Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: automatic atomic batching #172

Merged
merged 1 commit into from
Feb 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -972,9 +972,6 @@ Delete all documents across all collections.

```ts
await db.deleteAll()

// Excplicity set the atomic batch size between 1-1000
await db.deleteAll({ atomicBatchSize: 500 })
```

### wipe()
Expand All @@ -983,9 +980,6 @@ Delete all kvdex entries, including undelivered and history entries.

```ts
await db.wipe()

// Excplicity set the atomic batch size between 1-1000
await db.wipe({ atomicBatchSize: 500 })
```

### deleteUndelivered()
Expand Down Expand Up @@ -1297,8 +1291,9 @@ const PostSchema = z.object({
## Blob Storage

To store large blob sizes, and bypass the data limit of a single atomic
operation, a combination of serialized collections and the `atomicBatchSize`
option for write operations can be used.
operation, a combination of serialized collections and batching atomic
operations can be used. By default, batching is disabled to ensure consistency
and improve performance.

```ts
import { collection, kvdex, model } from "jsr:@olli/kvdex"
Expand All @@ -1310,7 +1305,7 @@ const db = kvdex(kv, {

const blob = // read from disk, etc.

const result = await db.blobs.add(blob, { atomicBatchSize: 10 })
const result = await db.blobs.add(blob, { batching: true })
```

## Development
Expand Down
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@olli/kvdex",
"version": "0.33.1",
"version": "0.34.0",
"exports": {
".": "./mod.ts",
"./ext/zod": "./ext/zod.ts"
Expand Down
109 changes: 73 additions & 36 deletions src/atomic_wrapper.ts
Original file line number Diff line number Diff line change
@@ -1,61 +1,85 @@
import { ATOMIC_OPERATION_MUTATION_LIMIT } from "./constants.ts"
import {
ATOMIC_OPERATION_MUTATION_LIMIT,
ATOMIC_OPERATION_SIZE_LIMIT,
ATOMIC_OPERTION_CHECK_LIMIT,
} from "./constants.ts"
import type { AtomicSetOptions } from "./types.ts"
import { clamp } from "./utils.ts"

export class AtomicWrapper implements Deno.AtomicOperation {
private kv: Deno.Kv
private current: Deno.AtomicOperation
private atomics: Deno.AtomicOperation[]
private count: number
private atomicBatchSize: number
private currentAtomic: Deno.AtomicOperation
private currentCount: number
private currentCheckCount: number
private currentSize: number

constructor(
kv: Deno.Kv,
atomicBatchSize = ATOMIC_OPERATION_MUTATION_LIMIT / 4,
) {
constructor(kv: Deno.Kv) {
this.kv = kv
this.current = kv.atomic()
this.atomics = []
this.count = 0
this.atomicBatchSize = clamp(
1,
atomicBatchSize,
ATOMIC_OPERATION_MUTATION_LIMIT,
)
this.currentAtomic = kv.atomic()
this.currentCount = 0
this.currentCheckCount = 0
this.currentSize = 0
}

set(key: Deno.KvKey, value: unknown, options?: AtomicSetOptions) {
this.addMutation((op) => op.set(key, value, options))
this.addMutation((op) => op.set(key, value, options), 67, false)
return this
}

delete(key: Deno.KvKey) {
this.addMutation((op) => op.delete(key))
this.addMutation((op) => op.delete(key), 3, false)
return this
}

mutate(...mutations: Deno.KvMutation[]) {
this.addMutation((op) => op.mutate(...mutations))
mutations.forEach((mut) => {
switch (mut.type) {
case "delete": {
this.delete(mut.key)
break
}
case "max": {
this.max(mut.key, mut.value.value)
break
}
case "min": {
this.min(mut.key, mut.value.value)
break
}
case "sum": {
this.sum(mut.key, mut.value.value)
break
}
case "set": {
this.set(mut.key, mut.value)
break
}
}
})

return this
}

check(...checks: Deno.AtomicCheck[]) {
this.addMutation((op) => op.check(...checks))
checks.forEach((check) =>
this.addMutation((op) => op.check(check), 3, true)
)
return this
}

sum(key: Deno.KvKey, n: bigint) {
this.addMutation((op) => op.sum(key, n))
this.addMutation((op) => op.sum(key, n), 3, false)
return this
}

max(key: Deno.KvKey, n: bigint) {
this.addMutation((op) => op.max(key, n))
this.addMutation((op) => op.max(key, n), 3, false)
return this
}

min(key: Deno.KvKey, n: bigint): this {
this.addMutation((op) => op.min(key, n))
this.addMutation((op) => op.min(key, n), 3, false)
return this
}

Expand All @@ -66,13 +90,15 @@ export class AtomicWrapper implements Deno.AtomicOperation {
keysIfUndelivered?: Deno.KvKey[] | undefined
} | undefined,
) {
this.addMutation((op) => op.enqueue(value, options))
this.addMutation((op) => op.enqueue(value, options), 96, false)
return this
}

async commit(): Promise<Deno.KvCommitResult | Deno.KvCommitError> {
// Add curent operation to atomics list
this.atomics.push(this.current)
if (this.currentCount > 0) {
this.atomics.push(this.currentAtomic)
}

// Commit all operations
const settled = await Promise.allSettled(
Expand All @@ -84,11 +110,9 @@ export class AtomicWrapper implements Deno.AtomicOperation {

// If successful, return commit result
if (success) {
const cr = (settled.at(0) as any).value as Deno.KvCommitResult
const versionstamp = cr.versionstamp
return {
ok: true,
versionstamp: versionstamp ?? "0",
versionstamp: (settled.at(0) as any)?.value.versionstamp ?? "0",
}
}

Expand All @@ -107,15 +131,28 @@ export class AtomicWrapper implements Deno.AtomicOperation {
*/
private addMutation(
mutation: (op: Deno.AtomicOperation) => Deno.AtomicOperation,
size: number,
isCheck: boolean,
) {
// Add atomic mutation and increment count
this.current = mutation(this.current)
this.count++

// Add current operation to atomics list if batch size is reached, reset current and count
if (this.count % this.atomicBatchSize === this.atomicBatchSize - 1) {
this.atomics.push(this.current)
this.current = this.kv.atomic()
this.currentSize += size
this.currentCount++

if (isCheck) {
this.currentCheckCount++
}

if (
this.currentCount > ATOMIC_OPERATION_MUTATION_LIMIT ||
this.currentSize > ATOMIC_OPERATION_SIZE_LIMIT ||
this.currentCheckCount > ATOMIC_OPERTION_CHECK_LIMIT
) {
this.atomics.push(this.currentAtomic)
this.currentAtomic = this.kv.atomic()
this.currentCount = 0
this.currentCheckCount = 0
this.currentSize = 0
}

mutation(this.currentAtomic)
}
}
20 changes: 7 additions & 13 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import type {
import {
allFulfilled,
checkIndices,
clamp,
compress,
createHandlerId,
createListOptions,
Expand All @@ -65,7 +64,6 @@ import {
setIndices,
} from "./utils.ts"
import {
ATOMIC_OPERATION_MUTATION_LIMIT,
DEFAULT_UPDATE_STRATEGY,
HISTORY_KEY_PREFIX,
ID_KEY_PREFIX,
Expand Down Expand Up @@ -1203,7 +1201,7 @@ export class Collection<
const iter = this.kv.list({ prefix: this._keys.base }, options)

const keys: Deno.KvKey[] = []
const atomic = new AtomicWrapper(this.kv, options?.atomicBatchSize)
const atomic = new AtomicWrapper(this.kv)

// Collect all collection entry keys
for await (const { key } of iter) {
Expand Down Expand Up @@ -1912,10 +1910,6 @@ export class Collection<
let docValue: any = value
const isUint8Array = value instanceof Uint8Array
const timeId = ulid()

const atomicBatchSize = options?.atomicBatchSize &&
clamp(1, options?.atomicBatchSize, ATOMIC_OPERATION_MUTATION_LIMIT)

const operationPool = new AtomicPool()
const indexOperationPool = new AtomicPool()

Expand Down Expand Up @@ -1999,12 +1993,12 @@ export class Collection<
let indexCheck = false
let cr: Deno.KvCommitResult | Deno.KvCommitError = { ok: false }

const atomic = atomicBatchSize
? new AtomicWrapper(this.kv, atomicBatchSize)
const atomic = options?.batched
? new AtomicWrapper(this.kv)
: this.kv.atomic()

// Perform index mutations first if operation is batched, else bind all mutations to main operation
if (atomicBatchSize) {
if (options?.batched) {
const indexAtomic = this.kv.atomic()
indexOperationPool.bindTo(indexAtomic)
const indexCr = await indexAtomic.commit()
Expand All @@ -2017,15 +2011,15 @@ export class Collection<
operationPool.bindTo(atomic)

// Commit operation if not batched or if index setters completed successfully
if (!atomicBatchSize || indexCheck) {
if (!options?.batched || indexCheck) {
cr = await atomic.commit()
}

// Handle failed operation
if (!cr.ok) {
// Delete any entries upon failed batched operation
if (atomicBatchSize && indexCheck) {
const failedAtomic = new AtomicWrapper(this.kv, atomicBatchSize)
if (options?.batched && indexCheck) {
const failedAtomic = new AtomicWrapper(this.kv)

if (this._keepsHistory) {
const historyKey = extendKey(this._keys.history, docId, timeId)
Expand Down
4 changes: 4 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ export const HISTORY_KEY_PREFIX = "__history__"
// Fixed limits
export const ATOMIC_OPERATION_MUTATION_LIMIT = 1_000

export const ATOMIC_OPERATION_SIZE_LIMIT = 800

export const ATOMIC_OPERTION_CHECK_LIMIT = 10

export const GET_MANY_KEY_LIMIT = 10

export const UINT8ARRAY_LENGTH_LIMIT = 65_536
Expand Down
30 changes: 7 additions & 23 deletions src/kvdex.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type {
AtomicBatchOptions,
CollectionOptions,
CollectionSelector,
CountAllOptions,
Expand Down Expand Up @@ -185,16 +184,10 @@ export class Kvdex<const TSchema extends Schema<SchemaDefinition>> {
* await db.deleteAll()
* ```
*
* @example
* ```ts
* await db.deleteAll({ atomicBatchSize: 500 })
* ```
*
* @param options - Atomic batch options.
* @returns Promise resolving to void.
*/
async deleteAll(options?: AtomicBatchOptions): Promise<void> {
await _deleteAll(this.kv, this.schema, options)
async deleteAll(): Promise<void> {
await _deleteAll(this.kv, this.schema)
}

/**
Expand All @@ -205,14 +198,9 @@ export class Kvdex<const TSchema extends Schema<SchemaDefinition>> {
* await db.wipe()
* ```
*
* @example
* ```ts
* await db.wipe({ atomicBatchSize: 500 })
* ```
*
* @param options - Atomic batch options.
* @returns Promise resolving to void.
*/
async wipe(options?: AtomicBatchOptions): Promise<void> {
async wipe(): Promise<void> {
// Create iterator
const iter = this.kv.list({ prefix: [KVDEX_KEY_PREFIX] })

Expand All @@ -223,7 +211,7 @@ export class Kvdex<const TSchema extends Schema<SchemaDefinition>> {
}

// Delete all entries
const atomic = new AtomicWrapper(this.kv, options?.atomicBatchSize)
const atomic = new AtomicWrapper(this.kv)
keys.forEach((key) => atomic.delete(key))
await atomic.commit()
}
Expand Down Expand Up @@ -677,26 +665,22 @@ async function _countAll(
*
* @param kv - Deno KV instance.
* @param schemaOrCollection - Schema or Collection object.
* @param options - DeleteAll options.
* @returns Promise resolving to void.
*/
async function _deleteAll(
kv: Deno.Kv,
schemaOrCollection:
| Schema<SchemaDefinition>
| Collection<KvValue, KvValue, CollectionOptions<KvValue>>,
options?: AtomicBatchOptions,
): Promise<void> {
// If input is a collection, perform deleteMany
if (schemaOrCollection instanceof Collection) {
await schemaOrCollection.deleteMany(options)
await schemaOrCollection.deleteMany()
return
}

// Recursively perform delete for all schemas/collections
await allFulfilled(
Object.values(schemaOrCollection).map((val) =>
_deleteAll(kv, val, options)
),
Object.values(schemaOrCollection).map((val) => _deleteAll(kv, val)),
)
}
Loading
Loading