1+ import logging
12import os
23import urllib .request
34from test .integration .conftest import get_region
910
1011import pytest
1112
12- from linode_api4 import LinodeClient , LogsStreamType , PaginatedList
13+ from linode_api4 import LinodeClient , LogsStreamType , PaginatedList , Region
1314from linode_api4 .objects import (
1415 ObjectStorageACL ,
1516 ObjectStorageBucket ,
2223 LogsStreamStatus ,
2324)
2425
26+ logger = logging .getLogger (__name__ )
27+
2528_RUN_ACLP_LOGS_STREAM_TESTS = "RUN_ACLP_LOGS_STREAM_TESTS"
2629_SKIP_STREAM_TESTS = pytest .mark .skipif (
2730 os .getenv (_RUN_ACLP_LOGS_STREAM_TESTS , "" ).strip ().lower ()
2831 not in {"yes" , "true" },
2932 reason = f"{ _RUN_ACLP_LOGS_STREAM_TESTS } environment variable must be set to 'yes' or 'true'" ,
3033)
34+ _STREAM_FIXTURE_CLEANUP_WAIT = 2700
35+ _STREAM_FIXTURE_PROVISIONING_WAIT = 3600
3136
3237
3338@pytest .fixture (scope = "session" )
34- def create_object_storage_key (test_linode_client : LinodeClient ):
39+ def region (test_linode_client : LinodeClient ):
40+ region = get_region (test_linode_client , {"Object Storage" })
41+ yield region
42+
43+
44+ @pytest .fixture (scope = "session" )
45+ def create_object_storage_key (test_linode_client : LinodeClient ,
46+ region : Region ):
3547 key = test_linode_client .object_storage .keys_create (
3648 label = get_test_label (),
49+ regions = [region .id ]
3750 )
3851 yield key
3952 key .delete ()
@@ -43,19 +56,19 @@ def create_object_storage_key(test_linode_client: LinodeClient):
4356def test_destination (
4457 test_linode_client : LinodeClient ,
4558 create_object_storage_key : ObjectStorageKeys ,
59+ region : Region ,
4660):
4761 dest , bucket = _create_destination_with_bucket (
48- test_linode_client , create_object_storage_key
62+ test_linode_client , create_object_storage_key , region
4963 )
5064 yield dest
5165 _delete_destination_with_bucket (test_linode_client , dest , bucket )
5266
5367
5468def _create_destination_with_bucket (
55- client : LinodeClient , key : ObjectStorageKeys
69+ client : LinodeClient , key : ObjectStorageKeys , region : Region
5670):
5771 """Helper that creates an OBJ bucket and a logs destination backed by it."""
58- region = get_region (client , {"Object Storage" })
5972 bucket = client .object_storage .bucket_create (
6073 cluster_or_region = region .id ,
6174 label = get_test_label (),
@@ -79,16 +92,38 @@ def _delete_destination_with_bucket(
7992 client : LinodeClient , dest : LogsDestination , bucket : ObjectStorageBucket
8093):
8194 """Helper that deletes a logs destination and its backing OBJ bucket."""
82- send_request_when_resource_available (timeout = 1800 , func = dest .delete )
95+ logger .info (
96+ f"Deleting logs destination %s (id=%s), waiting up to { _STREAM_FIXTURE_CLEANUP_WAIT } s..." ,
97+ dest .label ,
98+ dest .id ,
99+ )
100+ # Wait until no stream references this destination before attempting deletion.
101+ # create_secondary_destination may be torn down before create_stream due to
102+ # pytest's LIFO fixture teardown order, so the stream could still be alive.
103+ def no_stream_attached ():
104+ streams = client .monitor .streams ()
105+ return all (
106+ all (d .id != dest .id for d in s .destinations ) for s in streams
107+ )
108+
109+ logger .info (
110+ "Waiting for all streams referencing destination %s to be deleted..." ,
111+ dest .id ,
112+ )
113+ wait_for_condition (30 , _STREAM_FIXTURE_CLEANUP_WAIT , no_stream_attached )
114+ send_request_when_resource_available (timeout = 100 , func = dest .delete )
115+ logger .info ("Destination %s deleted, emptying bucket %s..." , dest .id , bucket .label )
83116 _empty_bucket (client , bucket )
84- send_request_when_resource_available (timeout = 1800 , func = bucket .delete )
117+ logger .info ("Deleting bucket %s, waiting up to 100s..." , bucket .label )
118+ send_request_when_resource_available (timeout = 100 , func = bucket .delete )
119+ logger .info ("Bucket %s deleted." , bucket .label )
85120
86121
87122def _skip_if_streams_exist (client : LinodeClient ):
88123 """Skip the current test if any streams already exist on the account.
89124 Only one stream can be present per account at a time."""
90125 existing_streams = client .monitor .streams ()
91- if len (existing_streams ) > 0 :
126+ if len (existing_streams ) > 10 :
92127 stream_labels = [s .label for s in existing_streams ]
93128 pytest .skip (
94129 f"Skipping: existing stream(s) found on this account "
@@ -304,9 +339,10 @@ def test_fails_to_create_stream_invalid_destination(invalid_destination_error):
304339def create_secondary_destination (
305340 test_linode_client : LinodeClient ,
306341 create_object_storage_key : ObjectStorageKeys ,
342+ region : Region
307343):
308344 dest , bucket = _create_destination_with_bucket (
309- test_linode_client , create_object_storage_key
345+ test_linode_client , create_object_storage_key , region
310346 )
311347 yield dest
312348 _delete_destination_with_bucket (test_linode_client , dest , bucket )
@@ -317,6 +353,7 @@ def create_stream(
317353 test_linode_client : LinodeClient ,
318354 test_destination : LogsDestination ,
319355 invalid_destination_error , # This ensures run order to keep negative test case deterministic
356+ create_secondary_destination : LogsDestination ,
320357):
321358 _skip_if_streams_exist (test_linode_client )
322359
@@ -328,7 +365,59 @@ def create_stream(
328365 assert stream .id is not None
329366 assert stream .status == LogsStreamStatus .provisioning
330367 yield stream
331- send_request_when_resource_available (timeout = 1800 , func = stream .delete )
368+ stream_teardown (test_linode_client , stream )
369+
370+
371+ def _wait_for_stream_updatable (client : LinodeClient , stream_id : int ):
372+ """
373+ Blocks until the stream with the given id reaches active or inactive status.
374+ Used both by the wait_for_updatable_status fixture (before tests) and by
375+ stream_teardown (before deletion) to ensure the stream is in a stable,
376+ operable state. Updating destinations or other attributes puts the stream
377+ back into a transitional state, and attempting to delete or modify it while
378+ transitioning results in [400] errors.
379+ """
380+
381+ def is_stream_updatable ():
382+ stream = client .load (LogsStream , stream_id )
383+ return stream .status in (
384+ LogsStreamStatus .active ,
385+ LogsStreamStatus .inactive ,
386+ )
387+
388+ wait_for_condition (30 , _STREAM_FIXTURE_PROVISIONING_WAIT , is_stream_updatable )
389+
390+
391+ def stream_teardown (test_linode_client : LinodeClient , stream : LogsStream ):
392+ # Wait for the stream to be in a stable state before deleting.
393+ # Updating destinations (or labels/status) puts the stream back into a
394+ # transitional state; deleting while transitioning returns [400].
395+ logger .info (
396+ "Teardown: waiting for stream %s (id=%s) to reach a stable state before deletion..." ,
397+ stream .label ,
398+ stream .id ,
399+ )
400+ _wait_for_stream_updatable (test_linode_client , stream .id )
401+ logger .info (
402+ "Teardown: deleting stream %s (id=%s)..." ,
403+ stream .label ,
404+ stream .id ,
405+ )
406+ send_request_when_resource_available (timeout = 100 , func = stream .delete )
407+
408+ # The delete request returns 200 but stream deletion is async on the backend.
409+ # Wait until the stream is fully gone before teardown continues, so that
410+ # dependent fixtures (e.g. create_secondary_destination) can proceed with teardown.
411+ logger .info (
412+ "Stream delete request accepted for id=%s, waiting for backend to process request" ,
413+ stream .id ,
414+ )
415+
416+ def is_stream_deleted ():
417+ existing = test_linode_client .monitor .streams ()
418+ return all (s .id != stream .id for s in existing )
419+
420+ wait_for_condition (30 , _STREAM_FIXTURE_CLEANUP_WAIT , is_stream_deleted )
332421
333422
334423@pytest .fixture (scope = "session" )
@@ -347,7 +436,7 @@ def is_stream_provisioned():
347436 LogsStreamStatus .inactive ,
348437 )
349438
350- wait_for_condition (60 , 3600 , is_stream_provisioned )
439+ wait_for_condition (60 , _STREAM_FIXTURE_PROVISIONING_WAIT , is_stream_provisioned )
351440
352441 yield test_linode_client .load (LogsStream , create_stream .id )
353442
@@ -360,15 +449,7 @@ def wait_for_updatable_status(
360449 Waits for the stream to be in an active or inactive state before a test runs.
361450 Streams can switch to `provisioning` state between updates. This makes sure the previous update is fully finished.
362451 """
363-
364- def is_stream_updatable ():
365- stream = test_linode_client .load (LogsStream , provisioned_stream .id )
366- return stream .status in (
367- LogsStreamStatus .active ,
368- LogsStreamStatus .inactive ,
369- )
370-
371- wait_for_condition (30 , 3600 , is_stream_updatable )
452+ _wait_for_stream_updatable (test_linode_client , provisioned_stream .id )
372453
373454
374455@_SKIP_STREAM_TESTS
0 commit comments