Skip to content

Commit

Permalink
Merge pull request #54 from ReactiveBayes/fix-43
Browse files Browse the repository at this point in the history
Fix async pairwise
  • Loading branch information
bvdmitri committed Jun 25, 2024
2 parents 26d3561 + fd41976 commit b443a7f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
7 changes: 4 additions & 3 deletions src/operators/pairwise.jl
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ mutable struct PairwiseActor{L, A} <: Actor{L}
end

function on_next!(actor::PairwiseActor{L}, data::L) where L
if actor.previous !== nothing
next!(actor.actor, (actor.previous, data))
end
previous = actor.previous
actor.previous = data
if !isnothing(previous)
next!(actor.actor, (previous, data))
end
end

on_error!(actor::PairwiseActor, err) = error!(actor.actor, err)
Expand Down
16 changes: 16 additions & 0 deletions test/operators/test_operator_pairwise.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ include("../test_helpers.jl")
values = @ts([ (1, 2), (2, 3), (3, 4), (4, 5), c ]),
source_type = Tuple{Int, Int}
),
(
source = from(1:5) |> async() |> pairwise(),
values = @ts([ (1, 2), (2, 3), (3, 4), (4, 5), c ]),
source_type = Tuple{Int, Int}
),
(
source = of(1) |> pairwise(),
values = @ts(c),
Expand Down Expand Up @@ -70,4 +75,15 @@ include("../test_helpers.jl")

end

@testset "Issue #43" begin
subject = Subject(Int)
paired = subject |> pairwise()
values = []
subscribe!(paired, (v) -> push!(values, v))
@sync for i = 1:10
@async next!(subject, i)
end
@test values == [(1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), (9, 10)]
end

end

0 comments on commit b443a7f

Please sign in to comment.