Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 30 additions & 34 deletions server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,11 +631,6 @@ async def post_vcon(
Stores the vCon in Redis and indexes it for searching. The vCon is added to a sorted
set for timestamp-based retrieval and indexed by party information for searching.
Optionally adds the vCon UUID to specified ingress lists for immediate processing.

The vCon is stored with a default TTL of VCON_REDIS_EXPIRY seconds (default 3600s/1 hour).
This means vCons will automatically expire from Redis cache unless persisted to a
storage backend or the expiry is updated. Configure VCON_REDIS_EXPIRY environment
variable to change the default expiry time.

Args:
inbound_vcon: The vCon to store
Expand All @@ -659,16 +654,12 @@ async def post_vcon(

logger.debug(f"Storing vCon {inbound_vcon.uuid} ({len(dict_vcon)} bytes)")
await redis_async.json().set(key, "$", dict_vcon)

# Set default expiry on newly created vCons
await redis_async.expire(key, VCON_REDIS_EXPIRY)
logger.debug(f"Set TTL of {VCON_REDIS_EXPIRY}s on vCon {inbound_vcon.uuid}")


logger.debug(f"Adding vCon {inbound_vcon.uuid} to sorted set")
await add_vcon_to_set(key, timestamp)

logger.debug(f"Indexing vCon {inbound_vcon.uuid}")
await index_vcon(inbound_vcon.uuid)
await index_vcon_parties(str(inbound_vcon.uuid), dict_vcon["parties"])

# Add to ingress lists if specified
if ingress_lists:
Expand Down Expand Up @@ -720,9 +711,7 @@ async def external_ingress_vcon(
- Multiple API keys can be configured for the same ingress list

The submitted vCon is stored, indexed, and automatically queued for processing
in the specified ingress list. The vCon is stored with a default TTL of
VCON_REDIS_EXPIRY seconds (default 3600s/1 hour), after which it will expire
from Redis cache unless persisted to a storage backend.
in the specified ingress list.

Args:
request: FastAPI Request object for accessing headers
Expand Down Expand Up @@ -760,16 +749,12 @@ async def external_ingress_vcon(
f"Storing vCon {inbound_vcon.uuid} ({len(dict_vcon)} bytes) via external ingress"
)
await redis_async.json().set(key, "$", dict_vcon)

# Set default expiry on newly created vCons
await redis_async.expire(key, VCON_REDIS_EXPIRY)
logger.debug(f"Set TTL of {VCON_REDIS_EXPIRY}s on vCon {inbound_vcon.uuid}")

logger.debug(f"Adding vCon {inbound_vcon.uuid} to sorted set")
await add_vcon_to_set(key, timestamp)

logger.debug(f"Indexing vCon {inbound_vcon.uuid}")
await index_vcon(inbound_vcon.uuid)
await index_vcon_parties(str(inbound_vcon.uuid), dict_vcon["parties"])

# Always add to the specified ingress list (required for this endpoint)
vcon_uuid_str = str(inbound_vcon.uuid)
Expand Down Expand Up @@ -1057,25 +1042,17 @@ async def get_dlq_vcons(
raise HTTPException(status_code=500, detail="Failed to read DLQ")


async def index_vcon(uuid: UUID) -> None:
"""Index a vCon for searching.
async def index_vcon_parties(vcon_uuid: str, parties: list) -> None:
"""Index a vCon's parties for searching.

Adds the vCon to the sorted set and indexes it by party information
(tel, mailto, name) for searching. All indexed keys will expire after
VCON_INDEX_EXPIRY seconds.
Indexes by party information (tel, mailto, name). All indexed keys
will expire after VCON_INDEX_EXPIRY seconds.

Args:
uuid: UUID of the vCon to index
vcon_uuid: UUID string of the vCon
parties: List of party dicts from the vCon
"""
key = f"vcon:{uuid}"
vcon = await redis_async.json().get(key)
created_at = datetime.fromisoformat(vcon["created_at"])
timestamp = int(created_at.timestamp())
vcon_uuid = vcon["uuid"]
await add_vcon_to_set(key, timestamp)

# Index by party information with expiration
for party in vcon["parties"]:
for party in parties:
if party.get("tel"):
tel_key = f"tel:{party['tel']}"
await redis_async.sadd(tel_key, vcon_uuid)
Expand All @@ -1090,6 +1067,25 @@ async def index_vcon(uuid: UUID) -> None:
await redis_async.expire(name_key, VCON_INDEX_EXPIRY)


async def index_vcon(uuid: UUID) -> None:
"""Index a vCon for searching (reads from Redis).

Reads the vCon from Redis, adds it to the sorted set, and indexes
by party information. Used for bulk re-indexing. For the ingest path,
use index_vcon_parties() directly to avoid redundant Redis reads.

Args:
uuid: UUID of the vCon to index
"""
key = f"vcon:{uuid}"
vcon = await redis_async.json().get(key)
created_at = datetime.fromisoformat(vcon["created_at"])
timestamp = int(created_at.timestamp())
vcon_uuid = vcon["uuid"]
await add_vcon_to_set(key, timestamp)
await index_vcon_parties(vcon_uuid, vcon["parties"])


@api_router.get(
"/index_vcons",
status_code=200,
Expand Down
12 changes: 7 additions & 5 deletions server/tests/test_external_ingress.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ def test_successful_submission_single_api_key(
# Configure mocks
mock_get_ingress_auth.return_value = {self.ingress_list: self.valid_api_key}

# Mock Redis client properly
# Mock Redis client properly (sadd/expire used by index_vcon_parties)
mock_redis = MagicMock()
mock_json = MagicMock()
mock_json.set = AsyncMock()
mock_redis.json.return_value = mock_json
mock_redis.expire = AsyncMock()
mock_redis.sadd = AsyncMock()
mock_redis.rpush = AsyncMock()

# Set the global redis_async directly in the api module
Expand All @@ -77,12 +78,10 @@ def test_successful_submission_single_api_key(

# Verify Redis operations were called
mock_json.set.assert_called_once()
mock_redis.expire.assert_called_once() # Verify expiry was set
mock_redis.rpush.assert_called_once_with(
self.ingress_list, self.test_vcon["uuid"]
)
mock_add_vcon_to_set.assert_called_once()
mock_index_vcon.assert_called_once()

finally:
# Clean up the global variable
Expand All @@ -100,12 +99,13 @@ def test_successful_submission_multiple_api_keys(
self.ingress_list: ["partner-1-key", self.valid_api_key, "partner-3-key"]
}

# Mock Redis client properly
# Mock Redis client properly (sadd/expire used by index_vcon_parties)
mock_redis = MagicMock()
mock_json = MagicMock()
mock_json.set = AsyncMock()
mock_redis.json.return_value = mock_json
mock_redis.expire = AsyncMock()
mock_redis.sadd = AsyncMock()
mock_redis.rpush = AsyncMock()

# Set the global redis_async directly in the api module
Expand Down Expand Up @@ -233,6 +233,7 @@ def test_redis_failure_handling(self, mock_get_ingress_auth):
mock_json.set = AsyncMock(side_effect=Exception("Redis connection failed"))
mock_redis.json.return_value = mock_json
mock_redis.expire = AsyncMock()
mock_redis.sadd = AsyncMock()
mock_redis.rpush = AsyncMock()

# Set the global redis_async directly in the api module
Expand Down Expand Up @@ -266,12 +267,13 @@ def test_multiple_ingress_lists_isolation(
"shared_ingress": ["partner-a-key", "partner-b-key-1", "shared-key"],
}

# Mock Redis client properly
# Mock Redis client properly (sadd/expire used by index_vcon_parties)
mock_redis = MagicMock()
mock_json = MagicMock()
mock_json.set = AsyncMock()
mock_redis.json.return_value = mock_json
mock_redis.expire = AsyncMock()
mock_redis.sadd = AsyncMock()
mock_redis.rpush = AsyncMock()

# Set the global redis_async directly in the api module
Expand Down
Loading