Skip to content
7 changes: 7 additions & 0 deletions .changeset/claim-scoped-write-tokens.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@electric-ax/agents-server': patch
'@electric-ax/agents-runtime': patch
'@electric-ax/agents-server-conformance-tests': patch
---

Replace static entity write tokens with claim-scoped tokens. Write tokens are now issued when a consumer claims a wake and revoked on done, preventing leaked credentials from granting permanent write access. Removes `writeToken` from webhook notifications and spawn response headers.
8 changes: 3 additions & 5 deletions packages/agents-runtime/src/process-wake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ export async function processWebhookWake(
// Create producer BEFORE the StreamDB so state actions can write through it.
const producer = new IdempotentProducer(stream, `entity-${entityUrl}`, {
epoch,
autoClaim: true,
fetch: (input, init) => {
const headers = new Headers(init?.headers)
if (writeToken) {
Expand Down Expand Up @@ -670,11 +671,7 @@ export async function processWebhookWake(

if (!claimed.ok) return null
claimedWake = true
writeToken =
claimed.writeToken ??
notification.writeToken ??
notification.entity?.writeToken ??
``
writeToken = claimed.writeToken ?? ``

// 3b. Start heartbeat once this worker owns the wake
heartbeat = setInterval(() => {
Expand Down Expand Up @@ -856,6 +853,7 @@ export async function processWebhookWake(
`shared-state-${entityUrl}-${ssId}`,
{
epoch,
autoClaim: true,
onError: (error) => {
failBackgroundWake(error, `WRITE_FAILED`)
},
Expand Down
2 changes: 0 additions & 2 deletions packages/agents-runtime/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,6 @@ export interface WebhookNotification {
triggeredBy?: Array<string>
callback: string
claimToken: string
writeToken?: string
triggerEvent?: string
wakeEvent?: WakeEvent
entity?: {
Expand All @@ -592,7 +591,6 @@ export interface WebhookNotification {
streams: { main: string; error: string }
tags?: Record<string, string>
spawnArgs?: Record<string, unknown>
writeToken?: string
}
}

Expand Down
60 changes: 59 additions & 1 deletion packages/agents-runtime/test/process-wake.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { createTransaction } from '@durable-streams/state'
import { getCronSourceRef } from '../src/cron-utils'
import { manifestSourceKey } from '../src/manifest-helpers'
import { db } from '../src/observation-sources'
import { processWake } from '../src/process-wake'
import { clearRegistry, defineEntity } from '../src/define-entity'
import { entityStateSchema } from '../src/entity-schema'
import { entityStateSchema, passthrough } from '../src/entity-schema'
import { runtimeLog } from '../src/log'
import { ev } from './helpers/event-fixtures'
import { createLocalOnlyTestCollection } from './helpers/local-only'
Expand All @@ -21,6 +22,7 @@ const {
mockProducerAppend,
mockProducerFlush,
mockProducerDetach,
mockConstructedProducers,
mockDbClose,
mockDbPreload,
mockStreamSubscribeJson,
Expand All @@ -35,6 +37,10 @@ const {
mockProducerAppend: vi.fn(),
mockProducerFlush: vi.fn().mockResolvedValue(undefined),
mockProducerDetach: vi.fn().mockResolvedValue(undefined),
mockConstructedProducers: [] as Array<{
producerId: string
opts?: Record<string, unknown>
}>,
mockDbClose: vi.fn(),
mockDbPreload: vi.fn().mockResolvedValue(undefined),
mockStreamSubscribeJson: vi.fn().mockReturnValue(() => {}),
Expand Down Expand Up @@ -72,6 +78,14 @@ vi.mock(`@durable-streams/client`, async (importOriginal) => {
head = mockStreamHead
}
class MockIdempotentProducer {
constructor(
_stream: unknown,
producerId: string,
opts?: Record<string, unknown>
) {
mockConstructedProducers.push({ producerId, opts })
}

append = mockProducerAppend
flush = mockProducerFlush
detach = mockProducerDetach
Expand Down Expand Up @@ -321,6 +335,14 @@ const BASE_CONFIG: ProcessWakeConfig = {
idleTimeout: 100,
}

const sharedFindingsSchema = {
findings: {
schema: passthrough<Record<string, unknown>>(),
type: `finding`,
primaryKey: `key`,
},
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
Expand All @@ -332,6 +354,7 @@ describe(`processWake`, () => {
vi.clearAllMocks()
vi.useRealTimers()
clearRegistry()
mockConstructedProducers.length = 0
mockDbPreload.mockResolvedValue(undefined)
mockStreamOffset.value = `10_100`
mockDbOffset.value = `10_100`
Expand Down Expand Up @@ -861,6 +884,41 @@ describe(`processWake`, () => {
expect(mockProducerDetach).toHaveBeenCalledOnce()
})

it(`creates the main entity producer with autoClaim enabled`, async () => {
defineEntity(`test-agent`, {
handler: () => {},
})

await processWake(makeNotification(), BASE_CONFIG)

expect(mockConstructedProducers).toContainEqual({
producerId: `entity-http://localhost:3000/test-agent/agent-1`,
opts: expect.objectContaining({
epoch: 1,
autoClaim: true,
}),
})
})

it(`creates shared-state producers with autoClaim enabled`, async () => {
defineEntity(`test-agent`, {
handler: async (ctx) => {
ctx.mkdb(`board-1`, sharedFindingsSchema)
await ctx.observe(db(`board-1`, sharedFindingsSchema))
},
})

await processWake(makeNotification(), BASE_CONFIG)

expect(mockConstructedProducers).toContainEqual({
producerId: `shared-state-http://localhost:3000/test-agent/agent-1-board-1`,
opts: expect.objectContaining({
epoch: 1,
autoClaim: true,
}),
})
})

it(`returns persisted manifest rows when manifest is non-empty`, async () => {
defineEntity(`test-agent`, {
handler: async (ctx) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1519,7 +1519,7 @@ async function executeStep(ctx: RunContext, step: Step): Promise<void> {
main: string
error: string
}
ctx.currentWriteToken = res.headers.get(`x-write-token`) ?? null
ctx.currentWriteToken = null

ctx.history.push({
type: `entity_spawned`,
Expand Down Expand Up @@ -2187,12 +2187,9 @@ export type ElectricAgentsAction =
| `delete_type`
| `spawn`
| `send`
| `write`
| `kill`
| `set_tag`
| `check_status`
| `list`
| `writeStateProtocol`

/**
* Model of a single entity type's state.
Expand Down Expand Up @@ -2232,9 +2229,7 @@ export interface ElectricAgentsWorldModel {
* - delete_type: when entity types exist and no running entities use them
* - spawn: when at least one entity type is registered (up to a cap)
* - send: when at least one entity is running
* - write: when at least one entity is running
* - kill: when at least one entity is running
* - set_tag: when at least one entity is running
* - check_status: when at least one entity exists
* - list: always
*/
Expand Down Expand Up @@ -2263,14 +2258,7 @@ export function enabledElectricAgentsActions(

const hasRunning = model.entities.some((e) => e.status === `running`)
if (hasRunning) {
enabled.push(
`send`,
`write`,
`kill`,
`set_tag`,
`check_status`,
`writeStateProtocol`
)
enabled.push(`send`, `kill`, `check_status`)
}

const hasAny = model.entities.length > 0
Expand Down Expand Up @@ -2353,9 +2341,6 @@ export function applyElectricAgentsAction(
entities[targetIdx] = { ...e, status: `stopped` }
return { ...model, entities }
}
case `write`:
case `writeStateProtocol`:
case `set_tag`:
case `check_status`:
case `list`:
return model
Expand Down
Loading
Loading