Elasticsearch 批量index性能测试
package kingsoft.com;
import org.apache.log4j.BasicConfigurator;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
/**
* Created by shidongjie on 16/4/19.
*/
public class EsPerformanceTest {
private static final Logger logger = LoggerFactory.getLogger(EsPerformanceTest.class);
private static ExecutorService executorService;
private static long totalCount = 1000000;
private static int bulkNum = 1000;
private static int threads = 2 * Runtime.getRuntime().availableProcessors();
private static CountDownLatch startLatch;
private static CountDownLatch endLatch;
private static AtomicLong totalTime = new AtomicLong(0);
private BulkProcessor bulkProcessor;
private String lastIp;
private TransportClient client;
private LongAdder count = new LongAdder();
private void initESClient(){
logger.info("初始化 es client");
Settings settings = Settings.settingsBuilder()
.put("action.bulk.compress", true)
.put("transport.tcp.compress", true)
.put("cluster.name", "DC-ES-HZ-Cluster").build();
try {
client = TransportClient.builder().settings(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.4.69.160"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.4.69.161"), 9300));
}
catch(Exception e) {
e.printStackTrace();
}
//Client client = TransportClient.builder().settings(settings).build();
}
public void start(){
initESClient();
try {
threads = Integer.valueOf(System.getProperty("thread.nums", String.valueOf(threads)));
bulkNum = Integer.valueOf(System.getProperty("bulk.nums", String.valueOf(bulkNum))); // 创建索引批量提交的个数
totalCount = Long.valueOf(System.getProperty("total.records", String.valueOf(totalCount)));
InetAddress netAddress = InetAddress.getLocalHost();
String ip = netAddress.getHostAddress();
this.lastIp = ip.substring(ip.lastIndexOf(".") + 1);
bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { }
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { }
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { }
})
.setBulkActions(10000)
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setFlushInterval(TimeValue.timeValueSeconds(5))
.setConcurrentRequests(1)
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
logger.info("测试线程数:" + threads + ", 总记录数:" + totalCount + ", 索引批量创建个数: " + bulkNum);
executorService = Executors.newFixedThreadPool(threads);
startLatch = new CountDownLatch(1);
endLatch = new CountDownLatch(threads);
creatWorker();
startLatch.countDown();
logger.info("启动ES测试...");
endLatch.await();
logger.info("总记录数:{}", count.longValue());
logger.info("关闭bulkProcessor...");
bulkProcessor.close();
logger.info("ES测试结束...");
logger.info("avg time:" + totalTime.longValue()/1000/threads);
logger.info("tps:" + (totalCount/(totalTime.longValue()/1000/threads)));
executorService.shutdown();
System.exit(0);
} catch(Exception e) {
e.printStackTrace();
}finally{
if(client != null) {
FlushRequest request = new FlushRequest("bigdata");
client.admin().indices().flush(request);
client.close();
}
}
}
public void creatWorker() {
for(int i = 0; i < threads; i++) {
final int index = i;
executorService.submit(new Runnable() {
public void run() {
try {
startLatch.await();
long startTime = System.currentTimeMillis();
creatIndex("T" + index);
long time = System.currentTimeMillis() - startTime;
totalTime.addAndGet(time);
endLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
private void creatIndex(String prefixKey) {
for(long j = 0; j < totalCount/threads; j++) {
count.increment();
String key = prefixKey + "-" + this.lastIp + j;
IndexRequest iRequest = new IndexRequest("bigdata", "aaa", key);
try {
XContentBuilder contentBuilder = XContentFactory.jsonBuilder().startObject();
contentBuilder.field("S_MDN", "S_MDN" + this.lastIp + j);
contentBuilder.field("S_IMSI", "S_IMSI" + this.lastIp + j);
contentBuilder.field("I_PAID_TYPE", "I_PAID_TYPE" + this.lastIp + j);
contentBuilder.field("S_TRML_CODE", "S_TRML_CODE" + this.lastIp + j);
contentBuilder.field("I_FCTY_ID", "I_FCTY_ID" + this.lastIp + j);
contentBuilder.field("S_TM_OS", "S_TM_OS" + this.lastIp + j);
contentBuilder.field("I_MODL_GENR", "I_MODL_GENR" + this.lastIp + j);
contentBuilder.field("S_BSID", "S_BSID" + this.lastIp + j);
contentBuilder.field("S_BSC_CODE", "S_BSC_CODE" + this.lastIp + j);
contentBuilder.field("S_BTS_NAME", "S_BTS_NAME" + this.lastIp + j);
contentBuilder.field("S_CELL_ID", "S_CELL_ID" + this.lastIp + j);
contentBuilder.field("I_CITY_OID", "I_CITY_OID" + this.lastIp + j);
contentBuilder.endObject();
iRequest.source(contentBuilder);
bulkProcessor.add(iRequest);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void stop() {
logger.info("closed");
}
public static void main(String[] args) {
BasicConfigurator.configure();
EsPerformanceTest estest = new EsPerformanceTest();
estest.start();
}
}
group 'kingsoft'
version '1.0-SNAPSHOT'
apply plugin: 'java'
sourceCompatibility = 1.8
repositories {
mavenCentral()
maven {
url "https://maven-central.storage.googleapis.com"
}
}
dependencies {
compile 'org.elasticsearch:elasticsearch:2.3.1'
compile 'org.slf4j:slf4j-api:1.7.21'
compile 'org.slf4j:slf4j-log4j12:1.7.21'
testCompile group: 'junit', name: 'junit', version: '4.11'
}