hivefans
12/14/2015 - 1:00 PM

Elasticsearch 批量index性能测试

Elasticsearch 批量index性能测试

package kingsoft.com;

import org.apache.log4j.BasicConfigurator;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

/**
 * Created by shidongjie on 16/4/19.
 */
public class EsPerformanceTest {

    private static final Logger logger = LoggerFactory.getLogger(EsPerformanceTest.class);

    private static ExecutorService executorService;

    private static long totalCount = 1000000;
    private static int bulkNum = 1000;
    private static int threads = 2 * Runtime.getRuntime().availableProcessors();

    private static CountDownLatch startLatch;
    private static CountDownLatch endLatch;

    private static AtomicLong totalTime = new AtomicLong(0);

    private BulkProcessor bulkProcessor;
    private String lastIp;
    private TransportClient client;

    private LongAdder count = new LongAdder();

    private void initESClient(){
        logger.info("初始化 es client");
        Settings settings = Settings.settingsBuilder()
                .put("action.bulk.compress", true)
                .put("transport.tcp.compress", true)
                .put("cluster.name", "DC-ES-HZ-Cluster").build();
        try {
            client = TransportClient.builder().settings(settings).build()
                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.4.69.160"), 9300))
                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.4.69.161"), 9300));
        }
        catch(Exception e) {
            e.printStackTrace();
        }

        //Client client = TransportClient.builder().settings(settings).build();
    }

    public void start(){
        initESClient();

        try {
            threads = Integer.valueOf(System.getProperty("thread.nums", String.valueOf(threads)));
            bulkNum = Integer.valueOf(System.getProperty("bulk.nums", String.valueOf(bulkNum))); // 创建索引批量提交的个数
            totalCount = Long.valueOf(System.getProperty("total.records", String.valueOf(totalCount)));

            InetAddress netAddress = InetAddress.getLocalHost();
            String ip = netAddress.getHostAddress();
            this.lastIp = ip.substring(ip.lastIndexOf(".") + 1);

            bulkProcessor = BulkProcessor.builder(
                    client,
                    new BulkProcessor.Listener() {
                        @Override
                        public void beforeBulk(long executionId,
                                               BulkRequest request) {  }

                        @Override
                        public void afterBulk(long executionId,
                                              BulkRequest request,
                                              BulkResponse response) { }

                        @Override
                        public void afterBulk(long executionId,
                                              BulkRequest request,
                                              Throwable failure) {  }
                    })
                    .setBulkActions(10000)
                    .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
                    .setFlushInterval(TimeValue.timeValueSeconds(5))
                    .setConcurrentRequests(1)
                    .setBackoffPolicy(
                            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                    .build();

            logger.info("测试线程数:" + threads + ", 总记录数:" + totalCount + ", 索引批量创建个数: " + bulkNum);

            executorService = Executors.newFixedThreadPool(threads);
            startLatch = new CountDownLatch(1);
            endLatch = new CountDownLatch(threads);

            creatWorker();

            startLatch.countDown();
            logger.info("启动ES测试...");
            endLatch.await();
            logger.info("总记录数:{}", count.longValue());
            logger.info("关闭bulkProcessor...");
            bulkProcessor.close();

            logger.info("ES测试结束...");
            logger.info("avg time:" + totalTime.longValue()/1000/threads);
            logger.info("tps:" + (totalCount/(totalTime.longValue()/1000/threads)));
            executorService.shutdown();
            System.exit(0);
        } catch(Exception e) {
            e.printStackTrace();
        }finally{
            if(client != null) {
                FlushRequest request = new FlushRequest("bigdata");
                client.admin().indices().flush(request);
                client.close();
            }
        }
    }

    public void creatWorker() {
        for(int i = 0; i < threads; i++) {
            final int index = i;
            executorService.submit(new Runnable() {
                public void run() {
                    try {
                        startLatch.await();
                        long startTime = System.currentTimeMillis();

                        creatIndex("T" + index);

                        long time = System.currentTimeMillis() - startTime;
                        totalTime.addAndGet(time);
                        endLatch.countDown();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    private void creatIndex(String prefixKey) {
        for(long j = 0; j < totalCount/threads; j++) {
            count.increment();
            String key = prefixKey + "-" + this.lastIp + j;
            IndexRequest iRequest = new IndexRequest("bigdata", "aaa", key);
            try {
                XContentBuilder contentBuilder = XContentFactory.jsonBuilder().startObject();
                contentBuilder.field("S_MDN", "S_MDN" + this.lastIp + j);
                contentBuilder.field("S_IMSI", "S_IMSI" + this.lastIp + j);
                contentBuilder.field("I_PAID_TYPE", "I_PAID_TYPE" + this.lastIp + j);
                contentBuilder.field("S_TRML_CODE", "S_TRML_CODE" + this.lastIp + j);
                contentBuilder.field("I_FCTY_ID", "I_FCTY_ID" + this.lastIp + j);
                contentBuilder.field("S_TM_OS", "S_TM_OS" + this.lastIp + j);
                contentBuilder.field("I_MODL_GENR", "I_MODL_GENR" + this.lastIp + j);
                contentBuilder.field("S_BSID", "S_BSID" + this.lastIp + j);
                contentBuilder.field("S_BSC_CODE", "S_BSC_CODE" + this.lastIp + j);
                contentBuilder.field("S_BTS_NAME", "S_BTS_NAME" + this.lastIp + j);
                contentBuilder.field("S_CELL_ID", "S_CELL_ID" + this.lastIp + j);
                contentBuilder.field("I_CITY_OID", "I_CITY_OID" + this.lastIp + j);
                contentBuilder.endObject();
                iRequest.source(contentBuilder);
                bulkProcessor.add(iRequest);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    public void stop() {
        logger.info("closed");
    }

    public static void main(String[] args) {
        BasicConfigurator.configure();
        EsPerformanceTest estest = new EsPerformanceTest();
        estest.start();
    }

}
group 'kingsoft'
version '1.0-SNAPSHOT'
apply plugin: 'java'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
    maven {
        url "https://maven-central.storage.googleapis.com"
    }
}

dependencies {
    compile 'org.elasticsearch:elasticsearch:2.3.1'
    compile 'org.slf4j:slf4j-api:1.7.21'
    compile 'org.slf4j:slf4j-log4j12:1.7.21'
    testCompile group: 'junit', name: 'junit', version: '4.11'
}