eyalgo
2/12/2015 - 1:27 PM

ReadWriteKeys.java

import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;

import rx.Observable;

import com.couchbase.client.java.AsyncBucket;
import com.couchbase.client.java.CouchbaseCluster;

public class ReadKeys {

	public ReadKeys() {
	}

	public static void main(String[] args) throws IOException, InterruptedException {
		CouchbaseCluster cluster = CouchbaseCluster.create("127.0.0.1");
		AsyncBucket bucket = cluster.openBucket("some-bucket").async();
		final FileWriter fw = new FileWriter("output-couch-keys");
		String fileLocation = "some location of insert keys";
		List<String> lines = Files.lines(Paths.get(fileLocation)).collect(Collectors.toList());

		//@formatter:off
//		Observable
//			.from(lines)
//			.flatMap(new Func1<String, Observable<JsonDocument>>() {
//		        @Override
//		        public Observable<JsonDocument> call(String id) {
//		            return bucket.get(id);
//		        }
//			})
//			.subscribe(new Action1<JsonDocument>() {
//		        @Override
//		        public void call(JsonDocument document) {
//		        	writeToFile(fw, document.id());
//		        }
//			});
		//@formatter:on

		//@formatter:off
		Observable
	    	.from(lines)
	    	.flatMap(bucket::get)
	    	.subscribe(document -> writeToFile(fw, document.id()));
		//@formatter:on

		Thread.sleep(2000);
		fw.close();

	}

	private static void writeToFile(FileWriter fw, String key) {
		try {
			fw.write(String.format("%s%n", key));
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
	}
}