How can I send items to a Kotlin.Flow (like a Behaviorsubject)
If you want to get the latest value on subscription/collection you should use a ConflatedBroadcastChannel:
private val channel = ConflatedBroadcastChannel<Boolean>()
This will replicate BehaviourSubject
, to expose the channel as a Flow:
// Repositoryfun observe() { return channel.asFlow()}
Now to send an event/value to that exposed Flow
simple send to this channel.
// Repositoryfun someLogicalOp() { channel.send(false) // This gets sent to the ViewModel/Presenter and printed.}
Console:
false
If you wish to only receive values after you start collecting you should use a BroadcastChannel
instead.
To make it clear:
Behaves as an Rx's PublishedSubject
private val channel = BroadcastChannel<Boolean>(1)fun broadcastChannelTest() { // 1. Send event channel.send(true) // 2. Start collecting channel .asFlow() .collect { println(it) } // 3. Send another event channel.send(false)}
false
Only false
gets printed as the first event was sent before collect { }
.
Behaves as an Rx's BehaviourSubject
private val confChannel = ConflatedBroadcastChannel<Boolean>()fun conflatedBroadcastChannelTest() { // 1. Send event confChannel.send(true) // 2. Start collecting confChannel .asFlow() .collect { println(it) } // 3. Send another event confChannel.send(false)}
true
false
Both events are printed, you always get the latest value (if present).
Also, want to mention Kotlin's team development on DataFlow
(name pending):
Which seems better suited to this use case (as it will be a cold stream).
Take a look at MutableStateFlow documentation as it is a replacement for ConflatedBroadcastChannel
that is going to be deprecated, very soon.
For a better context, look at the whole discussion on the original issue on Kotlin's repository on Github.
UPDATE:
Kotlin Coroutines 1.4.0
is now available with MutableSharedFlow
, which replaces the need for Channel
. MutableSharedFlow
cleanup is also built in so you don't need to manually OPEN & CLOSE it, unlike Channel
. Please use MutableSharedFlow
if you need a Subject-like api for Flow
ORIGINAL ANSWER
Since your question had the android
tag I'll add an Android implementation that allows you to easily create a BehaviorSubject
or a PublishSubject
that handles its own lifecycle.
This is relevant in Android because you don't want to forget to close the channel and leak memory. This implementation avoids the need to explicitly "dispose" of the reactive stream by tying it to the creation and destruction of the Fragment/Activity. Similar to LiveData
interface EventReceiver<Message> { val eventFlow: Flow<Message>}interface EventSender<Message> { fun postEvent(message: Message) val initialMessage: Message?}class LifecycleEventSender<Message>( lifecycle: Lifecycle, private val coroutineScope: CoroutineScope, private val channel: BroadcastChannel<Message>, override val initialMessage: Message?) : EventSender<Message>, LifecycleObserver { init { lifecycle.addObserver(this) } override fun postEvent(message: Message) { if (!channel.isClosedForSend) { coroutineScope.launch { channel.send(message) } } else { Log.e("LifecycleEventSender","Channel is closed. Cannot send message: $message") } } @OnLifecycleEvent(Lifecycle.Event.ON_CREATE) fun create() { channel.openSubscription() initialMessage?.let { postEvent(it) } } @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY) fun destroy() { channel.close() }}class ChannelEventReceiver<Message>(channel: BroadcastChannel<Message>) : EventReceiver<Message> { override val eventFlow: Flow<Message> = channel.asFlow()}abstract class EventRelay<Message>( lifecycle: Lifecycle, coroutineScope: CoroutineScope, channel: BroadcastChannel<Message>, initialMessage: Message? = null) : EventReceiver<Message> by ChannelEventReceiver<Message>(channel), EventSender<Message> by LifecycleEventSender<Message>( lifecycle, coroutineScope, channel, initialMessage )
By using the Lifecycle
library from Android, I can now create a BehaviorSubject
that cleans itself up after the activity/fragment has been destroyed
class BehaviorSubject<String>( lifecycle: Lifecycle, coroutineScope: CoroutineScope, initialMessage = "Initial Message") : EventRelay<String>( lifecycle, coroutineScope, ConflatedBroadcastChannel(), initialMessage)
or I can create a PublishSubject
by using a buffered BroadcastChannel
class PublishSubject<String>( lifecycle: Lifecycle, coroutineScope: CoroutineScope, initialMessage = "Initial Message") : EventRelay<String>( lifecycle, coroutineScope, BroadcastChannel(Channel.BUFFERED), initialMessage)
And now I can do something like this
class MyActivity: Activity() { val behaviorSubject = BehaviorSubject( this@MyActivity.lifecycle, this@MyActivity.lifecycleScope ) override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) if (savedInstanceState == null) { behaviorSubject.eventFlow .onEach { stringEvent -> Log.d("BehaviorSubjectFlow", stringEvent) // "BehaviorSubjectFlow: Initial Message" // "BehaviorSubjectFlow: Next Message" } .flowOn(Dispatchers.Main) .launchIn(this@MyActivity.lifecycleScope) } } override fun onResume() { super.onResume() behaviorSubject.postEvent("Next Message") }}