RXJS control observable invocation RXJS control observable invocation typescript typescript

RXJS control observable invocation


With Rx4's controlled Observable is still invoked when you subscribe

The controlled operator in RxJS 4 was really just controlling the flow of the Observable after the operator. Up to that point, it all pumps through and buffers at that operator. Consider this:

(RxJS 4) http://jsbin.com/yaqabe/1/edit?html,js,console

const source = Rx.Observable.range(0, 5).do(x => console.log('do' + x)).controlled();source.subscribe(x => console.log(x));setTimeout(() => {  console.log('requesting');  source.request(2);}, 1000);

You'll notice all five values from the Observable.range(0, 5) are emitted by the do immediately... then a one second (1000ms) pause before you get your two values.

So, it's really the illusion of backpressure control. In the end, there's an unbounded buffer in that operator. An array that is collecting everything that the Observable "above" it is sending down and waiting for you to dequeue it by calling request(n).


RxJS 5.0.0-beta.2 replicating controlled

At the time of this answer, the controlled operator does not exist in RxJS 5. This is for a few reasons: 1. No requests for it, and 2. Its name is clearly confusing (hence this question on StackOverflow)

How to replicate the behavior in RxJS 5 (for now): http://jsbin.com/metuyab/1/edit?html,js,console

// A subject we'll use to zip with the sourceconst controller = new Rx.Subject();// A request function to next values into the subjectfunction request(count) {  for (let i = 0; i < count; i++) {    controller.next(count);  }}// We'll zip our source with the subject, we don't care about what// comes out of the Subject, so we'll drop that.const source = Rx.Observable.range(0, 5).zip(controller, (x, _) => x);// Same effect as above Rx 4 examplesource.subscribe(x => console.log(x));// Same effect as above Rx 4 examplerequest(3);

Backpressure control

For "real backpressure control" right now, one solution is an iterator of promise. IoP isn't without its problems though, for one thing, there's an object allocation at each turn. Every value has a Promise associated to it. For another thing, cancellation isn't there, because it's promises.

A better, Rx-based approach is to have a Subject that "feeds" the top of your observable chain, and you compose in the rest.

Something like this: http://jsbin.com/qeqaxo/2/edit?js,console

// start with 5 valuesconst controller = new Rx.BehaviorSubject(5);// some observable source, in this case, an interval.const source = Rx.Observable.interval(100)const controlled = controller.flatMap(      // map your count into a set of values      (count) => source.take(count),       // additional mapping for metadata about when the block is done      (count, value, _, index) => {        return { value: value, done: count - index === 1 };       })      // when the block is done, request 5 more.      .do(({done}) => done && controller.next(5))      // we only care about the value for output      .map(({value}) => value);// start our subscriptioncontrolled.subscribe(x => {  console.log(x)});

... we have some plans for a flowable observable type with real backpressure control in the near future, too. That will be more exciting and much better for this sort of scenario.


You can seperate the start of the observable from subscription to it by publishing the observable. The published observable will only be started after calling connect on it.

Note that all subscribers will share a single subscription to the observable sequence.

var published = Observable.of(42).publish();// subscription does not start the observable sequencepublished.subscribe(value => console.log('received: ', value));// connect starts the sequence; subscribers will now receive valuespublished.connect();