diff --git a/acceptance/specs/admin.test.ts b/acceptance/specs/admin.test.ts index 18ee47aa9..3fd1a047c 100644 --- a/acceptance/specs/admin.test.ts +++ b/acceptance/specs/admin.test.ts @@ -2,6 +2,7 @@ import { randomUUID } from 'node:crypto' import { ListBucketsCommand } from '@aws-sdk/client-s3' import { describeAcceptance, getAcceptanceConfig, requireConfigValue } from '../support/config' import { AcceptanceHttpClient, createAdminClient } from '../support/http' +import { uniqueBucketName, uniqueObjectKey } from '../support/resources' import { createAcceptanceS3Client } from '../support/s3' interface TenantSummary { @@ -71,6 +72,10 @@ interface MessageResponse { message?: string } +interface GenerateSignaturesResponse extends MessageResponse { + jobId?: string +} + interface TenantMigrationRunResponse { migrated?: boolean } @@ -503,6 +508,43 @@ describeAcceptance( } } }) + + it('covers object signature generation scheduling for a scoped object name', async () => { + const config = getAcceptanceConfig() + const client = createAdminClient() + const headers = { + apikey: requireConfigValue(config.adminApiKey, 'ACCEPTANCE_ADMIN_API_KEY'), + } + const tenantId = await resolveTenantId(client, headers) + const bucketName = uniqueBucketName('sig') + const objectKey = uniqueObjectKey('sig') + const expectScheduling = config.target === 'local' && process.env.PG_QUEUE_ENABLE === 'true' + + const scheduled = await client.request( + 'POST', + `/tenants/${tenantId}/storage/generate-signatures`, + { + body: { + bucketId: bucketName, + force: true, + objectNames: [objectKey], + }, + expectedStatus: expectScheduling ? 200 : [200, 400], + headers, + } + ) + + if (scheduled.status === 200) { + expect(scheduled.json?.message).toBe('Object signature generation scheduled') + expect(typeof scheduled.json?.jobId).toBe('string') + expect(scheduled.json?.jobId?.length).toBeGreaterThan(0) + } else { + expect(expectScheduling).toBe(false) + expect(scheduled.json?.message).toMatch( + /^(Queue is not enabled|Tenant migrations must include add-objects-signature before generating signatures)$/ + ) + } + }) } ) diff --git a/migrations/tenant/0061-add-objects-signature.sql b/migrations/tenant/0061-add-objects-signature.sql new file mode 100644 index 000000000..4a2634896 --- /dev/null +++ b/migrations/tenant/0061-add-objects-signature.sql @@ -0,0 +1,90 @@ +ALTER TABLE storage.objects + ADD COLUMN IF NOT EXISTS signature bytea; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM pg_constraint + WHERE conname = 'objects_signature_length' + AND conrelid = 'storage.objects'::regclass + ) THEN + ALTER TABLE storage.objects + ADD CONSTRAINT objects_signature_length + CHECK (signature IS NULL OR octet_length(signature) = 32) + NOT VALID; + END IF; +END $$; + +DROP TRIGGER IF EXISTS update_objects_updated_at ON storage.objects; + +-- Keep this list in sync with all non-generated storage.objects columns except signature. +-- Generated columns such as path_tokens are intentionally omitted. +-- The explicit row comparison avoids converting every updated object row to jsonb. +CREATE TRIGGER update_objects_updated_at +BEFORE UPDATE ON storage.objects +FOR EACH ROW +WHEN ( + ROW( + NEW.id, + NEW.bucket_id, + NEW.name, + NEW.owner, + NEW.created_at, + NEW.updated_at, + NEW.last_accessed_at, + NEW.metadata, + NEW.version, + NEW.owner_id, + NEW.user_metadata + ) + IS DISTINCT FROM + ROW( + OLD.id, + OLD.bucket_id, + OLD.name, + OLD.owner, + OLD.created_at, + OLD.updated_at, + OLD.last_accessed_at, + OLD.metadata, + OLD.version, + OLD.owner_id, + OLD.user_metadata + ) +) +EXECUTE PROCEDURE update_updated_at_column(); + +CREATE OR REPLACE FUNCTION storage.enforce_objects_signature_client_writes() + RETURNS trigger +AS $$ +DECLARE + anon_role text = COALESCE(current_setting('storage.anon_role', true), 'anon'); + authenticated_role text = COALESCE(current_setting('storage.authenticated_role', true), 'authenticated'); + effective_role text = COALESCE(NULLIF(current_setting('role', true), 'none'), current_user); +BEGIN + IF effective_role = anon_role OR effective_role = authenticated_role THEN + IF TG_OP = 'INSERT' AND NEW.signature IS NOT NULL THEN + RAISE EXCEPTION 'Only storage service roles can set object signatures' + USING ERRCODE = '42501'; + END IF; + + IF TG_OP = 'UPDATE' + AND NEW.signature IS NOT NULL + AND NEW.signature IS DISTINCT FROM OLD.signature + THEN + RAISE EXCEPTION 'Only storage service roles can set object signatures' + USING ERRCODE = '42501'; + END IF; + END IF; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS enforce_objects_signature_client_writes ON storage.objects; + +CREATE TRIGGER enforce_objects_signature_client_writes +BEFORE INSERT OR UPDATE ON storage.objects +FOR EACH ROW +EXECUTE FUNCTION storage.enforce_objects_signature_client_writes(); diff --git a/migrations/tenant/0062-add-objects-signature-index.sql b/migrations/tenant/0062-add-objects-signature-index.sql new file mode 100644 index 000000000..f3392b703 --- /dev/null +++ b/migrations/tenant/0062-add-objects-signature-index.sql @@ -0,0 +1,6 @@ +-- postgres-migrations disable-transaction + +CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_objects_missing_signature_bucket_id_name + ON storage.objects (bucket_id, name) + INCLUDE (version) + WHERE signature IS NULL; diff --git a/src/admin-app.ts b/src/admin-app.ts index 6dbb898ac..fb99bf736 100644 --- a/src/admin-app.ts +++ b/src/admin-app.ts @@ -61,6 +61,7 @@ const build = (opts: buildOpts = {}): FastifyInstance => { app.register(routes.tenants, { prefix: 'tenants' }) app.register(routes.objects, { prefix: 'tenants' }) app.register(routes.jwks, { prefix: 'tenants' }) + app.register(routes.signatureGeneration, { prefix: 'tenants' }) app.register(routes.migrations, { prefix: 'migrations' }) if (isRunningUnderWatt) { app.register(routes.pprof, { prefix: 'debug/pprof' }) diff --git a/src/http/routes/admin/index.ts b/src/http/routes/admin/index.ts index 52f76b281..d145489ca 100644 --- a/src/http/routes/admin/index.ts +++ b/src/http/routes/admin/index.ts @@ -5,4 +5,5 @@ export { default as objects } from './objects' export { default as pprof } from './pprof' export { default as queue } from './queue' export { default as s3Credentials } from './s3' +export { default as signatureGeneration } from './signature-generation' export { default as tenants } from './tenants' diff --git a/src/http/routes/admin/signature-generation.test.ts b/src/http/routes/admin/signature-generation.test.ts new file mode 100644 index 000000000..3ee4c7f59 --- /dev/null +++ b/src/http/routes/admin/signature-generation.test.ts @@ -0,0 +1,395 @@ +import { tenantHasMigrations } from '@internal/database/migrations' +import { vi } from 'vitest' + +const { mockGenerateObjectSignaturesSend, mockTenantHasMigrations } = vi.hoisted(() => ({ + mockGenerateObjectSignaturesSend: vi.fn(), + mockTenantHasMigrations: vi.fn(), +})) + +vi.mock('@internal/database/migrations', () => ({ + tenantHasMigrations: mockTenantHasMigrations, +})) + +vi.mock('@storage/events', () => ({ + GenerateObjectSignatures: { + send: mockGenerateObjectSignaturesSend, + }, +})) + +describe('admin signature generation routes', () => { + beforeEach(() => { + vi.clearAllMocks() + vi.resetModules() + mockTenantHasMigrations.mockResolvedValue(true) + }) + + async function createApp(config: { pgQueueEnable: boolean }) { + const { mergeConfig } = await import('../../../config') + mergeConfig({ + adminApiKeys: 'test-admin-key', + ...config, + }) + + const fastify = (await import('fastify')).default + const { default: routes } = await import('./signature-generation') + + const app = fastify() + app.decorateRequest('sbReqId', undefined) + app.addHook('onRequest', (request, _reply, done) => { + request.sbReqId = + typeof request.headers['sb-request-id'] === 'string' + ? request.headers['sb-request-id'] + : undefined + done() + }) + app.register(routes, { prefix: '/tenants' }) + return app + } + + it('rejects signature generation requests when the queue is disabled', async () => { + const app = await createApp({ + pgQueueEnable: false, + }) + + try { + const response = await app.inject({ + method: 'POST', + url: '/tenants/project-a/storage/generate-signatures', + headers: { + apikey: 'test-admin-key', + }, + }) + + expect(response.statusCode).toBe(400) + expect(response.json()).toEqual({ message: 'Queue is not enabled' }) + expect(mockGenerateObjectSignaturesSend).not.toHaveBeenCalled() + } finally { + await app.close() + } + }) + + it('requires an admin api key', async () => { + const app = await createApp({ + pgQueueEnable: true, + }) + + try { + const response = await app.inject({ + method: 'POST', + url: '/tenants/project-a/storage/generate-signatures', + }) + + expect(response.statusCode).toBe(401) + expect(mockGenerateObjectSignaturesSend).not.toHaveBeenCalled() + } finally { + await app.close() + } + }) + + it('rejects invalid admin api keys', async () => { + const app = await createApp({ + pgQueueEnable: true, + }) + + try { + const response = await app.inject({ + method: 'POST', + url: '/tenants/project-a/storage/generate-signatures', + headers: { + apikey: 'wrong-admin-key', + }, + }) + + expect(response.statusCode).toBe(401) + expect(mockGenerateObjectSignaturesSend).not.toHaveBeenCalled() + } finally { + await app.close() + } + }) + + it('requires a bucket id when object names are provided', async () => { + const app = await createApp({ + pgQueueEnable: true, + }) + + try { + const response = await app.inject({ + method: 'POST', + url: '/tenants/project-a/storage/generate-signatures', + headers: { + apikey: 'test-admin-key', + 'content-type': 'application/json', + }, + payload: { + objectNames: ['a.txt'], + }, + }) + + expect(response.statusCode).toBe(400) + expect(response.json()).toEqual({ message: 'bucketId is required when objectNames is set' }) + expect(mockGenerateObjectSignaturesSend).not.toHaveBeenCalled() + } finally { + await app.close() + } + }) + + it('rejects empty bucket ids', async () => { + const app = await createApp({ + pgQueueEnable: true, + }) + + try { + const response = await app.inject({ + method: 'POST', + url: '/tenants/project-a/storage/generate-signatures', + headers: { + apikey: 'test-admin-key', + 'content-type': 'application/json', + }, + payload: { + bucketId: '', + }, + }) + + expect(response.statusCode).toBe(400) + expect(mockGenerateObjectSignaturesSend).not.toHaveBeenCalled() + } finally { + await app.close() + } + }) + + it('rejects empty object names', async () => { + const app = await createApp({ + pgQueueEnable: true, + }) + + try { + const response = await app.inject({ + method: 'POST', + url: '/tenants/project-a/storage/generate-signatures', + headers: { + apikey: 'test-admin-key', + 'content-type': 'application/json', + }, + payload: { + bucketId: 'bucket-a', + objectNames: [''], + }, + }) + + expect(response.statusCode).toBe(400) + expect(mockGenerateObjectSignaturesSend).not.toHaveBeenCalled() + } finally { + await app.close() + } + }) + + it('rejects too many object names', async () => { + const app = await createApp({ + pgQueueEnable: true, + }) + + try { + const response = await app.inject({ + method: 'POST', + url: '/tenants/project-a/storage/generate-signatures', + headers: { + apikey: 'test-admin-key', + 'content-type': 'application/json', + }, + payload: { + bucketId: 'bucket-a', + objectNames: Array.from({ length: 1001 }, (_, index) => `object-${index}`), + }, + }) + + expect(response.statusCode).toBe(400) + expect(mockGenerateObjectSignaturesSend).not.toHaveBeenCalled() + } finally { + await app.close() + } + }) + + it('rejects requests before the tenant has the object signature migration', async () => { + vi.mocked(tenantHasMigrations).mockResolvedValue(false) + + const app = await createApp({ + pgQueueEnable: true, + }) + + try { + const response = await app.inject({ + method: 'POST', + url: '/tenants/project-a/storage/generate-signatures', + headers: { + apikey: 'test-admin-key', + }, + }) + + expect(response.statusCode).toBe(400) + expect(response.json()).toEqual({ + message: + 'Tenant migrations must include add-objects-signature before generating signatures', + }) + expect(tenantHasMigrations).toHaveBeenCalledWith('project-a', 'add-objects-signature') + expect(mockGenerateObjectSignaturesSend).not.toHaveBeenCalled() + } finally { + await app.close() + } + }) + + it('rejects broad signature generation before the tenant has the signature index migration', async () => { + vi.mocked(tenantHasMigrations).mockImplementation(async (_tenantId, migration) => { + return migration === 'add-objects-signature' + }) + + const app = await createApp({ + pgQueueEnable: true, + }) + + try { + const response = await app.inject({ + method: 'POST', + url: '/tenants/project-a/storage/generate-signatures', + headers: { + apikey: 'test-admin-key', + 'content-type': 'application/json', + }, + payload: { + bucketId: 'bucket-a', + }, + }) + + expect(response.statusCode).toBe(400) + expect(response.json()).toEqual({ + message: + 'Tenant migrations must include add-objects-signature-index before broad signature generation', + }) + expect(tenantHasMigrations).toHaveBeenCalledWith('project-a', 'add-objects-signature') + expect(tenantHasMigrations).toHaveBeenCalledWith('project-a', 'add-objects-signature-index') + expect(mockGenerateObjectSignaturesSend).not.toHaveBeenCalled() + } finally { + await app.close() + } + }) + + it('allows scoped object signature generation before the signature index migration', async () => { + vi.mocked(tenantHasMigrations).mockImplementation(async (_tenantId, migration) => { + return migration === 'add-objects-signature' + }) + mockGenerateObjectSignaturesSend.mockResolvedValue('job-id-scoped-no-index') + + const app = await createApp({ + pgQueueEnable: true, + }) + + try { + const response = await app.inject({ + method: 'POST', + url: '/tenants/project-a/storage/generate-signatures', + headers: { + apikey: 'test-admin-key', + 'content-type': 'application/json', + }, + payload: { + bucketId: 'bucket-a', + objectNames: ['a.txt'], + }, + }) + + expect(response.statusCode).toBe(200) + expect(mockGenerateObjectSignaturesSend).toHaveBeenCalledWith({ + tenant: { ref: 'project-a', host: '' }, + bucketId: 'bucket-a', + objectNames: ['a.txt'], + force: false, + reqId: expect.any(String), + sbReqId: undefined, + }) + expect(tenantHasMigrations).toHaveBeenCalledWith('project-a', 'add-objects-signature') + expect(tenantHasMigrations).not.toHaveBeenCalledWith( + 'project-a', + 'add-objects-signature-index' + ) + } finally { + await app.close() + } + }) + + it('schedules tenant-wide signature generation from an empty json body', async () => { + mockGenerateObjectSignaturesSend.mockResolvedValue('job-id-empty-body') + + const app = await createApp({ + pgQueueEnable: true, + }) + + try { + const response = await app.inject({ + method: 'POST', + url: '/tenants/project-a/storage/generate-signatures', + headers: { + apikey: 'test-admin-key', + 'content-type': 'application/json', + }, + payload: '', + }) + + expect(response.statusCode).toBe(200) + expect(response.json()).toEqual({ + message: 'Object signature generation scheduled', + jobId: 'job-id-empty-body', + }) + expect(mockGenerateObjectSignaturesSend).toHaveBeenCalledWith({ + tenant: { ref: 'project-a', host: '' }, + bucketId: undefined, + objectNames: undefined, + force: false, + reqId: expect.any(String), + sbReqId: undefined, + }) + } finally { + await app.close() + } + }) + + it('schedules a scoped object signature generation job', async () => { + mockGenerateObjectSignaturesSend.mockResolvedValue('job-id-1') + + const app = await createApp({ + pgQueueEnable: true, + }) + + try { + const response = await app.inject({ + method: 'POST', + url: '/tenants/project-a/storage/generate-signatures', + headers: { + apikey: 'test-admin-key', + 'content-type': 'application/json', + 'sb-request-id': 'sb-req-123', + }, + payload: { + bucketId: 'bucket-a', + objectNames: ['a.txt', 'folder/b.txt'], + force: true, + }, + }) + + expect(response.statusCode).toBe(200) + expect(response.json()).toEqual({ + message: 'Object signature generation scheduled', + jobId: 'job-id-1', + }) + expect(mockGenerateObjectSignaturesSend).toHaveBeenCalledWith({ + tenant: { ref: 'project-a', host: '' }, + bucketId: 'bucket-a', + objectNames: ['a.txt', 'folder/b.txt'], + force: true, + reqId: expect.any(String), + sbReqId: 'sb-req-123', + }) + } finally { + await app.close() + } + }) +}) diff --git a/src/http/routes/admin/signature-generation.ts b/src/http/routes/admin/signature-generation.ts new file mode 100644 index 000000000..538f4957f --- /dev/null +++ b/src/http/routes/admin/signature-generation.ts @@ -0,0 +1,111 @@ +import { tenantHasMigrations } from '@internal/database/migrations' +import { GenerateObjectSignatures } from '@storage/events' +import { FastifyInstance, RequestGenericInterface } from 'fastify' +import { FromSchema } from 'json-schema-to-ts' +import { getConfig } from '../../../config' +import { registerApiKeyAuth } from '../../plugins/apikey' +import { registerJsonParserAllowingEmptyBody } from '../../plugins/empty-json-body' + +const { pgQueueEnable } = getConfig() + +const MAX_OBJECT_NAMES = 1000 + +const generateSignaturesSchema = { + params: { + type: 'object', + properties: { + tenantId: { type: 'string' }, + }, + required: ['tenantId'], + }, + body: { + type: 'object', + properties: { + bucketId: { type: 'string', minLength: 1 }, + objectNames: { + type: 'array', + items: { type: 'string', minLength: 1 }, + minItems: 1, + maxItems: MAX_OBJECT_NAMES, + }, + force: { type: 'boolean', default: false }, + }, + additionalProperties: false, + }, +} as const + +interface GenerateSignaturesRequest extends RequestGenericInterface { + Params: FromSchema + Body: { + bucketId?: string + objectNames?: string[] + force?: boolean + } +} + +export default async function routes(fastify: FastifyInstance) { + registerApiKeyAuth(fastify) + + fastify.register(async (f) => { + registerJsonParserAllowingEmptyBody(f) + + f.post( + '/:tenantId/storage/generate-signatures', + { + schema: { ...generateSignaturesSchema, tags: ['object'] }, + preValidation: async (req) => { + req.body = req.body ?? {} + }, + }, + async (req, reply) => { + if (!pgQueueEnable) { + return reply.status(400).send({ message: 'Queue is not enabled' }) + } + + if (req.body?.objectNames && !req.body.bucketId) { + return reply.status(400).send({ message: 'bucketId is required when objectNames is set' }) + } + + const hasSignatureMigration = await tenantHasMigrations( + req.params.tenantId, + 'add-objects-signature' + ) + + if (!hasSignatureMigration) { + return reply.status(400).send({ + message: + 'Tenant migrations must include add-objects-signature before generating signatures', + }) + } + + if (!req.body?.objectNames) { + const hasSignatureIndexMigration = await tenantHasMigrations( + req.params.tenantId, + 'add-objects-signature-index' + ) + + if (!hasSignatureIndexMigration) { + return reply.status(400).send({ + message: + 'Tenant migrations must include add-objects-signature-index before broad signature generation', + }) + } + } + + const jobId = await GenerateObjectSignatures.send({ + tenant: { ref: req.params.tenantId, host: '' }, + bucketId: req.body?.bucketId, + objectNames: req.body?.objectNames, + force: req.body?.force, + reqId: req.id, + sbReqId: req.sbReqId, + }) + + return reply.send({ + message: 'Object signature generation scheduled', + jobId, + }) + } + ) + }) +} diff --git a/src/internal/database/migrations/types.ts b/src/internal/database/migrations/types.ts index 9e687e449..4cd952923 100644 --- a/src/internal/database/migrations/types.ts +++ b/src/internal/database/migrations/types.ts @@ -60,4 +60,6 @@ export const DBMigration = { 'operation-ergonomics': 58, 'drop-unused-functions': 59, 'optimize-existing-functions-again': 60, + 'add-objects-signature': 61, + 'add-objects-signature-index': 62, } as const diff --git a/src/internal/monitoring/logger.ts b/src/internal/monitoring/logger.ts index 6029ed4b9..ebeb44eab 100644 --- a/src/internal/monitoring/logger.ts +++ b/src/internal/monitoring/logger.ts @@ -104,12 +104,14 @@ export interface EventLog extends RequestLogContext { type: 'event' event: string payload: string + metadata?: string objectPath: string resources?: string[] } interface ErrorLog extends RequestLogContext { type: string + event?: string error?: Error | unknown metadata?: string } @@ -121,7 +123,7 @@ interface InfoLog extends RequestLogContext { export const logSchema = { info: (logger: FastifyBaseLogger, message: string, log: InfoLog) => logger.info(log, message), - warning: (logger: FastifyBaseLogger, message: string, log: InfoLog | ErrorLog) => + warning: (logger: FastifyBaseLogger, message: string, log: InfoLog | ErrorLog | EventLog) => logger.warn(log, message), request: (logger: FastifyBaseLogger, message: string, log: RequestLog) => { if (!log.res) { diff --git a/src/storage/database/adapter.ts b/src/storage/database/adapter.ts index 251ae8835..98e46db3d 100644 --- a/src/storage/database/adapter.ts +++ b/src/storage/database/adapter.ts @@ -28,6 +28,25 @@ export interface FindObjectFilters { dontErrorOnEmpty?: boolean } +export interface ObjectSignatureCursor { + bucketId: string + objectName: string +} + +export interface ObjectSignatureRow { + bucket_id: string + name: string + version?: string | null +} + +export interface ListObjectsForSignatureGenerationOptions { + bucketId?: string + objectNames?: string[] + force?: boolean + cursor?: ObjectSignatureCursor + limit: number +} + export interface TransactionOptions { isolation?: string retry?: number @@ -185,6 +204,17 @@ export interface Database { searchObjects(bucketId: string, prefix: string, options: SearchObjectOption): Promise + listObjectsForSignatureGeneration( + options: ListObjectsForSignatureGenerationOptions + ): Promise + + updateObjectSignature( + bucketId: string, + objectName: string, + version: string | undefined, + signature: Buffer + ): Promise + healthcheck(): Promise destroyConnection(): Promise diff --git a/src/storage/database/knex.test.ts b/src/storage/database/knex.test.ts index 04cd3ada3..98bd29610 100644 --- a/src/storage/database/knex.test.ts +++ b/src/storage/database/knex.test.ts @@ -1,6 +1,13 @@ import { TenantConnection } from '../../internal/database/connection' +import { DBMigration } from '../../internal/database/migrations' import { escapeLike, StorageKnexDB } from './knex' +interface QueryCall { + method: string + args: unknown[] + nested?: QueryCall[] +} + describe('escapeLike', () => { test('escapes SQL wildcard characters', () => { expect(escapeLike('%_abc')).toBe('\\%\\_abc') @@ -31,6 +38,88 @@ function createStorageKnexTestHarness() { return { db, connection, transaction } } +function createQueryBuilder(result: unknown) { + const calls: QueryCall[] = [] + const builder = { + select: vi.fn((...args: unknown[]) => { + calls.push({ method: 'select', args }) + return builder + }), + orderBy: vi.fn((...args: unknown[]) => { + calls.push({ method: 'orderBy', args }) + return builder + }), + limit: vi.fn((...args: unknown[]) => { + calls.push({ method: 'limit', args }) + return builder + }), + where: vi.fn((...args: unknown[]) => { + calls.push({ method: 'where', args }) + return builder + }), + whereIn: vi.fn((...args: unknown[]) => { + calls.push({ method: 'whereIn', args }) + return builder + }), + whereNull: vi.fn((...args: unknown[]) => { + calls.push({ method: 'whereNull', args }) + return builder + }), + andWhere: vi.fn((...args: unknown[]) => { + recordNestedCall(calls, 'andWhere', args) + return builder + }), + orWhere: vi.fn((...args: unknown[]) => { + recordNestedCall(calls, 'orWhere', args) + return builder + }), + update: vi.fn((...args: unknown[]) => { + calls.push({ method: 'update', args }) + return builder + }), + abortOnSignal: vi.fn((...args: unknown[]) => { + calls.push({ method: 'abortOnSignal', args }) + return Promise.resolve(result) + }), + } + + return { builder, calls } +} + +function recordNestedCall(calls: QueryCall[], method: string, args: unknown[]) { + if (typeof args[0] !== 'function') { + calls.push({ method, args }) + return + } + + const nested = createQueryBuilder(undefined) + ;(args[0] as (builder: unknown) => void)(nested.builder) + calls.push({ method, args: [], nested: nested.calls }) +} + +function createStorageKnexQueryHarness(result: unknown) { + const query = createQueryBuilder(result) + const transaction = { + from: vi.fn(() => query.builder), + once: vi.fn(), + commit: vi.fn().mockResolvedValue(undefined), + rollback: vi.fn().mockResolvedValue(undefined), + } + const connection = { + role: 'anon', + transactionProvider: vi.fn().mockReturnValue(async () => transaction), + setScope: vi.fn().mockResolvedValue(undefined), + getAbortSignal: vi.fn().mockReturnValue(undefined), + } as unknown as TenantConnection + + const db = new StorageKnexDB(connection, { + tenantId: 'test-tenant', + host: 'localhost', + }) + + return { db, query, transaction } +} + describe('StorageKnexDB.testPermission', () => { it('returns the callback result after rolling back the transaction', async () => { const { db, connection, transaction } = createStorageKnexTestHarness() @@ -62,3 +151,142 @@ describe('StorageKnexDB.testPermission', () => { expect(transaction.commit).not.toHaveBeenCalled() }) }) + +describe('StorageKnexDB migration column normalization', () => { + class TestStorageKnexDB extends StorageKnexDB { + normalizeForTest>(columns: T): T { + return this.normalizeColumns(columns) + } + } + + const makeDB = (latestMigration?: keyof typeof DBMigration) => + new TestStorageKnexDB({ role: 'anon' } as TenantConnection, { + tenantId: 'test-tenant', + host: 'localhost', + latestMigration, + }) + + it('strips signature columns before the signature migration', () => { + const db = makeDB('optimize-existing-functions-again') + + expect( + db.normalizeForTest({ + metadata: {}, + user_metadata: {}, + signature: null, + }) + ).toEqual({ + metadata: {}, + user_metadata: {}, + }) + expect(db.normalizeForTest('metadata,user_metadata,signature')).toBe('metadata,user_metadata') + }) + + it('keeps signature columns after the signature migration', () => { + const db = makeDB('add-objects-signature') + + expect( + db.normalizeForTest({ + metadata: {}, + user_metadata: {}, + signature: null, + }) + ).toEqual({ + metadata: {}, + user_metadata: {}, + signature: null, + }) + expect(db.normalizeForTest('metadata,user_metadata,signature')).toBe( + 'metadata,user_metadata,signature' + ) + }) +}) + +describe('StorageKnexDB object signature methods', () => { + it('applies lexicographic cursor filters when listing objects for signature generation', async () => { + const rows = [{ bucket_id: 'bucket-b', name: 'z.txt', version: 'v2' }] + const { db, query, transaction } = createStorageKnexQueryHarness(rows) + + await expect( + db.listObjectsForSignatureGeneration({ + bucketId: 'bucket-a', + objectNames: ['a.txt', 'z.txt'], + cursor: { bucketId: 'bucket-a', objectName: 'm.txt' }, + force: false, + limit: 25, + }) + ).resolves.toBe(rows) + + expect(transaction.from).toHaveBeenCalledWith('objects') + expect(query.calls).toEqual( + expect.arrayContaining([ + { method: 'select', args: ['bucket_id', 'name', 'version'] }, + { method: 'orderBy', args: ['bucket_id', 'asc'] }, + { method: 'orderBy', args: ['name', 'asc'] }, + { method: 'limit', args: [25] }, + { method: 'where', args: ['bucket_id', 'bucket-a'] }, + { method: 'whereIn', args: ['name', ['a.txt', 'z.txt']] }, + { method: 'whereNull', args: ['signature'] }, + ]) + ) + expect(query.calls.find((call) => call.method === 'andWhere')?.nested).toEqual([ + { method: 'where', args: ['bucket_id', '>', 'bucket-a'] }, + { + method: 'orWhere', + args: [], + nested: [ + { method: 'where', args: ['bucket_id', 'bucket-a'] }, + { method: 'andWhere', args: ['name', '>', 'm.txt'] }, + ], + }, + ]) + }) + + it('does not filter out already signed objects when force is true', async () => { + const rows = [{ bucket_id: 'bucket-a', name: 'a.txt', version: 'v1' }] + const { db, query } = createStorageKnexQueryHarness(rows) + + await expect( + db.listObjectsForSignatureGeneration({ + force: true, + limit: 25, + }) + ).resolves.toBe(rows) + + expect(query.calls).not.toContainEqual({ method: 'whereNull', args: ['signature'] }) + }) + + it('matches null object versions when updating a signature without a version', async () => { + const signature = Buffer.from('a'.repeat(64), 'hex') + const { db, query, transaction } = createStorageKnexQueryHarness(1) + + await expect(db.updateObjectSignature('bucket-a', 'a.txt', undefined, signature)).resolves.toBe( + true + ) + + expect(transaction.from).toHaveBeenCalledWith('objects') + expect(query.calls).toEqual([ + { method: 'where', args: ['bucket_id', 'bucket-a'] }, + { method: 'where', args: ['name', 'a.txt'] }, + { method: 'whereNull', args: ['version'] }, + { method: 'update', args: [{ signature }] }, + { method: 'abortOnSignal', args: [undefined] }, + ]) + }) + + it('matches explicit object versions when updating a signature with a version', async () => { + const signature = Buffer.from('b'.repeat(64), 'hex') + const { db, query, transaction } = createStorageKnexQueryHarness(1) + + await expect(db.updateObjectSignature('bucket-a', 'a.txt', 'v1', signature)).resolves.toBe(true) + + expect(transaction.from).toHaveBeenCalledWith('objects') + expect(query.calls).toEqual([ + { method: 'where', args: ['bucket_id', 'bucket-a'] }, + { method: 'where', args: ['name', 'a.txt'] }, + { method: 'where', args: ['version', 'v1'] }, + { method: 'update', args: [{ signature }] }, + { method: 'abortOnSignal', args: [undefined] }, + ]) + }) +}) diff --git a/src/storage/database/knex.ts b/src/storage/database/knex.ts index 697355759..33c142841 100644 --- a/src/storage/database/knex.ts +++ b/src/storage/database/knex.ts @@ -22,6 +22,8 @@ import { FindBucketFilters, FindObjectFilters, ListBucketOptions, + ListObjectsForSignatureGenerationOptions, + ObjectSignatureRow, SearchObjectOption, TransactionOptions, } from './adapter' @@ -612,6 +614,7 @@ export class StorageKnexDB implements Database { version: data.version, owner: isUuid(data.owner || '') ? data.owner : undefined, owner_id: data.owner, + signature: null, }) ) .returning('*') @@ -913,6 +916,63 @@ export class StorageKnexDB implements Database { }) } + async listObjectsForSignatureGeneration( + options: ListObjectsForSignatureGenerationOptions + ): Promise { + return this.runQuery('ListObjectsForSignatureGeneration', async (knex, signal) => { + const query = knex + .from('objects') + .select('bucket_id', 'name', 'version') + .orderBy('bucket_id', 'asc') + .orderBy('name', 'asc') + .limit(options.limit) + + if (options.bucketId) { + query.where('bucket_id', options.bucketId) + } + + if (options.objectNames && options.objectNames.length > 0) { + query.whereIn('name', options.objectNames) + } + + if (!options.force) { + query.whereNull('signature') + } + + if (options.cursor) { + query.andWhere((builder) => { + builder.where('bucket_id', '>', options.cursor!.bucketId).orWhere((nested) => { + nested + .where('bucket_id', options.cursor!.bucketId) + .andWhere('name', '>', options.cursor!.objectName) + }) + }) + } + + return query.abortOnSignal(signal) + }) + } + + async updateObjectSignature( + bucketId: string, + objectName: string, + version: string | undefined, + signature: Buffer + ) { + return this.runQuery('UpdateObjectSignature', async (knex, signal) => { + const query = knex.from('objects').where('bucket_id', bucketId).where('name', objectName) + + if (version === undefined) { + query.whereNull('version') + } else { + query.where('version', version) + } + + const updated = await query.update({ signature }).abortOnSignal(signal) + return updated > 0 + }) + } + async createMultipartUpload( uploadId: string, bucketId: string, @@ -1051,10 +1111,13 @@ export class StorageKnexDB implements Database { return columns } - const rules = [{ migration: 'custom-metadata', newColumns: ['user_metadata'] }] + const rules: { migration: keyof typeof DBMigration; newColumns: string[] }[] = [ + { migration: 'custom-metadata', newColumns: ['user_metadata'] }, + { migration: 'add-objects-signature', newColumns: ['signature'] }, + ] rules.forEach((rule) => { - if (DBMigration[latestMigration] < DBMigration[rule.migration as keyof typeof DBMigration]) { + if (DBMigration[latestMigration] < DBMigration[rule.migration]) { const value = rule.newColumns if (typeof columns === 'string') { diff --git a/src/storage/events/index.ts b/src/storage/events/index.ts index dc83153c0..7bab56515 100644 --- a/src/storage/events/index.ts +++ b/src/storage/events/index.ts @@ -10,6 +10,7 @@ export * from './lifecycle/webhook' export * from './migrations/reset-migrations' export * from './migrations/run-migrations' export * from './objects/backup-object' +export * from './objects/generate-object-signature' export * from './objects/object-admin-delete' export * from './objects/object-admin-delete-all-before' export * from './pgboss/move-jobs' diff --git a/src/storage/events/objects/generate-object-signature.test.ts b/src/storage/events/objects/generate-object-signature.test.ts new file mode 100644 index 000000000..d5d1cf57b --- /dev/null +++ b/src/storage/events/objects/generate-object-signature.test.ts @@ -0,0 +1,881 @@ +import { createHash } from 'node:crypto' +import { Readable } from 'node:stream' +import { vi } from 'vitest' + +const { + mockCreateStorage, + mockDestroyConnection, + mockGetKeyLocation, + mockGetObject, + mockFindObject, + mockListObjectsForSignatureGeneration, + mockLogError, + mockLogEvent, + mockLogWarning, + mockUpdateObjectSignature, +} = vi.hoisted(() => ({ + mockCreateStorage: vi.fn(), + mockDestroyConnection: vi.fn(), + mockGetKeyLocation: vi.fn(), + mockGetObject: vi.fn(), + mockFindObject: vi.fn(), + mockListObjectsForSignatureGeneration: vi.fn(), + mockLogError: vi.fn(), + mockLogEvent: vi.fn(), + mockLogWarning: vi.fn(), + mockUpdateObjectSignature: vi.fn(), +})) + +vi.mock('../base-event', () => ({ + BaseEvent: class { + payload: unknown + + constructor(payload: unknown) { + this.payload = payload + } + + static send = vi.fn() + static batchSend = vi.fn() + + static async createStorage(payload: unknown) { + return mockCreateStorage(payload) + } + + static getQueueName(this: { queueName: string }) { + return this.queueName + } + }, +})) + +vi.mock('@internal/monitoring', () => ({ + logger: {}, + logSchema: { + event: mockLogEvent, + error: mockLogError, + info: vi.fn(), + warning: mockLogWarning, + }, +})) + +vi.mock('../../../config', () => ({ + getConfig: () => ({ + storageS3Bucket: 'stub-storage-bucket', + }), +})) + +function makeStorage() { + return { + backend: { + getObject: mockGetObject, + }, + db: { + destroyConnection: mockDestroyConnection, + findObject: mockFindObject, + listObjectsForSignatureGeneration: mockListObjectsForSignatureGeneration, + updateObjectSignature: mockUpdateObjectSignature, + }, + location: { + getKeyLocation: mockGetKeyLocation, + }, + } +} + +function makeJob(data: Record) { + return { + id: 'job-1', + data: { + tenant: { ref: 'tenant-a', host: '' }, + reqId: 'req-123', + sbReqId: 'sb-req-123', + ...data, + }, + } +} + +function makeObjectResponse(body: unknown) { + return { + body, + httpStatusCode: 200, + metadata: { + cacheControl: 'no-cache', + contentLength: Buffer.isBuffer(body) ? body.length : 0, + eTag: 'etag', + mimetype: 'text/plain', + size: Buffer.isBuffer(body) ? body.length : 0, + }, + } +} + +describe('object signature generation events', () => { + beforeEach(() => { + vi.clearAllMocks() + mockCreateStorage.mockResolvedValue(makeStorage()) + mockDestroyConnection.mockResolvedValue(undefined) + mockGetKeyLocation.mockImplementation( + (locator: { tenantId: string; bucketId: string; objectName: string }) => + `${locator.tenantId}/${locator.bucketId}/${locator.objectName}` + ) + mockFindObject.mockResolvedValue({ version: 'v1' }) + }) + + it('fans out one signature generation job per listed object', async () => { + const { GenerateObjectSignature, GenerateObjectSignatures } = await import( + './generate-object-signature' + ) + const generateBatchSend = vi + .spyOn(GenerateObjectSignature, 'batchSend') + .mockResolvedValue(undefined) + + mockListObjectsForSignatureGeneration.mockResolvedValue([ + { bucket_id: 'bucket-a', name: 'a.txt', version: 'v1' }, + { bucket_id: 'bucket-a', name: 'b.txt', version: 'v2' }, + ]) + + await GenerateObjectSignatures.handle( + makeJob({ + bucketId: 'bucket-a', + force: false, + batchSize: 10, + }) as never + ) + + expect(mockListObjectsForSignatureGeneration).toHaveBeenCalledWith({ + bucketId: 'bucket-a', + cursor: undefined, + force: false, + limit: 10, + objectNames: undefined, + }) + expect(generateBatchSend).toHaveBeenCalledTimes(1) + expect(generateBatchSend.mock.calls[0][0].map((message) => message.payload)).toEqual([ + { + tenant: { ref: 'tenant-a', host: '' }, + bucketId: 'bucket-a', + objectName: 'a.txt', + version: 'v1', + reqId: 'req-123', + sbReqId: 'sb-req-123', + }, + { + tenant: { ref: 'tenant-a', host: '' }, + bucketId: 'bucket-a', + objectName: 'b.txt', + version: 'v2', + reqId: 'req-123', + sbReqId: 'sb-req-123', + }, + ]) + expect(mockDestroyConnection).toHaveBeenCalledTimes(1) + }) + + it('reschedules the coordinator when the batch is full', async () => { + const { GenerateObjectSignature, GenerateObjectSignatures } = await import( + './generate-object-signature' + ) + vi.spyOn(GenerateObjectSignature, 'batchSend').mockResolvedValue(undefined) + const generateSend = vi.spyOn(GenerateObjectSignatures, 'send').mockResolvedValue(undefined) + + mockListObjectsForSignatureGeneration.mockResolvedValue([ + { bucket_id: 'bucket-a', name: 'a.txt', version: 'v1' }, + { bucket_id: 'bucket-b', name: 'z.txt', version: 'v2' }, + ]) + + await GenerateObjectSignatures.handle( + makeJob({ + force: true, + batchSize: 2, + }) as never + ) + + expect(generateSend).toHaveBeenCalledWith({ + tenant: { ref: 'tenant-a', host: '' }, + cursor: { bucketId: 'bucket-b', objectName: 'z.txt' }, + force: true, + reqId: 'req-123', + sbReqId: 'sb-req-123', + batchSize: 2, + }) + }) + + it('logs coordinator throughput after processing a batch', async () => { + const { GenerateObjectSignature, GenerateObjectSignatures } = await import( + './generate-object-signature' + ) + vi.spyOn(GenerateObjectSignature, 'batchSend').mockResolvedValue(undefined) + vi.spyOn(GenerateObjectSignatures, 'send').mockResolvedValue(undefined) + + mockListObjectsForSignatureGeneration.mockResolvedValue([ + { bucket_id: 'bucket-a', name: 'a.txt', version: 'v1' }, + { bucket_id: 'bucket-b', name: 'z.txt', version: 'v2' }, + ]) + + await GenerateObjectSignatures.handle( + makeJob({ + bucketId: 'bucket-a', + objectNames: ['a.txt', 'z.txt'], + force: true, + batchSize: 2, + }) as never + ) + + expect(mockLogEvent).toHaveBeenCalledWith( + expect.anything(), + '[Admin]: GenerateObjectSignatures tenant-a processed 2 objects', + expect.objectContaining({ + event: 'GenerateObjectSignatures', + jobId: 'job-1', + objectPath: 'tenant-a', + tenantId: 'tenant-a', + project: 'tenant-a', + reqId: 'req-123', + sbReqId: 'sb-req-123', + }) + ) + + const [, , log] = mockLogEvent.mock.calls[0] + expect(JSON.parse(log.metadata)).toEqual({ + batchSize: 2, + bucketId: 'bucket-a', + cursor: null, + force: true, + objectNamesCount: 2, + objectsCount: 2, + rescheduled: true, + }) + }) + + it.each([ + { batchSize: Number.NaN, expectedLimit: 500 }, + { batchSize: 0, expectedLimit: 500 }, + { batchSize: -10, expectedLimit: 500 }, + { batchSize: 1200, expectedLimit: 1000 }, + ])('normalizes coordinator batch size $batchSize to $expectedLimit in query and reschedule payload', async (testCase) => { + const { GenerateObjectSignature, GenerateObjectSignatures } = await import( + './generate-object-signature' + ) + vi.spyOn(GenerateObjectSignature, 'batchSend').mockResolvedValue(undefined) + const generateSend = vi.spyOn(GenerateObjectSignatures, 'send').mockResolvedValue(undefined) + mockListObjectsForSignatureGeneration.mockResolvedValue( + Array.from({ length: testCase.expectedLimit }, (_, index) => ({ + bucket_id: 'bucket-a', + name: `object-${index}`, + version: `v${index}`, + })) + ) + + await GenerateObjectSignatures.handle( + makeJob({ + batchSize: testCase.batchSize, + }) as never + ) + + expect(mockListObjectsForSignatureGeneration).toHaveBeenCalledWith( + expect.objectContaining({ + limit: testCase.expectedLimit, + }) + ) + expect(generateSend).toHaveBeenCalledWith( + expect.objectContaining({ + batchSize: testCase.expectedLimit, + cursor: { bucketId: 'bucket-a', objectName: `object-${testCase.expectedLimit - 1}` }, + }) + ) + }) + + it('serializes coordinator fanout per tenant', async () => { + const { GenerateObjectSignatures } = await import('./generate-object-signature') + + expect(GenerateObjectSignatures.getQueueOptions()).toEqual({ + name: 'object:signatures:generate', + policy: 'singleton', + }) + expect( + GenerateObjectSignatures.getSendOptions({ + tenant: { ref: 'tenant-a', host: '' }, + bucketId: 'bucket-a', + objectNames: ['a.txt'], + }) + ).toEqual( + expect.objectContaining({ + singletonKey: 'tenant-a', + priority: 5, + retryLimit: 5, + retryDelay: 5, + }) + ) + }) + + it('limits per-object signature hashing worker concurrency', async () => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + + expect(GenerateObjectSignature.getWorkerOptions()).toEqual( + expect.objectContaining({ + concurrentTaskCount: 8, + }) + ) + }) + + it('deduplicates queued per-object signature jobs by singleton key', async () => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + + expect(GenerateObjectSignature.getQueueOptions()).toEqual({ + name: 'object:signature:generate', + policy: 'exactly_once', + }) + }) + + it('hashes backend bytes and updates the matching object version', async () => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + const payload = Buffer.from('hello world') + const expectedHex = createHash('sha256').update(payload).digest('hex') + + mockGetObject.mockResolvedValue({ + body: Readable.from([payload.subarray(0, 5), payload.subarray(5)]), + httpStatusCode: 200, + metadata: { + cacheControl: 'no-cache', + contentLength: payload.length, + eTag: 'etag', + mimetype: 'text/plain', + size: payload.length, + }, + }) + mockUpdateObjectSignature.mockResolvedValue(true) + + await GenerateObjectSignature.handle( + makeJob({ + bucketId: 'bucket-a', + objectName: 'a.txt', + version: 'v1', + }) as never + ) + + expect(mockGetObject).toHaveBeenCalledWith( + 'stub-storage-bucket', + 'tenant-a/bucket-a/a.txt', + 'v1', + undefined, + undefined + ) + expect(mockUpdateObjectSignature).toHaveBeenCalledWith( + 'bucket-a', + 'a.txt', + 'v1', + Buffer.from(expectedHex, 'hex') + ) + expect(mockLogEvent).toHaveBeenCalledWith( + expect.anything(), + '[Admin]: GenerateObjectSignature tenant-a/bucket-a/a.txt', + expect.objectContaining({ + event: 'GenerateObjectSignature', + objectPath: 'tenant-a/bucket-a/a.txt', + tenantId: 'tenant-a', + project: 'tenant-a', + metadata: JSON.stringify({ version: 'v1' }), + }) + ) + expect(mockDestroyConnection).toHaveBeenCalledTimes(1) + }) + + it('passes the queue abort signal to the backend object read', async () => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + const abortController = new AbortController() + + mockGetObject.mockResolvedValue(makeObjectResponse(Buffer.from('abortable object'))) + mockUpdateObjectSignature.mockResolvedValue(true) + + await GenerateObjectSignature.handle( + makeJob({ + bucketId: 'bucket-a', + objectName: 'a.txt', + version: 'v1', + }) as never, + { signal: abortController.signal } + ) + + expect(mockGetObject).toHaveBeenCalledWith( + 'stub-storage-bucket', + 'tenant-a/bucket-a/a.txt', + 'v1', + undefined, + abortController.signal + ) + }) + + it('stops streaming signature hashing when the queue abort signal fires', async () => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + const abortController = new AbortController() + const abortError = new DOMException('worker stopped', 'AbortError') + const body = Readable.from( + (async function* () { + yield Buffer.from('first chunk') + abortController.abort(abortError) + yield Buffer.from('second chunk') + })() + ) + + mockGetObject.mockResolvedValue(makeObjectResponse(body)) + + await expect( + GenerateObjectSignature.handle( + makeJob({ + bucketId: 'bucket-a', + objectName: 'a.txt', + version: 'v1', + }) as never, + { signal: abortController.signal } + ) + ).rejects.toBe(abortError) + + expect(mockUpdateObjectSignature).not.toHaveBeenCalled() + }) + + it('hashes and updates objects without a version', async () => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + const payload = Buffer.from('versionless object') + const expectedHex = createHash('sha256').update(payload).digest('hex') + + mockFindObject.mockResolvedValue({ version: null }) + mockGetObject.mockResolvedValue(makeObjectResponse(payload)) + mockUpdateObjectSignature.mockResolvedValue(true) + + await GenerateObjectSignature.handle( + makeJob({ + bucketId: 'bucket-a', + objectName: 'a.txt', + }) as never + ) + + expect(mockGetObject).toHaveBeenCalledWith( + 'stub-storage-bucket', + 'tenant-a/bucket-a/a.txt', + undefined, + undefined, + undefined + ) + expect(mockUpdateObjectSignature).toHaveBeenCalledWith( + 'bucket-a', + 'a.txt', + undefined, + Buffer.from(expectedHex, 'hex') + ) + }) + + it.each([ + { label: 'updated', currentObject: { version: 'v2' } }, + { label: 'deleted', currentObject: undefined }, + ])('skips $label stale object rows before reading backend bytes', async (testCase) => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + + mockFindObject.mockResolvedValue(testCase.currentObject) + + await expect( + GenerateObjectSignature.handle( + makeJob({ + bucketId: 'bucket-a', + objectName: 'a.txt', + version: 'v1', + }) as never + ) + ).resolves.toBeUndefined() + + expect(mockFindObject).toHaveBeenCalledWith('bucket-a', 'a.txt', 'version', { + dontErrorOnEmpty: true, + }) + expect(mockGetObject).not.toHaveBeenCalled() + expect(mockUpdateObjectSignature).not.toHaveBeenCalled() + expect(mockLogEvent).not.toHaveBeenCalled() + expect(mockLogError).not.toHaveBeenCalled() + const [, message, log] = mockLogWarning.mock.calls[0] + expect(message).toBe( + '[Admin]: GenerateObjectSignature tenant-a/bucket-a/a.txt skipped stale object' + ) + expect(log).toEqual( + expect.objectContaining({ + event: 'GenerateObjectSignature', + objectPath: 'tenant-a/bucket-a/a.txt', + }) + ) + expect(JSON.parse(log.metadata)).toEqual({ + bucketId: 'bucket-a', + objectName: 'a.txt', + objectPath: 'tenant-a/bucket-a/a.txt', + version: 'v1', + }) + }) + + it('treats backend read failures as stale when the row changes after the pre-read check', async () => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + const error = new Error('backend object version was removed') + + mockFindObject.mockResolvedValueOnce({ version: 'v1' }).mockResolvedValueOnce({ version: 'v2' }) + mockGetObject.mockRejectedValue(error) + + await expect( + GenerateObjectSignature.handle( + makeJob({ + bucketId: 'bucket-a', + objectName: 'a.txt', + version: 'v1', + }) as never + ) + ).resolves.toBeUndefined() + + expect(mockFindObject).toHaveBeenNthCalledWith(1, 'bucket-a', 'a.txt', 'version', { + dontErrorOnEmpty: true, + }) + expect(mockFindObject).toHaveBeenNthCalledWith(2, 'bucket-a', 'a.txt', 'version', { + dontErrorOnEmpty: true, + }) + expect(mockUpdateObjectSignature).not.toHaveBeenCalled() + expect(mockLogError).not.toHaveBeenCalled() + const [, message] = mockLogWarning.mock.calls[0] + expect(message).toBe( + '[Admin]: GenerateObjectSignature tenant-a/bucket-a/a.txt skipped stale object' + ) + }) + + it('skips stale object rows when no object row is updated after hashing', async () => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + + mockGetObject.mockResolvedValue(makeObjectResponse(Buffer.from('stale object'))) + mockUpdateObjectSignature.mockResolvedValue(false) + + await expect( + GenerateObjectSignature.handle( + makeJob({ + bucketId: 'bucket-a', + objectName: 'a.txt', + version: 'v1', + }) as never + ) + ).resolves.toBeUndefined() + + expect(mockLogEvent).not.toHaveBeenCalled() + expect(mockLogError).not.toHaveBeenCalled() + const [, message, log] = mockLogWarning.mock.calls[0] + expect(message).toBe( + '[Admin]: GenerateObjectSignature tenant-a/bucket-a/a.txt skipped stale object' + ) + expect(log).toEqual( + expect.objectContaining({ + event: 'GenerateObjectSignature', + objectPath: 'tenant-a/bucket-a/a.txt', + }) + ) + expect(JSON.parse(log.metadata)).toEqual({ + bucketId: 'bucket-a', + objectName: 'a.txt', + objectPath: 'tenant-a/bucket-a/a.txt', + version: 'v1', + }) + }) + + it.each([ + { + label: 'Buffer', + body: Buffer.from('buffer body'), + expectedPayload: Buffer.from('buffer body'), + }, + { + label: 'Blob', + body: new Blob([Buffer.from('blob body')]), + expectedPayload: Buffer.from('blob body'), + }, + { + label: 'web ReadableStream', + body: new ReadableStream({ + start(controller) { + controller.enqueue(Buffer.from('web stream body')) + controller.close() + }, + }), + expectedPayload: Buffer.from('web stream body'), + }, + ])('hashes $label object bodies', async (testCase) => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + const expectedHex = createHash('sha256').update(testCase.expectedPayload).digest('hex') + + mockGetObject.mockResolvedValue(makeObjectResponse(testCase.body)) + mockUpdateObjectSignature.mockResolvedValue(true) + + await GenerateObjectSignature.handle( + makeJob({ + bucketId: 'bucket-a', + objectName: 'a.txt', + version: 'v1', + }) as never + ) + + expect(mockUpdateObjectSignature).toHaveBeenCalledWith( + 'bucket-a', + 'a.txt', + 'v1', + Buffer.from(expectedHex, 'hex') + ) + }) + + it('streams Blob object bodies instead of materializing them with arrayBuffer', async () => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + const body = new Blob([Buffer.from('streamed blob body')]) + const expectedHex = createHash('sha256').update(Buffer.from('streamed blob body')).digest('hex') + const arrayBufferSpy = vi + .spyOn(Blob.prototype, 'arrayBuffer') + .mockRejectedValue(new Error('arrayBuffer should not be called')) + + try { + mockGetObject.mockResolvedValue(makeObjectResponse(body)) + mockUpdateObjectSignature.mockResolvedValue(true) + + await GenerateObjectSignature.handle( + makeJob({ + bucketId: 'bucket-a', + objectName: 'a.txt', + version: 'v1', + }) as never + ) + } finally { + arrayBufferSpy.mockRestore() + } + + expect(mockUpdateObjectSignature).toHaveBeenCalledWith( + 'bucket-a', + 'a.txt', + 'v1', + Buffer.from(expectedHex, 'hex') + ) + }) + + it('does not stringify undefined versions in the singleton key', async () => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + const singletonKey = createHash('sha256') + .update(JSON.stringify(['tenant-a', 'bucket-a', 'a.txt', null])) + .digest('hex') + + expect( + GenerateObjectSignature.getSendOptions({ + tenant: { ref: 'tenant-a', host: '' }, + bucketId: 'bucket-a', + objectName: 'a.txt', + }) + ).toEqual( + expect.objectContaining({ + expireInMinutes: 120, + singletonKey, + }) + ) + }) + + it('uses tuple encoding for singleton keys before hashing', async () => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + const tupleEncodedKey = createHash('sha256') + .update(JSON.stringify(['tenant-a', 'bucket-a', 'a/b.txt', 'v1'])) + .digest('hex') + + expect( + GenerateObjectSignature.getSendOptions({ + tenant: { ref: 'tenant-a', host: '' }, + bucketId: 'bucket-a', + objectName: 'a/b.txt', + version: 'v1', + }) + ).toEqual(expect.objectContaining({ singletonKey: tupleEncodedKey })) + }) + + it('rejects string chunks instead of hashing UTF-8 encoded text', async () => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + + mockGetObject.mockResolvedValue({ + body: Readable.from(['hello world']), + httpStatusCode: 200, + metadata: { + cacheControl: 'no-cache', + contentLength: 11, + eTag: 'etag', + mimetype: 'text/plain', + size: 11, + }, + }) + + await expect( + GenerateObjectSignature.handle( + makeJob({ + bucketId: 'bucket-a', + objectName: 'a.txt', + version: 'v1', + }) as never + ) + ).rejects.toThrow('Unsupported object body string chunk for SHA-256 hashing') + expect(mockUpdateObjectSignature).not.toHaveBeenCalled() + }) + + it('rejects missing object bodies', async () => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + + mockGetObject.mockResolvedValue(makeObjectResponse(undefined)) + + await expect( + GenerateObjectSignature.handle( + makeJob({ + bucketId: 'bucket-a', + objectName: 'a.txt', + version: 'v1', + }) as never + ) + ).rejects.toThrow('Object body is missing for SHA-256 hashing') + expect(mockUpdateObjectSignature).not.toHaveBeenCalled() + }) + + it('rejects unsupported object body types', async () => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + + mockGetObject.mockResolvedValue(makeObjectResponse(42)) + + await expect( + GenerateObjectSignature.handle( + makeJob({ + bucketId: 'bucket-a', + objectName: 'a.txt', + version: 'v1', + }) as never + ) + ).rejects.toThrow('Unsupported object body type for SHA-256 hashing') + expect(mockUpdateObjectSignature).not.toHaveBeenCalled() + }) + + it('rejects objects with non-callable getReader as unsupported body types', async () => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + + mockGetObject.mockResolvedValue(makeObjectResponse({ getReader: 'not-a-function' })) + + await expect( + GenerateObjectSignature.handle( + makeJob({ + bucketId: 'bucket-a', + objectName: 'a.txt', + version: 'v1', + }) as never + ) + ).rejects.toThrow('Unsupported object body type for SHA-256 hashing') + expect(mockUpdateObjectSignature).not.toHaveBeenCalled() + }) + + it('logs scoped details when the coordinator fails', async () => { + const { GenerateObjectSignatures } = await import('./generate-object-signature') + const error = new Error('list failed') + + mockListObjectsForSignatureGeneration.mockRejectedValue(error) + + await expect( + GenerateObjectSignatures.handle( + makeJob({ + bucketId: 'bucket-a', + objectNames: ['a.txt', 'b.txt'], + force: true, + batchSize: 10, + cursor: { bucketId: 'bucket-a', objectName: 'a.txt' }, + }) as never + ) + ).rejects.toThrow(error) + + const [, message, log] = mockLogError.mock.calls[0] + expect(message).toBe('[Admin]: GenerateObjectSignatures tenant-a - FAILED') + expect(log).toEqual( + expect.objectContaining({ + error, + type: 'event', + event: 'GenerateObjectSignatures', + tenantId: 'tenant-a', + project: 'tenant-a', + reqId: 'req-123', + sbReqId: 'sb-req-123', + }) + ) + expect(JSON.parse(log.metadata)).toEqual({ + batchSize: 10, + bucketId: 'bucket-a', + cursor: { bucketId: 'bucket-a', objectName: 'a.txt' }, + force: true, + objectNamesCount: 2, + }) + }) + + it('logs object details when a signature job fails', async () => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + const error = new Error('read failed') + + mockGetObject.mockRejectedValue(error) + + await expect( + GenerateObjectSignature.handle( + makeJob({ + bucketId: 'bucket-a', + objectName: 'a.txt', + version: 'v1', + }) as never + ) + ).rejects.toThrow(error) + + const [, message, log] = mockLogError.mock.calls[0] + expect(message).toBe('[Admin]: GenerateObjectSignature tenant-a/bucket-a/a.txt - FAILED') + expect(log).toEqual( + expect.objectContaining({ + error, + type: 'event', + event: 'GenerateObjectSignature', + tenantId: 'tenant-a', + project: 'tenant-a', + reqId: 'req-123', + sbReqId: 'sb-req-123', + }) + ) + expect(JSON.parse(log.metadata)).toEqual({ + bucketId: 'bucket-a', + objectName: 'a.txt', + objectPath: 'tenant-a/bucket-a/a.txt', + version: 'v1', + }) + }) + + it('logs only tenant context when connection disposal fails', async () => { + const { GenerateObjectSignature } = await import('./generate-object-signature') + const error = new Error('disconnect failed') + + mockGetObject.mockResolvedValue({ + body: Buffer.from('hello world'), + httpStatusCode: 200, + metadata: { + cacheControl: 'no-cache', + contentLength: 11, + eTag: 'etag', + mimetype: 'text/plain', + size: 11, + }, + }) + mockUpdateObjectSignature.mockResolvedValue(true) + mockDestroyConnection.mockRejectedValue(error) + + await GenerateObjectSignature.handle( + makeJob({ + bucketId: 'bucket-a', + objectName: 'a.txt', + version: 'v1', + }) as never + ) + await new Promise(process.nextTick) + + const [, message, log] = mockLogError.mock.calls[0] + expect(message).toBe('[Admin]: GenerateObjectSignature tenant-a - FAILED DISPOSING CONNECTION') + expect(log).toEqual( + expect.objectContaining({ + error, + type: 'event', + tenantId: 'tenant-a', + project: 'tenant-a', + reqId: 'req-123', + sbReqId: 'sb-req-123', + }) + ) + expect(log).not.toHaveProperty('metadata') + }) +}) diff --git a/src/storage/events/objects/generate-object-signature.ts b/src/storage/events/objects/generate-object-signature.ts new file mode 100644 index 000000000..f7833caaf --- /dev/null +++ b/src/storage/events/objects/generate-object-signature.ts @@ -0,0 +1,506 @@ +import { createHash } from 'node:crypto' +import { Readable } from 'node:stream' +import { ERRORS } from '@internal/errors' +import { logger, logSchema } from '@internal/monitoring' +import { BasePayload } from '@internal/queue' +import { ObjectResponse } from '@storage/backend' +import { Job, Queue as PgBossQueue, SendOptions, WorkOptions } from 'pg-boss' +import { getConfig } from '../../../config' +import { ObjectSignatureCursor } from '../../database' +import { Storage } from '../../storage' +import { BaseEvent } from '../base-event' + +const { storageS3Bucket } = getConfig() + +const DEFAULT_BACKFILL_BATCH_SIZE = 500 +const MAX_BACKFILL_BATCH_SIZE = 1000 + +interface GenerateObjectSignaturesPayload extends BasePayload { + bucketId?: string + objectNames?: string[] + force?: boolean + cursor?: ObjectSignatureCursor + batchSize?: number +} + +interface GenerateObjectSignaturePayload extends BasePayload { + bucketId: string + objectName: string + version?: string +} + +export class GenerateObjectSignatures extends BaseEvent { + static queueName = 'object:signatures:generate' + protected static allowSync = false + + static getWorkerOptions(): WorkOptions { + return {} + } + + static getQueueOptions(): PgBossQueue { + return { + name: this.queueName, + policy: 'singleton', + } as const + } + + static getSendOptions(payload: GenerateObjectSignaturesPayload): SendOptions { + return { + singletonKey: payload.tenant.ref, + priority: 5, + retryLimit: 5, + retryDelay: 5, + } + } + + static async handle(job: Job) { + let storage: Storage | undefined + const batchSize = normalizeBatchSize(job.data.batchSize) + + try { + storage = await this.createStorage(job.data) + const objects = await storage.db.listObjectsForSignatureGeneration({ + bucketId: job.data.bucketId, + objectNames: job.data.objectNames, + force: Boolean(job.data.force), + cursor: job.data.cursor, + limit: batchSize, + }) + + if (objects.length > 0) { + await GenerateObjectSignature.batchSend( + objects.map( + (object) => + new GenerateObjectSignature({ + tenant: job.data.tenant, + bucketId: object.bucket_id, + objectName: object.name, + version: object.version ?? undefined, + reqId: job.data.reqId, + sbReqId: job.data.sbReqId, + }) + ) + ) + } + + const shouldReschedule = objects.length >= batchSize + + if (shouldReschedule) { + const last = objects[objects.length - 1] + await GenerateObjectSignatures.send({ + tenant: job.data.tenant, + bucketId: job.data.bucketId, + objectNames: job.data.objectNames, + force: job.data.force, + cursor: { bucketId: last.bucket_id, objectName: last.name }, + reqId: job.data.reqId, + sbReqId: job.data.sbReqId, + batchSize, + }) + } + + logSchema.event( + logger, + `[Admin]: GenerateObjectSignatures ${job.data.tenant.ref} processed ${objects.length} objects`, + { + jobId: job.id, + type: 'event', + event: 'GenerateObjectSignatures', + payload: JSON.stringify(job.data), + objectPath: job.data.tenant.ref, + tenantId: job.data.tenant.ref, + project: job.data.tenant.ref, + reqId: job.data.reqId, + sbReqId: job.data.sbReqId, + metadata: JSON.stringify({ + batchSize, + bucketId: job.data.bucketId ?? null, + cursor: job.data.cursor ?? null, + force: Boolean(job.data.force), + objectNamesCount: job.data.objectNames?.length ?? 0, + objectsCount: objects.length, + rescheduled: shouldReschedule, + }), + } + ) + } catch (error) { + logSignatureGenerationError({ + error, + eventName: 'GenerateObjectSignatures', + target: job.data.tenant.ref, + payload: job.data, + metadata: { + batchSize, + bucketId: job.data.bucketId, + cursor: job.data.cursor, + force: job.data.force, + objectNamesCount: job.data.objectNames?.length, + }, + }) + throw error + } finally { + disposeStorage(storage, 'GenerateObjectSignatures', job.data) + } + } +} + +export class GenerateObjectSignature extends BaseEvent { + static queueName = 'object:signature:generate' + protected static allowSync = false + + static getWorkerOptions(): WorkOptions & { concurrentTaskCount: number } { + return { + concurrentTaskCount: 8, + } + } + + static getQueueOptions(): PgBossQueue { + return { + name: this.queueName, + policy: 'exactly_once', + } as const + } + + static getSendOptions(payload: GenerateObjectSignaturePayload): SendOptions { + return { + singletonKey: createHash('sha256') + .update( + JSON.stringify([ + payload.tenant.ref, + payload.bucketId, + payload.objectName, + payload.version ?? null, + ]) + ) + .digest('hex'), + expireInMinutes: 120, + priority: 5, + retryLimit: 5, + retryDelay: 5, + } + } + + static async handle( + job: Job, + opts: { signal?: AbortSignal } = {} + ) { + let storage: Storage | undefined + let objectPath = `${job.data.tenant.ref}/${job.data.bucketId}/${job.data.objectName}` + const signal = opts.signal + + try { + throwIfAborted(signal) + storage = await this.createStorage(job.data) + objectPath = storage.location.getKeyLocation({ + tenantId: job.data.tenant.ref, + bucketId: job.data.bucketId, + objectName: job.data.objectName, + }) + + if (await objectSignatureJobIsStale(storage, job.data)) { + logStaleObjectSignatureJob(job, objectPath) + return + } + + let response: ObjectResponse + try { + response = await storage.backend.getObject( + storageS3Bucket, + objectPath, + job.data.version, + undefined, + signal + ) + } catch (error) { + throwIfAborted(signal) + if (await objectSignatureJobIsStale(storage, job.data)) { + logStaleObjectSignatureJob(job, objectPath) + return + } + throw error + } + + const sha256 = await digestObjectBody(response.body, signal) + throwIfAborted(signal) + const updated = await storage.db.updateObjectSignature( + job.data.bucketId, + job.data.objectName, + job.data.version, + Buffer.from(sha256, 'hex') + ) + if (!updated) { + logStaleObjectSignatureJob(job, objectPath) + return + } + + logSchema.event(logger, `[Admin]: GenerateObjectSignature ${objectPath}`, { + jobId: job.id, + type: 'event', + event: 'GenerateObjectSignature', + payload: JSON.stringify(job.data), + objectPath, + resources: [`${job.data.bucketId}/${job.data.objectName}`], + tenantId: job.data.tenant.ref, + project: job.data.tenant.ref, + reqId: job.data.reqId, + sbReqId: job.data.sbReqId, + metadata: JSON.stringify({ version: job.data.version ?? null }), + }) + } catch (error) { + logSignatureGenerationError({ + error, + eventName: 'GenerateObjectSignature', + target: objectPath, + payload: job.data, + metadata: { + bucketId: job.data.bucketId, + objectName: job.data.objectName, + objectPath, + version: job.data.version, + }, + }) + throw error + } finally { + disposeStorage(storage, 'GenerateObjectSignature', job.data) + } + } +} + +function normalizeBatchSize(batchSize: number | undefined) { + if (!batchSize || !Number.isFinite(batchSize) || batchSize < 1) { + return DEFAULT_BACKFILL_BATCH_SIZE + } + + return Math.min(Math.floor(batchSize), MAX_BACKFILL_BATCH_SIZE) +} + +async function objectSignatureJobIsStale( + storage: Storage, + payload: GenerateObjectSignaturePayload +) { + const currentObject = await storage.db.findObject( + payload.bucketId, + payload.objectName, + 'version', + { + dontErrorOnEmpty: true, + } + ) + + return !currentObject || !objectVersionMatches(currentObject.version, payload.version) +} + +function objectVersionMatches( + currentVersion: string | null | undefined, + queuedVersion: string | undefined +) { + return (currentVersion ?? null) === (queuedVersion ?? null) +} + +function logStaleObjectSignatureJob(job: Job, objectPath: string) { + logSchema.warning(logger, `[Admin]: GenerateObjectSignature ${objectPath} skipped stale object`, { + jobId: job.id, + type: 'event', + event: 'GenerateObjectSignature', + payload: JSON.stringify(job.data), + objectPath, + resources: [`${job.data.bucketId}/${job.data.objectName}`], + tenantId: job.data.tenant.ref, + project: job.data.tenant.ref, + reqId: job.data.reqId, + sbReqId: job.data.sbReqId, + metadata: JSON.stringify({ + bucketId: job.data.bucketId, + objectName: job.data.objectName, + objectPath, + version: job.data.version, + }), + }) +} + +async function digestObjectBody(body: ObjectResponse['body'], signal?: AbortSignal) { + if (!body) { + throw ERRORS.InternalError(undefined, 'Object body is missing for SHA-256 hashing') + } + + throwIfAborted(signal) + const hash = createHash('sha256') + + if (Buffer.isBuffer(body)) { + hash.update(body) + throwIfAborted(signal) + return hash.digest('hex') + } + + if (body instanceof Blob) { + await updateHashFromReadableStream(hash, body.stream(), signal) + return hash.digest('hex') + } + + if (body instanceof Readable) { + await updateHashFromIterable(hash, body, signal) + return hash.digest('hex') + } + + if (isReadableStream(body)) { + await updateHashFromReadableStream(hash, body, signal) + return hash.digest('hex') + } + + throw ERRORS.InternalError(undefined, 'Unsupported object body type for SHA-256 hashing') +} + +async function updateHashFromIterable( + hash: ReturnType, + body: AsyncIterable, + signal?: AbortSignal +) { + const abort = () => { + if (body instanceof Readable) { + body.destroy(getAbortError(signal)) + } + } + + signal?.addEventListener('abort', abort, { once: true }) + + try { + throwIfAborted(signal) + for await (const chunk of body) { + throwIfAborted(signal) + if (typeof chunk === 'string') { + throw ERRORS.InternalError( + undefined, + 'Unsupported object body string chunk for SHA-256 hashing' + ) + } + + if (Buffer.isBuffer(chunk)) { + hash.update(chunk) + continue + } + + if (chunk instanceof Uint8Array) { + hash.update(chunk) + continue + } + + if (chunk instanceof ArrayBuffer) { + hash.update(new Uint8Array(chunk)) + continue + } + + throw ERRORS.InternalError(undefined, 'Unsupported object body chunk for SHA-256 hashing') + } + } catch (error) { + if (signal?.aborted) { + throwAbortReason(signal) + } + throw error + } finally { + signal?.removeEventListener('abort', abort) + } +} + +async function updateHashFromReadableStream( + hash: ReturnType, + body: ReadableStream, + signal?: AbortSignal +) { + const reader = body.getReader() + const abort = () => { + reader.cancel(getAbortReason(signal)).catch(() => { + // no-op: the digest loop observes the same signal and exits with its abort reason + }) + } + + signal?.addEventListener('abort', abort, { once: true }) + + try { + throwIfAborted(signal) + let result = await reader.read() + throwIfAborted(signal) + while (!result.done) { + throwIfAborted(signal) + if (typeof result.value === 'string') { + throw ERRORS.InternalError( + undefined, + 'Unsupported object body string chunk for SHA-256 hashing' + ) + } + + hash.update(result.value) + result = await reader.read() + throwIfAborted(signal) + } + } finally { + signal?.removeEventListener('abort', abort) + reader.releaseLock() + } +} + +function throwIfAborted(signal: AbortSignal | undefined): void { + if (signal?.aborted) { + throwAbortReason(signal) + } +} + +function throwAbortReason(signal: AbortSignal): never { + throw getAbortReason(signal) +} + +function getAbortReason(signal: AbortSignal | undefined): unknown { + return signal?.reason ?? new DOMException('The operation was aborted', 'AbortError') +} + +function getAbortError(signal: AbortSignal | undefined): Error { + const reason = getAbortReason(signal) + if (reason instanceof Error) { + return reason + } + return new DOMException('The operation was aborted', 'AbortError') +} + +function isReadableStream(body: unknown): body is ReadableStream { + return Boolean( + body && typeof body === 'object' && typeof (body as ReadableStream).getReader === 'function' + ) +} + +function disposeStorage(storage: Storage | undefined, eventName: string, payload: BasePayload) { + storage?.db.destroyConnection().catch((error) => { + logSignatureGenerationError({ + error, + eventName, + target: payload.tenant.ref, + payload, + suffix: 'FAILED DISPOSING CONNECTION', + }) + }) +} + +function logSignatureGenerationError({ + error, + eventName, + target, + payload, + metadata, + suffix = 'FAILED', +}: { + error: unknown + eventName: string + target: string + payload: BasePayload + metadata?: Record + suffix?: string +}) { + logSchema.error(logger, `[Admin]: ${eventName} ${target} - ${suffix}`, { + error, + type: 'event', + event: eventName, + ...(metadata ? { metadata: JSON.stringify(metadata) } : {}), + tenantId: payload.tenant.ref, + project: payload.tenant.ref, + reqId: payload.reqId, + sbReqId: payload.sbReqId, + }) +} diff --git a/src/storage/events/workers.ts b/src/storage/events/workers.ts index 74b6bb9f6..e17f57836 100644 --- a/src/storage/events/workers.ts +++ b/src/storage/events/workers.ts @@ -7,6 +7,10 @@ import { Webhook } from './lifecycle/webhook' import { ResetMigrationsOnTenant } from './migrations/reset-migrations' import { RunMigrationsOnTenants } from './migrations/run-migrations' import { BackupObjectEvent } from './objects/backup-object' +import { + GenerateObjectSignature, + GenerateObjectSignatures, +} from './objects/generate-object-signature' import { ObjectAdminDelete } from './objects/object-admin-delete' import { ObjectAdminDeleteAllBefore } from './objects/object-admin-delete-all-before' import { MoveJobs } from './pgboss/move-jobs' @@ -19,6 +23,8 @@ export function registerWorkers() { Queue.register(ObjectAdminDeleteAllBefore) Queue.register(RunMigrationsOnTenants) Queue.register(BackupObjectEvent) + Queue.register(GenerateObjectSignatures) + Queue.register(GenerateObjectSignature) Queue.register(ResetMigrationsOnTenant) Queue.register(JwksCreateSigningSecret) Queue.register(JwksRollUrlSigningKey) diff --git a/src/test/database-protection.test.ts b/src/test/database-protection.test.ts index 137516013..946186016 100644 --- a/src/test/database-protection.test.ts +++ b/src/test/database-protection.test.ts @@ -129,4 +129,309 @@ describe('Database Protection Triggers', () => { }) }) }) + + describe('Object updated_at trigger', () => { + it('does not change updated_at when only the internal signature changes', async () => { + const db = tHelper.database.connection.pool.acquire() + const testObjectName = `test-signature-updated-at-${Date.now()}.txt` + const initialUpdatedAt = '2024-01-02T03:04:05.000Z' + const signature = Buffer.from('a'.repeat(64), 'hex') + + try { + await db.raw( + ` + INSERT INTO storage.objects (bucket_id, name, owner, version, metadata, updated_at) + VALUES (?, ?, ?, ?, ?::jsonb, ?) + `, + [testBucketName, testObjectName, null, '1', JSON.stringify({ size: 1 }), initialUpdatedAt] + ) + + await expect( + tHelper.database.updateObjectSignature(testBucketName, testObjectName, '1', signature) + ).resolves.toBe(true) + + const signed = await db.raw( + 'SELECT updated_at FROM storage.objects WHERE bucket_id = ? AND name = ?', + [testBucketName, testObjectName] + ) + expect(signed.rows[0].updated_at).toEqual(new Date(initialUpdatedAt)) + + await db.raw( + 'UPDATE storage.objects SET metadata = ?::jsonb WHERE bucket_id = ? AND name = ?', + [JSON.stringify({ size: 2 }), testBucketName, testObjectName] + ) + + const metadataUpdated = await db.raw( + 'SELECT updated_at FROM storage.objects WHERE bucket_id = ? AND name = ?', + [testBucketName, testObjectName] + ) + expect(metadataUpdated.rows[0].updated_at.getTime()).toBeGreaterThan( + new Date(initialUpdatedAt).getTime() + ) + } finally { + await withDeleteEnabled(db, async (db) => { + await db.raw('DELETE FROM storage.objects WHERE bucket_id = ? AND name = ?', [ + testBucketName, + testObjectName, + ]) + }) + } + }) + + it('allows authenticated users to clear signatures but not write them', async () => { + const db = tHelper.database.connection.pool.acquire() + const testObjectName = `test-signature-guard-${Date.now()}.txt` + const testInsertObjectName = `${testObjectName}.insert` + const policyPrefix = `signature_guard_${Date.now()}` + const selectPolicyName = `${policyPrefix}_select` + const updatePolicyName = `${policyPrefix}_update` + const insertPolicyName = `${policyPrefix}_insert` + const bucketLiteral = `'${testBucketName.replace(/'/g, "''")}'` + const signature = 'a'.repeat(64) + + try { + await db.raw( + `CREATE POLICY "${selectPolicyName}" + ON storage.objects + FOR SELECT + TO authenticated + USING (bucket_id = ${bucketLiteral})` + ) + await db.raw( + `CREATE POLICY "${updatePolicyName}" + ON storage.objects + FOR UPDATE + TO authenticated + USING (bucket_id = ${bucketLiteral}) + WITH CHECK (bucket_id = ${bucketLiteral})` + ) + await db.raw( + `CREATE POLICY "${insertPolicyName}" + ON storage.objects + FOR INSERT + TO authenticated + WITH CHECK (bucket_id = ${bucketLiteral})` + ) + + await db.raw( + ` + INSERT INTO storage.objects (bucket_id, name, owner, version, metadata, signature) + VALUES (?, ?, ?, ?, ?::jsonb, decode(?, 'hex')) + `, + [testBucketName, testObjectName, null, '1', JSON.stringify({ size: 1 }), signature] + ) + + await db.transaction(async (trx) => { + await trx.raw('SET LOCAL ROLE authenticated') + await trx.raw( + 'UPDATE storage.objects SET signature = NULL WHERE bucket_id = ? AND name = ?', + [testBucketName, testObjectName] + ) + }) + + const cleared = await db.raw( + 'SELECT signature FROM storage.objects WHERE bucket_id = ? AND name = ?', + [testBucketName, testObjectName] + ) + expect(cleared.rows[0].signature).toBeNull() + + await expect( + db.transaction(async (trx) => { + await trx.raw('SET LOCAL ROLE authenticated') + await trx.raw( + `UPDATE storage.objects + SET signature = decode(?, 'hex') + WHERE bucket_id = ? AND name = ?`, + [signature, testBucketName, testObjectName] + ) + }) + ).rejects.toMatchObject({ code: '42501' }) + + await expect( + db.transaction(async (trx) => { + await trx.raw('SET LOCAL ROLE authenticated') + await trx.raw( + ` + INSERT INTO storage.objects (bucket_id, name, owner, version, metadata, signature) + VALUES (?, ?, ?, ?, ?::jsonb, decode(?, 'hex')) + `, + [ + testBucketName, + testInsertObjectName, + null, + '1', + JSON.stringify({ size: 1 }), + signature, + ] + ) + }) + ).rejects.toMatchObject({ code: '42501' }) + } finally { + await db.raw(`DROP POLICY IF EXISTS "${selectPolicyName}" ON storage.objects`) + await db.raw(`DROP POLICY IF EXISTS "${updatePolicyName}" ON storage.objects`) + await db.raw(`DROP POLICY IF EXISTS "${insertPolicyName}" ON storage.objects`) + await withDeleteEnabled(db, async (db) => { + await db.raw('DELETE FROM storage.objects WHERE bucket_id = ? AND name IN (?, ?)', [ + testBucketName, + testObjectName, + testInsertObjectName, + ]) + }) + } + }) + + it('enforces signature write guards against the request-scoped role setting', async () => { + const db = tHelper.database.connection.pool.acquire() + const testObjectName = `test-signature-effective-role-${Date.now()}.txt` + const testInsertObjectName = `${testObjectName}.insert` + const policyPrefix = `signature_effective_role_${Date.now()}` + const selectPolicyName = `${policyPrefix}_select` + const updatePolicyName = `${policyPrefix}_update` + const insertPolicyName = `${policyPrefix}_insert` + const bucketLiteral = `'${testBucketName.replace(/'/g, "''")}'` + const signature = 'b'.repeat(64) + + try { + await db.raw( + `CREATE POLICY "${selectPolicyName}" + ON storage.objects + FOR SELECT + TO authenticated + USING (bucket_id = ${bucketLiteral})` + ) + await db.raw( + `CREATE POLICY "${updatePolicyName}" + ON storage.objects + FOR UPDATE + TO authenticated + USING (bucket_id = ${bucketLiteral}) + WITH CHECK (bucket_id = ${bucketLiteral})` + ) + await db.raw( + `CREATE POLICY "${insertPolicyName}" + ON storage.objects + FOR INSERT + TO authenticated + WITH CHECK (bucket_id = ${bucketLiteral})` + ) + + await db.raw( + ` + INSERT INTO storage.objects (bucket_id, name, owner, version, metadata, signature) + VALUES (?, ?, ?, ?, ?::jsonb, decode(?, 'hex')) + `, + [testBucketName, testObjectName, null, '1', JSON.stringify({ size: 1 }), signature] + ) + + await db.transaction(async (trx) => { + await trx.raw(`SELECT set_config('role', 'authenticated', true)`) + await trx.raw( + 'UPDATE storage.objects SET signature = NULL WHERE bucket_id = ? AND name = ?', + [testBucketName, testObjectName] + ) + }) + + await expect( + db.transaction(async (trx) => { + await trx.raw(`SELECT set_config('role', 'authenticated', true)`) + await trx.raw( + `UPDATE storage.objects + SET signature = decode(?, 'hex') + WHERE bucket_id = ? AND name = ?`, + [signature, testBucketName, testObjectName] + ) + }) + ).rejects.toMatchObject({ code: '42501' }) + + await expect( + db.transaction(async (trx) => { + await trx.raw(`SELECT set_config('role', 'authenticated', true)`) + await trx.raw( + ` + INSERT INTO storage.objects (bucket_id, name, owner, version, metadata, signature) + VALUES (?, ?, ?, ?, ?::jsonb, decode(?, 'hex')) + `, + [ + testBucketName, + testInsertObjectName, + null, + '1', + JSON.stringify({ size: 1 }), + signature, + ] + ) + }) + ).rejects.toMatchObject({ code: '42501' }) + } finally { + await db.raw(`DROP POLICY IF EXISTS "${selectPolicyName}" ON storage.objects`) + await db.raw(`DROP POLICY IF EXISTS "${updatePolicyName}" ON storage.objects`) + await db.raw(`DROP POLICY IF EXISTS "${insertPolicyName}" ON storage.objects`) + await withDeleteEnabled(db, async (db) => { + await db.raw('DELETE FROM storage.objects WHERE bucket_id = ? AND name IN (?, ?)', [ + testBucketName, + testObjectName, + testInsertObjectName, + ]) + }) + } + }) + + it('clears signatures on object content upsert but preserves them on metadata updates', async () => { + const db = tHelper.database.connection.pool.acquire() + const testObjectName = `test-signature-invalidation-${Date.now()}.txt` + const signature = Buffer.from('a'.repeat(64), 'hex') + const metadata = (size: number) => ({ + cacheControl: 'no-cache', + contentLength: size, + eTag: `etag-${size}`, + mimetype: 'text/plain', + size, + }) + + try { + await tHelper.database.createObject({ + bucket_id: testBucketName, + name: testObjectName, + owner: undefined, + version: 'v1', + metadata: metadata(1), + user_metadata: undefined, + }) + + await expect( + tHelper.database.updateObjectSignature(testBucketName, testObjectName, 'v1', signature) + ).resolves.toBe(true) + + await tHelper.database.updateObjectMetadata(testBucketName, testObjectName, metadata(2)) + + const metadataUpdated = await db.raw( + 'SELECT signature FROM storage.objects WHERE bucket_id = ? AND name = ?', + [testBucketName, testObjectName] + ) + expect(metadataUpdated.rows[0].signature).toEqual(signature) + + await tHelper.database.upsertObject({ + bucket_id: testBucketName, + name: testObjectName, + owner: undefined, + version: 'v2', + metadata: metadata(3), + user_metadata: undefined, + }) + + const contentUpdated = await db.raw( + 'SELECT signature FROM storage.objects WHERE bucket_id = ? AND name = ?', + [testBucketName, testObjectName] + ) + expect(contentUpdated.rows[0].signature).toBeNull() + } finally { + await withDeleteEnabled(db, async (db) => { + await db.raw('DELETE FROM storage.objects WHERE bucket_id = ? AND name = ?', [ + testBucketName, + testObjectName, + ]) + }) + } + }) + }) }) diff --git a/src/test/tus.test.ts b/src/test/tus.test.ts index ef0ad21c3..9f1f28590 100644 --- a/src/test/tus.test.ts +++ b/src/test/tus.test.ts @@ -333,6 +333,7 @@ describe.each([ owner: null, owner_id: null, path_tokens: [objectName], + signature: null, updated_at: expect.any(Date), version: expect.any(String), }) @@ -427,6 +428,7 @@ describe.each([ owner: null, owner_id: null, path_tokens: [objectName], + signature: null, updated_at: expect.any(Date), version: expect.any(String), }) @@ -582,6 +584,7 @@ describe.each([ owner: null, owner_id: null, path_tokens: [objectName], + signature: null, updated_at: expect.any(Date), version: expect.any(String), }) @@ -853,6 +856,7 @@ describe.each([ owner: null, owner_id: null, path_tokens: [objectName], + signature: null, updated_at: expect.any(Date), version: expect.any(String), }) @@ -919,6 +923,7 @@ describe.each([ owner: null, owner_id: 'some-owner-id', path_tokens: [objectName], + signature: null, updated_at: expect.any(Date), version: expect.any(String), })