Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Sources/CLI/Application.swift
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ struct Application: AsyncParsableCommand {
let signals = AsyncSignalHandler.create(notify: Application.signalSet)
return try await withThrowingTaskGroup(of: Int32?.self, returning: Int32.self) { group in
let waitAdded = group.addTaskUnlessCancelled {
try await process.wait()
let code = try await process.wait()
try await io.wait()
return code
}

guard waitAdded else {
Expand Down
42 changes: 42 additions & 0 deletions Sources/CLI/RunCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import ArgumentParser
import ContainerClient
import Containerization
import ContainerizationError
import ContainerizationExtras
import ContainerizationOS
import Foundation
import NIOCore
Expand Down Expand Up @@ -169,6 +170,13 @@ struct ProcessIO {
let stdin: Pipe?
let stdout: Pipe?
let stderr: Pipe?
var ioTracker: IoTracker?

struct IoTracker {
let stream: AsyncStream<Void>
let cont: AsyncStream<Void>.Continuation
let configuredStreams: Int
}

let stdio: [FileHandle?]

Expand Down Expand Up @@ -224,7 +232,11 @@ struct ProcessIO {
}
return Pipe()
}()

var configuredStreams = 0
let (stream, cc) = AsyncStream<Void>.makeStream()
if let stdout {
configuredStreams += 1
let pout: FileHandle = {
if let current {
return current.handle
Expand All @@ -237,6 +249,7 @@ struct ProcessIO {
let data = handle.availableData
if data.isEmpty {
rout.readabilityHandler = nil
cc.yield()
return
}
try! pout.write(contentsOf: data)
Expand All @@ -251,25 +264,54 @@ struct ProcessIO {
return Pipe()
}()
if let stderr {
configuredStreams += 1
let perr: FileHandle = .standardError
let rerr = stderr.fileHandleForReading
rerr.readabilityHandler = { handle in
let data = handle.availableData
if data.isEmpty {
rerr.readabilityHandler = nil
cc.yield()
return
}
try! perr.write(contentsOf: data)
}
stdio[2] = stderr.fileHandleForWriting
}

var ioTracker: IoTracker? = nil
if configuredStreams > 0 {
ioTracker = .init(stream: stream, cont: cc, configuredStreams: configuredStreams)
}

return .init(
stdin: stdin,
stdout: stdout,
stderr: stderr,
ioTracker: ioTracker,
stdio: stdio,
console: current
)
}

public func wait() async throws {
guard let ioTracker = self.ioTracker else {
return
}
do {
try await Timeout.run(seconds: 3) {
var counter = ioTracker.configuredStreams
for await _ in ioTracker.stream {
counter -= 1
if counter == 0 {
ioTracker.cont.finish()
break
}
}
}
} catch {
log.error("Timeout waiting for IO to complete : \(error)")
throw error
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to throw here since self is a struct and we cannot update self.ioTracker to be nil (without going throw a few hoops) - At least this way the client will know that something is off and they wont repeatedly call this wait method

}
}
}
175 changes: 104 additions & 71 deletions Sources/Services/ContainerSandboxService/SandboxService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -154,88 +154,111 @@ public actor SandboxService {
@Sendable
public func startProcess(_ message: XPCMessage) async throws -> XPCMessage {
self.log.info("`start` xpc handler")
return try await self.lock.withLock { _ in
return try await self.lock.withLock { lock in
let id = try message.id()
let stdio = message.stdio()
let containerInfo = try await self.getContainer()
let containerId = containerInfo.container.id
let container = containerInfo.container
let bundle = containerInfo.bundle
if id == containerId {
guard await self.state == .booted else {
throw ContainerizationError(
.invalidState,
message: "container expected to be in booted state, got: \(await self.state)"
)
}
let containerLog = try FileHandle(forWritingTo: bundle.containerLog)
let config = containerInfo.config
let stdout = {
if let h = stdio[1] {
return MultiWriter(handles: [h, containerLog])
}
return MultiWriter(handles: [containerLog])
}()
let stderr: MultiWriter? = {
if !config.initProcess.terminal {
if let h = stdio[2] {
return MultiWriter(handles: [h, containerLog])
}
return MultiWriter(handles: [containerLog])
}
return nil
}()
if let h = stdio[0] {
container.stdin = h
}
container.stdout = stdout
if let stderr {
container.stderr = stderr
}
await self.setState(.starting)
do {
try await container.start()
let waitFunc: ExitMonitor.WaitHandler = {
let code = try await container.wait()
return code
}
try await self.monitor.track(id: id, waitingOn: waitFunc)
} catch {
try? await self.cleanupContainer()
await self.setState(.created)
try await self.sendContainerEvent(.containerExit(id: id, exitCode: -1))
throw error
}
try await self.startInitProcess(stdio: stdio, lock: lock)
await self.setState(.running)
try await self.sendContainerEvent(.containerStart(id: id))
} else {
// we are starting a process other than the init process. Check if it exists
guard let processInfo = await self.processes[id] else {
throw ContainerizationError(.notFound, message: "Process with id \(id)")
try await self.startExecProcess(processId: id, stdio: stdio, lock: lock)
}
return message.reply()
}
}

private func startInitProcess(stdio: [FileHandle?], lock: AsyncLock.Context) async throws {
let info = try self.getContainer()
let container = info.container
let bundle = info.bundle
let id = container.id
guard self.state == .booted else {
throw ContainerizationError(
.invalidState,
message: "container expected to be in booted state, got: \(self.state)"
)
}
let containerLog = try FileHandle(forWritingTo: bundle.containerLog)
let config = info.config
let stdout = {
if let h = stdio[1] {
return MultiWriter(handles: [h, containerLog])
}
return MultiWriter(handles: [containerLog])
}()
let stderr: MultiWriter? = {
if !config.initProcess.terminal {
if let h = stdio[2] {
return MultiWriter(handles: [h, containerLog])
}
let ociConfig = self.configureProcessConfig(config: processInfo.config)
let stdin: ReaderStream? = {
if let h = stdio[0] {
return h
}
return nil
}()
let process = try await container.exec(
id,
configuration: ociConfig,
stdin: stdin,
stdout: stdio[1],
stderr: stdio[2]
)
try await self.setUnderlyingProcess(id, process)
try await process.start()
let waitFunc: ExitMonitor.WaitHandler = {
try await process.wait()
return MultiWriter(handles: [containerLog])
}
return nil
}()
if let h = stdio[0] {
container.stdin = h
}
container.stdout = stdout
if let stderr {
container.stderr = stderr
}
self.setState(.starting)
do {
try await container.start()
let waitFunc: ExitMonitor.WaitHandler = {
let code = try await container.wait()
if let out = stdio[1] {
try self.closeHandle(out.fileDescriptor)
}
if let err = stdio[2] {
try self.closeHandle(err.fileDescriptor)
}
try await self.monitor.track(id: id, waitingOn: waitFunc)
return code
}
return message.reply()
try await self.monitor.track(id: id, waitingOn: waitFunc)
} catch {
try? await self.cleanupContainer()
self.setState(.created)
try await self.sendContainerEvent(.containerExit(id: id, exitCode: -1))
throw error
}
}

private func startExecProcess(processId id: String, stdio: [FileHandle?], lock: AsyncLock.Context) async throws {
let container = try self.getContainer().container
guard let processInfo = self.processes[id] else {
throw ContainerizationError(.notFound, message: "Process with id \(id)")
}
let ociConfig = self.configureProcessConfig(config: processInfo.config)
let stdin: ReaderStream? = {
if let h = stdio[0] {
return h
}
return nil
}()
let process = try await container.exec(
id,
configuration: ociConfig,
stdin: stdin,
stdout: stdio[1],
stderr: stdio[2]
)
try self.setUnderlyingProcess(id, process)
try await process.start()
let waitFunc: ExitMonitor.WaitHandler = {
let code = try await process.wait()
if let out = stdio[1] {
try self.closeHandle(out.fileDescriptor)
}
if let err = stdio[2] {
try self.closeHandle(err.fileDescriptor)
}
return code
}
try await self.monitor.track(id: id, waitingOn: waitFunc)
}

/// Create a process inside the virtual machine for the container.
Expand Down Expand Up @@ -267,13 +290,14 @@ public actor SandboxService {
try await self.monitor.registerProcess(
id: id,
onExit: { id, code in
guard await self.processes[id] != nil else {
guard let process = await self.processes[id]?.process else {
throw ContainerizationError(.invalidState, message: "ProcessInfo missing for process \(id)")
}
for cc in await self.waiters[id] ?? [] {
cc.resume(returning: code)
}
await self.removeWaiters(for: id)
try await process.delete()
try await self.setProcessState(id: id, state: .stopped(code))
})
return message.reply()
Expand Down Expand Up @@ -664,6 +688,15 @@ public actor SandboxService {
return proc
}

private nonisolated func closeHandle(_ handle: Int32) throws {
guard close(handle) == 0 else {
guard let errCode = POSIXErrorCode(rawValue: errno) else {
fatalError("failed to convert errno to POSIXErrorCode")
}
throw POSIXError(errCode)
}
}

private nonisolated func modifyingEnvironment(_ config: ProcessConfiguration) -> [String] {
guard config.terminal else {
return config.environment
Expand Down