Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/internal/database/pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,25 @@ describe('PoolManager cache lifecycle', () => {
expect(recreated).not.toBe(first)
})

test('recreates cached tenant pool with updated max connections after rebalance', async () => {
const poolModule = await loadPoolModule(10_000)
const poolManager = new poolModule.PoolManager()
const pool = poolManager.getPool(createPoolSettings('tenant-max-connections-rebalance'))

try {
const originalKnex = pool.acquire()
expect((originalKnex.client.pool as { max: number }).max).toBe(10)

pool.rebalance({ maxConnections: 14 })

const rebalancedKnex = pool.acquire()
expect(rebalancedKnex).not.toBe(originalKnex)
expect((rebalancedKnex.client.pool as { max: number }).max).toBe(14)
} finally {
await poolManager.destroyAll()
}
})

test('propagates explicit destroy failures without double-destroying pools', async () => {
const poolModule = await loadPoolModule(10_000)

Expand Down
31 changes: 22 additions & 9 deletions src/internal/database/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,14 @@ export interface PoolStats {
total: number
}

export interface PoolRebalanceOptions {
clusterSize?: number
maxConnections?: number
}

export interface PoolStrategy {
acquire(): Knex
rebalance(options: { clusterSize: number }): void
rebalance(options: PoolRebalanceOptions): void
destroy(): Promise<void>
getPoolStats(): PoolStats | null
}
Expand Down Expand Up @@ -255,12 +260,10 @@ export class PoolManager {
}
}

rebalance(tenantId: string, data: { clusterSize: number }) {
rebalance(tenantId: string, data: PoolRebalanceOptions) {
const pool = tenantPools.get(tenantId)
if (pool) {
pool.rebalance({
clusterSize: data.clusterSize,
})
pool.rebalance({ ...data })
}
}

Expand Down Expand Up @@ -383,14 +386,24 @@ class TenantPool implements PoolStrategy {
}
}

rebalance(options: { clusterSize: number }) {
if (options.clusterSize === 0) {
rebalance(options: PoolRebalanceOptions) {
let shouldReplacePool = false

if (options.clusterSize !== undefined && options.clusterSize !== 0) {
this.options.clusterSize = options.clusterSize
shouldReplacePool = true
}

if (options.maxConnections !== undefined) {
this.options.maxConnections = options.maxConnections
shouldReplacePool = true
}

if (!shouldReplacePool) {
return
}

const originalPool = this.pool

this.options.clusterSize = options.clusterSize
this.pool = undefined

if (originalPool) {
Expand Down
19 changes: 15 additions & 4 deletions src/internal/database/tenant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ const {
icebergEnabled,
vectorEnabled,
multitenantDatabaseQueryTimeout,
databaseMaxConnections,
} = getConfig()

export const TENANT_CONFIG_CACHE_MAX_ITEMS = 16384
Expand Down Expand Up @@ -421,7 +422,7 @@ export async function listenForTenantUpdate(pubSub: PubSubAdapter): Promise<void
* Handles the tenant config change event
* @param cacheKey
*/
async function onTenantConfigChange(cacheKey: string) {
export async function onTenantConfigChange(cacheKey: string) {
const oldConfig = tenantConfigCache.get(cacheKey, { recordMetrics: false })
tenantConfigCache.delete(cacheKey)

Expand All @@ -443,10 +444,12 @@ async function onTenantConfigChange(cacheKey: string) {
})
}

// Rebalance the pool if the max connections changed
if (newConfig.maxConnections && newConfig.maxConnections !== oldConfig.maxConnections) {
if (
normalizeMaxConnections(newConfig.maxConnections) !==
normalizeMaxConnections(oldConfig.maxConnections)
) {
TenantConnection.poolManager.rebalance(cacheKey, {
clusterSize: newConfig.maxConnections,
maxConnections: resolveMaxConnections(newConfig.maxConnections),
})
}
} catch {
Expand All @@ -456,3 +459,11 @@ async function onTenantConfigChange(cacheKey: string) {
// before we could get the new config
}
}

function normalizeMaxConnections(maxConnections: number | null | undefined): number | null {
return maxConnections ?? null
}

function resolveMaxConnections(maxConnections: number | null | undefined): number {
return maxConnections ?? databaseMaxConnections
}
62 changes: 61 additions & 1 deletion src/test/tenant.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { encrypt, signJWT } from '@internal/auth'
import { TENANT_CONFIG_CACHE_NAME } from '@internal/cache'
import { jwksManager } from '@internal/database'
import { jwksManager, TenantConnection } from '@internal/database'
import { DBMigration } from '@internal/database/migrations'
import {
deleteTenantConfig,
getFeatures,
getFileSizeLimit,
getServiceKey,
getTenantConfig,
onTenantConfigChange,
} from '@internal/database/tenant'
import { cacheRequestsTotal } from '@internal/monitoring/metrics'
import dotenv from 'dotenv'
Expand Down Expand Up @@ -765,6 +766,65 @@ describe('Tenant configs', () => {
}
})

test('Tenant config maxConnections change rebalances cached pool without destroying it', async () => {
const tenantId = 'pool-max-connections-change'
const encryptedTenant = {
anon_key: encrypt('anon'),
database_url: encrypt('postgres://tenant'),
database_pool_mode: 'recycled',
file_size_limit: 1,
jwt_secret: encrypt('jwt-secret'),
jwks: null,
service_key: encrypt('service-key'),
feature_purge_cache: false,
feature_image_transformation: false,
feature_s3_protocol: false,
feature_iceberg_catalog: false,
feature_iceberg_catalog_max_catalogs: 0,
feature_iceberg_catalog_max_namespaces: 0,
feature_iceberg_catalog_max_tables: 0,
feature_vector_buckets: false,
feature_vector_buckets_max_buckets: 0,
feature_vector_buckets_max_indexes: 0,
image_transformation_max_resolution: null,
database_pool_url: null,
max_connections: 20,
migrations_version: migrationVersion,
migrations_status: 'COMPLETED',
tracing_mode: null,
disable_events: null,
}
const queryBuilder = {
first: vi.fn().mockReturnThis(),
where: vi.fn().mockReturnThis(),
abortOnSignal: vi
.fn()
.mockResolvedValueOnce(encryptedTenant)
.mockResolvedValueOnce({
...encryptedTenant,
max_connections: 40,
}),
}
const knexTableSpy = vi.spyOn(multitenantKnex, 'table')
const destroySpy = vi.spyOn(TenantConnection.poolManager, 'destroy').mockResolvedValue()
const rebalanceSpy = vi.spyOn(TenantConnection.poolManager, 'rebalance')

try {
knexTableSpy.mockReturnValue(queryBuilder as unknown as TenantQueryBuilder)

await getTenantConfig(tenantId)
await onTenantConfigChange(tenantId)

expect(rebalanceSpy).toHaveBeenCalledWith(tenantId, { maxConnections: 40 })
expect(destroySpy).not.toHaveBeenCalled()
} finally {
deleteTenantConfig(tenantId)
knexTableSpy.mockRestore()
destroySpy.mockRestore()
rebalanceSpy.mockRestore()
}
})

test('Get tenant config records one cache request per logical lookup', async () => {
const knexTableSpy = vi.spyOn(multitenantKnex, 'table')
const addSpy = vi.spyOn(cacheRequestsTotal, 'add')
Expand Down