How can I send items to a Kotlin.Flow (like a Behaviorsubject) How can I send items to a Kotlin.Flow (like a Behaviorsubject) android android

How can I send items to a Kotlin.Flow (like a Behaviorsubject)


If you want to get the latest value on subscription/collection you should use a ConflatedBroadcastChannel:

private val channel = ConflatedBroadcastChannel<Boolean>()

This will replicate BehaviourSubject, to expose the channel as a Flow:

// Repositoryfun observe() {  return channel.asFlow()}

Now to send an event/value to that exposed Flow simple send to this channel.

// Repositoryfun someLogicalOp() {  channel.send(false) // This gets sent to the ViewModel/Presenter and printed.}

Console:

false

If you wish to only receive values after you start collecting you should use a BroadcastChannel instead.

To make it clear:

Behaves as an Rx's PublishedSubject

private val channel = BroadcastChannel<Boolean>(1)fun broadcastChannelTest() {  // 1. Send event  channel.send(true)  // 2. Start collecting  channel    .asFlow()    .collect {      println(it)    }  // 3. Send another event  channel.send(false)}

false

Only false gets printed as the first event was sent before collect { }.


Behaves as an Rx's BehaviourSubject

private val confChannel = ConflatedBroadcastChannel<Boolean>()fun conflatedBroadcastChannelTest() {  // 1. Send event  confChannel.send(true)  // 2. Start collecting  confChannel    .asFlow()    .collect {      println(it)    }  // 3. Send another event  confChannel.send(false)}

true

false

Both events are printed, you always get the latest value (if present).

Also, want to mention Kotlin's team development on DataFlow (name pending):

Which seems better suited to this use case (as it will be a cold stream).


Take a look at MutableStateFlow documentation as it is a replacement for ConflatedBroadcastChannel that is going to be deprecated, very soon.

For a better context, look at the whole discussion on the original issue on Kotlin's repository on Github.


UPDATE:

Kotlin Coroutines 1.4.0 is now available with MutableSharedFlow, which replaces the need for Channel. MutableSharedFlow cleanup is also built in so you don't need to manually OPEN & CLOSE it, unlike Channel. Please use MutableSharedFlow if you need a Subject-like api for Flow

ORIGINAL ANSWER

Since your question had the android tag I'll add an Android implementation that allows you to easily create a BehaviorSubject or a PublishSubject that handles its own lifecycle.

This is relevant in Android because you don't want to forget to close the channel and leak memory. This implementation avoids the need to explicitly "dispose" of the reactive stream by tying it to the creation and destruction of the Fragment/Activity. Similar to LiveData

interface EventReceiver<Message> {    val eventFlow: Flow<Message>}interface EventSender<Message> {    fun postEvent(message: Message)    val initialMessage: Message?}class LifecycleEventSender<Message>(    lifecycle: Lifecycle,    private val coroutineScope: CoroutineScope,    private val channel: BroadcastChannel<Message>,    override val initialMessage: Message?) : EventSender<Message>, LifecycleObserver {    init {        lifecycle.addObserver(this)    }    override fun postEvent(message: Message) {        if (!channel.isClosedForSend) {            coroutineScope.launch { channel.send(message) }        } else {            Log.e("LifecycleEventSender","Channel is closed. Cannot send message: $message")        }    }    @OnLifecycleEvent(Lifecycle.Event.ON_CREATE)    fun create() {        channel.openSubscription()        initialMessage?.let { postEvent(it) }    }    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)    fun destroy() {        channel.close()    }}class ChannelEventReceiver<Message>(channel: BroadcastChannel<Message>) :    EventReceiver<Message> {    override val eventFlow: Flow<Message> = channel.asFlow()}abstract class EventRelay<Message>(    lifecycle: Lifecycle,    coroutineScope: CoroutineScope,    channel: BroadcastChannel<Message>,    initialMessage: Message? = null) : EventReceiver<Message> by ChannelEventReceiver<Message>(channel),    EventSender<Message> by LifecycleEventSender<Message>(        lifecycle,        coroutineScope,        channel,        initialMessage    )

By using the Lifecycle library from Android, I can now create a BehaviorSubject that cleans itself up after the activity/fragment has been destroyed

class BehaviorSubject<String>(    lifecycle: Lifecycle,    coroutineScope: CoroutineScope,    initialMessage = "Initial Message") : EventRelay<String>(    lifecycle,    coroutineScope,    ConflatedBroadcastChannel(),    initialMessage)

or I can create a PublishSubject by using a buffered BroadcastChannel

class PublishSubject<String>(    lifecycle: Lifecycle,    coroutineScope: CoroutineScope,    initialMessage = "Initial Message") : EventRelay<String>(    lifecycle,    coroutineScope,    BroadcastChannel(Channel.BUFFERED),    initialMessage)

And now I can do something like this

class MyActivity: Activity() {    val behaviorSubject = BehaviorSubject(        this@MyActivity.lifecycle,        this@MyActivity.lifecycleScope    )    override fun onCreate(savedInstanceState: Bundle?) {        super.onCreate(savedInstanceState)        if (savedInstanceState == null) {             behaviorSubject.eventFlow                .onEach { stringEvent ->                    Log.d("BehaviorSubjectFlow", stringEvent)                    // "BehaviorSubjectFlow: Initial Message"                    // "BehaviorSubjectFlow: Next Message"                }                .flowOn(Dispatchers.Main)                .launchIn(this@MyActivity.lifecycleScope)        }    }    override fun onResume() {        super.onResume()        behaviorSubject.postEvent("Next Message")    }}