-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Closed
Labels
type/bugA general bugA general bug
Description
Expected behavior
OnComplete late subscribers if upstream onComplete.
Actual behavior
It does seem that onComplete is not forwarded to late subscribers :
Steps to reproduce
Flux<Flux<String>> incoming = incomingProcessor.log("stream.proc").publish().autoConnect()
.log("stream.incoming");
Flux.just("ALPHA", "BRAVO", "CHARLIE", "DELTA", "ALPHA", "BRAVO", "CHARLIE", "DELTA", "ALPHA", "BRAVO", "CHARLIE", "DELTA")
.log("stream.incoming")
.windowWhile(s -> !"DELTA".equals(s))
.subscribe(incomingProcessor);
AtomicInteger windowIndex = new AtomicInteger(0);
AtomicInteger nextIndex = new AtomicInteger(0);
System.out.println("ZERO");
incoming
.next()
.flatMapMany(flux -> flux
// .takeWhile(s -> !"CHARLIE".equals(s))
.log(String.format("stream.window.%d", windowIndex.getAndIncrement())))
.log(String.format("stream.next.%d", nextIndex.getAndIncrement()))
.as(StepVerifier::create)
.expectNextCount(3)
.verifyComplete();
System.out.println("ONE");
incoming.next()
.flatMapMany(flux -> flux
// .takeWhile(s -> !"CHARLIE".equals(s))
.log(String.format("stream.window.%d", windowIndex.getAndIncrement())))
.log(String.format("stream.next.%d", nextIndex.getAndIncrement()))
.as(StepVerifier::create)
.expectNextCount(3)
.verifyComplete();
Reactor Core version
3.1.2
Metadata
Metadata
Assignees
Labels
type/bugA general bugA general bug