How does .group().intoMultis() work? #1084
-
Hi, I can't find documentation about intoMultis(). I have been doing some testing and I can't understand what is going on. Example code: 1. // some random strings are fired every second
2. @Incoming("in")
3. @Outgoing("out")
4. Multi<String> process(Multi<String> stream) {
5. return stream
6. .onItem().invoke(s -> System.out.println("before buffering: " + s)) //
7. .group().intoMultis().every(Duration.ofMillis(10000))
8. .onItem().invoke(s -> System.out.println("sending multi: " + Instant.now().toString()))
9. .onItem().transformToMulti(multi -> multi
10. .onItem().invoke(it -> System.out.println("inside transformToMulti: " + it))
11. .filter(__ -> true)
12. ).merge(200)
13. .onItem().invoke(i -> System.out.println(" ---> sending final: " + i));
14. } My expectation is to ONLY see the prints for line 6 for 10 seconds; in other words, items are buffered for 10 seconds in line 7. Then I should see prints from line 8 and line 10 (for the items that have been buffered). Line 11 filters all items so I should NEVER see prints from line 13. What I see is something totally erratic (the format of the item string is irrelevant):
As you can see everything is being printed as if there was no buffering (line 7) and filtering (line 11) happening whatsoever, all items are just passing through directly ("---> sending final ..."). I do see a multi being emitted every 10 seconds ("sending multi: ...") but that's pretty much it; this peace of code could work equally the same without the .group().intoMultis() call. Is there a bug here??? |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
Items are not buffered. It creates the Multi and emits the items into that Multi, and when the time duration passes, it creates another multi. |
Beta Was this translation helpful? Give feedback.
-
There is an error in my code above. I should have add |
Beta Was this translation helpful? Give feedback.
Items are not buffered. It creates the Multi and emits the items into that Multi, and when the time duration passes, it creates another multi.
If you want to buffer, use
intoLists()
.