Skip to content

Commit 7e332b9

Browse files
AnthonyMDevgh-action-runner
authored andcommitted
Implement continuation queue in AsyncReadWriteLock (apollographql/apollo-ios-dev#851)
1 parent 62377ef commit 7e332b9

File tree

2 files changed

+84
-12
lines changed

2 files changed

+84
-12
lines changed

Sources/Apollo/Caching/RecordSet.swift

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,19 @@ public struct RecordSet: Sendable, Hashable {
5555
}
5656

5757
@discardableResult public mutating func merge(record: Record) -> Set<CacheKey> {
58-
if var oldRecord = storage.removeValue(forKey: record.key) {
58+
if let oldRecord = storage[record.key] {
5959
var changedKeys: Set<CacheKey> = Set()
60-
60+
var updatedRecord = oldRecord
61+
6162
for (key, value) in record.fields {
6263
if let oldValue = oldRecord.fields[key], AnyHashable(oldValue) == AnyHashable(value) {
6364
continue
6465
}
65-
oldRecord[key] = value
66+
updatedRecord[key] = value
6667
changedKeys.insert([record.key, key].joined(separator: "."))
6768
}
68-
storage[record.key] = oldRecord
69+
70+
storage[record.key] = updatedRecord
6971
return changedKeys
7072
} else {
7173
storage[record.key] = record

Sources/Apollo/Internal Utilities/AsyncReadWriteLock.swift

Lines changed: 78 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
import Foundation
22

33
actor AsyncReadWriteLock {
4+
private enum State {
5+
case idle
6+
case reading
7+
case writing
8+
}
9+
410
private final class ReadTask: Sendable {
511
let task: Task<Void, any Swift.Error>
612

@@ -13,19 +19,32 @@ actor AsyncReadWriteLock {
1319

1420
private var currentReadTasks: [ObjectIdentifier: ReadTask] = [:]
1521
private var currentWriteTask: Task<Void, any Swift.Error>?
22+
private var queue: [(handle: CheckedContinuation<Void, Never>, isWriter: Bool)] = []
23+
24+
private var state: State {
25+
if currentWriteTask != nil { return .writing }
26+
if !currentReadTasks.isEmpty { return .reading }
27+
return .idle
28+
}
1629

1730
/// Waits for all current reads/writes to be completed, then calls the provided closure while preventing
1831
/// any other reads/writes from beginning.
1932
///
2033
/// This function should be `rethrows` but the compiler doesn't understand that when passing the `block` into a Task.
2134
/// If the `body` provided does not throw, this function will not throw.
2235
func write(_ body: @Sendable @escaping () async throws -> Void) async throws {
23-
while currentWriteTask != nil || !currentReadTasks.isEmpty {
24-
await Task.yield()
25-
continue
36+
switch state {
37+
case .writing, .reading:
38+
await addToQueueAndWait(isWriter: true)
39+
40+
case .idle:
41+
break
2642
}
2743

28-
defer { currentWriteTask = nil }
44+
defer {
45+
currentWriteTask = nil
46+
writeTaskDidFinish()
47+
}
2948
let writeTask = Task {
3049
try await body()
3150
}
@@ -40,17 +59,68 @@ actor AsyncReadWriteLock {
4059
/// This function should be `rethrows` but the compiler doesn't understand that when passing the `block` into a Task.
4160
/// If the `body` provided does not throw, this function will not throw.
4261
func read(_ body: @Sendable @escaping () async throws -> Void) async throws {
43-
while currentWriteTask != nil {
44-
await Task.yield()
45-
continue
62+
switch state {
63+
case .writing:
64+
await addToQueueAndWait(isWriter: false)
65+
66+
case .reading:
67+
// If we are currently reading, there will only be a queue if there is at least one waiting writer. If a writer
68+
// is waiting, we should queue the new reader for after the write.
69+
if !queue.isEmpty {
70+
await addToQueueAndWait(isWriter: false)
71+
}
72+
case .idle:
73+
break
4674
}
4775

4876
let readTask = ReadTask(body)
4977
let taskID = ObjectIdentifier(readTask)
50-
defer { currentReadTasks[taskID] = nil }
78+
defer {
79+
currentReadTasks[taskID] = nil
80+
readTaskDidFinish()
81+
}
5182
currentReadTasks[taskID] = readTask
5283

5384
try await readTask.task.value
5485
}
5586

87+
private func addToQueueAndWait(isWriter: Bool) async {
88+
await withCheckedContinuation { continuation in
89+
queue.append((handle: continuation, isWriter: isWriter))
90+
}
91+
}
92+
93+
private func readTaskDidFinish() {
94+
if state == .idle {
95+
wakeNext()
96+
}
97+
}
98+
99+
private func writeTaskDidFinish() {
100+
wakeNext()
101+
}
102+
103+
private func wakeNext() {
104+
guard !queue.isEmpty else {
105+
return
106+
}
107+
108+
let next = queue[0]
109+
next.handle.resume(returning: ())
110+
111+
if next.isWriter {
112+
queue.remove(at: 0)
113+
return
114+
115+
} else {
116+
var lastReader = 0
117+
for i in 1..<queue.count {
118+
guard !queue[i].isWriter else { break }
119+
queue[i].handle.resume(returning: ())
120+
lastReader = i
121+
}
122+
queue.removeSubrange(0...lastReader)
123+
}
124+
}
125+
56126
}

0 commit comments

Comments
 (0)