Skip to content

Commit c7c88c2

Browse files
authored
[Build] Do not use unbounded DispatchIO readers for tar tranfers (#257)
- Addresses #166 - Memory utilization explodes since there is no mechanism for backpressure - Using a synchronous buffered reader seem to provide similar performance without the memory explosion issue - 4MB buffer seems to provide the best results | Metric | 1MB Buffer | 4MB Buffer | Unbounded Zero-Copy | |--------------------------|------------|------------|---------------------| | Build Time | 149.33s | 138.57s | 139.79s | | Max RAM Used | 2.16 GB | 3.02 GB | 3.52 GB | | Peak Memory Footprint | 8.30 GB | 8.17 GB | 10.21 GB | | Page Reclaims | 1,085,559 | 1,039,677 | 1,619,943 | | Page Faults | 115 | 148 | 143 | | CPU Usage (User+Sys) | 53.71s | 53.12s | 60.44s |
1 parent 4a6a1f1 commit c7c88c2

File tree

2 files changed

+144
-1
lines changed

2 files changed

+144
-1
lines changed

Sources/ContainerBuild/BuildFSSync.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ actor BuildFSSync: BuildPipelineHandler {
229229
pathInArchive: URL(fileURLWithPath: rel))
230230
}
231231

232-
for await chunk in try tarURL.zeroCopyReader() {
232+
for try await chunk in try tarURL.bufferedCopyReader() {
233233
let part = BuildTransfer(
234234
id: packet.id,
235235
source: tarURL.path,

Sources/ContainerBuild/URL+Extensions.swift

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,4 +138,147 @@ extension URL {
138138
}
139139
}
140140
}
141+
142+
func bufferedCopyReader(chunkSize: Int = 4 * 1024 * 1024) throws -> BufferedCopyReader {
143+
try BufferedCopyReader(url: self, chunkSize: chunkSize)
144+
}
145+
}
146+
147+
/// A synchronous buffered reader that reads one chunk at a time from a file
148+
/// Uses a configurable buffer size (default 4MB) and only reads when nextChunk() is called
149+
/// Implements AsyncSequence for use with `for await` loops
150+
public final class BufferedCopyReader: AsyncSequence {
151+
public typealias Element = Data
152+
public typealias AsyncIterator = BufferedCopyReaderIterator
153+
154+
private let inputStream: InputStream
155+
private let chunkSize: Int
156+
private var isFinished: Bool = false
157+
private let reusableBuffer: UnsafeMutablePointer<UInt8>
158+
159+
/// Initialize a buffered copy reader for the given URL
160+
/// - Parameters:
161+
/// - url: The file URL to read from
162+
/// - chunkSize: Size of each chunk to read (default: 4MB)
163+
public init(url: URL, chunkSize: Int = 4 * 1024 * 1024) throws {
164+
guard let stream = InputStream(url: url) else {
165+
throw CocoaError(.fileReadNoSuchFile)
166+
}
167+
self.inputStream = stream
168+
self.chunkSize = chunkSize
169+
self.reusableBuffer = UnsafeMutablePointer<UInt8>.allocate(capacity: chunkSize)
170+
self.inputStream.open()
171+
}
172+
173+
deinit {
174+
inputStream.close()
175+
reusableBuffer.deallocate()
176+
}
177+
178+
/// Create an async iterator for this sequence
179+
public func makeAsyncIterator() -> BufferedCopyReaderIterator {
180+
BufferedCopyReaderIterator(reader: self)
181+
}
182+
183+
/// Read the next chunk of data from the file
184+
/// - Returns: Data chunk, or nil if end of file reached
185+
/// - Throws: Any file reading errors
186+
public func nextChunk() throws -> Data? {
187+
guard !isFinished else { return nil }
188+
189+
// Read directly into our reusable buffer
190+
let bytesRead = inputStream.read(reusableBuffer, maxLength: chunkSize)
191+
192+
// Check for errors
193+
if bytesRead < 0 {
194+
if let error = inputStream.streamError {
195+
throw error
196+
}
197+
throw CocoaError(.fileReadUnknown)
198+
}
199+
200+
// If we read no data, we've reached the end
201+
if bytesRead == 0 {
202+
isFinished = true
203+
return nil
204+
}
205+
206+
// If we read less than the chunk size, this is the last chunk
207+
if bytesRead < chunkSize {
208+
isFinished = true
209+
}
210+
211+
// Create Data object only with the bytes actually read
212+
return Data(bytes: reusableBuffer, count: bytesRead)
213+
}
214+
215+
/// Check if the reader has finished reading the file
216+
public var hasFinished: Bool {
217+
isFinished
218+
}
219+
220+
/// Reset the reader to the beginning of the file
221+
/// Note: InputStream doesn't support seeking, so this recreates the stream
222+
/// - Throws: Any file opening errors
223+
public func reset() throws {
224+
inputStream.close()
225+
// Note: InputStream doesn't provide a way to get the original URL,
226+
// so reset functionality is limited. Consider removing this method
227+
// or storing the original URL if reset is needed.
228+
throw CocoaError(
229+
.fileReadUnsupportedScheme,
230+
userInfo: [
231+
NSLocalizedDescriptionKey: "Reset not supported with InputStream-based implementation"
232+
])
233+
}
234+
235+
/// Get the current file offset
236+
/// Note: InputStream doesn't provide offset information
237+
/// - Returns: Current position in the file
238+
/// - Throws: Unsupported operation error
239+
public func currentOffset() throws -> UInt64 {
240+
throw CocoaError(
241+
.fileReadUnsupportedScheme,
242+
userInfo: [
243+
NSLocalizedDescriptionKey: "Offset tracking not supported with InputStream-based implementation"
244+
])
245+
}
246+
247+
/// Seek to a specific offset in the file
248+
/// Note: InputStream doesn't support seeking
249+
/// - Parameter offset: The byte offset to seek to
250+
/// - Throws: Unsupported operation error
251+
public func seek(to offset: UInt64) throws {
252+
throw CocoaError(
253+
.fileReadUnsupportedScheme,
254+
userInfo: [
255+
NSLocalizedDescriptionKey: "Seeking not supported with InputStream-based implementation"
256+
])
257+
}
258+
259+
/// Close the input stream explicitly (called automatically in deinit)
260+
public func close() {
261+
inputStream.close()
262+
isFinished = true
263+
}
264+
}
265+
266+
/// AsyncIteratorProtocol implementation for BufferedCopyReader
267+
public struct BufferedCopyReaderIterator: AsyncIteratorProtocol {
268+
public typealias Element = Data
269+
270+
private let reader: BufferedCopyReader
271+
272+
init(reader: BufferedCopyReader) {
273+
self.reader = reader
274+
}
275+
276+
/// Get the next chunk of data asynchronously
277+
/// - Returns: Next data chunk, or nil when finished
278+
/// - Throws: Any file reading errors
279+
public mutating func next() async throws -> Data? {
280+
// Yield control to allow other tasks to run, then read synchronously
281+
await Task.yield()
282+
return try reader.nextChunk()
283+
}
141284
}

0 commit comments

Comments
 (0)