Skip to content

Commit e283033

Browse files
feat: batch support (#38)
1 parent b80998e commit e283033

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+2773
-1144
lines changed

example/pubspec.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ version: 1.0.0
44
publish_to: none
55

66
environment:
7-
sdk: ^3.2.5
7+
sdk: ^3.5.0
88

99
dependencies:
1010
nitric_sdk:
1111
path: ../
1212
uuid: ^4.3.3
1313

1414
dev_dependencies:
15-
lints: ^2.1.0
15+
lints: ^4.0.0
1616
test: ^1.24.0

lib/src/api/api.dart

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,11 @@ export 'secret.dart';
66
export 'topic.dart';
77
export 'proto.dart';
88
export 'queue.dart';
9+
export 'batch.dart';
10+
export 'sql.dart';
911

10-
typedef UseClientCallback<T extends Client, Resp> = Future<Resp> Function(T);
12+
typedef UseClientCallback<GrpcClient extends Client, Resp> = Future<Resp>
13+
Function(GrpcClient);
14+
15+
typedef ClientConstructor<GrpcClient extends Client> = GrpcClient Function(
16+
ClientChannel);

lib/src/api/batch.dart

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import 'dart:async';
2+
3+
import 'package:nitric_sdk/src/api/api.dart';
4+
import 'package:nitric_sdk/src/grpc_helper.dart';
5+
import 'package:nitric_sdk/src/nitric/proto/batch/v1/batch.pb.dart';
6+
import 'package:nitric_sdk/src/nitric/proto/batch/v1/batch.pbgrpc.dart' as $p;
7+
8+
class Job {
9+
String name;
10+
11+
Job(this.name);
12+
13+
Future<void> submit(Map<String, dynamic> message) async {
14+
final data = JobData(struct: Proto.structFromMap(message));
15+
16+
final req = $p.JobSubmitRequest(data: data, jobName: name);
17+
18+
await ClientChannelSingleton.useClient($p.BatchClient.new, (client) async {
19+
await client.submitJob(req);
20+
});
21+
}
22+
}

lib/src/api/bucket.dart

Lines changed: 14 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import 'dart:async';
22
import 'dart:convert';
33

4-
import 'package:nitric_sdk/src/api/api.dart';
54
import 'package:nitric_sdk/src/context/common.dart';
65
import 'package:nitric_sdk/src/grpc_helper.dart';
76
import 'package:nitric_sdk/src/nitric/proto/storage/v1/storage.pbgrpc.dart'
@@ -11,44 +10,22 @@ import 'package:fixnum/fixnum.dart';
1110
import 'package:nitric_sdk/src/workers/common.dart';
1211

1312
class Bucket {
14-
late final $p.StorageClient? _storageClient;
15-
late final $p.StorageListenerClient? _storageListenerClient;
16-
1713
String name;
1814

19-
Bucket(this.name,
20-
{$p.StorageListenerClient? storageListenerClient,
21-
$p.StorageClient? client}) {
22-
_storageClient = client;
23-
_storageListenerClient = storageListenerClient;
24-
}
15+
Bucket(this.name);
2516

2617
/// Get a reference to a file by it's [key].
2718
File file(String key) {
2819
return File(this, key);
2920
}
3021

31-
Future<Resp> _useClient<Resp>(
32-
UseClientCallback<$p.StorageClient, Resp> callback) async {
33-
final client = _storageClient ??
34-
$p.StorageClient(ClientChannelSingleton.instance.clientChannel);
35-
36-
var resp = await callback(client);
37-
38-
if (_storageClient == null) {
39-
await ClientChannelSingleton.instance.release();
40-
}
41-
42-
return resp;
43-
}
44-
4522
/// Get a list of references to the files in the bucket. Optionally supply a [prefix] to filter by.
4623
Future<List<File>> files({String prefix = ""}) async {
4724
final request =
4825
$p.StorageListBlobsRequest(bucketName: name, prefix: prefix);
4926

50-
var resp =
51-
await _useClient((client) async => await client.listBlobs(request));
27+
var resp = await ClientChannelSingleton.useClient($p.StorageClient.new,
28+
(client) async => await client.listBlobs(request));
5229

5330
return resp.blobs.map((blob) => File(this, blob.key)).toList();
5431
}
@@ -72,8 +49,7 @@ class Bucket {
7249
final composedHandler =
7350
composeMiddleware([...middlewares, handler], FileEventContext.fromCtx);
7451

75-
var worker = FileEventWorker(registrationRequest, composedHandler, this,
76-
client: _storageListenerClient);
52+
var worker = FileEventWorker(registrationRequest, composedHandler, this);
7753

7854
await worker.start();
7955
}
@@ -95,7 +71,8 @@ class File {
9571
key: key,
9672
);
9773

98-
await _bucket._useClient((client) async => await client.delete(req));
74+
await ClientChannelSingleton.useClient(
75+
$p.StorageClient.new, (client) async => await client.delete(req));
9976
}
10077

10178
/// Read the file from the bucket.
@@ -105,8 +82,8 @@ class File {
10582
key: key,
10683
);
10784

108-
var resp =
109-
await _bucket._useClient((client) async => await client.read(req));
85+
var resp = await ClientChannelSingleton.useClient(
86+
$p.StorageClient.new, (client) async => await client.read(req));
11087

11188
return utf8.decode(resp.body);
11289
}
@@ -121,7 +98,8 @@ class File {
12198
body: bytes,
12299
);
123100

124-
await _bucket._useClient((client) async => await client.write(req));
101+
await ClientChannelSingleton.useClient(
102+
$p.StorageClient.new, (client) async => await client.write(req));
125103
}
126104

127105
/// Check whether the file exists in the bucket.
@@ -131,8 +109,8 @@ class File {
131109
key: key,
132110
);
133111

134-
var resp =
135-
await _bucket._useClient((client) async => await client.exists(req));
112+
var resp = await ClientChannelSingleton.useClient(
113+
$p.StorageClient.new, (client) async => await client.exists(req));
136114

137115
return resp.exists;
138116
}
@@ -164,8 +142,8 @@ class File {
164142
expiry: exp,
165143
);
166144

167-
var resp = await _bucket
168-
._useClient((client) async => await client.preSignUrl(req));
145+
var resp = await ClientChannelSingleton.useClient(
146+
$p.StorageClient.new, (client) async => await client.preSignUrl(req));
169147

170148
return resp.url;
171149
}

lib/src/api/keyvalue.dart

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,34 +7,17 @@ import 'package:nitric_sdk/src/nitric/proto/kvstore/v1/kvstore.pbgrpc.dart'
77

88
/// A Key Value Store.
99
class KeyValueStore {
10-
late final $p.KvStoreClient? _keyValueClient;
11-
1210
final String name;
1311

14-
KeyValueStore(this.name, {$p.KvStoreClient? client}) {
15-
_keyValueClient = client;
16-
}
17-
18-
Future<Resp> _useClient<Resp>(
19-
UseClientCallback<$p.KvStoreClient, Resp> callback) async {
20-
final client = _keyValueClient ??
21-
$p.KvStoreClient(ClientChannelSingleton.instance.clientChannel);
22-
23-
var resp = callback(client);
24-
25-
if (_keyValueClient == null) {
26-
await ClientChannelSingleton.instance.release();
27-
}
28-
29-
return resp;
30-
}
12+
KeyValueStore(this.name);
3113

3214
/// Get a reference to a [key] in the store.
3315
Future<Map<String, dynamic>> get(String key) async {
3416
var req =
3517
$p.KvStoreGetValueRequest(ref: $p.ValueRef(key: key, store: name));
3618

37-
var resp = await _useClient((client) async => await client.getValue(req));
19+
var resp = await ClientChannelSingleton.useClient(
20+
$p.KvStoreClient.new, (client) async => await client.getValue(req));
3821

3922
return Proto.mapFromStruct(resp.value.content);
4023
}
@@ -46,23 +29,26 @@ class KeyValueStore {
4629
var req = $p.KvStoreSetValueRequest(
4730
ref: $p.ValueRef(key: key, store: name), content: content);
4831

49-
await _useClient((client) async => await client.setValue(req));
32+
await ClientChannelSingleton.useClient(
33+
$p.KvStoreClient.new, (client) async => await client.setValue(req));
5034
}
5135

5236
/// Delete a [key] from the store.
5337
Future<void> delete(String key) async {
5438
var req =
5539
$p.KvStoreDeleteKeyRequest(ref: $p.ValueRef(key: key, store: name));
5640

57-
await _useClient((client) async => await client.deleteKey(req));
41+
await ClientChannelSingleton.useClient(
42+
$p.KvStoreClient.new, (client) async => await client.deleteKey(req));
5843
}
5944

6045
/// Get a stream of key values that match the [prefix].
6146
Future<Stream<String>> keys({String prefix = ""}) async {
6247
var req =
6348
$p.KvStoreScanKeysRequest(store: $p.Store(name: name), prefix: prefix);
6449

65-
var resp = await _useClient((client) async => client.scanKeys(req));
50+
var resp = await ClientChannelSingleton.useClient(
51+
$p.KvStoreClient.new, (client) async => client.scanKeys(req));
6652

6753
return resp.map((event) => event.key);
6854
}

lib/src/api/queue.dart

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,11 @@ import 'package:nitric_sdk/src/grpc_helper.dart';
55
import 'package:nitric_sdk/src/nitric/proto/queues/v1/queues.pbgrpc.dart' as $p;
66

77
class Queue {
8-
late final $p.QueuesClient? _queuesClient;
9-
108
/// The name of the queue.
119
String name;
1210

1311
/// Construct a new queue.
14-
Queue(this.name, {$p.QueuesClient? client}) {
15-
_queuesClient = client;
16-
}
17-
18-
Future<Resp> _useClient<Resp>(
19-
UseClientCallback<$p.QueuesClient, Resp> callback) async {
20-
final client = _queuesClient ??
21-
$p.QueuesClient(ClientChannelSingleton.instance.clientChannel);
22-
23-
var resp = callback(client);
24-
25-
if (_queuesClient == null) {
26-
await ClientChannelSingleton.instance.release();
27-
}
28-
29-
return resp;
30-
}
12+
Queue(this.name);
3113

3214
/// Enqueue a list of [messages] to the queue.
3315
Future<List<FailedMessage>> enqueue(
@@ -40,7 +22,8 @@ class Queue {
4022
queueName: name,
4123
);
4224

43-
var resp = await _useClient((client) async => await client.enqueue(req));
25+
var resp = await ClientChannelSingleton.useClient(
26+
$p.QueuesClient.new, (client) async => await client.enqueue(req));
4427

4528
return resp.failedMessages.map((fm) => FailedMessage(fm)).toList();
4629
}
@@ -49,7 +32,8 @@ class Queue {
4932
Future<List<DequeuedMessage>> dequeue({int depth = 1}) async {
5033
var req = $p.QueueDequeueRequest(queueName: name, depth: depth);
5134

52-
var resp = await _useClient((client) async => await client.dequeue(req));
35+
var resp = await ClientChannelSingleton.useClient(
36+
$p.QueuesClient.new, (client) async => await client.dequeue(req));
5337

5438
return resp.messages.map((m) => DequeuedMessage(this, m)).toList();
5539
}
@@ -73,7 +57,8 @@ class DequeuedMessage {
7357
var req =
7458
$p.QueueCompleteRequest(leaseId: _leaseId, queueName: _queue.name);
7559

76-
await _queue._useClient((client) async => await client.complete(req));
60+
await ClientChannelSingleton.useClient(
61+
$p.QueuesClient.new, (client) async => await client.complete(req));
7762
}
7863
}
7964

lib/src/api/secret.dart

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,31 +4,12 @@ import 'package:nitric_sdk/src/grpc_helper.dart';
44
import 'package:nitric_sdk/src/nitric/proto/secrets/v1/secrets.pbgrpc.dart'
55
as $p;
66

7-
import 'api.dart';
8-
97
/// References an encrypted secret stored in a secret manager.
108
class Secret {
119
/// The name of the secret
1210
final String name;
13-
late final $p.SecretManagerClient? _secretClient;
14-
15-
Secret(this.name, {$p.SecretManagerClient? client}) {
16-
_secretClient = client;
17-
}
18-
19-
Future<Resp> _useClient<Resp>(
20-
UseClientCallback<$p.SecretManagerClient, Resp> callback) async {
21-
final client = _secretClient ??
22-
$p.SecretManagerClient(ClientChannelSingleton.instance.clientChannel);
2311

24-
var resp = callback(client);
25-
26-
if (_secretClient == null) {
27-
await ClientChannelSingleton.instance.release();
28-
}
29-
30-
return resp;
31-
}
12+
Secret(this.name);
3213

3314
/// Get a reference to a specific [version] of this secret.
3415
SecretVersion version(String version) {
@@ -43,7 +24,8 @@ class Secret {
4324
/// Put a new [value] to the secret, creating a new secret version and returning it.
4425
Future<SecretVersion> put(String value) async {
4526
var req = $p.SecretPutRequest(secret: _toWire(), value: utf8.encode(value));
46-
var resp = await _useClient((client) async => client.put(req));
27+
var resp = await ClientChannelSingleton.useClient(
28+
$p.SecretManagerClient.new, (client) async => client.put(req));
4729

4830
return SecretVersion._fromWire(this, resp.secretVersion);
4931
}
@@ -71,8 +53,8 @@ class SecretVersion {
7153
/// Access the value of this secret version.
7254
Future<SecretValue> access() async {
7355
var req = $p.SecretAccessRequest(secretVersion: _toWire());
74-
var resp =
75-
await _secret._useClient((client) async => await client.access(req));
56+
var resp = await ClientChannelSingleton.useClient(
57+
$p.SecretManagerClient.new, (client) async => await client.access(req));
7658

7759
return SecretValue(version, utf8.decode(resp.value));
7860
}

0 commit comments

Comments
 (0)