-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Enhancement: Add ConnectableFlow
to the Flow API.
Each time an observer of a Flow
starts collecting, the source of the Flow is executed, much like a call to subscribe
of a Flowable
in RxJava executes the Flowable
's source.
This change is to defer the execution of the source of the Flow until a specific point in time, possibly after one or more observers started collecting the 'shared' Flow.
The use-case for deferring the execution of the source of a Flow is for (cold) Flows whose data-source is a resource that should not be started/created or stopped/destroyed by each and every call to collect
and should be explicitly managed by a call to a function (connect
, for example) instead. It differs from using broadcastIn
by the fact that publish
will return a Flow
, not a BroadcastChannel
.
E.g.
val dataFlow = flowViaChannel<MyData> { channel ->
val resource = getResource(channel)
channel.invokeOnClose {
resource.close()
}
resource.startReceivingData()
}
val sharedFlow = dataFlow.publish()
...
...
val observer1 = launch {
sharedFlow.collect { ... }
}
...
...
val observer2 = launch {
sharedFlow.collect { ... }
}
...
// Start the flow right now.
val connection = sharedFlow.connect(scope)
...
...
...
// Cancel the flow here.
// Note that when 'scope' is cancelled, this 'connection' would be canceled as well.
connection.close()
...
I propose creating these new classes and extension functions or something similar (they are modeled after RxJava ConnectableObserver
):
/**
* A [Flow] of type [T] that only starts emitting value after its [connect] method is called.
*
* If this flow's [Connection] is still connected, the current [Connection] will be returned when
* [connect] is called and the flow will not be restarted.
*
* When its [collect] method is called, this flow will not immediately start collecting. Only after
* [connect] is called, the emission and actual collecting of values starts.
*/
interface ConnectableFlow<out T> : Flow<T> {
/**
* Connects this shared [Flow] to start emitting values.
*
* @param scope The [CoroutineScope] in which the emissions will take place.
* @return The [Connection] that can be closed to stop this shared [Flow].
*/
fun connect(scope: CoroutineScope): Connection
}
and
/**
* A connection returned by a call to [ConnectableFlow.connect].
*/
interface Connection {
/**
* Returns true if this connection is connected and active.
*/
suspend fun isConnected(): Boolean
/**
* Closes this connection in an orderly fashion.
*/
suspend fun close()
}
/**
* Publishes and shares an upstream [Flow] of type [T] and returns a [ConnectableFlow] of type [T].
*
* The upstream [Flow] begins emissions only after the [ConnectableFlow.connect] has been called.
*
* @return The [ConnectableFlow] that represents the shared [Flow] of this receiver.
*/
fun <T> Flow<T>.publish(): ConnectableFlow<T>
/**
* Creates a [Flow] of type [T] from this [ConnectableFlow] that automatically connects (i.e. calls
* [ConnectableFlow.connect]) when the first [numberOfCollectors] observer starts collecting (i.e. calls [Flow.collect])
* and automatically cancels this [ConnectableFlow] when the last observers stops collecting.
*
* @param scope The scope in which this [ConnectableFlow] will be connected.
* @param numberOfCollectors The number of observers that need to start collecting before the connection (re)starts.
* @return The shared referenced-counted [Flow].
*/
fun <T> ConnectableFlow<T>.refCount(scope: CoroutineScope, numberOfCollectors: Int = 1): Flow<T> =
/**
* Shares this [Flow] of type [T] with multiple observers without restarting when each observer starts
* collecting. This is the same as calling [Flow.publish] and then [ConnectableFlow.refCount].
*
* @param scope The scope in which this [ConnectableFlow] will be connected.
* @return The new [Flow] that shares this [Flow]
*/
fun <T> Flow<T>.share(scope: CoroutineScope): Flow<T> = publish().refCount(scope)
This is my first stab at an initial/draft/try-out implementation:
https://gist.github.com/streetsofboston/39c3f8c882c9891b35e0ebc3cd812381
Update:
I took autoConnect
out: This is more for 'replay' and 'caching'. If needed, this should be addressed in a separate issue.
Activity
elizarov commentedon Apr 11, 2019
Can you please add some actual use-case as in "here is the what application is trying to do and that is what it wants to achive" without tying this use-case to the actual solution in the form of
ConnectableFlow
. There are tons of methods here. All of them need use-case (refCounting, autoConnect), etc.streetsofboston commentedon Apr 12, 2019
Use case for ConnectableFlow:
Cold Streams are often Unicast. When an observer/consumer starts observing, the source of the Stream is started again. The
Flow
API currently allows for cold unicasts, where its source is (re)started each timecollect
is called.Hot Streams are often Multicast where observers/consumers can come and go without them restarting anything.
The
Channel
andBroadcastChannel
APIs support this and the methodbroadcastIn
already exists.Sometimes, it is desirable to have Cold Streams that are Multicast. The source of the stream may not always be active (it may be expensive to have the stream being active all the time or starting a new one each time), and starting the stream does not depend on whether any observers/consumers are actually observing. Starting and stopping the cold multicast stream needs to be managed explicitly.
The proposed
ConnectableFlow
would implement such Cold MulticastFlow
, where the source of theFlow
(re)starts each time when itsconnect
method is called and where the source of theFlow
stops when this connection is closed.Examples of such cold multicast streams are BLE (Bluetooth Low Energy) characteristics that notify the observer of data changing on an external device, e.g. a BLE thermometer or any other continuous monitoring device. Starting a characteristic keeps a connection open between the observer and the BLE device and this can be somewhat expensive. It is best to manage this explicitly, e.g. have the user click a 'connect' and 'disconnect' button or to manage it implicitly by only starting the connection when observers on the UI are observing the device.
Make a cold unicast
Flow
a cold multicastConnectableFlow
:fun <T> Flow<T>.publish(): ConnectableFlow<T>
Implicitly manage the connection of a
ConnectableFlow
by reference-counting:This allows a cold multicast
Flow
to be active only when observers are listening/collecting.E.g. Keep a BLE Characteristic notification stream active if the user is looking at a UI that needs it.
fun <T> ConnectableFlow<T>.refCount(scope: CoroutineScope, numberOfCollectors: Int = 1): Flow<T>
elizarov commentedon Apr 12, 2019
Can you, please, provide an example with an actual application scenario (as in "here is the actual application I'm writing and here is why I need it") where a cold stream needs to be mutlicast, but it cannot be always active (so you cannot just use always active
BroadcastChannel
) and when, at the same time, starting/stopping stream does not depend on the presence of observers, so that it needs to be managed explicitly.LouisCAD commentedon Apr 12, 2019
@streetsofboston To me, that Bluetooth GATT notifications use case is not a cold stream at all, but a hot one with manual start and stop, so effectively just a channel (possibly broadcast), and two custom functions to start/subscribe and stop/unsubscribe.
streetsofboston commentedon Apr 12, 2019
@LouisCAD You're correct.
But adding the two custom functions to start and stop the steam could be also handled by the
ConnectableFlow
'sconnect
method and theclose
method on the returnedConnection
, much like aConnectableObserver
in Rx (connect
anddispose
). Theconnect
call would make the underlying stream hot (i.e. it will start it and the stream will emit values untilclose
on the returned connection is called).I also do believe the addition of
ConnectableFlow
to the core Flow API is not required and stuff can be done manually (I implemented aConnectableFlow
myself in the gist I linked, using the public Flow api). But it can be convenient to other devs. MaybeConnectableFlow
can be part of an extension library?I'm currently not working on any BLE app right now, but have been in the past on an apps that used plain callbacks the RxAndroidBLE library. There we make use of ConnectableObservables. But that has been a while and I need to dig into the past a little to get a good use-case. :-)
LouisCAD commentedon Apr 12, 2019
@streetsofboston Actually, I made a library for Bluetooth Low Energy with coroutines a while ago (and I keep it updated). Notifications support works with channels, although I've not needed notifications support myself. If you think that part of the API may be improved, feel free to open an issue there!
matejdro commentedon Apr 18, 2019
All that is true, but wrapping GATT into cold stream provides benefits of automatic state management. Programmer can easily forget to call manual start/stop method (especially stop). But with flow, this is done automatically (start on
collect
call, stop when coroutine is cancelled).Another use case:
Developing a mobile app that uses GPS location at various points. Location is exposed as flow stream. Whenever part of app needs access to Location, it starts
collect
on stream, GPS receiver activates and location is transmitted. If another part needs access to Location, it cancollect
on the same stream. Since stream is already active, it would just multicast new location data to all subscribers. When part does not need access to Location anymore, it cancels the collecting coroutine. Once all producers are cancelled, GPS receiver shuts off, saving battery.inorichi commentedon Apr 18, 2019
This last example might be somewhat related to #1097, but using
Flow
s instead. Not sure if they could share a common implementation, though.Edit: although it's only the analogue of the
.refCount()
operator..connect(N)
must be cancelled manually, or with the scope (what OP has asked for).elizarov commentedon Apr 19, 2019
Let me summarize what I'm getting out of this thread so far. I see a bunch of use-cases here for an operator that automatically actives a flow on a first collector, shares the emitted events with all the other collectors, and cancels the flow instance as soon as the last collector is done. Easy, usefull, no chance of resource leakage, no need to introduce any new types like
ConnectableFlow
-- it is just an operator. The only question is how do we have name it. Can we name it justshare()
?A kind of manual activation/deactivation of the flow sounds like a use-cases for a channel to me. You can already do
flow.produceIn(scope)
to active a flow and we might even provide a scope-lessflow.produce()
variant.produce
activates a flow and returns a channel. You can cancel this channel when you no longer need it -- that is your manual activation even when you don't have any collectors.Does this sound like a plan?
matejdro commentedon Apr 19, 2019
For my use case, this sounds perfect.
nhaarman commentedon Apr 19, 2019
Next to just
share()
ing the subscription, new subscribers often want to immediately receive the most recent value that was emitted. In Rx this is done with thereplay(1)
operator.Take for example the way Firebase Database provides lists of items. Firebase emits events like 'item added', 'item removed', 'item moved', etc. Clients can construct the entire list from these events.
A Flow can
scan
these events and construct the list, sharing the result:Replacing
replay(1)
withpublish()
, new subscribers potentially never receive any emissions.streetsofboston commentedon Apr 19, 2019
@elizarov
I updated my gist with some examples, use-cases for
ConnectableFlow
andshare()
.https://gist.github.com/streetsofboston/39c3f8c882c9891b35e0ebc3cd812381
(see the 2nd file in this gist called
Example.kt
)I also tried to implement the
UseCasesForConnectableFlow.manage_expensive_start_and_stop_of_resource()
use-case/example using theproduceIn
function instead, and even tried it using thebroadcastIn
function. I was not able to do this in a way that produced the same results when usingConnectableFlow
from my implementation. In this use-case, theExpensiveResource
should be started about 3 seconds after the call tofun manage_expensive_start_and_stop_of_resource()
, not earlier.The main issue with using
produceIn
is that the listeners (collectors/consumers) need to be able to get a reference to a Flow (or a ReceiveChannel, BroadcastChannel, etc) before the cold stream is activated, butproduceIn
returns an activated ReceiveChannel...Maybe I'm overlooking something and missing an implementation that works without the introduction of something new like a
ConnectableFlow
.21 remaining items
zach-klippenstein commentedon Jun 19, 2019
I know this issue has been all but superseded by #1261, but FYI RxJava is considering changing their Connectable API to make the state machine more explicit when reconnecting: ReactiveX/RxJava#5628
akarnokd commentedon Jul 28, 2019
If I understand suspension correctly, there could be some trouble with connecting and collecting because you have to
launch { }
them all to not get suspended (as connect() has to callcollect
on the upstream).With a
publish
type of sharing, if there are no collectors, the upstream data may get dropped. In contrast, collectors may not even appear so waiting for some could be equally troubling. Could this be solved within the coroutine conceptual framework?elizarov commentedon Aug 5, 2019
@akarnokd I'm not sure I understand this question. Can you, please, clarify.
pacher commentedon Feb 12, 2020
If a
Flow.share
discussed in #1261 automatically activates a flow on a first collector, than only that first collector is guaranteed to receive all the events. If I understand correctly, other collectors might miss a few events in the beginning of the sequence due to concurrency. As per example from #1261:Collectors are launched concurrently. First one activates the flow. Second might see all of it if subscribed fast enough, but could miss a few if out of luck.
That is why in my opinion
ConnectableFlow
story is different from share operator. Here we want to share expensive cold source of data, where loosing data items is not acceptable compared to hot sources like mouse clicks.Here is an example:
Imagine I have a huge log file, which is naturally represented as a cold Flow of lines/records.
I need to read and analyze it in multiple ways. All these tasks are very convenient to code separately as reactive operator chains, like
map{..}.filter{...}
. E.g. collectorA is looking for apples, collectorB is looking for oranges, collectorC is counting ducks etc.So at some point in time I have a selected bunch of collectors and want to run them to get results.
If I just collect my cold flow, every collector will initiate reading of a file from disk, which is slow and wasteful.
I want to
share
it, so that the file is read only once, but I want to be sure that each collector gets log items from the beginning. I can't imagine how it can be achieved without some form of delayed explicitconnect
. Also, the file is huge but collectors are expected to find what they are looking for at some point, so the reading of the file should stop when there is no more need for it.I would image some kind of
StartableFlow
(ColdFlow, heh) which is not started until explicitstart
call. It would be reasonable to make it one-start-only and not accept any new collectors after it is started. Similar to howconsumeAsFlow
results in a flow which can be collected only once.The closest solution from Rx world I found is
publish
operator with "selector" functionhttps://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#publish-java.util.function.Function-
http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#publish-io.reactivex.functions.Function-
I use it quite a lot in many different circumstances and it would be great to have something like this for Flow. The key difference from share is that all subscriptions happen within provided lambda, so operator can safely connect to upstream after everybody is subscribed and no events are lost.
akarnokd commentedon Feb 12, 2020
The problem is that it is generally unclear (to me) when the collector(s) are all lined up to receive items. For example, the multicast operator equivalent to RxJava's
publish(Function)
has to collect the returnedFlow
asynchronously otherwise the scope wouldn't progress to the connect phase.In RxJava, consumers are by default non-blocking and synchronous, thus, we generally know all the consumers to the subject have lined up and ready to receive items the moment subscribe returns.
elizarov commentedon Feb 12, 2020
@pacher This is quite a valid concern. There are two different use-cases here:
You have a stream of events coming from some source that is expensive to establish a connection with, so you want to establish a connection once (and only when needed) and share events with all the subscribers. This what the
share
discussed in Flow.share operator #1261 is about. Here "missing events" is not an issue, since, by definition of "event source" you only start receiving them from some point in time.Your use-case with data processing. It definitely needs some other approach, configuration, or even a different operator. You should have an easy to way to ensure that all the collectors are guaranteed to receive all the items. Indeed, this is not trivial and requires a separate design effort. One way it could be made to work is via some kind of dedicated operator/DSL designed to replicate the flow into multiple copies. This DSL can be arranged in such a way as to make the number of downstream collectors explicit and clear. For example, we might have something like this:
pacher commentedon Feb 12, 2020
Another example of similar feature is shiny new teeing collector from java 12
@elizarov Exactly! I would formulate the difference as follows:
share
subscribers can come and go at any arbitrary moment, therefore it is normal and expected that some late subscribers can miss events. Than it is natural that there is the one, the first collector and any other can be "late" even if subscribed on the next line of code.replicate
there is a predefined number of subscribers at the start and they shall receive from the beginning. Late subscriptions could be forbidden if required.My example of data processing is just one use-case. As I mentioned
publish(Function)
is actually very powerful and I use it really a lot. That's because it's not terminal, but an operator which returns Flowable/Flux. Couple of examples:publish
returns a stream of original events to process further, while this secondary stream silently keeps connection healthy somewhere in the backgroundIt is amazing how far you can get with it
This is probably already another use-case territory, but as usual I just want you to keep it in mind while in the design and discussion phase. (maybe
replicate
should return a flow as well)@akarnokd Totally agree. It is tricky and I don't see a simple solution either, otherwise would just code something for myself instead of bothering all of you.
I am just trying to keep discussion and thinking going instead of dismissing it as resolved by #1261
elizarov commentedon May 18, 2020
There is a design for
SharedFlow
that, I believe, covers most of the need that advanced use-cases ofConnectableFlow
(full control on upstream, observability of the number of downstream connections, etc) and provides a framework to implement easy-to-use sharing operators for simpler use-cases. See #2034elizarov commentedon May 22, 2020
Basic use-cases described herein are now taken into account, too, in the design of sharing operators as described in #2047, so I'm closing this issue.