Skip to content

Commit

Permalink
feat: batch support (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
HomelessDinosaur authored Oct 3, 2024
1 parent b80998e commit e283033
Show file tree
Hide file tree
Showing 74 changed files with 2,773 additions and 1,144 deletions.
4 changes: 2 additions & 2 deletions example/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ version: 1.0.0
publish_to: none

environment:
sdk: ^3.2.5
sdk: ^3.5.0

dependencies:
nitric_sdk:
path: ../
uuid: ^4.3.3

dev_dependencies:
lints: ^2.1.0
lints: ^4.0.0
test: ^1.24.0
8 changes: 7 additions & 1 deletion lib/src/api/api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,11 @@ export 'secret.dart';
export 'topic.dart';
export 'proto.dart';
export 'queue.dart';
export 'batch.dart';
export 'sql.dart';

typedef UseClientCallback<T extends Client, Resp> = Future<Resp> Function(T);
typedef UseClientCallback<GrpcClient extends Client, Resp> = Future<Resp>
Function(GrpcClient);

typedef ClientConstructor<GrpcClient extends Client> = GrpcClient Function(
ClientChannel);
22 changes: 22 additions & 0 deletions lib/src/api/batch.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import 'dart:async';

import 'package:nitric_sdk/src/api/api.dart';
import 'package:nitric_sdk/src/grpc_helper.dart';
import 'package:nitric_sdk/src/nitric/proto/batch/v1/batch.pb.dart';
import 'package:nitric_sdk/src/nitric/proto/batch/v1/batch.pbgrpc.dart' as $p;

class Job {
String name;

Job(this.name);

Future<void> submit(Map<String, dynamic> message) async {
final data = JobData(struct: Proto.structFromMap(message));

final req = $p.JobSubmitRequest(data: data, jobName: name);

await ClientChannelSingleton.useClient($p.BatchClient.new, (client) async {
await client.submitJob(req);
});
}
}
50 changes: 14 additions & 36 deletions lib/src/api/bucket.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import 'dart:async';
import 'dart:convert';

import 'package:nitric_sdk/src/api/api.dart';
import 'package:nitric_sdk/src/context/common.dart';
import 'package:nitric_sdk/src/grpc_helper.dart';
import 'package:nitric_sdk/src/nitric/proto/storage/v1/storage.pbgrpc.dart'
Expand All @@ -11,44 +10,22 @@ import 'package:fixnum/fixnum.dart';
import 'package:nitric_sdk/src/workers/common.dart';

class Bucket {
late final $p.StorageClient? _storageClient;
late final $p.StorageListenerClient? _storageListenerClient;

String name;

Bucket(this.name,
{$p.StorageListenerClient? storageListenerClient,
$p.StorageClient? client}) {
_storageClient = client;
_storageListenerClient = storageListenerClient;
}
Bucket(this.name);

/// Get a reference to a file by it's [key].
File file(String key) {
return File(this, key);
}

Future<Resp> _useClient<Resp>(
UseClientCallback<$p.StorageClient, Resp> callback) async {
final client = _storageClient ??
$p.StorageClient(ClientChannelSingleton.instance.clientChannel);

var resp = await callback(client);

if (_storageClient == null) {
await ClientChannelSingleton.instance.release();
}

return resp;
}

/// Get a list of references to the files in the bucket. Optionally supply a [prefix] to filter by.
Future<List<File>> files({String prefix = ""}) async {
final request =
$p.StorageListBlobsRequest(bucketName: name, prefix: prefix);

var resp =
await _useClient((client) async => await client.listBlobs(request));
var resp = await ClientChannelSingleton.useClient($p.StorageClient.new,
(client) async => await client.listBlobs(request));

return resp.blobs.map((blob) => File(this, blob.key)).toList();
}
Expand All @@ -72,8 +49,7 @@ class Bucket {
final composedHandler =
composeMiddleware([...middlewares, handler], FileEventContext.fromCtx);

var worker = FileEventWorker(registrationRequest, composedHandler, this,
client: _storageListenerClient);
var worker = FileEventWorker(registrationRequest, composedHandler, this);

await worker.start();
}
Expand All @@ -95,7 +71,8 @@ class File {
key: key,
);

await _bucket._useClient((client) async => await client.delete(req));
await ClientChannelSingleton.useClient(
$p.StorageClient.new, (client) async => await client.delete(req));
}

/// Read the file from the bucket.
Expand All @@ -105,8 +82,8 @@ class File {
key: key,
);

var resp =
await _bucket._useClient((client) async => await client.read(req));
var resp = await ClientChannelSingleton.useClient(
$p.StorageClient.new, (client) async => await client.read(req));

return utf8.decode(resp.body);
}
Expand All @@ -121,7 +98,8 @@ class File {
body: bytes,
);

await _bucket._useClient((client) async => await client.write(req));
await ClientChannelSingleton.useClient(
$p.StorageClient.new, (client) async => await client.write(req));
}

/// Check whether the file exists in the bucket.
Expand All @@ -131,8 +109,8 @@ class File {
key: key,
);

var resp =
await _bucket._useClient((client) async => await client.exists(req));
var resp = await ClientChannelSingleton.useClient(
$p.StorageClient.new, (client) async => await client.exists(req));

return resp.exists;
}
Expand Down Expand Up @@ -164,8 +142,8 @@ class File {
expiry: exp,
);

var resp = await _bucket
._useClient((client) async => await client.preSignUrl(req));
var resp = await ClientChannelSingleton.useClient(
$p.StorageClient.new, (client) async => await client.preSignUrl(req));

return resp.url;
}
Expand Down
32 changes: 9 additions & 23 deletions lib/src/api/keyvalue.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,17 @@ import 'package:nitric_sdk/src/nitric/proto/kvstore/v1/kvstore.pbgrpc.dart'

/// A Key Value Store.
class KeyValueStore {
late final $p.KvStoreClient? _keyValueClient;

final String name;

KeyValueStore(this.name, {$p.KvStoreClient? client}) {
_keyValueClient = client;
}

Future<Resp> _useClient<Resp>(
UseClientCallback<$p.KvStoreClient, Resp> callback) async {
final client = _keyValueClient ??
$p.KvStoreClient(ClientChannelSingleton.instance.clientChannel);

var resp = callback(client);

if (_keyValueClient == null) {
await ClientChannelSingleton.instance.release();
}

return resp;
}
KeyValueStore(this.name);

/// Get a reference to a [key] in the store.
Future<Map<String, dynamic>> get(String key) async {
var req =
$p.KvStoreGetValueRequest(ref: $p.ValueRef(key: key, store: name));

var resp = await _useClient((client) async => await client.getValue(req));
var resp = await ClientChannelSingleton.useClient(
$p.KvStoreClient.new, (client) async => await client.getValue(req));

return Proto.mapFromStruct(resp.value.content);
}
Expand All @@ -46,23 +29,26 @@ class KeyValueStore {
var req = $p.KvStoreSetValueRequest(
ref: $p.ValueRef(key: key, store: name), content: content);

await _useClient((client) async => await client.setValue(req));
await ClientChannelSingleton.useClient(
$p.KvStoreClient.new, (client) async => await client.setValue(req));
}

/// Delete a [key] from the store.
Future<void> delete(String key) async {
var req =
$p.KvStoreDeleteKeyRequest(ref: $p.ValueRef(key: key, store: name));

await _useClient((client) async => await client.deleteKey(req));
await ClientChannelSingleton.useClient(
$p.KvStoreClient.new, (client) async => await client.deleteKey(req));
}

/// Get a stream of key values that match the [prefix].
Future<Stream<String>> keys({String prefix = ""}) async {
var req =
$p.KvStoreScanKeysRequest(store: $p.Store(name: name), prefix: prefix);

var resp = await _useClient((client) async => client.scanKeys(req));
var resp = await ClientChannelSingleton.useClient(
$p.KvStoreClient.new, (client) async => client.scanKeys(req));

return resp.map((event) => event.key);
}
Expand Down
29 changes: 7 additions & 22 deletions lib/src/api/queue.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,11 @@ import 'package:nitric_sdk/src/grpc_helper.dart';
import 'package:nitric_sdk/src/nitric/proto/queues/v1/queues.pbgrpc.dart' as $p;

class Queue {
late final $p.QueuesClient? _queuesClient;

/// The name of the queue.
String name;

/// Construct a new queue.
Queue(this.name, {$p.QueuesClient? client}) {
_queuesClient = client;
}

Future<Resp> _useClient<Resp>(
UseClientCallback<$p.QueuesClient, Resp> callback) async {
final client = _queuesClient ??
$p.QueuesClient(ClientChannelSingleton.instance.clientChannel);

var resp = callback(client);

if (_queuesClient == null) {
await ClientChannelSingleton.instance.release();
}

return resp;
}
Queue(this.name);

/// Enqueue a list of [messages] to the queue.
Future<List<FailedMessage>> enqueue(
Expand All @@ -40,7 +22,8 @@ class Queue {
queueName: name,
);

var resp = await _useClient((client) async => await client.enqueue(req));
var resp = await ClientChannelSingleton.useClient(
$p.QueuesClient.new, (client) async => await client.enqueue(req));

return resp.failedMessages.map((fm) => FailedMessage(fm)).toList();
}
Expand All @@ -49,7 +32,8 @@ class Queue {
Future<List<DequeuedMessage>> dequeue({int depth = 1}) async {
var req = $p.QueueDequeueRequest(queueName: name, depth: depth);

var resp = await _useClient((client) async => await client.dequeue(req));
var resp = await ClientChannelSingleton.useClient(
$p.QueuesClient.new, (client) async => await client.dequeue(req));

return resp.messages.map((m) => DequeuedMessage(this, m)).toList();
}
Expand All @@ -73,7 +57,8 @@ class DequeuedMessage {
var req =
$p.QueueCompleteRequest(leaseId: _leaseId, queueName: _queue.name);

await _queue._useClient((client) async => await client.complete(req));
await ClientChannelSingleton.useClient(
$p.QueuesClient.new, (client) async => await client.complete(req));
}
}

Expand Down
28 changes: 5 additions & 23 deletions lib/src/api/secret.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,12 @@ import 'package:nitric_sdk/src/grpc_helper.dart';
import 'package:nitric_sdk/src/nitric/proto/secrets/v1/secrets.pbgrpc.dart'
as $p;

import 'api.dart';

/// References an encrypted secret stored in a secret manager.
class Secret {
/// The name of the secret
final String name;
late final $p.SecretManagerClient? _secretClient;

Secret(this.name, {$p.SecretManagerClient? client}) {
_secretClient = client;
}

Future<Resp> _useClient<Resp>(
UseClientCallback<$p.SecretManagerClient, Resp> callback) async {
final client = _secretClient ??
$p.SecretManagerClient(ClientChannelSingleton.instance.clientChannel);

var resp = callback(client);

if (_secretClient == null) {
await ClientChannelSingleton.instance.release();
}

return resp;
}
Secret(this.name);

/// Get a reference to a specific [version] of this secret.
SecretVersion version(String version) {
Expand All @@ -43,7 +24,8 @@ class Secret {
/// Put a new [value] to the secret, creating a new secret version and returning it.
Future<SecretVersion> put(String value) async {
var req = $p.SecretPutRequest(secret: _toWire(), value: utf8.encode(value));
var resp = await _useClient((client) async => client.put(req));
var resp = await ClientChannelSingleton.useClient(
$p.SecretManagerClient.new, (client) async => client.put(req));

return SecretVersion._fromWire(this, resp.secretVersion);
}
Expand Down Expand Up @@ -71,8 +53,8 @@ class SecretVersion {
/// Access the value of this secret version.
Future<SecretValue> access() async {
var req = $p.SecretAccessRequest(secretVersion: _toWire());
var resp =
await _secret._useClient((client) async => await client.access(req));
var resp = await ClientChannelSingleton.useClient(
$p.SecretManagerClient.new, (client) async => await client.access(req));

return SecretValue(version, utf8.decode(resp.value));
}
Expand Down
Loading

0 comments on commit e283033

Please sign in to comment.