Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Sources/CLI/Application.swift
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ struct Application: AsyncParsableCommand {
let signals = AsyncSignalHandler.create(notify: Application.signalSet)
return try await withThrowingTaskGroup(of: Int32?.self, returning: Int32.self) { group in
let waitAdded = group.addTaskUnlessCancelled {
try await process.wait()
let code = try await process.wait()
try await io.wait()
return code
}

guard waitAdded else {
Expand Down
42 changes: 42 additions & 0 deletions Sources/CLI/RunCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import ArgumentParser
import ContainerClient
import Containerization
import ContainerizationError
import ContainerizationExtras
import ContainerizationOS
import Foundation
import NIOCore
Expand Down Expand Up @@ -169,6 +170,13 @@ struct ProcessIO {
let stdin: Pipe?
let stdout: Pipe?
let stderr: Pipe?
var ioTracker: IoTracker?

struct IoTracker {
let stream: AsyncStream<Void>
let cont: AsyncStream<Void>.Continuation
let configuredStreams: Int
}

let stdio: [FileHandle?]

Expand Down Expand Up @@ -224,7 +232,11 @@ struct ProcessIO {
}
return Pipe()
}()

var configuredStreams = 0
let (stream, cc) = AsyncStream<Void>.makeStream()
if let stdout {
configuredStreams += 1
let pout: FileHandle = {
if let current {
return current.handle
Expand All @@ -237,6 +249,7 @@ struct ProcessIO {
let data = handle.availableData
if data.isEmpty {
rout.readabilityHandler = nil
cc.yield()
return
}
try! pout.write(contentsOf: data)
Expand All @@ -251,25 +264,54 @@ struct ProcessIO {
return Pipe()
}()
if let stderr {
configuredStreams += 1
let perr: FileHandle = .standardError
let rerr = stderr.fileHandleForReading
rerr.readabilityHandler = { handle in
let data = handle.availableData
if data.isEmpty {
rerr.readabilityHandler = nil
cc.yield()
return
}
try! perr.write(contentsOf: data)
}
stdio[2] = stderr.fileHandleForWriting
}

var ioTracker: IoTracker? = nil
if configuredStreams > 0 {
ioTracker = .init(stream: stream, cont: cc, configuredStreams: configuredStreams)
}

return .init(
stdin: stdin,
stdout: stdout,
stderr: stderr,
ioTracker: ioTracker,
stdio: stdio,
console: current
)
}

public func wait() async throws {
guard let ioTracker = self.ioTracker else {
return
}
do {
try await Timeout.run(seconds: 3) {
var counter = ioTracker.configuredStreams
for await _ in ioTracker.stream {
counter -= 1
if counter == 0 {
ioTracker.cont.finish()
break
}
}
}
} catch {
log.error("Timeout waiting for IO to complete : \(error)")
throw error
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to throw here since self is a struct and we cannot update self.ioTracker to be nil (without going throw a few hoops) - At least this way the client will know that something is off and they wont repeatedly call this wait method

}
}
}
175 changes: 104 additions & 71 deletions Sources/Services/ContainerSandboxService/SandboxService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -154,88 +154,111 @@ public actor SandboxService {
@Sendable
public func startProcess(_ message: XPCMessage) async throws -> XPCMessage {
self.log.info("`start` xpc handler")
return try await self.lock.withLock { _ in
return try await self.lock.withLock { lock in
let id = try message.id()
let stdio = message.stdio()
let containerInfo = try await self.getContainer()
let containerId = containerInfo.container.id
let container = containerInfo.container
let bundle = containerInfo.bundle
if id == containerId {
guard await self.state == .booted else {
throw ContainerizationError(
.invalidState,
message: "container expected to be in booted state, got: \(await self.state)"
)
}
let containerLog = try FileHandle(forWritingTo: bundle.containerLog)
let config = containerInfo.config
let stdout = {
if let h = stdio[1] {
return MultiWriter(handles: [h, containerLog])
}
return MultiWriter(handles: [containerLog])
}()
let stderr: MultiWriter? = {
if !config.initProcess.terminal {
if let h = stdio[2] {
return MultiWriter(handles: [h, containerLog])
}
return MultiWriter(handles: [containerLog])
}
return nil
}()
if let h = stdio[0] {
container.stdin = h
}
container.stdout = stdout
if let stderr {
container.stderr = stderr
}
await self.setState(.starting)
do {
try await container.start()
let waitFunc: ExitMonitor.WaitHandler = {
let code = try await container.wait()
return code
}
try await self.monitor.track(id: id, waitingOn: waitFunc)
} catch {
try? await self.cleanupContainer()
await self.setState(.created)
try await self.sendContainerEvent(.containerExit(id: id, exitCode: -1))
throw error
}
try await self.startInitProcess(stdio: stdio, lock: lock)
await self.setState(.running)
try await self.sendContainerEvent(.containerStart(id: id))
} else {
// we are starting a process other than the init process. Check if it exists
guard let processInfo = await self.processes[id] else {
throw ContainerizationError(.notFound, message: "Process with id \(id)")
try await self.startExecProcess(processId: id, stdio: stdio, lock: lock)
}
return message.reply()
}
}

private func startInitProcess(stdio: [FileHandle?], lock: AsyncLock.Context) async throws {
let info = try self.getContainer()
let container = info.container
let bundle = info.bundle
let id = container.id
guard self.state == .booted else {
throw ContainerizationError(
.invalidState,
message: "container expected to be in booted state, got: \(self.state)"
)
}
let containerLog = try FileHandle(forWritingTo: bundle.containerLog)
let config = info.config
let stdout = {
if let h = stdio[1] {
return MultiWriter(handles: [h, containerLog])
}
return MultiWriter(handles: [containerLog])
}()
let stderr: MultiWriter? = {
if !config.initProcess.terminal {
if let h = stdio[2] {
return MultiWriter(handles: [h, containerLog])
}
let ociConfig = self.configureProcessConfig(config: processInfo.config)
let stdin: ReaderStream? = {
if let h = stdio[0] {
return h
}
return nil
}()
let process = try await container.exec(
id,
configuration: ociConfig,
stdin: stdin,
stdout: stdio[1],
stderr: stdio[2]
)
try await self.setUnderlyingProcess(id, process)
try await process.start()
let waitFunc: ExitMonitor.WaitHandler = {
try await process.wait()
return MultiWriter(handles: [containerLog])
}
return nil
}()
if let h = stdio[0] {
container.stdin = h
}
container.stdout = stdout
if let stderr {
container.stderr = stderr
}
self.setState(.starting)
do {
try await container.start()
let waitFunc: ExitMonitor.WaitHandler = {
let code = try await container.wait()
if let out = stdio[1] {
try self.closeHandle(out.fileDescriptor)
}
if let err = stdio[2] {
try self.closeHandle(err.fileDescriptor)
}
try await self.monitor.track(id: id, waitingOn: waitFunc)
return code
}
return message.reply()
try await self.monitor.track(id: id, waitingOn: waitFunc)
} catch {
try? await self.cleanupContainer()
self.setState(.created)
try await self.sendContainerEvent(.containerExit(id: id, exitCode: -1))
throw error
}
}

private func startExecProcess(processId id: String, stdio: [FileHandle?], lock: AsyncLock.Context) async throws {
let container = try self.getContainer().container
guard let processInfo = self.processes[id] else {
throw ContainerizationError(.notFound, message: "Process with id \(id)")
}
let ociConfig = self.configureProcessConfig(config: processInfo.config)
let stdin: ReaderStream? = {
if let h = stdio[0] {
return h
}
return nil
}()
let process = try await container.exec(
id,
configuration: ociConfig,
stdin: stdin,
stdout: stdio[1],
stderr: stdio[2]
)
try self.setUnderlyingProcess(id, process)
try await process.start()
let waitFunc: ExitMonitor.WaitHandler = {
let code = try await process.wait()
if let out = stdio[1] {
try self.closeHandle(out.fileDescriptor)
}
if let err = stdio[2] {
try self.closeHandle(err.fileDescriptor)
}
return code
}
try await self.monitor.track(id: id, waitingOn: waitFunc)
}

/// Create a process inside the virtual machine for the container.
Expand Down Expand Up @@ -267,13 +290,14 @@ public actor SandboxService {
try await self.monitor.registerProcess(
id: id,
onExit: { id, code in
guard await self.processes[id] != nil else {
guard let process = await self.processes[id]?.process else {
throw ContainerizationError(.invalidState, message: "ProcessInfo missing for process \(id)")
}
for cc in await self.waiters[id] ?? [] {
cc.resume(returning: code)
}
await self.removeWaiters(for: id)
try await process.delete()
try await self.setProcessState(id: id, state: .stopped(code))
})
return message.reply()
Expand Down Expand Up @@ -664,6 +688,15 @@ public actor SandboxService {
return proc
}

private nonisolated func closeHandle(_ handle: Int32) throws {
guard close(handle) == 0 else {
guard let errCode = POSIXErrorCode(rawValue: errno) else {
fatalError("failed to convert errno to POSIXErrorCode")
}
throw POSIXError(errCode)
}
}

private nonisolated func modifyingEnvironment(_ config: ProcessConfiguration) -> [String] {
guard config.terminal else {
return config.environment
Expand Down