How to handle errors in Spring reactor Mono or Flux? How to handle errors in Spring reactor Mono or Flux? spring spring

How to handle errors in Spring reactor Mono or Flux?


I think the first error is in the title: "Mono or Flux" is not related with the error handling.

  • Mono can only emit one item at the most (streams one element)
  • Flux can emit more complex stuff (i.e. List)

To handle errors you can follow this example:

return webClient.get()                .uri(url)                .retrieve()                .bodyToMono(ModelYouAreRetrieving.class)                .doOnError(throwable -> logger.error("Failed for some reason", throwable))                .onErrorReturn(new ModelYouAreRetrieving(...))                .block();


DoOnError will only perform side effects and assuming the findById are will return a Mono.Error() if it fails something like this should work.

return userRepository.findById(id)    .flatMap ( user ->         barRepository.findByUserId(user.getId())        .map((user,bar)-> Foo.builder().msg("Already exists").build())          .switchIfEmpty(barRepository.save(Bar.builder().userId(user.getId()).build())        .map(bar-> Foo.builder().msg("Created").build())    ))    .onErrorReturn(throwable -> Mono.just(handleError(throwable)));

The try catch will only work if you either call a blocking operation of the chain, or a runtime error occurs before you enter the reactive chain. the doOn operations do not modify the chain, they are used for side effects only. Since flatMap expects a producer, you will need to return a Mono from the call, and in this case if an error occurs, then it will just propagate the error. In all reactive chains the error will propagate unless otherwise handled.


Use Exceptions.propagate(e) which wraps a checked exception into a special runtime exception that can be handled by onError

Below Code tries to covers User attributes in upper case. Now, when it encounters kyle the checked exception is throws and MIKE is returned from onErrorReturn

@Testvoid Test19() {    Flux.fromIterable(Arrays.asList(new User("jhon", "10000"),            new User("kyle", "bot")))        .map(x -> {            try {                return toUpper(x);            } catch (TestException e) {                throw Exceptions.propagate(e);            }        })        .onErrorReturn(new User("MIKE", "BOT")).subscribe(x -> System.out.println(x));}protected final class TestException extends Exception {    private static final long serialVersionUID = -831485594512095557L;}private User toUpper(User user) throws TestException{    if (user.getName().equals("kyle")) {        throw new TestException();    }    return new User(user.getName().toUpperCase(), user.getProfession().toUpperCase());}

Output

User [name=JHON, profession=10000]User [name=MIKE, profession=BOT]