jinxuan
8/21/2015 - 6:34 AM

rangequery.java

package Examples;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

import com.vividsolutions.jts.geom.Envelope;
import com.vividsolutions.jts.geom.Point;

import GeoSpark.PointRDD;
import GeoSpark.RectangleRDD;
import GeoSpark.SpatialPairRDD;

import com.vividsolutions.jts.geom.Envelope;

public class rangeQuery {
	public static void main(String[] args) throws IOException {
		String IP=args[0];
		String set1="hdfs://"+IP+":9000/"+args[1];
		Integer partitions1=Integer.parseInt(args[2]);
		Integer offset1=Integer.parseInt(args[3]);
		String Splitter1=args[4];
		String outputlocation="hdfs://"+IP+":9000/test/tempResult.txt";
		URI uri=URI.create(outputlocation);
		Path pathhadoop=new Path(uri);
		SparkConf conf=new SparkConf().setAppName("Geospark_Range").setMaster("spark://"+IP+":7077");//.set("spark.cores.max", "136").set("spark.executor.memory", "50g").set("spark.executor.cores", "8").set("spark.local.ip", IP).set("spark.driver.memory", "13g").set("spark.driver.cores", "8").set("spark.driver.host", IP).set("spark.eventLog.enabled","true").set("spark.eventLog.dir", "/tmp/spark-events/").set("spark.history.retainedApplications", "1000");//.set("spark.reducer.maxSizeInFlight","1000m" );//.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").registerKryoClasses(kryo).set("spark.shuffle.memoryFraction", "0.2").set("spark.storage.memoryFraction", "0.6");//.set("spark.akka.failuredetector.threshold", "300000").set("spark.akka.threads", "300").set("spark.akka.timeout", "30000").set("spark.akka.askTimeout", "300").set("spark.storage.blockManagerTimeoutIntervalMs", "180000").set("spark.network.timeout", "10000").set("spark.blockManagerHeartBeatMs", "80000");
		JavaSparkContext spark=new JavaSparkContext(conf);
		Configuration confhadoop=spark.hadoopConfiguration();
		confhadoop.set("fs.hdfs.impl", 
			        org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
			    );
		confhadoop.set("fs.file.impl",
			        org.apache.hadoop.fs.LocalFileSystem.class.getName()
			    );
		FileSystem filehadoop =FileSystem.get(uri, confhadoop);
		filehadoop.delete(pathhadoop, true);
		System.out.println("Old output file has been deleted!");
		spark.addJar("target/GeoSpark-0.1-GeoSpark.jar");
		Envelope envelope = new Envelope(-100.0, 100.0, -100.0, 100.0);
		PointRDD pointRDD=new PointRDD(spark,set1,offset1,Splitter1,partitions1);
		PointRDD rangeQueryRDD=pointRDD.SpatialRangeQuery(envelope, 0);
		rangeQueryRDD.getPointRDD().saveAsTextFile(outputlocation);
		spark.stop();
		
	
	}
}