Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/optional.atomic batching #170

Merged
merged 3 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ _Supported Deno verisons:_ **^1.40.0**
- [Zod](#zod)
- [zodModel()](#zodmodel)
- [Kv-Schemas](#kv-schemas)
- [Blob Storage](#blob-storage)
- [Development](#development)
- [License](#license)

Expand Down Expand Up @@ -1293,6 +1294,25 @@ const PostSchema = z.object({
})
```

## Blob Storage

To store large blob sizes, and bypass the data limit of a single atomic
operation, a combination of serialized collections and the `atomicBatchSize`
option for write operations can be used.

```ts
import { collection, kvdex, model } from "jsr:@olli/kvdex"

const kv = await Deno.openKv()
const db = kvdex(kv, {
blobs: collection(model<Uint8Array>(), { serialize: "json" }),
})

const blob = // read from disk, etc.

const result = await db.blobs.add(blob, { atomicBatchSize: 10 })
```

## Development

Any contributions are welcomed and appreciated. How to contribute:
Expand Down
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@olli/kvdex",
"version": "0.33.0",
"version": "0.33.1",
"exports": {
".": "./mod.ts",
"./ext/zod": "./ext/zod.ts"
Expand Down
63 changes: 63 additions & 0 deletions src/atomic_pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import type { AtomicSetOptions } from "./types.ts"

export class AtomicPool implements Deno.AtomicOperation {
private pool: Array<(op: Deno.AtomicOperation) => Deno.AtomicOperation>

constructor() {
this.pool = []
}

set(key: Deno.KvKey, value: unknown, options?: AtomicSetOptions) {
this.pool.push((op) => op.set(key, value, options))
return this
}

delete(key: Deno.KvKey) {
this.pool.push((op) => op.delete(key))
return this
}

mutate(...mutations: Deno.KvMutation[]) {
this.pool.push((op) => op.mutate(...mutations))
return this
}

check(...checks: Deno.AtomicCheck[]) {
this.pool.push((op) => op.check(...checks))
return this
}

sum(key: Deno.KvKey, n: bigint) {
this.pool.push((op) => op.sum(key, n))
return this
}

max(key: Deno.KvKey, n: bigint) {
this.pool.push((op) => op.max(key, n))
return this
}

min(key: Deno.KvKey, n: bigint): this {
this.pool.push((op) => op.min(key, n))
return this
}

enqueue(
value: unknown,
options?: {
delay?: number | undefined
keysIfUndelivered?: Deno.KvKey[] | undefined
} | undefined,
) {
this.pool.push((op) => op.enqueue(value, options))
return this
}

commit(): Promise<Deno.KvCommitResult | Deno.KvCommitError> {
throw Error("Not Implemented")
}

bindTo(atomic: Deno.AtomicOperation) {
this.pool.forEach((mutation) => mutation(atomic))
}
}
4 changes: 2 additions & 2 deletions src/atomic_wrapper.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ATOMIC_OPERATION_MUTATION_LIMIT } from "./constants.ts"
import type { SetOptions } from "./types.ts"
import type { AtomicSetOptions } from "./types.ts"
import { clamp } from "./utils.ts"

export class AtomicWrapper implements Deno.AtomicOperation {
Expand All @@ -24,7 +24,7 @@ export class AtomicWrapper implements Deno.AtomicOperation {
)
}

set(key: Deno.KvKey, value: unknown, options?: SetOptions) {
set(key: Deno.KvKey, value: unknown, options?: AtomicSetOptions) {
this.addMutation((op) => op.set(key, value, options))
return this
}
Expand Down
80 changes: 71 additions & 9 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import type {
import {
allFulfilled,
checkIndices,
clamp,
compress,
createHandlerId,
createListOptions,
Expand All @@ -64,6 +65,7 @@ import {
setIndices,
} from "./utils.ts"
import {
ATOMIC_OPERATION_MUTATION_LIMIT,
DEFAULT_UPDATE_STRATEGY,
HISTORY_KEY_PREFIX,
ID_KEY_PREFIX,
Expand All @@ -75,6 +77,7 @@ import {
UNDELIVERED_KEY_PREFIX,
} from "./constants.ts"
import { AtomicWrapper } from "./atomic_wrapper.ts"
import { AtomicPool } from "./atomic_pool.ts"
import { Document } from "./document.ts"
import { model } from "./model.ts"
import { concat, deepMerge, ulid } from "./deps.ts"
Expand Down Expand Up @@ -1905,15 +1908,20 @@ export class Collection<
options: SetOptions | undefined,
): Promise<CommitResult<TOutput> | Deno.KvCommitError> {
// Initialize atomic operation and keys list
const atomic = this.kv.atomic()
const ids: KvId[] = []
let docValue: any = value
const isUint8Array = value instanceof Uint8Array
const timeId = ulid()

const atomicBatchSize = options?.atomicBatchSize &&
clamp(1, options?.atomicBatchSize, ATOMIC_OPERATION_MUTATION_LIMIT)

const operationPool = new AtomicPool()
const indexOperationPool = new AtomicPool()

// Check for id collision
if (!options?.overwrite) {
atomic.check({
operationPool.check({
key: idKey,
versionstamp: null,
})
Expand All @@ -1932,7 +1940,8 @@ export class Collection<
const part = compressed.subarray(i, i + UINT8ARRAY_LENGTH_LIMIT)
const key = extendKey(this._keys.segment, docId, index)
ids.push(index)
atomic.set(key, part, options)

operationPool.set(key, part, options)

// Set history segments if keeps history
if (this._keepsHistory) {
Expand All @@ -1943,7 +1952,7 @@ export class Collection<
index,
)

atomic.set(historySegmentKey, part)
operationPool.set(historySegmentKey, part)
}

index++
Expand All @@ -1959,7 +1968,7 @@ export class Collection<
}

// Set document entry
atomic.set(idKey, docValue, options)
operationPool.set(idKey, docValue, options)

// Set history entry if keeps history
if (this._keepsHistory) {
Expand All @@ -1971,7 +1980,7 @@ export class Collection<
value: docValue,
}

atomic.set(historyKey, historyEntry)
operationPool.set(historyKey, historyEntry)
}

// Set indices if is indexable
Expand All @@ -1980,24 +1989,77 @@ export class Collection<
docId,
value as KvObject,
docValue,
atomic,
indexOperationPool,
this,
options,
)
}

// Commit atomic operation
const cr = await atomic.commit()
// Initialize index check, commit result and atomic operation
let indexCheck = false
let cr: Deno.KvCommitResult | Deno.KvCommitError = { ok: false }

const atomic = atomicBatchSize
? new AtomicWrapper(this.kv, atomicBatchSize)
: this.kv.atomic()

// Perform index mutations first if operation is batched, else bind all mutations to main operation
if (atomicBatchSize) {
const indexAtomic = this.kv.atomic()
indexOperationPool.bindTo(indexAtomic)
const indexCr = await indexAtomic.commit()
indexCheck = indexCr.ok
} else {
indexOperationPool.bindTo(atomic)
}

// Bind remaining mutations to main operation
operationPool.bindTo(atomic)

// Commit operation if not batched or if index setters completed successfully
if (!atomicBatchSize || indexCheck) {
cr = await atomic.commit()
}

// Handle failed operation
if (!cr.ok) {
// Delete any entries upon failed batched operation
if (atomicBatchSize && indexCheck) {
const failedAtomic = new AtomicWrapper(this.kv, atomicBatchSize)

if (this._keepsHistory) {
const historyKey = extendKey(this._keys.history, docId, timeId)
failedAtomic.delete(historyKey)
}

if (this._isSerialized) {
const { ids } = docValue as SerializedEntry
ids.forEach((id) =>
failedAtomic.delete(extendKey(this._keys.segment, docId, id))
)
}

if (this._isIndexable) {
deleteIndices(
docId,
value as KvObject,
failedAtomic,
this,
)
}

await failedAtomic.commit()
}

// Return commit error if no remaining retry attempts
const retry = options?.retry ?? 0
if (!retry) {
return {
ok: false,
}
}

// Retry operation and decrement retry count
return await this.setDoc(docId, idKey, value, {
...options,
retry: retry - 1,
Expand Down
9 changes: 8 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,13 @@ export type SetOptions = NonNullable<Parameters<Deno.Kv["set"]>["2"]> & {
* @default false
*/
overwrite?: boolean

/**
* The max number of mutations to be batched in a single atomic operation.
*
* If not set, the operation will not be batched, and will pool all mutations into a single atomic operation.
*/
atomicBatchSize?: number
}

export type ListOptions<T> = Deno.KvListOptions & {
Expand Down Expand Up @@ -393,7 +400,7 @@ export type HandleManyOptions<T> = ListOptions<T> & {
}

export type AtomicBatchOptions = {
/** Batch size of atomic operations where applicable */
/** Max number of mutations to be batched in a single atomic operation */
atomicBatchSize?: number
}

Expand Down
Loading