Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Package.resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import PackageDescription
let releaseVersion = ProcessInfo.processInfo.environment["RELEASE_VERSION"] ?? "0.0.0"
let gitCommit = ProcessInfo.processInfo.environment["GIT_COMMIT"] ?? "unspecified"
let builderShimVersion = "0.7.0"
let scVersion = "0.13.0"
let scVersion = "0.11.0"

let package = Package(
name: "container",
Expand Down
11 changes: 8 additions & 3 deletions Sources/ContainerClient/Core/ClientImage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,11 @@ extension ClientImage {
})
}

public static func pull(reference: String, platform: Platform? = nil, scheme: RequestScheme = .auto, progressUpdate: ProgressUpdateHandler? = nil) async throws -> ClientImage {
public static func pull(reference: String, platform: Platform? = nil, scheme: RequestScheme = .auto, progressUpdate: ProgressUpdateHandler? = nil, maxConcurrentDownloads: Int = 3) async throws -> ClientImage {
guard maxConcurrentDownloads > 0 else {
throw ContainerizationError(.invalidArgument, message: "maxConcurrentDownloads must be greater than 0, got \(maxConcurrentDownloads)")
}

let client = newXPCClient()
let request = newRequest(.imagePull)

Expand All @@ -234,6 +238,7 @@ extension ClientImage {

let insecure = try scheme.schemeFor(host: host) == .http
request.set(key: .insecureFlag, value: insecure)
request.set(key: .maxConcurrentDownloads, value: Int64(maxConcurrentDownloads))

var progressUpdateClient: ProgressUpdateClient?
if let progressUpdate {
Expand Down Expand Up @@ -293,7 +298,7 @@ extension ClientImage {
return (digests, size)
}

public static func fetch(reference: String, platform: Platform? = nil, scheme: RequestScheme = .auto, progressUpdate: ProgressUpdateHandler? = nil) async throws -> ClientImage
public static func fetch(reference: String, platform: Platform? = nil, scheme: RequestScheme = .auto, progressUpdate: ProgressUpdateHandler? = nil, maxConcurrentDownloads: Int = 3) async throws -> ClientImage
{
do {
let match = try await self.get(reference: reference)
Expand All @@ -307,7 +312,7 @@ extension ClientImage {
guard err.isCode(.notFound) else {
throw err
}
return try await Self.pull(reference: reference, platform: platform, scheme: scheme, progressUpdate: progressUpdate)
return try await Self.pull(reference: reference, platform: platform, scheme: scheme, progressUpdate: progressUpdate, maxConcurrentDownloads: maxConcurrentDownloads)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions Sources/ContainerClient/Flags.swift
Original file line number Diff line number Diff line change
Expand Up @@ -214,5 +214,8 @@ public struct Flags {

@Option(name: .long, help: ArgumentHelp("Progress type (format: none|ansi)", valueName: "type"))
public var progress: ProgressType = .ansi

@Option(name: .long, help: "Maximum number of concurrent layer downloads (default: 3)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adityaramani Should we use the word "layer" or "blob" here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use just "concurrent downloads"

public var maxConcurrentDownloads: Int = 3
}
}
2 changes: 1 addition & 1 deletion Sources/ContainerCommands/Image/ImagePull.swift
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ extension Application {
let taskManager = ProgressTaskCoordinator()
let fetchTask = await taskManager.startTask()
let image = try await ClientImage.pull(
reference: processedReference, platform: p, scheme: scheme, progressUpdate: ProgressTaskCoordinator.handler(for: fetchTask, from: progress.handler)
reference: processedReference, platform: p, scheme: scheme, progressUpdate: ProgressTaskCoordinator.handler(for: fetchTask, from: progress.handler), maxConcurrentDownloads: self.progressFlags.maxConcurrentDownloads
)

progress.set(description: "Unpacking image")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public enum ImagesServiceXPCKeys: String {
case ociPlatform
case insecureFlag
case garbageCollect
case maxConcurrentDownloads

/// ContentStore
case digest
Expand All @@ -54,6 +55,10 @@ extension XPCMessage {
self.set(key: key.rawValue, value: value)
}

public func set(key: ImagesServiceXPCKeys, value: Int64) {
self.set(key: key.rawValue, value: value)
}

public func set(key: ImagesServiceXPCKeys, value: Data) {
self.set(key: key.rawValue, value: value)
}
Expand All @@ -78,6 +83,10 @@ extension XPCMessage {
self.uint64(key: key.rawValue)
}

public func int64(key: ImagesServiceXPCKeys) -> Int64 {
self.int64(key: key.rawValue)
}

public func bool(key: ImagesServiceXPCKeys) -> Bool {
self.bool(key: key.rawValue)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ public actor ImagesService {
return try await imageStore.list().map { $0.description.fromCZ }
}

public func pull(reference: String, platform: Platform?, insecure: Bool, progressUpdate: ProgressUpdateHandler?) async throws -> ImageDescription {
self.log.info("ImagesService: \(#function) - ref: \(reference), platform: \(String(describing: platform)), insecure: \(insecure)")
public func pull(reference: String, platform: Platform?, insecure: Bool, progressUpdate: ProgressUpdateHandler?, maxConcurrentDownloads: Int = 3) async throws -> ImageDescription {
self.log.info("ImagesService: \(#function) - ref: \(reference), platform: \(String(describing: platform)), insecure: \(insecure), maxConcurrentDownloads: \(maxConcurrentDownloads)")
let img = try await Self.withAuthentication(ref: reference) { auth in
try await self.imageStore.pull(
reference: reference, platform: platform, insecure: insecure, auth: auth, progress: ContainerizationProgressAdapter.handler(from: progressUpdate))
reference: reference, platform: platform, insecure: insecure, auth: auth, progress: ContainerizationProgressAdapter.handler(from: progressUpdate), maxConcurrentDownloads: maxConcurrentDownloads)
}
guard let img else {
throw ContainerizationError(.internalError, message: "failed to pull image \(reference)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ public struct ImagesServiceHarness: Sendable {
platform = try JSONDecoder().decode(ContainerizationOCI.Platform.self, from: platformData)
}
let insecure = message.bool(key: .insecureFlag)
let maxConcurrentDownloads = message.int64(key: .maxConcurrentDownloads)

let progressUpdateService = ProgressUpdateService(message: message)
let imageDescription = try await service.pull(reference: ref, platform: platform, insecure: insecure, progressUpdate: progressUpdateService?.handler)
let imageDescription = try await service.pull(reference: ref, platform: platform, insecure: insecure, progressUpdate: progressUpdateService?.handler, maxConcurrentDownloads: Int(maxConcurrentDownloads))

let imageData = try JSONEncoder().encode(imageDescription)
let reply = message.reply()
Expand Down
13 changes: 7 additions & 6 deletions Sources/Services/ContainerSandboxService/SandboxService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public actor SandboxService {
try bundle.createLogFile()

var config = try bundle.configuration

let vmm = VZVirtualMachineManager(
kernel: try bundle.kernel,
initialFilesystem: bundle.initialFilesystem.asMount,
Expand Down Expand Up @@ -850,19 +851,19 @@ public actor SandboxService {
czConfig.process.terminal = process.terminal
czConfig.process.workingDirectory = process.workingDirectory
czConfig.process.rlimits = process.rlimits.map {
.init(type: $0.limit, hard: $0.hard, soft: $0.soft)
POSIXRlimit(type: $0.limit, hard: $0.hard, soft: $0.soft)
}
switch process.user {
case .raw(let name):
czConfig.process.user = .init(
czConfig.process.user = ContainerizationOCI.User(
uid: 0,
gid: 0,
umask: nil,
additionalGids: process.supplementalGroups,
username: name
)
case .id(let uid, let gid):
czConfig.process.user = .init(
czConfig.process.user = ContainerizationOCI.User(
uid: uid,
gid: gid,
umask: nil,
Expand Down Expand Up @@ -892,19 +893,19 @@ public actor SandboxService {
proc.terminal = config.terminal
proc.workingDirectory = config.workingDirectory
proc.rlimits = config.rlimits.map {
.init(type: $0.limit, hard: $0.hard, soft: $0.soft)
POSIXRlimit(type: $0.limit, hard: $0.hard, soft: $0.soft)
}
switch config.user {
case .raw(let name):
proc.user = .init(
proc.user = ContainerizationOCI.User(
uid: 0,
gid: 0,
umask: nil,
additionalGids: config.supplementalGroups,
username: name
)
case .id(let uid, let gid):
proc.user = .init(
proc.user = ContainerizationOCI.User(
uid: uid,
gid: gid,
umask: nil,
Expand Down
109 changes: 109 additions & 0 deletions testConcurrency.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#!/usr/bin/env swift

import Foundation

func testConcurrentDownloads() async throws {
print("Testing concurrent download behavior...\n")

// Track concurrent task count
actor ConcurrencyTracker {
var currentCount = 0
var maxObservedCount = 0
var completedTasks = 0

func taskStarted() {
currentCount += 1
maxObservedCount = max(maxObservedCount, currentCount)
}

func taskCompleted() {
currentCount -= 1
completedTasks += 1
}

func getStats() -> (max: Int, completed: Int) {
return (maxObservedCount, completedTasks)
}

func reset() {
currentCount = 0
maxObservedCount = 0
completedTasks = 0
}
}

let tracker = ConcurrencyTracker()

// Test with different concurrency limits
for maxConcurrent in [1, 3, 6] {
await tracker.reset()

// Simulate downloading 20 layers
let layerCount = 20
let layers = Array(0..<layerCount)

print("Testing maxConcurrent=\(maxConcurrent) with \(layerCount) layers...")

let startTime = Date()

try await withThrowingTaskGroup(of: Void.self) { group in
var iterator = layers.makeIterator()

// Start initial batch based on maxConcurrent
for _ in 0..<maxConcurrent {
if iterator.next() != nil {
group.addTask {
await tracker.taskStarted()
try await Task.sleep(nanoseconds: 10_000_000)
await tracker.taskCompleted()
}
}
}
for try await _ in group {
if iterator.next() != nil {
group.addTask {
await tracker.taskStarted()
try await Task.sleep(nanoseconds: 10_000_000)
await tracker.taskCompleted()
}
}
}
}

let duration = Date().timeIntervalSince(startTime)
let stats = await tracker.getStats()

print(" ✓ Completed: \(stats.completed)/\(layerCount)")
print(" ✓ Max concurrent: \(stats.max)")
print(" ✓ Duration: \(String(format: "%.3f", duration))s")

guard stats.max <= maxConcurrent + 1 else {
throw TestError.concurrencyLimitExceeded
}

guard stats.completed == layerCount else {
throw TestError.incompleteTasks
}

print(" ✅ PASSED\n")
}

print("All tests passed!")
}

enum TestError: Error {
case concurrencyLimitExceeded
case incompleteTasks
}

Task {
do {
try await testConcurrentDownloads()
exit(0)
} catch {
print("Test failed: \(error)")
exit(1)
}
}

RunLoop.main.run()
91 changes: 91 additions & 0 deletions testParameterFlow.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/usr/bin/env swift

import Foundation

print("Testing parameter flow...\n")

print("1. CLI flag parsing...")
struct ProgressFlags {
var disableProgressUpdates = false
var maxConcurrentDownloads: Int = 3
}

let defaultFlags = ProgressFlags()
print(" ✓ Default: \(defaultFlags.maxConcurrentDownloads)")

let customFlags = ProgressFlags(disableProgressUpdates: false, maxConcurrentDownloads: 6)
print(" ✓ Custom: \(customFlags.maxConcurrentDownloads)")
print(" PASSED\n")

print("2. XPC key...")
enum ImageServiceXPCKeys: String {
case maxConcurrentDownloads
}

let key = ImageServiceXPCKeys.maxConcurrentDownloads
print(" ✓ Key exists: \(key.rawValue)")
print(" PASSED\n")

print("3. Function signatures...")
func mockClientImagePull(
reference: String,
maxConcurrentDownloads: Int = 3
) -> String {
return "pull(\(reference), maxConcurrent=\(maxConcurrentDownloads))"
}

_ = mockClientImagePull(reference: "nginx:latest")
_ = mockClientImagePull(reference: "nginx:latest", maxConcurrentDownloads: 6)
print(" ✓ Compiles")
print(" PASSED\n")

print("4. Parameter propagation...")

struct MockXPCMessage {
var values: [String: Any] = [:]

mutating func set(key: String, value: Int64) {
values[key] = value
}

func int64(key: String) -> Int64 {
return values[key] as? Int64 ?? 3
}
}

func simulateFlow(maxConcurrent: Int) -> Int {
let flags = ProgressFlags(maxConcurrentDownloads: maxConcurrent)
var xpcMessage = MockXPCMessage()
xpcMessage.set(key: "maxConcurrentDownloads", value: Int64(flags.maxConcurrentDownloads))
return Int(xpcMessage.int64(key: "maxConcurrentDownloads"))
}

for testValue in [1, 3, 6] {
guard simulateFlow(maxConcurrent: testValue) == testValue else {
print(" ✗ Failed")
exit(1)
}
}
print(" ✓ Values propagate correctly")
print(" PASSED\n")

print("5. Implementation verification...")

let filesToCheck = [
"Sources/ContainerClient/Flags.swift",
"Sources/ContainerClient/Core/ClientImage.swift",
"Sources/Services/ContainerImagesService/Server/ImageService.swift",
]

for file in filesToCheck {
if let content = try? String(contentsOf: URL(fileURLWithPath: file), encoding: .utf8),
content.contains("maxConcurrentDownloads") {
continue
}
print(" ✗ Missing in \(file)")
exit(1)
}
print(" ✓ Found in implementation")
print(" PASSED\n")

print("All tests passed!")
Loading