diff --git a/cli/src/cursor/cursorAcpRemoteLauncher.test.ts b/cli/src/cursor/cursorAcpRemoteLauncher.test.ts index 761e86c387..d4b801aa80 100644 --- a/cli/src/cursor/cursorAcpRemoteLauncher.test.ts +++ b/cli/src/cursor/cursorAcpRemoteLauncher.test.ts @@ -248,7 +248,7 @@ describe('cursorAcpRemoteLauncher', () => { const session = makeSession('old-stream-json-id'); await expect(cursorAcpRemoteLauncher(session)).rejects.toThrow( - /Legacy stream-json sessions cannot be loaded via ACP/ + /Failed to resume Cursor ACP session \(session not found\)/ ); expect(harness.loadSessionCalled).toBe(true); @@ -256,6 +256,13 @@ describe('cursorAcpRemoteLauncher', () => { expect(legacyLauncher).not.toHaveBeenCalled(); }); + it('sends ready after successful ACP bootstrap for hub resume handshake', async () => { + const session = makeSession('resume-thread-1'); + await cursorAcpRemoteLauncher(session); + + expect(session.client.sendSessionEvent).toHaveBeenCalledWith({ type: 'ready' }); + }); + it('throws when resume id is set but session/load is unsupported', async () => { harness.supportsLoadSession = false; const session = makeSession('some-session-id'); diff --git a/cli/src/cursor/cursorAcpRemoteLauncher.ts b/cli/src/cursor/cursorAcpRemoteLauncher.ts index 5e3eaf75b5..9e2240a9d5 100644 --- a/cli/src/cursor/cursorAcpRemoteLauncher.ts +++ b/cli/src/cursor/cursorAcpRemoteLauncher.ts @@ -115,8 +115,9 @@ class CursorAcpRemoteLauncher extends RemoteLauncherBase { }); } catch (error) { logger.warn('[cursor-acp] session/load failed', formatAcpLoadError(error)); + const detail = error instanceof Error ? error.message : String(error); throw new Error( - 'Failed to resume Cursor ACP session. Legacy stream-json sessions cannot be loaded via ACP.' + `Failed to resume Cursor ACP session (${detail}). Legacy stream-json sessions cannot be loaded via ACP.` ); } } else if (resumeSessionId) { @@ -179,6 +180,9 @@ class CursorAcpRemoteLauncher extends RemoteLauncherBase { session.sendSessionEvent({ type: 'ready' }); }; + // Hub reopen/resume waits for this before merging archived rows (#917). + sendReady(); + while (!this.shouldExit) { const waitSignal = this.abortController.signal; const batch = await session.queue.waitForMessagesAndGetAsString(waitSignal); diff --git a/cli/src/cursor/utils/cursorProtocol.ts b/cli/src/cursor/utils/cursorProtocol.ts index 64936f7c39..c85a9e69de 100644 --- a/cli/src/cursor/utils/cursorProtocol.ts +++ b/cli/src/cursor/utils/cursorProtocol.ts @@ -1,23 +1,5 @@ -import type { Metadata } from '@hapi/protocol/schemas'; - -export type CursorSessionProtocol = 'acp' | 'stream-json'; - -export function isLegacyCursorSession(metadata: Metadata | null | undefined): boolean { - if (metadata?.flavor !== 'cursor') { - return false; - } - if (metadata.cursorSessionProtocol === 'acp') { - return false; - } - if (metadata.cursorSessionProtocol === 'stream-json') { - return Boolean(metadata.cursorSessionId); - } - return Boolean(metadata.cursorSessionId); -} - -export function resolveCursorRemoteProtocol(metadata: Metadata | null | undefined): CursorSessionProtocol { - if (isLegacyCursorSession(metadata)) { - return 'stream-json'; - } - return 'acp'; -} +export { + isLegacyCursorSession, + resolveCursorRemoteProtocol, + type CursorSessionProtocol +} from '@hapi/protocol/cursorProtocol' diff --git a/hub/src/sync/messageService.ts b/hub/src/sync/messageService.ts index a740f12d70..9ee8f9e33b 100644 --- a/hub/src/sync/messageService.ts +++ b/hub/src/sync/messageService.ts @@ -14,6 +14,7 @@ import type { Server } from 'socket.io' import { randomUUID } from 'node:crypto' import type { Store, CancelQueuedMessageResult } from '../store' import { EventPublisher } from './eventPublisher' +import { isSessionReadyMessage } from './sessionActivity' type StoredMessageForDelivery = ReturnType[number] @@ -91,6 +92,11 @@ export class MessageService { return toVisibleDecryptedMessages(stored) } + hasSessionReadyEvent(sessionId: string, limit: number = 100): boolean { + const stored = this.store.messages.getMessages(sessionId, limit) + return stored.some((message) => isSessionReadyMessage(message.content)) + } + getSessionExport( sessionId: string, session: Session, diff --git a/hub/src/sync/sessionActivity.ts b/hub/src/sync/sessionActivity.ts index 08c2f45e20..e56e44241c 100644 --- a/hub/src/sync/sessionActivity.ts +++ b/hub/src/sync/sessionActivity.ts @@ -36,6 +36,15 @@ function isReadyEventContent(content: unknown): boolean { return data?.type === 'ready' } +export function isSessionReadyMessage(content: unknown): boolean { + const message = unwrapRoleWrappedRecordEnvelope(content) + if (!message || message.role !== 'agent') { + return false + } + + return isReadyEventContent(message.content) +} + export function shouldRecordSessionActivity(content: unknown): boolean { const message = unwrapRoleWrappedRecordEnvelope(content) if (!message) { diff --git a/hub/src/sync/sessionCache.ts b/hub/src/sync/sessionCache.ts index 304b01ccae..7afb6bf9fa 100644 --- a/hub/src/sync/sessionCache.ts +++ b/hub/src/sync/sessionCache.ts @@ -1,10 +1,12 @@ import { AgentStateSchema, MetadataSchema, TeamStateSchema } from '@hapi/protocol/schemas' +import { resolveCursorRemoteProtocol } from '@hapi/protocol/cursorProtocol' import type { CodexCollaborationMode, PermissionMode, Session, SessionPatch } from '@hapi/protocol/types' import type { Store } from '../store' import { clampAliveTime } from './aliveTime' import { EventPublisher } from './eventPublisher' import { extractTodoWriteTodosFromMessageContent, TodosSchema } from './todos' import { extractBackgroundTaskDelta } from './backgroundTasks' +import { isSessionReadyMessage } from './sessionActivity' const QUEUED_MESSAGE_THINKING_GRACE_MS = 15_000 // tiann/hapi#919: metadata writers (renameSession, clearSessionArchiveMetadata, @@ -696,6 +698,61 @@ export class SessionCache { throw new Error('Session was modified concurrently. Please try again.') } + /** + * Stamp `cursorSessionProtocol: 'stream-json'` on pre-#799 Cursor rows without + * clearing archive metadata. Used during archived reopen (#917): we defer + * `clearSessionArchiveMetadata` until resume succeeds, but the spawned CLI + * reads protocol from the existing row when bootstrapping a `--resume` spawn. + */ + async stampLegacyCursorSessionProtocol(sessionId: string): Promise<{ cursorSessionProtocol: 'stream-json' }> { + for (let attempt = 0; attempt < METADATA_RETRY_ATTEMPTS; attempt += 1) { + const session = this.sessions.get(sessionId) ?? this.refreshSession(sessionId) + if (!session) { + throw new Error('Session not found') + } + + const currentMetadata = session.metadata + if (!currentMetadata) { + throw new Error('Session metadata missing') + } + + if ( + currentMetadata.flavor !== 'cursor' + || typeof currentMetadata.cursorSessionId !== 'string' + || currentMetadata.cursorSessionId.length === 0 + || currentMetadata.cursorSessionProtocol !== undefined + ) { + throw new Error('Session is not a legacy Cursor row needing protocol stamp') + } + + const next: Record = { + ...currentMetadata, + cursorSessionProtocol: 'stream-json' + } + + const result = this.store.sessions.updateSessionMetadata( + sessionId, + next, + session.metadataVersion, + session.namespace, + { touchUpdatedAt: false } + ) + + if (result.result === 'error') { + throw new Error('Failed to update session metadata') + } + + if (result.result === 'success') { + this.refreshSession(sessionId) + return { cursorSessionProtocol: 'stream-json' } + } + + this.refreshSession(sessionId) + } + + throw new Error('Session was modified concurrently. Please try again.') + } + /** * Restore archive-related metadata fields that were captured before a reopen attempt. * Used when `resumeSession` fails after `clearSessionArchiveMetadata` already ran so the @@ -1165,6 +1222,13 @@ export class SessionCache { mergeAgentState: false }) } else { + const candidateMetadata = candidate?.metadata + const isArchivedCursorAcp = candidateMetadata?.lifecycleState === 'archived' + && candidateMetadata?.flavor === 'cursor' + && resolveCursorRemoteProtocol(candidateMetadata) === 'acp' + if (isArchivedCursorAcp && !this.hasSessionReadyEvent(targetId)) { + continue + } await this.mergeSessions(id, targetId, targetNamespace) } } catch { @@ -1177,4 +1241,9 @@ export class SessionCache { this.deduplicatePending.delete(agentId.value) } } + + private hasSessionReadyEvent(sessionId: string): boolean { + return this.store.messages.getMessages(sessionId, 100) + .some((message) => isSessionReadyMessage(message.content)) + } } diff --git a/hub/src/sync/sessionModel.test.ts b/hub/src/sync/sessionModel.test.ts index 8fa18c074a..7f76b95cee 100644 --- a/hub/src/sync/sessionModel.test.ts +++ b/hub/src/sync/sessionModel.test.ts @@ -2215,6 +2215,35 @@ describe('session model', () => { expect(meta?.cursorSessionProtocol).toBe('stream-json') }) + it('stamps stream-json without clearing archive metadata for legacy rows', async () => { + const store = new Store(':memory:') + const events: SyncEvent[] = [] + const cache = new SessionCache(store, createPublisher(events)) + + const session = cache.getOrCreateSession( + 'session-cursor-legacy-prestamp', + { + path: '/tmp/project', + host: 'localhost', + flavor: 'cursor', + cursorSessionId: 'legacy-prestamp-id', + lifecycleState: 'archived', + archivedBy: 'cli', + archiveReason: 'Hub restart' + }, + null, + 'default' + ) + + const result = await cache.stampLegacyCursorSessionProtocol(session.id) + + expect(result.cursorSessionProtocol).toBe('stream-json') + const meta = cache.getSession(session.id)?.metadata as Record | null | undefined + expect(meta?.cursorSessionProtocol).toBe('stream-json') + expect(meta?.lifecycleState).toBe('archived') + expect(meta?.archiveReason).toBe('Hub restart') + }) + it('keeps an existing acp protocol intact when clearing archive metadata', async () => { const store = new Store(':memory:') const events: SyncEvent[] = [] @@ -2466,7 +2495,6 @@ describe('session model', () => { })).resolves.toBeUndefined() }) }) - // tiann/hapi#916: when the CLI is gone, the kill-RPC throws // RpcTargetMissingError. markSessionArchivedFromHub writes the archive // metadata directly so the row's lifecycleState still flips to 'archived'. @@ -2727,4 +2755,360 @@ describe('session model', () => { expect(meta?.name).toBe('parallel-rename') }) }) + + describe('cursor ACP reopen handshake (#917)', () => { + it('does not merge cursor ACP resume until ready handshake completes', async () => { + const store = new Store(':memory:') + const engine = new SyncEngine( + store, + {} as never, + new RpcRegistry(), + { broadcast() {} } as never + ) + + try { + const oldSession = engine.getOrCreateSession( + 'session-cursor-acp-resume-old', + { + path: '/tmp/project', + host: 'localhost', + machineId: 'machine-1', + flavor: 'cursor', + cursorSessionId: 'cursor-acp-1', + cursorSessionProtocol: 'acp', + lifecycleState: 'archived', + archivedBy: 'cli', + archiveReason: 'Hub restart' + }, + null, + 'default' + ) + engine.getOrCreateMachine( + 'machine-1', + { host: 'localhost', platform: 'linux', happyCliVersion: '0.1.0' }, + null, + 'default' + ) + engine.handleMachineAlive({ machineId: 'machine-1', time: Date.now() }) + + const spawnedSession = engine.getOrCreateSession( + 'session-cursor-acp-resume-new', + { + path: '/tmp/project', + host: 'localhost', + machineId: 'machine-1', + flavor: 'cursor', + cursorSessionId: 'cursor-acp-1', + cursorSessionProtocol: 'acp' + }, + null, + 'default' + ) + + let mergeCalls = 0 + const sessionCache = (engine as any).sessionCache + const mergeSessions = sessionCache.mergeSessions.bind(sessionCache) + sessionCache.mergeSessions = async (oldSessionId: string, newSessionId: string, namespace: string) => { + mergeCalls += 1 + return mergeSessions(oldSessionId, newSessionId, namespace) + } + ;(engine as any).rpcGateway.spawnSession = async () => { + engine.handleSessionAlive({ sid: spawnedSession.id, time: Date.now() }) + return { type: 'success', sessionId: spawnedSession.id } + } + ;(engine as any).waitForSessionActive = async () => true + ;(engine as any).waitForSessionReady = async () => 'timeout' + + const result = await engine.resumeSession(oldSession.id, 'default') + + expect(result.type).toBe('error') + if (result.type === 'error') { + expect(result.code).toBe('resume_failed') + } + expect(mergeCalls).toBe(0) + expect(engine.getSession(oldSession.id)).toBeDefined() + } finally { + engine.stop() + } + }) + + it('skips ACP resume handshake for pre-#799 legacy cursor rows without cursorSessionProtocol', async () => { + const priorAutoMigrate = process.env.HAPI_CURSOR_LEGACY_AUTO_MIGRATE + process.env.HAPI_CURSOR_LEGACY_AUTO_MIGRATE = '0' + const store = new Store(':memory:') + const engine = new SyncEngine( + store, + {} as never, + new RpcRegistry(), + { broadcast() {} } as never + ) + + try { + const oldSession = engine.getOrCreateSession( + 'session-cursor-legacy-resume-old', + { + path: '/tmp/project', + host: 'localhost', + machineId: 'machine-1', + flavor: 'cursor', + cursorSessionId: 'legacy-cursor-1', + lifecycleState: 'archived', + archivedBy: 'cli', + archiveReason: 'Hub restart' + }, + null, + 'default' + ) + engine.getOrCreateMachine( + 'machine-1', + { host: 'localhost', platform: 'linux', happyCliVersion: '0.1.0' }, + null, + 'default' + ) + engine.handleMachineAlive({ machineId: 'machine-1', time: Date.now() }) + + const spawnedSession = engine.getOrCreateSession( + 'session-cursor-legacy-resume-new', + { + path: '/tmp/project', + host: 'localhost', + machineId: 'machine-1', + flavor: 'cursor', + cursorSessionId: 'legacy-cursor-1' + }, + null, + 'default' + ) + + let handshakeCalls = 0 + ;(engine as any).rpcGateway.spawnSession = async () => { + engine.handleSessionAlive({ sid: spawnedSession.id, time: Date.now() }) + return { type: 'success', sessionId: spawnedSession.id } + } + ;(engine as any).waitForSessionActive = async () => true + ;(engine as any).waitForSessionReady = async () => { + handshakeCalls += 1 + return 'timeout' + } + + const result = await engine.resumeSession(oldSession.id, 'default') + + expect(handshakeCalls).toBe(0) + expect(result.type).toBe('success') + } finally { + if (priorAutoMigrate === undefined) { + delete process.env.HAPI_CURSOR_LEGACY_AUTO_MIGRATE + } else { + process.env.HAPI_CURSOR_LEGACY_AUTO_MIGRATE = priorAutoMigrate + } + engine.stop() + } + }) + + it('pre-stamps legacy protocol before spawn while keeping the row archived', async () => { + const priorAutoMigrate = process.env.HAPI_CURSOR_LEGACY_AUTO_MIGRATE + process.env.HAPI_CURSOR_LEGACY_AUTO_MIGRATE = '0' + const store = new Store(':memory:') + const engine = new SyncEngine( + store, + {} as never, + new RpcRegistry(), + { broadcast() {} } as never + ) + + try { + const session = engine.getOrCreateSession( + 'session-cursor-legacy-reopen-prestamp', + { + path: '/tmp/project', + host: 'localhost', + machineId: 'machine-1', + flavor: 'cursor', + cursorSessionId: 'legacy-reopen-prestamp', + lifecycleState: 'archived', + archivedBy: 'cli', + archiveReason: 'Hub restart' + }, + null, + 'default' + ) + engine.getOrCreateMachine( + 'machine-1', + { host: 'localhost', platform: 'linux', happyCliVersion: '0.1.0' }, + null, + 'default' + ) + engine.handleMachineAlive({ machineId: 'machine-1', time: Date.now() }) + + let metadataAtSpawn: Record | null | undefined + ;(engine as any).rpcGateway.spawnSession = async () => { + metadataAtSpawn = engine.getSessionByNamespace(session.id, 'default')?.metadata as Record | null | undefined + engine.handleSessionAlive({ sid: session.id, time: Date.now() }) + return { type: 'success', sessionId: session.id } + } + ;(engine as any).waitForSessionActive = async () => true + + const result = await engine.reopenSession(session.id, 'default') + + expect(result.type).toBe('success') + expect(metadataAtSpawn?.cursorSessionProtocol).toBe('stream-json') + expect(metadataAtSpawn?.lifecycleState).toBe('archived') + } finally { + if (priorAutoMigrate === undefined) { + delete process.env.HAPI_CURSOR_LEGACY_AUTO_MIGRATE + } else { + process.env.HAPI_CURSOR_LEGACY_AUTO_MIGRATE = priorAutoMigrate + } + engine.stop() + } + }) + + it('restores archived metadata when cursor ACP resume handshake fails', async () => { + const store = new Store(':memory:') + const engine = new SyncEngine( + store, + {} as never, + new RpcRegistry(), + { broadcast() {} } as never + ) + + try { + const session = engine.getOrCreateSession( + 'session-cursor-acp-reopen-rollback', + { + path: '/tmp/project', + host: 'localhost', + machineId: 'machine-1', + flavor: 'cursor', + cursorSessionId: 'cursor-acp-rollback', + cursorSessionProtocol: 'acp', + lifecycleState: 'archived', + archivedBy: 'cli', + archiveReason: 'Session crashed', + lifecycleStateSince: 1000 + }, + null, + 'default' + ) + engine.getOrCreateMachine( + 'machine-1', + { host: 'localhost', platform: 'linux', happyCliVersion: '0.1.0' }, + null, + 'default' + ) + engine.handleMachineAlive({ machineId: 'machine-1', time: Date.now() }) + + const spawnedSession = engine.getOrCreateSession( + 'session-cursor-acp-reopen-spawned', + { + path: '/tmp/project', + host: 'localhost', + machineId: 'machine-1', + flavor: 'cursor', + cursorSessionId: 'cursor-acp-rollback', + cursorSessionProtocol: 'acp' + }, + null, + 'default' + ) + ;(engine as any).rpcGateway.spawnSession = async () => { + engine.handleSessionAlive({ sid: spawnedSession.id, time: Date.now() }) + return { type: 'success', sessionId: spawnedSession.id } + } + ;(engine as any).waitForSessionActive = async () => true + ;(engine as any).waitForSessionReady = async () => 'timeout' + + const result = await engine.reopenSession(session.id, 'default') + + expect(result.type).toBe('error') + const restored = engine.getSessionByNamespace(session.id, 'default')?.metadata as Record | null | undefined + expect(restored?.lifecycleState).toBe('archived') + expect(restored?.archiveReason).toBe('Session crashed') + } finally { + engine.stop() + } + }) + + it('defers dedup merge of archived cursor ACP rows until ready is emitted', async () => { + const store = new Store(':memory:') + const events: SyncEvent[] = [] + const cache = new SessionCache(store, createPublisher(events)) + + const archived = cache.getOrCreateSession( + 'cursor-archived-dup', + { + path: '/tmp/project', + host: 'localhost', + flavor: 'cursor', + cursorSessionId: 'cursor-dedup-1', + cursorSessionProtocol: 'acp', + lifecycleState: 'archived', + archivedBy: 'cli', + archiveReason: 'Hub restart' + }, + null, + 'default' + ) + const active = cache.getOrCreateSession( + 'cursor-active-dup', + { + path: '/tmp/project', + host: 'localhost', + flavor: 'cursor', + cursorSessionId: 'cursor-dedup-1', + cursorSessionProtocol: 'acp' + }, + null, + 'default' + ) + cache.handleSessionAlive({ sid: active.id, time: Date.now() }) + + await cache.deduplicateByAgentSessionId(active.id) + expect(cache.getSession(archived.id)).toBeDefined() + + store.messages.addMessage(active.id, { + role: 'agent', + content: { type: 'event', data: { type: 'ready' } } + }, 'ready-local') + + await cache.deduplicateByAgentSessionId(active.id) + expect(cache.getSession(archived.id)).toBeUndefined() + }) + + it('merges archived legacy cursor duplicates without waiting for ACP ready', async () => { + const store = new Store(':memory:') + const events: SyncEvent[] = [] + const cache = new SessionCache(store, createPublisher(events)) + + const archived = cache.getOrCreateSession( + 'cursor-legacy-archived-dup', + { + path: '/tmp/project', + host: 'localhost', + flavor: 'cursor', + cursorSessionId: 'legacy-dedup-1', + lifecycleState: 'archived', + archivedBy: 'cli', + archiveReason: 'Hub restart' + }, + null, + 'default' + ) + const active = cache.getOrCreateSession( + 'cursor-legacy-active-dup', + { + path: '/tmp/project', + host: 'localhost', + flavor: 'cursor', + cursorSessionId: 'legacy-dedup-1' + }, + null, + 'default' + ) + cache.handleSessionAlive({ sid: active.id, time: Date.now() }) + + await cache.deduplicateByAgentSessionId(active.id) + expect(cache.getSession(archived.id)).toBeUndefined() + }) + }) }) diff --git a/hub/src/sync/syncEngine.ts b/hub/src/sync/syncEngine.ts index d59f58fd47..2437439621 100644 --- a/hub/src/sync/syncEngine.ts +++ b/hub/src/sync/syncEngine.ts @@ -8,6 +8,7 @@ */ import { isKnownFlavor, type LocalResumeTarget, type ResumableSession } from '@hapi/protocol' +import { isLegacyCursorSession, resolveCursorRemoteProtocol } from '@hapi/protocol/cursorProtocol' import type { CursorMigrateOutcome, CursorMigrateToAcpRequest, SlashCommandsResponse } from '@hapi/protocol/apiTypes' import type { AgentFlavor, CodexCollaborationMode, DecryptedMessage, PermissionMode, Session, SyncEvent } from '@hapi/protocol/types' import { unwrapRoleWrappedRecordEnvelope } from '@hapi/protocol/messages' @@ -1206,7 +1207,7 @@ export class SyncEngine { const needsReadyBeforeMerge = spawnResult.sessionId !== access.sessionId && flavor === 'cursor' - && metadata.cursorSessionProtocol === 'acp' + && resolveCursorRemoteProtocol(metadata) === 'acp' if (needsReadyBeforeMerge) { const readyResult = await this.waitForSessionReady(spawnResult.sessionId) if (readyResult !== 'ready') { @@ -1287,40 +1288,49 @@ export class SyncEngine { } } - const archiveSnapshot = { - lifecycleState: metadata.lifecycleState, - archivedBy: metadata.archivedBy, - archiveReason: metadata.archiveReason, - lifecycleStateSince: metadata.lifecycleStateSince - } - - let applied: { cursorSessionProtocol?: 'acp' | 'stream-json' } - try { - applied = await this.sessionCache.clearSessionArchiveMetadata(access.sessionId) - } catch (error) { - const message = error instanceof Error ? error.message : 'Failed to clear archive metadata' - return { type: 'error', message, code: 'metadata_conflict' } + let preapplied: { cursorSessionProtocol?: 'acp' | 'stream-json' } | undefined + if (isLegacyCursorSession(metadata)) { + try { + preapplied = await this.sessionCache.stampLegacyCursorSessionProtocol(access.sessionId) + } catch (error) { + const message = error instanceof Error ? error.message : 'Failed to stamp legacy Cursor protocol' + return { type: 'error', message, code: 'metadata_conflict' } + } } const resumeResult = await this.resumeSession(access.sessionId, namespace) if (resumeResult.type === 'error') { - // Resume failed - put the archive flags back so the row stays archived in the UI - // and the operator can retry. Best-effort: a concurrent metadata write that - // succeeded between clear and restore (e.g. an unrelated rename) wins, in - // which case we surface the original resume error rather than masking it. + // Old row stays archived — we defer clearSessionArchiveMetadata until success + // so dedup cannot merge away the recovery path during handshake (#917). + return resumeResult + } + + let applied: { cursorSessionProtocol?: 'acp' | 'stream-json' } | undefined + const clearTargets = resumeResult.sessionId === access.sessionId + ? [resumeResult.sessionId] + : [resumeResult.sessionId, access.sessionId] + for (const targetId of clearTargets) { + if (!this.sessionCache.getSessionByNamespace(targetId, namespace)) { + continue + } try { - await this.sessionCache.restoreSessionArchiveMetadata(access.sessionId, archiveSnapshot) - } catch { - // Swallow restore failures - the resume error is the more important signal. + applied = await this.sessionCache.clearSessionArchiveMetadata(targetId) + break + } catch (error) { + if (targetId === clearTargets[clearTargets.length - 1]) { + const message = error instanceof Error ? error.message : 'Failed to clear archive metadata' + return { type: 'error', message, code: 'metadata_conflict' } + } } - return resumeResult } return { type: 'success', sessionId: resumeResult.sessionId, resumed: true, - ...(applied.cursorSessionProtocol ? { cursorSessionProtocol: applied.cursorSessionProtocol } : {}) + ...((applied?.cursorSessionProtocol ?? preapplied?.cursorSessionProtocol) + ? { cursorSessionProtocol: applied?.cursorSessionProtocol ?? preapplied?.cursorSessionProtocol } + : {}) } } diff --git a/shared/package.json b/shared/package.json index 1513cb3cf3..14e246a27a 100644 --- a/shared/package.json +++ b/shared/package.json @@ -11,6 +11,7 @@ "./messages": "./src/messages.ts", "./slashCommands": "./src/slashCommands.ts", "./buildInfo": "./src/buildInfo.ts", + "./cursorProtocol": "./src/cursorProtocol.ts", "./modes": "./src/modes.ts", "./rpcMethods": "./src/rpcMethods.ts", "./schemas": "./src/schemas.ts", diff --git a/shared/src/cursorProtocol.test.ts b/shared/src/cursorProtocol.test.ts new file mode 100644 index 0000000000..c1758ab487 --- /dev/null +++ b/shared/src/cursorProtocol.test.ts @@ -0,0 +1,58 @@ +import { describe, expect, it } from 'bun:test' +import { isLegacyCursorSession, resolveCursorRemoteProtocol } from './cursorProtocol' + +const baseMetadata = { flavor: 'cursor' as const, path: '/tmp', host: 'test' } + +describe('cursorProtocol', () => { + it('routes new sessions to ACP', () => { + expect(resolveCursorRemoteProtocol(baseMetadata)).toBe('acp') + expect(isLegacyCursorSession(baseMetadata)).toBe(false) + }) + + it('routes legacy stream-json sessions by metadata', () => { + const metadata = { + ...baseMetadata, + cursorSessionId: 'old-session', + cursorSessionProtocol: 'stream-json' as const + } + expect(resolveCursorRemoteProtocol(metadata)).toBe('stream-json') + expect(isLegacyCursorSession(metadata)).toBe(true) + }) + + it('routes sessions with cursorSessionId but no protocol to legacy', () => { + const metadata = { + ...baseMetadata, + cursorSessionId: 'old-session' + } + expect(resolveCursorRemoteProtocol(metadata)).toBe('stream-json') + }) + + it('routes explicit ACP metadata to ACP even with session id', () => { + const metadata = { + ...baseMetadata, + cursorSessionId: 'acp-session', + cursorSessionProtocol: 'acp' as const + } + expect(resolveCursorRemoteProtocol(metadata)).toBe('acp') + expect(isLegacyCursorSession(metadata)).toBe(false) + }) + + it('never treats non-cursor flavor as legacy', () => { + expect(isLegacyCursorSession({ ...baseMetadata, flavor: 'claude', cursorSessionId: 'x' })).toBe(false) + expect(resolveCursorRemoteProtocol({ ...baseMetadata, flavor: 'claude', cursorSessionId: 'x' })).toBe('acp') + }) + + it('does not use legacy when stream-json protocol is set but cursorSessionId is missing', () => { + const metadata = { + ...baseMetadata, + cursorSessionProtocol: 'stream-json' as const + } + expect(isLegacyCursorSession(metadata)).toBe(false) + expect(resolveCursorRemoteProtocol(metadata)).toBe('acp') + }) + + it('defaults null/undefined metadata to ACP (new session)', () => { + expect(resolveCursorRemoteProtocol(null)).toBe('acp') + expect(resolveCursorRemoteProtocol(undefined)).toBe('acp') + }) +}) diff --git a/shared/src/cursorProtocol.ts b/shared/src/cursorProtocol.ts new file mode 100644 index 0000000000..38e33da3be --- /dev/null +++ b/shared/src/cursorProtocol.ts @@ -0,0 +1,23 @@ +import type { Metadata } from './schemas' + +export type CursorSessionProtocol = 'acp' | 'stream-json' + +export function isLegacyCursorSession(metadata: Metadata | null | undefined): boolean { + if (metadata?.flavor !== 'cursor') { + return false + } + if (metadata.cursorSessionProtocol === 'acp') { + return false + } + if (metadata.cursorSessionProtocol === 'stream-json') { + return Boolean(metadata.cursorSessionId) + } + return Boolean(metadata.cursorSessionId) +} + +export function resolveCursorRemoteProtocol(metadata: Metadata | null | undefined): CursorSessionProtocol { + if (isLegacyCursorSession(metadata)) { + return 'stream-json' + } + return 'acp' +}