package me.falci;
import io.reactivex.Observable;
/**
* @author Falci
*
*/
public class PessoaModem {
public static void main( String[] args ) {
new PessoaModem().run();
}
public void run() {
final long millis = System.currentTimeMillis();
endpointPessoa()
.filter(p -> {
if(p.getModens() != 1) {
throw new IllegalArgumentException("Não pode fazer o reset (N modens)");
}
return true;
})
.flatMap(pessoa -> this.endpointModem().map(modem -> {
if(!modem) {
throw new IllegalArgumentException("reset falhou");
}
return pessoa;
}))
.map(p -> "Modem do " + p.getNome() + " resetado com sucesso")
.doOnTerminate(() -> System.out.println((System.currentTimeMillis() - millis) + " ms"))
.subscribe(System.out::println,
e -> System.err.println(e.getMessage()));
}
public Observable<Pessoa> endpointPessoa() {
return Observable.create(emitter -> {
Thread.sleep(1000);
emitter.onNext(new Pessoa("John", 2));
emitter.onComplete();
});
}
public Observable<Boolean> endpointModem() {
return Observable.create(emitter -> {
Thread.sleep(1000);
emitter.onNext(Boolean.FALSE);
emitter.onComplete();
});
}
public static class Pessoa {
private final String nome;
private final Integer modens;
public Pessoa(String nome, Integer modens){
this.nome = nome;
this.modens = modens;
}
public String getNome() {
return nome;
}
public Integer getModens() {
return modens;
}
}
}