ragnarokkrr
9/28/2016 - 12:47 PM

Embedding Kafka+Zookeeper for testing purposes. Tested with Apache Kafka 0.8

Embedding Kafka+Zookeeper for testing purposes. Tested with Apache Kafka 0.8

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;
 
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 
public class ZooKeeperLocal {
	
	ZooKeeperServerMain zooKeeperServer;
	
	public ZooKeeperLocal(Properties zkProperties) throws FileNotFoundException, IOException{
		QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
		try {
		    quorumConfiguration.parseProperties(zkProperties);
		} catch(Exception e) {
		    throw new RuntimeException(e);
		}
 
		zooKeeperServer = new ZooKeeperServerMain();
		final ServerConfig configuration = new ServerConfig();
		configuration.readFrom(quorumConfiguration);
		
		
		new Thread() {
		    public void run() {
		        try {
		            zooKeeperServer.runFromConfig(configuration);
		        } catch (IOException e) {
		            System.out.println("ZooKeeper Failed");
		            e.printStackTrace(System.err);
		        }
		    }
		}.start();
	}
}
public class MyTest {
 
	static KafkaLocal kafka;
 
	@BeforeClass
	public static void startKafka(){
		Properties kafkaProperties = new Properties();
		Properties zkProperties = new Properties();
		
		try {
			//load properties
			kafkaProperties.load(Class.class.getResourceAsStream("/kafkalocal.properties"));
			zkProperties.load(Class.class.getResourceAsStream("/zklocal.properties"));
			
			//start kafka
			kafka = new KafkaLocal(kafkaProperties, zkProperties);
			Thread.sleep(5000);
		} catch (Exception e){
			e.printStackTrace(System.out);
			fail("Error running local Kafka broker");
			e.printStackTrace(System.out);
		}
		
		//do other things
	}
 
	@Test
	public void testSomething() {
	}
 
}
import java.io.IOException;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
 
 
public class KafkaLocal {
 
	public KafkaServerStartable kafka;
	public ZooKeeperLocal zookeeper;
	
	public KafkaLocal(Properties kafkaProperties, Properties zkProperties) throws IOException, InterruptedException{
		KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
		
		//start local zookeeper
		System.out.println("starting local zookeeper...");
		zookeeper = new ZooKeeperLocal(zkProperties);
		System.out.println("done");
		
		//start local kafka broker
		kafka = new KafkaServerStartable(kafkaConfig);
		System.out.println("starting local kafka broker...");
		kafka.startup();
		System.out.println("done");
	}
	
	
	public void stop(){
		//stop kafka broker
		System.out.println("stopping kafka...");
		kafka.shutdown();
		System.out.println("done");
	}
	
}