Skip to content

2.0 Design: Hot and Cold #2785

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
benjchristensen opened this issue Feb 28, 2015 · 10 comments
Closed

2.0 Design: Hot and Cold #2785

benjchristensen opened this issue Feb 28, 2015 · 10 comments
Milestone

Comments

@benjchristensen
Copy link
Member

RxJava needs to continue natively supporting both "cold" and "hot" data sources and make it easy to create both and then compose flow control and backpressure.

Reactive Streams lends itself to "cold" data sources by requiring every Publisher to only emit in response to demand received via Subscription.request(n).

ReactiveX implementations on the other hand lends themselves to "hot" datasources where they immediately start emitting without complications of "demand".

RxJava 1.0 is in the middle where it supports both "cold" and "hot", but it takes effort to implement a "cold" source with the Observable.Producer (similar to the RS Subscription).

In RxJava 2.0 we need to embrace the RS Subscription and "cold' behavior but allow creating "hot" data sources as easily as in RxJava 1.0.

@benjchristensen benjchristensen added this to the 2.0 milestone Feb 28, 2015
@benjchristensen
Copy link
Member Author

Related to this, it should not be required to provide a strategy to a source Observable of what to do when backpressure occurs if it is a hot datasource. It should be up to the consumer, and the consumer may not have access or be involved in the construction of the source Observable.

The default behavior of an Observable should be capable of emitting hot and if the consumer does not apply flow control then it will fail, as per the MissingBackpressureException in RxJava 1.x. We will not buffer or drop by default which defeats the purpose of having RS backpressure semantics.

@benjchristensen
Copy link
Member Author

Here is a contrived hot source:

Observable<Integer> o = Observable.create(s -> {
   for(int i=0; i < 1000000; i++) {
        s.onNext(i);
   }
   s.onComplete();
})

This doesn't obey backpressure, which would be silly in a range source like this, but many legit cases exist like this. (Ignore this trivial implementation not support cancellation).

A downstream consumer should be able to subscribe like this without giving a backpressure strategy:

o.subscribe(System.out::println);

In this case it is synchronous so works fine.

If they did something like this it would likely fail and force the developer to choose a flow control solution:

o.observeOn(someThread).subscribe(System.out::println);

That is what we want – no assumption of default flow control strategy, fail and force the developer to choose one. It should be composable such as this:

o.onBackpressureDrop().observeOn(someThread).subscribe(System.out::println);
o.sample(1, TimeUnit.MILLISECONDS).observeOn(someThread).subscribe(System.out::println);
o.onBackpressureBuffer().observeOn(someThread).subscribe(System.out::println);
// etc

The principle of this is decoupling the source Observable and the sink Subscriber without requiring the Observable to have a predefined flow control strategy.

@dpsm
Copy link
Contributor

dpsm commented Feb 28, 2015

@benjchristensen this somewhat relates to #2765. I'd appreciate your thoughts on it!

@akarnokd
Copy link
Member

RS requires backpressure to be honored, especially the case when the the downstream hasn't requested anything yet. In RxJava, for example, just and AsyncSubject violates this, not to mention the other Subjects. I was able to make just work properly but not yet the Subjects: AsyncSubject needs a single-item drain-support (i.e., hold off emitting the value on completion if not yet requested), but the BehaviorSubject and PublishSubject would need to support buffering until requested (or require the use of onBackpressureXXX methods). ReplaySubject can naturally support backpressure (replay only the amount requested, I had a prototype for RxJava some time ago).

Btw, the o.sample above may still throw MissingBackpressureException with small time values, so adding onBackpressureDrop too may be required.

@benjchristensen
Copy link
Member Author

not to mention the other Subjects

So does Reactive Streams Processor if not carefully used. It is hot since it exposes onNext directly. It only works according to the contract if the Publisher to it is correctly obeying the contract. But if someone uses it like a Subject then it will break the contract.

We should adjust Rx Subject to match the RS Processor though so that backpressure can be weaved through.

@benjchristensen
Copy link
Member Author

RS requires backpressure to be honored

So does RxJava 1.x. If it doesn't a MissingBackpressureException is thrown. The difference is that we only assert this in the operators where it matters. We are less strict (for backwards compatibility reasons).

In 2.0 we can adopt a stricter stance. That does not however mean we do not want to support hot data sources and composable flow control and backpressure, just like RxJava 1.x.

@akarnokd
Copy link
Member

akarnokd commented Mar 5, 2015

In my reimplementation, I don't throw MissingBackpressureException up but send it down to the downstream and cancel the upstream as with any other onError exception.

@benjchristensen
Copy link
Member Author

I don't throw MissingBackpressureException up but send it down to the downstream and cancel the upstream as with any other onError exception.

That's how it works in v1 (if something does throw, it gets caught and passed downstream anyways), and how I'd expect it in v2 as well. So I think we agree on that.

@benjchristensen
Copy link
Member Author

I think the work in RxJava v1 to improve the create methods relates to this, allowing generation of an Observable that emits data correctly for the "hot" and "cold" scenarios.

These are the cases:

  • synchronous cold -> very easy, either an Iterable or similar to an Iterable
  • asynchronous cold -> tricky to do right from scratch as it typically means ordered batches and concurrency of requests, emission and request(n), but we can address this elegantly with generator functions and behavior like "eager concat"
  • hot -> very easy to write as it calls onNext whenever it wants. This would work fine as a generator to an Observable/Publisher, and will cause a MissingBackpressureException right at the source (rather than in each operator) if it emits more than is requested.

We can have Observable.create* methods to handle each of these. I think our API design should guide to these being the default entry points, and the raw behavior of getting a full OnSubscribe like RxJava v1 has should be considered an "unsafe" piece of functionality, since we almost ALL do it wrong.

@akarnokd
Copy link
Member

I guess this can be closed. We have Subjects and FlowableProcessors to act like hot sources.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants