Can RxJava reduce() be unsafe when parallelized? Can RxJava reduce() be unsafe when parallelized? multithreading multithreading

Can RxJava reduce() be unsafe when parallelized?


The problem lies in the shared state between realizations of the chain. This is pitfall # 8 in my blog:

Shared state in an Observable chain

Let's assume you are dissatisfied with the performance or the type of the List the toList() operator returns and you want to roll your own aggregator instead of it. For a change, you want to do this by using existing operators and you find the operator reduce():

Observable<Vector<Integer>> list = Observable    .range(1, 3)    .reduce(new Vector<Integer>(), (vector, value) -> {        vector.add(value);        return vector;    });list.subscribe(System.out::println);list.subscribe(System.out::println);list.subscribe(System.out::println);

When you run the 'test' calls, the first prints what you'd expect, but the second prints a vector where the range 1-3 appears twice and the third subscribe prints 9 elements!

The problem is not with the reduce() operator itself but with the expectation surrounding it. When the chain is established, the new Vector passed in is a 'global' instance and will be shared between all evaluation of the chain.

Naturally, there is a way of fixing this without implementing an operator for the whole purpose (which should be quite simple if you see the potential in the previous CounterOp):

Observable<Vector<Integer>> list2 = Observable    .range(1, 3)    .reduce((Vector<Integer>)null, (vector, value) -> {        if (vector == null) {            vector = new Vector<>();        }        vector.add(value);        return vector;    });list2.subscribe(System.out::println);list2.subscribe(System.out::println);list2.subscribe(System.out::println);

You need to start with null and create a vector inside the accumulator function, which now isn't shared between subscribers.

Alternatively, you can look into the collect() operator which has a factory callback for the initial value.

The rule of thumb here is that whenever you see an aggregator-like operator taking some plain value, be cautious as this 'initial value' will most likely be shared across all subscribers and if you plan to consume the resulting stream with multiple subscribers, they will clash and may give you unexpected results or even crash.


According to the Observable contract, an observable must not make onNext calls in parallel, so you have to modify your strings Observable to respect this. You can use the serialize operator to achieve this.