Falci
5/3/2017 - 8:07 PM

PessoaModem.java

package me.falci;

import io.reactivex.Observable;

/**
 * @author Falci
 *
 */
public class PessoaModem {


    public static void main( String[] args ) {

        new PessoaModem().run();
    }

    public void run() {

        final long millis = System.currentTimeMillis();

        endpointPessoa()
            .filter(p -> {
                if(p.getModens() != 1) {
                    throw new IllegalArgumentException("Não pode fazer o reset (N modens)");
                }

                return true;
            })
            .flatMap(pessoa -> this.endpointModem().map(modem -> {
                if(!modem) {
                    throw new IllegalArgumentException("reset falhou");
                }

                return pessoa;
            }))
            .map(p -> "Modem do " + p.getNome() + " resetado com sucesso")

        .doOnTerminate(() -> System.out.println((System.currentTimeMillis() - millis) + " ms"))
        .subscribe(System.out::println,
                        e -> System.err.println(e.getMessage()));
    }

    public Observable<Pessoa> endpointPessoa() {
        return Observable.create(emitter -> {
            Thread.sleep(1000);
            emitter.onNext(new Pessoa("John", 2));
            emitter.onComplete();
        });
    }

    public Observable<Boolean> endpointModem() {
        return Observable.create(emitter -> {
            Thread.sleep(1000);
            emitter.onNext(Boolean.FALSE);
            emitter.onComplete();
        });
    }

    public static class Pessoa {

        private final String nome;
        private final Integer modens;

        public Pessoa(String nome, Integer modens){

            this.nome = nome;
            this.modens = modens;
        }

        public String getNome() {
            return nome;
        }

        public Integer getModens() {
            return modens;
        }
    }
}