From fd41976db0d32c37b223be81d8408fabc76026ad Mon Sep 17 00:00:00 2001 From: Bagaev Dmitry Date: Mon, 24 Jun 2024 10:54:57 +0200 Subject: [PATCH] Fix async pairwise --- src/operators/pairwise.jl | 7 ++++--- test/operators/test_operator_pairwise.jl | 16 ++++++++++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/operators/pairwise.jl b/src/operators/pairwise.jl index cdde093d6..bf581f204 100644 --- a/src/operators/pairwise.jl +++ b/src/operators/pairwise.jl @@ -66,10 +66,11 @@ mutable struct PairwiseActor{L, A} <: Actor{L} end function on_next!(actor::PairwiseActor{L}, data::L) where L - if actor.previous !== nothing - next!(actor.actor, (actor.previous, data)) - end + previous = actor.previous actor.previous = data + if !isnothing(previous) + next!(actor.actor, (previous, data)) + end end on_error!(actor::PairwiseActor, err) = error!(actor.actor, err) diff --git a/test/operators/test_operator_pairwise.jl b/test/operators/test_operator_pairwise.jl index 71eff4413..09f96c885 100644 --- a/test/operators/test_operator_pairwise.jl +++ b/test/operators/test_operator_pairwise.jl @@ -17,6 +17,11 @@ include("../test_helpers.jl") values = @ts([ (1, 2), (2, 3), (3, 4), (4, 5), c ]), source_type = Tuple{Int, Int} ), + ( + source = from(1:5) |> async() |> pairwise(), + values = @ts([ (1, 2), (2, 3), (3, 4), (4, 5), c ]), + source_type = Tuple{Int, Int} + ), ( source = of(1) |> pairwise(), values = @ts(c), @@ -70,4 +75,15 @@ include("../test_helpers.jl") end +@testset "Issue #43" begin + subject = Subject(Int) + paired = subject |> pairwise() + values = [] + subscribe!(paired, (v) -> push!(values, v)) + @sync for i = 1:10 + @async next!(subject, i) + end + @test values == [(1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), (9, 10)] +end + end