Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds ConcatMap operator #68

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

ohitsdaniel
Copy link

@ohitsdaniel ohitsdaniel commented Nov 22, 2020

Resolves #41.

@freak4pc mentioned in #41, that we could try using .flatMap(maxPublishers: 1, transform:) to achieve the functionality of ConcatMap. I added some tests to verify if this is true, but unfortunately it's not that easy. ;)

Unexpected behaviour:

  • .flatMap(maxPublishers: 1, transform:) does not buffer mapped publishers, discarding all publishers if there is an active one, leading to missing values.

@codecov
Copy link

codecov bot commented Nov 22, 2020

Codecov Report

Merging #68 (4caf28f) into main (730d272) will increase coverage by 0.02%.
The diff coverage is 97.63%.

❗ Current head 4caf28f differs from pull request most recent head 80ba6dd. Consider uploading reports for the commit 80ba6dd to get more accurate results
Impacted file tree graph

@@            Coverage Diff             @@
##             main      #68      +/-   ##
==========================================
+ Coverage   97.12%   97.15%   +0.02%     
==========================================
  Files          62       64       +2     
  Lines        3336     3546     +210     
==========================================
+ Hits         3240     3445     +205     
- Misses         96      101       +5     
Impacted Files Coverage Δ
Sources/Operators/ConcatMap.swift 94.73% <94.73%> (ø)
Sources/Common/DemandBuffer.swift 100.00% <100.00%> (ø)
Tests/ConcatMapTests.swift 100.00% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 730d272...80ba6dd. Read the comment docs.

@ohitsdaniel
Copy link
Author

ohitsdaniel commented Nov 22, 2020

A gave it a go and implemented Publishers.ConcatMap. As this is my first custom combine operator / Publisher, I'm happy about any kind of feedback. 😸

Copy link
Member

@freak4pc freak4pc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still didn't go through the tests but have some questions. Thank you for all the hard work!

Sources/Operators/ConcatMap.swift Outdated Show resolved Hide resolved
public extension Publisher {
/// Transforms an output value into a new publisher, and flattens the stream of events from these multiple upstream publishers to appear as if they were coming from a single stream of events.
///
/// Mapping to a new publisher will keep the subscription to the previous one alive until it completes and only then subscribe to the new one. This also means that all values sent by the new publisher are not forwarded as long as the previous one hasn't completed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Mapping to a new publisher will keep the subscription to the previous one alive until it completes and only then subscribe to the new one. This also means that all values sent by the new publisher are not forwarded as long as the previous one hasn't completed.
/// Mapping to a new publisher will keep the subscription to the previous one alive until it completes and only then subscribe to the new one. This also means that all values sent by the new publisher are not forwarded as long as the previous one hasn't completed.

This also means that all values sent by the new publisher are not forwarded as long as the previous one hasn't completed.

Not sure about this - why would it emit at all if it's only subscribed to after the completion of the previous one?

Copy link
Author

@ohitsdaniel ohitsdaniel Apr 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new, inner publisher could potentially emit before being subscribed to (Like a non-deferred future, i.e. hot observable)?

Example:

let passthrough = PassthroughSubject<Int, Never>()

let mapped = passthrough.map { value in
    Future<Int, Never> { completion in
        completion(.success(value))
    }
    .delay(for: .seconds(1), scheduler: DispatchQueue.main)
  }
  .compactMap { $0 }

  let cancellable = mapped.sink(receiveValue: { value in
    print(value)
  })

  passthrough.send(1)
  passthrough.send(2)     

In this case, concatMap does not buffer 2 as the Publisher 1 hadn't completed when 2 was put onto the stream.

See test_ignores_values_of_subsequent_while_previous_hasNot_completed in CompactMapTests.swift

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Rx, the concatMap assumes that inner Observables are cold and that the closure creates the Observable that is being subscribed to. I think it's correct to make that assumption here as well.

Sources/Operators/ConcatMap.swift Show resolved Hide resolved
bufferedPublishers.append(mapped)
}

return .unlimited
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure returning unlimited demand here is right. Any reason?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason, but I'm also not 100% sure what to return here. 🤔

defer { lock.unlock() }
activePublisher = publisher

publisher.sink(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I'm a fan of using sink inside a custom publisher. I mean, you're alraedy holding your own subscription etc so why use a built-in subscriber that creates a subscription to do this ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem is that we have an outer publisher of publishers and we want to flatten the value of the inner publisher into one publisher of values.

I checked how RxSwift solves this and it relies on MergeLimitedSink which subscribes a MergeLimitedSinkIter (inner sink) to each inner publisher and then checks if the outer sink has a 'next' publisher when the inner publisher completes.

The only benefit that I see from this is that it reuses 'active sinks' and subscribes them directly to the next publisher. Maybe you see some other benefits in this approach? I'm not 100% familiar with the RxSwift codebase.

Copy link
Author

@ohitsdaniel ohitsdaniel Apr 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized that having an outer / inner sink would solve the whole uncertainty around which demand to return. I'll rewrite it to look a bit more like the RxSwift equivalent.

@freak4pc
Copy link
Member

freak4pc commented Mar 6, 2021

Hey @ohitsdaniel - are you interested in finishing this PR? Thanks :)

@ohitsdaniel
Copy link
Author

Yes, just didn't find the time yet as I have been focusing on other projects. I'll see if I can squeeze it in next week. :)

@ohitsdaniel
Copy link
Author

Quick update from my side: I scheduled some time this Friday to work on this. :)

@ohitsdaniel
Copy link
Author

ohitsdaniel commented Apr 12, 2021

@freak4pc

Re-implemented it, pretty closely following the RxSwift implementation while not over-generifying the implementation. Main idea is that we have an inner and outer sink.

The OuterSink observes the stream of upstream values and has .unlimited demand. Whenever it receives a value, it transforms it into a new publisher (NewPublisher) and hands it to the inner sink.

The InnerSink observes a stream of NewPublisher values and manages the demand. Whenever it receives a completion, it subscribes to the next publisher, if there is one. Whenever the outer sink hands in a NewPublisher, the inner sink either directly subscribes to the new publisher, if it currently has no active subscription, or queues it, so that it is subscribed to whenever the currently active subscription ends (i.e. the inner sink receives a successful completion).

Had to add a remainingDemand field to DemandBuffer, so that the inner sink can request more values from the next subscription if the downstream demand hasn't been fulfilled.

@filblue
Copy link

filblue commented May 12, 2021

@freak4pc Any plans to move on with this?

@kerrmarin-lvmh
Copy link

Hey, are there any plans to merge this?

@Evertt
Copy link

Evertt commented Feb 9, 2024

bump

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add concatMap
6 participants