morristech
12/29/2016 - 11:52 AM

OkHttp Observable Callback

OkHttp Observable Callback

    public Observable<FilesWrapper> download(List<Thing> things) {
        
        return Observable.from(things)
                .flatMap(thing -> {
                    File file = new File(getExternalCacheDir() + File.separator + thing.getName());
                    if (file.exists()) {
                        return Observable.just(file);
                    }

                    Request request = new Request.Builder().url(thing.getUrl()).build();

                    final ObservableCallback callback = new ObservableCallback();
                    client.newCall(request).enqueue(callback);
                    
                    return callback.getObservable().map(response -> {
                        try (BufferedSink sink = Okio.buffer(Okio.sink(file))) {
                            final ResponseBody body = response.body();
                            sink.writeAll(body.source());
                        } catch (IOException io) {
                            throw OnErrorThrowable.from(OnErrorThrowable.addValueAsLastCause(io, thing));
                        }
                        
                        return file;
                    });
                })
                .toList()
                .map(files -> new FilesWrapper(files));
    }
    
    private static class ObservableCallback implements Callback {
        private final AsyncSubject<Response> subject = AsyncSubject.create();
        
        @Override
        public void onFailure(Request request, IOException e) {
            subject.onError(OnErrorThrowable.from(OnErrorThrowable.addValueAsLastCause(e, request)));
        }

        @Override
        public void onResponse(Response response) throws IOException {
            subject.onNext(response);
            
            subject.onCompleted();
        }
        
        public Observable<Response> getObservable() {
            return subject.asObservable();
        }
    }