Pular para o conteúdo

KeyVal

Este conteúdo não está disponível em sua língua ainda.

Key-value store inspired by Deno KV, with composite keys, TTL, atomic transactions (OCC), FIFO queues with DLQ, full-text search, and real-time watch via SSE. Current storage persists through the plugin-turso service boundary.

For plugin model details (lifecycle, provides, getPlugin, manifest.yaml), see Plugin System.

The plugin exposes the Kv class as a service, plus five subsystems:

ComponentResponsibility
KvCRUD (get/set/delete), list, count, paginate, factory for atomic() and transaction()
AtomicOperationBuilder for checks + mutations with versionstamps (OCC)
KvTransactionRead-modify-write with conflict detection (no automatic retry — see Limitations)
KvQueueFIFO queue with delay, retry/backoff, DLQ
KvFtsSearch indexes and full-text-like search with where filters
KvMetricsIn-memory counters and Prometheus-format export

Key aspects:

  • Persistent mode. Routes mounted in plugin.ts run on the main thread (required for SSE in watch and queue listen).
  • Storage. Current implementation stores everything in SQL through @buntime/plugin-turso; KeyVal owns the kv_* schema and uses a local KeyValSqlAdapter over TursoService.
  • Deno KV compatibility. Types and semantics mirror Deno.Kv (versionstamps, sum/max/min/append/prepend mutations).
@buntime/plugin-keyval
├── Kv (core CRUD + list/paginate)
│ ├── AtomicOperation (checks + mutations)
│ └── KvTransaction (read-modify-write)
├── KvQueue (FIFO + DLQ)
├── KvFts (Search + where)
└── KvMetrics (Prom/JSON)
└──> plugin-turso provider

All options live in manifest.yaml (or are passed programmatically via keyvalExtension({...})).

OptionTypeDefaultDescription
metrics.persistentbooleanfalsePersist counters to the database (survives restarts)
metrics.flushIntervalnumber (ms)30000Flush interval when persistent: true
queue.cleanupIntervalnumber (ms)60000Expired lock cleanup; 0 disables
queue.lockDurationnumber (ms)30000Lock duration between dequeue and ack/nack

Current minimal example:

name: "@buntime/plugin-keyval"
enabled: true
dependencies:
- "@buntime/plugin-turso"

Production configuration:

metrics:
persistent: true
flushInterval: 30000
queue:
cleanupInterval: 60000
lockDuration: 30000

The SPA UI lives at /keyval/ (Overview, Entries, Queue, Search, Watch, Atomic, Metrics) and is registered via menus in the manifest.

  • The current implementation obtains ctx.getPlugin<TursoService>("@buntime/plugin-turso") in onInit, wraps it in TursoKeyValAdapter, and initializes kv_* tables through that adapter.
  • schema.ts creates kv_entries, kv_queue, kv_dlq, FTS auxiliary tables, and (optionally) the metrics table.
  • Keys are binary-encoded with a type prefix, ensuring lexicographic ordering Uint8Array < string < number < bigint < boolean.
  • where filters are translated to SQL via where-to-sql.ts, using json_extract(value, '$.field') for nested fields.
  • Kv is exposed via provides(). Other plugins can consume it with ctx.getPlugin<Kv>("@buntime/plugin-keyval") when they explicitly need a generic KV service. plugin-gateway and plugin-proxy no longer use KeyVal for their own operational state; they use plugin-turso directly.
  • onShutdown flushes metrics and stops the queue cleanup.

Keys are arrays whose elements can be string, number, bigint, boolean, or Uint8Array. Each entry has a key, value (JSON), and versionstamp (null when absent).

["users", "123"]
["users", 42, "profile"]
["metrics", "2024-01-01", "cpu"]
LimitValue
Key depth20 parts
Recommended value size< 1 MB
Maximum expiresIn (TTL)2,147,483,647 ms (~24.8 days)
Batch get/delete1,000 keys
List limit1,000 entries (default 100)
Prefix watch1,000 keys

In the REST API, keys become paths: /keyval/api/keys/users/123["users", "123"]. In POST endpoints (/keys/list, /atomic, /keys/batch, etc.), keys are JSON arrays in the request body.

All routes are under {base}/api/* (default /keyval/api/*). Errors use { "error": string } with status 400 (validation), 404 (not found), or 500.

MethodEndpointDescription
GET/api/keys/*Get by key path
PUT/api/keys/*?expiresIn=Set with optional TTL
DELETE/api/keys/*Delete (prefix + children by default; see modes)
GET/api/keys?prefix=&start=&end=&limit=&reverse=List by prefix
POST/api/keys/listList with where filters
GET/api/keys/count?prefix=Count entries
GET/api/keys/paginate?prefix=&cursor=&limit=Cursor-based pagination
POST/api/keys/batchBatch get ({ keys: KvKey[] })
POST/api/keys/delete-batchBatch delete ({ keys, exact?, where? })
  1. Prefix (default) — deletes the key and all descendants.
  2. Exact ({"exact": true}) — only the specified key.
  3. Filtered ({"where": {...}}) — key becomes a prefix; deletes only matching entries.
Terminal window
curl -X PUT /keyval/api/keys/users/123 \
-H "Content-Type: application/json" \
-d '{"name":"Alice","age":30}'
curl -X POST /keyval/api/keys/list \
-H "Content-Type: application/json" \
-d '{"prefix":["users"],"where":{"status":{"$eq":"active"}}}'
curl -X DELETE /keyval/api/keys/tasks \
-H "Content-Type: application/json" \
-d '{"where":{"status":"completed"}}'
MethodEndpointBody
POST/api/atomic{ checks?: KvCheck[], mutations: KvMutation[] }

Response: { "ok": true, "versionstamp": "..." } on success, { "ok": false } when any check fails (always HTTP 200).

MethodEndpointDescription
GET/api/watch?keys=&initial=SSE for specific keys (CSV of paths)
GET/api/watch/poll?keys=&versionstamps=Polling without streaming
GET/api/watch/prefix?prefix=&initial=&limit=SSE by prefix
GET/api/watch/prefix/poll?prefix=&versionstamps=&limit=Polling by prefix

SSE events: change (data: JSON array of entries — value/versionstamp null indicate deletion) and ping (heartbeat).

MethodEndpointDescription
POST/api/queue/enqueue{ value, options? }
GET/api/queue/listenSSE with auto-dequeue
GET/api/queue/pollDequeue one message
POST/api/queue/ack{ id } — success
POST/api/queue/nack{ id } — retry/DLQ
GET/api/queue/stats{ pending, processing, dlq, total }
GET/api/queue/dlq?limit=&offset=List DLQ
GET/api/queue/dlq/:idInspect a DLQ message
POST/api/queue/dlq/:id/requeueRequeue to the main queue
DELETE/api/queue/dlq/:idDelete one
DELETE/api/queue/dlqPurge all
MethodEndpointDescription
POST/api/indexesCreate index ({ prefix, options: { fields, tokenize? } })
GET/api/indexesList indexes
DELETE/api/indexes?prefix=Remove index
GET/api/search?prefix=&query=&limit=Simple search
POST/api/searchSearch + where ({ prefix, query, options: { limit?, where? } })
MethodEndpointDescription
GET/api/metricsJSON with operations, queue, storage
GET/api/metrics/prometheusPrometheus text format (text/plain; version=0.0.4)

Atomic combines checks (expected versionstamp per key) and mutations in an all-or-nothing commit. If any check fails, nothing is applied and the response is { ok: false } (status 200, not an error).

MutationParametersBehavior
setkey, value, expiresIn?Sets value (with optional TTL)
deletekeyRemoves
sumkey, value: number|bigintAdds; missing key treated as 0
maxkey, value: number|bigintmax(current, new)
minkey, value: number|bigintmin(current, new)
appendkey, value: any[]Concatenates to array; creates if absent
prependkey, value: any[]Inserts at the start; creates if absent

Read-modify-write pattern:

Terminal window
# 1. Read and capture versionstamp
curl /keyval/api/keys/users/123
# {"key":[...],"value":{...},"versionstamp":"00000001"}
# 2. Update atomically
curl -X POST /keyval/api/atomic \
-H "Content-Type: application/json" \
-d '{
"checks":[{"key":["users","123"],"versionstamp":"00000001"}],
"mutations":[
{"type":"set","key":["users","123"],"value":{"name":"Alice","age":31}},
{"type":"sum","key":["counters","updates"],"value":1}
]
}'

Create-if-not-exists uses versionstamp: null in the check. UUIDv7 placeholders (kv.uuidv7()) resolve to the same value across all mutations in a single commit, useful for creating cross-references and time-ordered indexes in one atomic operation.

KvTransaction is a wrapper that caches reads, automatically builds the checks, and returns { ok: false, error: "conflict" | "error" } on failure — but currently does not retry automatically, see Limitations.

Each prefix supports at most one index. Recreating replaces the previous one. Synchronization is automatic on set/delete/atomic.

TokenizerWhen to use
unicode61 (default)Multilingual content / general use
porterEnglish text with stemming (run/running/runs → run)
asciiASCII identifiers and codes only

Fields accept dot-notation for nested properties (details.manufacturer.name).

Terminal window
# Create index
curl -X POST /keyval/api/indexes \
-H "Content-Type: application/json" \
-d '{
"prefix":["products"],
"options":{"fields":["name","description","category"],"tokenize":"porter"}
}'
# Search + where (search index narrow -> where filter)
curl -X POST /keyval/api/search \
-H "Content-Type: application/json" \
-d '{
"prefix":["products"],
"query":"smartphone",
"options":{"limit":50,"where":{"price":{"$lt":500},"inStock":{"$eq":true}}}
}'

Results come from the KeyVal search table for the prefix; where is applied afterward on the full value.

FIFO with at-least-once delivery, delay, configurable backoff, and DLQ.

[*] ──enqueue──> pending
pending ──dequeue (lock)──> processing
processing ──ack──> removed
processing ──nack (retries remaining + delay)──> pending
processing ──nack (retries exhausted)──> DLQ
processing ──lock timeout──> pending
DLQ ──requeue──> pending
DLQ ──delete / purge──> removed

Enqueue options:

FieldDefaultDescription
delay0Delay (ms) before the message becomes available
backoffSchedule[1000, 5000, 10000]Per-attempt delays; length = number of retries
keysIfUndeliveredKV keys where the value is stored when all retries are exhausted
Terminal window
# Job with 3 increasing retries and KV fallback
curl -X POST /keyval/api/queue/enqueue \
-H "Content-Type: application/json" \
-d '{
"value":{"task":"webhook","url":"https://api.example.com/hook"},
"options":{
"backoffSchedule":[1000,5000,30000],
"keysIfUndelivered":[["failed-webhooks","hook-123"]]
}
}'
# SSE consumer
curl -N /keyval/api/queue/listen

After dequeue, the message is locked for queue.lockDuration. If the consumer does not ack/nack before the lock expires, it returns to pending on the next cleanup run (queue.cleanupInterval).

where is translated to SQL using json_extract on the value column. Supports dot-notation and logical nesting.

CategoryOperators
Comparison$eq, $ne, $gt, $gte, $lt, $lte, $between
Array$in, $nin
Case-sensitive string$contains, $notContains, $startsWith, $endsWith
Case-insensitive string$containsi, $notContainsi, $startsWithi, $endsWithi
Existence$null, $empty, $notEmpty
Logical$and, $or, $not
Timestamp$now (with $offset in ms — positive for future, negative for past)

Shorthand: { "field": "value" } is equivalent to { "field": { "$eq": "value" } }. Multiple conditions at the same level are implicitly AND.

{
"$and": [
{ "status": { "$eq": "active" } },
{ "$or": [{ "role": { "$eq": "admin" } }, { "role": { "$eq": "moderator" } }] },
{ "lastLogin": { "$gt": { "$now": true, "$offset": -2592000000 } } },
{ "$not": { "suspended": { "$eq": true } } }
]
}

$now is resolved on the server, avoiding clock skew. $null: true matches null/absent; $empty: true matches null, "", or [].

  • Hard: @buntime/plugin-turso. KeyVal owns the kv_* schema and KV semantics while using plugin-turso for durable SQL connection, sync, MVCC setup, and retry helpers.
  • Consumers: plugins that explicitly need persistent KV storage should use ctx.getPlugin<Kv>("@buntime/plugin-keyval"). Gateway and proxy operational state should not use KeyVal as infrastructure; see plugin-turso and Storage.
type KvKeyPart = bigint | boolean | number | string | Uint8Array;
type KvKey = KvKeyPart[];
interface KvEntry<T> { key: KvKey; value: T | null; versionstamp: string | null }
interface KvCheck { key: KvKey; versionstamp: string | null }
type KvMutationType = "append" | "delete" | "max" | "min" | "prepend" | "set" | "sum";
interface KvMutation { key: KvKey; type: KvMutationType; value?: unknown; expiresIn?: number }
interface KvEnqueueOptions {
delay?: number;
backoffSchedule?: number[];
keysIfUndelivered?: KvKey[];
}
interface KvQueueMessage<T> { id: string; value: T; attempts: number }
type KvFtsTokenizer = "ascii" | "porter" | "unicode61";
interface KvCreateIndexOptions { fields: string[]; tokenize?: KvFtsTokenizer }
interface KvSearchOptions { limit?: number; where?: KvWhereFilter }
interface KvPaginateResult<T> { entries: KvEntry<T>[]; cursor: string | null; hasMore: boolean }
AreaLimitationMitigation
sum/max/minBigInt is converted to Number — precision is lost outside ±2^53For very large counters, store as string and use set with manual OCC
KvTransactionDoes not retry automatically despite the maxRetries/retryDelay field namesImplement retry in the caller (loop with exponential backoff)
Watch~100 ms polling per key; no batchingKeep to < 100 keys; for many keys, use list in a loop
TTLCleanup is blocking on the main thread; effective granularity ~1 sDistribute TTLs with jitter to avoid expiration spikes
QueueCrash between dequeue and ack leaves the message locked until lockDurationIdempotent handler + lockDuration tuned to the actual processing time
DLQNo automatic cleanupPeriodic job that deletes entries with old failedAt
Where filterfield is interpolated into SQL (paths are not escaped)Validate fields against an allowlist when exposed to external input
ScaleSingle writer (SQLite/libSQL) — ~1k–5k writes/s; no shardingPostgres/manual sharding above ~100 GB
SymptomLikely causeAction
SQLITE_BUSY: database is lockedMultiple processes writing to the same fileEnable WAL (PRAGMA journal_mode=WAL), use libSQL server, increase busy_timeout
atomic().commit() returns { ok: false }Concurrency: versionstamp changed between read and writeRe-read the entry and implement retry with backoff; for counters, prefer sum (no check needed)
Messages stuck in processingWorker crashed; cleanup did not runForce UPDATE kv_queue SET status='pending' WHERE locked_until < now; adjust cleanupInterval
DLQ growingHandler always failing or poison messagesInspect errorMessage in the DLQ; implement periodic cleanup; use requeue for recoverable messages
Duplicate messagesLock expired during long processingIncrease lockDuration; make handler idempotent (mark processed:<id> with TTL)
Watch consuming CPUMany watched keys (100 ms polling each)Cancel unused watchers; consolidate into manual list poll; keep < 100 watchers
Expired entries not removedCleanup did not run or large volumeRun DELETE FROM kv_entries WHERE expires_at <= unixepoch() LIMIT 1000 in batches
no such table: kv_entriesDifferent database than configured or schema did not initializeVerify the selected adapter and onInit logs; run initSchema manually

Useful endpoints for diagnostics:

Terminal window
curl /keyval/api/metrics # aggregated counters (JSON)
curl /keyval/api/metrics/prometheus # same, Prometheus format
curl /keyval/api/queue/stats # pending/processing/dlq/total

For durable SQL provider behavior, see plugin-turso. For the client package, see @buntime/keyval.