diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 00000000..5772f4b1 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,55 @@ +name: Run tests + +on: + push: + branches: + - '**' + tags-ignore: + - '*' + +jobs: + build-test-deploy: + name: Build and test + runs-on: ubuntu-latest + services: + postgres: + image: postgres:12 + env: + POSTGRES_HOST_AUTH_METHOD: trust + ports: + # will assign a random free host port + - 5432:5432 + # needed because the postgres container does not provide a healthcheck + options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 + steps: + - name: Checkout + uses: actions/checkout@v2 + + - uses: actions/setup-dotnet@v1 + with: + dotnet-version: '6.0.x' + + - name: Install message-db + env: + MESSAGE_DB_VERSION: 1.3.0 + PGHOST: localhost + PGUSER: postgres + PGPASSWORD: postgres + PGPORT: '5432' + run: | + mkdir -p /tmp/eventide + curl -L https://github.com/message-db/message-db/archive/refs/tags/v$MESSAGE_DB_VERSION.tar.gz -o /tmp/eventide/message-db.tgz + tar -xf /tmp/eventide/message-db.tgz --directory /tmp/eventide + (cd /tmp/eventide/message-db-${MESSAGE_DB_VERSION}/database && ./install.sh) + + - name: Restore + run: dotnet restore Propulsion.sln + + - name: Build + run: dotnet build Propulsion.sln --configuration Release --no-restore + + - name: Run Tests + env: + MSG_DB_CONNECTION_STRING: "Host=localhost; Database=message_store; Port=5432; Username=message_store" + CHECKPOINT_CONNECTION_STRING: "Host=localhost; Database=message_store; Port=5432; Username=postgres; Password=postgres" + run: dotnet test Propulsion.sln --no-restore --verbosity minimal diff --git a/Propulsion.sln b/Propulsion.sln index 4c71986f..4f54b74c 100644 --- a/Propulsion.sln +++ b/Propulsion.sln @@ -17,6 +17,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".project", ".project", "{6E LICENSE = LICENSE README.md = README.md SECURITY.md = SECURITY.md + .github\workflows\ci.yaml = .github\workflows\ci.yaml EndProjectSection EndProject Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Propulsion", "src\Propulsion\Propulsion.fsproj", "{0F72360F-1C14-46E3-9A60-B6BF87BD726D}" diff --git a/build.proj b/build.proj index c00ac9e5..2d273597 100644 --- a/build.proj +++ b/build.proj @@ -34,6 +34,7 @@ + diff --git a/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs b/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs index 5ca6acbb..5429794c 100644 --- a/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs +++ b/src/Propulsion.CosmosStore/ChangeFeedProcessor.fs @@ -152,16 +152,15 @@ type ChangeFeedProcessor = let estimator = monitored.GetChangeFeedEstimator(processorName_, leases) let emitLagMetrics (ct : CancellationToken) = task { while not ct.IsCancellationRequested do - let feedIteratorMap (map : ChangeFeedProcessorState -> 'u) : IAsyncEnumerable<'u> = taskSeq { + let feedIteratorMap (map : ChangeFeedProcessorState -> 'u) : Task<'u seq> = task { // earlier versions, such as 3.9.0, do not implement IDisposable; see linked issue for detail on when SDK team added it use query = estimator.GetCurrentStateIterator() // see https://github.com/jet/equinox/issues/225 - in the Cosmos V4 SDK, all this is managed IAsyncEnumerable + let result = ResizeArray() while query.HasMoreResults do let! res = query.ReadNextAsync(ct) - for x in res do - yield map x } - let! leasesState = - feedIteratorMap (fun s -> leaseTokenToPartitionId s.LeaseToken, s.EstimatedLag) - |> TaskSeq.toArrayAsync + for x in res do result.Add(map x) + return result :> 'u seq } + let! leasesState = feedIteratorMap (fun s -> leaseTokenToPartitionId s.LeaseToken, s.EstimatedLag) do! lagMonitorCallback (Seq.sortBy fst leasesState |> List.ofSeq) } emitLagMetrics) let wrap (f : unit -> Task) () = task { return! f () } diff --git a/src/Propulsion.DynamoStore/DynamoStoreSource.fs b/src/Propulsion.DynamoStore/DynamoStoreSource.fs index 4dd5fd5c..690091c7 100644 --- a/src/Propulsion.DynamoStore/DynamoStoreSource.fs +++ b/src/Propulsion.DynamoStore/DynamoStoreSource.fs @@ -2,6 +2,7 @@ open Equinox.DynamoStore open FSharp.Control +open Propulsion.Infrastructure open Propulsion.Internal open System open System.Threading @@ -44,10 +45,10 @@ module private Impl = // Includes optional hydrating of events with event bodies and/or metadata (controlled via hydrating/maybeLoad args) let materializeIndexEpochAsBatchesOfStreamEvents (log : Serilog.ILogger, sourceId, storeLog) (hydrating, maybeLoad : _ -> _ -> (CancellationToken -> Task<_>) voption, loadDop) batchCutoff (context : DynamoStoreContext) - (AppendsPartitionId.Parse pid) (Checkpoint.Parse (epochId, offset)) ct = taskSeq { + (AppendsPartitionId.Parse pid) (Checkpoint.Parse (epochId, offset)) ct = AsyncSeq.toAsyncEnum(asyncSeq { let epochs = AppendsEpoch.Reader.Config.create storeLog context let sw = Stopwatch.start () - let! _maybeSize, version, state = epochs.Read(pid, epochId, offset) |> Async.startImmediateAsTask ct + let! _maybeSize, version, state = epochs.Read(pid, epochId, offset) let totalChanges = state.changes.Length sw.Stop() let totalStreams, chosenEvents, totalEvents, streamEvents = @@ -103,15 +104,15 @@ module private Impl = for i, spans in state.changes do let pending = spans |> Array.filter (fun (span : AppendsEpoch.Events.StreamSpan) -> streamEvents.ContainsKey(span.p)) if buffer.Count <> 0 && buffer.Count + pending.Length > batchCutoff then - let! hydrated = materializeSpans ct + let! hydrated = materializeSpans ct |> Async.AwaitTaskCorrect report (Some i) hydrated.Length yield struct (sw.Elapsed, sliceBatch epochId i hydrated) // not i + 1 as the batch does not include these changes sw.Reset() buffer.Clear() buffer.AddRange(pending) - let! hydrated = materializeSpans ct + let! hydrated = materializeSpans ct |> Async.AwaitTaskCorrect report None hydrated.Length - yield struct (sw.Elapsed, finalBatch epochId (version, state) hydrated) } + yield struct (sw.Elapsed, finalBatch epochId (version, state) hydrated) }) /// Defines the strategy to use for hydrating the events prior to routing them to the Handler [] diff --git a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj index 963bb6a3..08099139 100644 --- a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj +++ b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj @@ -6,6 +6,7 @@ + diff --git a/src/Propulsion.Feed/FeedReader.fs b/src/Propulsion.Feed/FeedReader.fs index 577fa8c5..0c49a533 100644 --- a/src/Propulsion.Feed/FeedReader.fs +++ b/src/Propulsion.Feed/FeedReader.fs @@ -2,6 +2,7 @@ namespace Propulsion.Feed.Core open FSharp.Control open Propulsion.Feed +open Propulsion.Infrastructure open Propulsion.Internal open Serilog open System @@ -125,7 +126,7 @@ type FeedReader renderPos, ?logCommitFailure, // If supplied, an isTail Batch stops the reader loop and waits for supplied cleanup function. Default is a perpetual read loop. - ?awaitIngesterShutdown) = + ?awaitIngesterShutdown: CancellationToken -> Task) = let stats = Stats(partition, source, tranche, renderPos) @@ -161,10 +162,13 @@ type FeedReader stats.UpdateCommittedPosition(initialPosition) let mutable currentPos, lastWasTail = initialPosition, false while not (ct.IsCancellationRequested || (lastWasTail && Option.isSome awaitIngesterShutdown)) do - for readLatency, batch in crawl (lastWasTail, currentPos) ct do - do! submitPage (readLatency, batch) - currentPos <- batch.checkpoint - lastWasTail <- batch.isTail + do! AsyncSeq.ofAsyncEnum (crawl (lastWasTail, currentPos) ct) + |> AsyncSeq.iterAsync (fun struct(readLatency, batch) -> async { + do! submitPage (readLatency, batch) |> Async.AwaitTaskCorrect + currentPos <- batch.checkpoint + lastWasTail <- batch.isTail }) + |> Async.startImmediateAsTask ct + |> Task.ignore match awaitIngesterShutdown with | Some a when not ct.IsCancellationRequested -> let completionTimer = Stopwatch.start () diff --git a/src/Propulsion.Feed/FeedSource.fs b/src/Propulsion.Feed/FeedSource.fs index afb0ca4b..891ea356 100644 --- a/src/Propulsion.Feed/FeedSource.fs +++ b/src/Propulsion.Feed/FeedSource.fs @@ -267,24 +267,25 @@ type TailingFeedSource inherit FeedSourceBase(log, statsInterval, sourceId, checkpoints, establishOrigin, sink, renderPos, ?logCommitFailure = logCommitFailure, ?readersStopAtTail = readersStopAtTail) - let crawl trancheId (wasLast, startPos) ct = taskSeq { - if wasLast then do! Task.delay tailSleepInterval ct - try let batches = crawl.Invoke(trancheId, startPos, ct) + let crawl trancheId (wasLast, startPos) ct = AsyncSeq.toAsyncEnum(asyncSeq { + if wasLast then do! Async.Sleep tailSleepInterval + try let batches = crawl.Invoke(trancheId, startPos, ct) |> AsyncSeq.ofAsyncEnum for batch in batches do yield batch with e -> // Swallow (and sleep, if requested) if there's an issue reading from a tailing log match logReadFailure with None -> log.ForContext("tranche", trancheId).ForContext().Warning(e, "Read failure") | Some l -> l e - match readFailureSleepInterval with None -> () | Some interval -> do! Task.delay interval ct } + match readFailureSleepInterval with None -> () | Some interval -> do! Async.Sleep interval + }) member _.Pump(readTranches, ct) = base.Pump(readTranches, crawl, ct) module TailingFeedSource = - let readOne readBatch cat pos ct = taskSeq { + let readOne readBatch cat pos ct = AsyncSeq.toAsyncEnum (asyncSeq { let sw = Stopwatch.start () - let! b = readBatch struct (cat, pos, ct) - yield struct (sw.Elapsed, b) } + let! b = readBatch struct (cat, pos, ct) |> Async.AwaitTaskCorrect + yield struct (sw.Elapsed, b) }) /// Drives reading and checkpointing from a source that aggregates data from multiple streams as a singular source /// without shards/physical partitions (tranches), such as the SqlStreamStore and EventStoreDB $all feeds @@ -341,6 +342,7 @@ open Propulsion.Internal open System open System.Threading open System.Threading.Tasks +open Propulsion.Infrastructure [] type Page<'F> = { items : FsCodec.ITimelineEvent<'F>[]; checkpoint : Position; isTail : bool } @@ -357,13 +359,13 @@ type FeedSource let crawl (readPage: Func>>) trancheId = let streamName = FsCodec.StreamName.compose "Messages" [SourceId.toString sourceId; TrancheId.toString trancheId] - fun (wasLast, pos) ct -> taskSeq { + fun (wasLast, pos) ct -> AsyncSeq.toAsyncEnum(asyncSeq { if wasLast then - do! Task.delay tailSleepInterval ct + do! Async.Sleep tailSleepInterval let readTs = Stopwatch.timestamp () - let! page = readPage.Invoke(trancheId, pos, ct) + let! page = (readPage.Invoke(trancheId, pos, ct)) |> Async.AwaitTaskCorrect let items' = page.items |> Array.map (fun x -> struct (streamName, x)) - yield struct (Stopwatch.elapsed readTs, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail } : Core.Batch<_>)) } + yield struct (Stopwatch.elapsed readTs, ({ items = items'; checkpoint = page.checkpoint; isTail = page.isTail } : Core.Batch<_>)) }) member internal _.Pump(readTranches: Func>, readPage: Func>>, ct): Task = diff --git a/src/Propulsion.Feed/PeriodicSource.fs b/src/Propulsion.Feed/PeriodicSource.fs index 124ec2ee..e4e05a45 100644 --- a/src/Propulsion.Feed/PeriodicSource.fs +++ b/src/Propulsion.Feed/PeriodicSource.fs @@ -56,11 +56,11 @@ type PeriodicSource inherit Core.FeedSourceBase(log, statsInterval, sourceId, checkpoints, None, sink, defaultArg renderPos DateTimeOffsetPosition.render) // We don't want to checkpoint for real until we know the scheduler has handled the full set of pages in the crawl. - let crawlInternal (read : Func<_, IAsyncEnumerable>) trancheId (_wasLast, position) ct : IAsyncEnumerable)> = taskSeq { + let crawlInternal (read : Func<_, IAsyncEnumerable>) trancheId (_wasLast, position) ct : IAsyncEnumerable)> = AsyncSeq.toAsyncEnum(asyncSeq { let startDate = DateTimeOffsetPosition.getDateTimeOffset position let dueDate = startDate + refreshInterval match dueDate - DateTimeOffset.UtcNow with - | waitTime when waitTime.Ticks > 0L -> do! Task.delay waitTime ct + | waitTime when waitTime.Ticks > 0L -> do! Async.Sleep waitTime | _ -> () let basePosition = DateTimeOffset.UtcNow |> DateTimeOffsetPosition.ofDateTimeOffset @@ -70,7 +70,7 @@ type PeriodicSource let buffer = ResizeArray() let mutable index = 0L let mutable elapsed = TimeSpan.Zero - for ts, xs in read.Invoke trancheId do + for ts, xs in AsyncSeq.ofAsyncEnum (read.Invoke trancheId) do elapsed <- elapsed + ts let streamEvents : Propulsion.Sinks.StreamEvent seq = seq { for si in xs -> @@ -91,7 +91,7 @@ type PeriodicSource match buffer.ToArray() with | [||] as noItems -> noItems, basePosition | finalItem -> finalItem, let struct (_s, e) = Array.last finalItem in e |> Core.TimelineEvent.toCheckpointPosition - yield elapsed, ({ items = items; checkpoint = checkpoint; isTail = true } : Core.Batch<_>) } + yield elapsed, ({ items = items; checkpoint = checkpoint; isTail = true } : Core.Batch<_>) }) member internal _.Pump(readTranches: Func>, // The TaskSeq is expected to manage its own resilience strategy (retries etc).
diff --git a/src/Propulsion.Feed/Propulsion.Feed.fsproj b/src/Propulsion.Feed/Propulsion.Feed.fsproj index 97d8fdee..0ff17e3a 100644 --- a/src/Propulsion.Feed/Propulsion.Feed.fsproj +++ b/src/Propulsion.Feed/Propulsion.Feed.fsproj @@ -5,6 +5,9 @@ + + Infrastructure.fs + @@ -12,9 +15,8 @@ + - - diff --git a/src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs b/src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs index 9ca8283a..cabdd058 100644 --- a/src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs +++ b/src/Propulsion.SqlStreamStore/ReaderCheckpoint.fs @@ -4,6 +4,7 @@ open Dapper open FSharp.Control open Microsoft.Data.SqlClient open Propulsion.Feed +open Propulsion.Internal open System open System.Data diff --git a/src/Propulsion/Internal.fs b/src/Propulsion/Internal.fs index 73e3e26d..fbb9b29c 100644 --- a/src/Propulsion/Internal.fs +++ b/src/Propulsion/Internal.fs @@ -116,6 +116,11 @@ module Task = let parallelUnlimited ct xs : Task<'t []> = parallel_ 0 ct xs + let inline ignore (a: Task<'T>): Task = task { + let! _ = a + return () + } + type Sem(max) = let inner = new SemaphoreSlim(max) member _.HasCapacity = inner.CurrentCount <> 0 diff --git a/tests/Propulsion.MessageDb.Integration/Tests.fs b/tests/Propulsion.MessageDb.Integration/Tests.fs index 4da2ebb2..5de46a98 100644 --- a/tests/Propulsion.MessageDb.Integration/Tests.fs +++ b/tests/Propulsion.MessageDb.Integration/Tests.fs @@ -27,8 +27,15 @@ let createStreamMessage streamName = cmd.Parameters.AddWithValue("Data", NpgsqlDbType.Jsonb, """{"name": "world"}""") |> ignore cmd -[] -let ConnectionString = "Host=localhost; Port=5433; Username=message_store; Password=;" +let ConnectionString = + match Environment.GetEnvironmentVariable "MSG_DB_CONNECTION_STRING" with + | null -> "Host=localhost; Database=message_store; Port=5432; Username=message_store" + | s -> s +let CheckpointConnectionString = + match Environment.GetEnvironmentVariable "CHECKPOINT_CONNECTION_STRING" with + | null -> "Host=localhost; Database=message_store; Port=5432; Username=postgres; Password=postgres" + | s -> s + let connect () = task { let conn = new NpgsqlConnection(ConnectionString) @@ -54,10 +61,9 @@ let stats log = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, T member _.HandleExn(log, x) = () } let makeCheckpoints consumerGroup = task { - let checkpoints = ReaderCheckpoint.CheckpointStore("Host=localhost; Database=message_store; Port=5433; Username=postgres; Password=postgres", "public", $"TestGroup{consumerGroup}", TimeSpan.FromSeconds 10) + let checkpoints = ReaderCheckpoint.CheckpointStore(CheckpointConnectionString, "public", $"TestGroup{consumerGroup}", TimeSpan.FromSeconds 10) do! checkpoints.CreateSchemaIfNotExists() - return checkpoints -} + return checkpoints } [] let ``It processes events for a category`` () = task { @@ -136,7 +142,6 @@ let ``It doesn't read the tail event again`` () = task { checkpoints, sink, [| category |]) use capture = new ActivityCapture() - use _src = source.Start() do! source.RunUntilCaughtUp(TimeSpan.FromSeconds(10), stats.StatsInterval) :> Task