swuecho
11/19/2016 - 5:16 AM

Tarantool Quick Test

Tarantool Quick Test

// Tarantool quick test
// Copyright, Dennis Anikin 2016
//
// Quick disclaimer:
//
// This test shows 500K-1000K transactions per second on one CPU core
// and 600K-1600K queries per second on one CPU core.
//
// Based on the $6.57 per-month-price for the AWS t2.micro instance we can afford the tremendous number of 630bln queries for just $1
//
// A typical output of this test should look like this:
// https://cloclo18.datacloudmail.ru/weblink/view/28zo6HFUdUrh/img-2016-03-10-14-43-48.png?etag=E3D0131D9EFE2C928BD2CC819096535E09F4AFF1&key=98e4a8efef63f289046072a10c02a508fb204c82
//
// Here is a quick instruction how to run this test on Centos 6.x machines. All you need to run this
// test is a couple of such machines.
//
// Try to do the following things from point 1 to point 37
//
// Getting ready:
//
// 1. Launch two AWS t2.micro instances with AWS Linux: https://aws.amazon.com or
//    two Google 1v CPU instances on Centos 6.x: https://console.cloud.google.com. Otherwise you
//    can use any Centos 6.x instances on any cloud platform or in your own datacenter.
// 2. Connect via SSH terminal to both of them.
// 3. Head over to http://tarantool.org/download.html
// 4. Click Amazon Linux.
// 5. Copy the command set.
// 6. Open the server terminal
// 7. Paste everything to the console and press Enter.
// 8. Press “y” and then enter whenever the console asks you to do so.
// 9. sudo ln -sf /etc/tarantool/instances.available/example.lua /etc/tarantool/instances.enabled/example.lua
// 10. sudo sed "s/space:create_index('primary')/space:create_index('primary', {type='HASH'})/g" -i /etc/tarantool/instances.enabled/example.lua
// 11. sudo sed 's/wal_mode = ".*"/wal_mode = "write"/g' -i /etc/tarantool/instances.enabled/example.lua
// 11.5 sudo sed "s/listen = 'localhost:3301';/listen = '*:3301';/g" -i /etc/tarantool/instances.enabled/example.lua
// 12. sudo tarantoolctl start example
// 13. Open the client terminal
// 14. Head over to http://tarantool.org/download.html
// 15. Click Amazon Linux.
// 16. Copy the command set except the last line.
// 17. Paste everything to the console and press Enter.
// 18. Press “y” and then Enter whenever the console asks you to do so.
// 19. sudo yum -y install tarantool-c tarantool-c-devel
// 20. Press “y” and then Enter whenever the console asks you to do so.
// 20.5. Copy-paste the following command to the console and press Enter

/*

sudo tee /etc/yum.repos.d/tarantool_1_6.repo <<- EOF
[tarantool_1_6]
name=EnterpriseLinux-\$releasever - Tarantool
baseurl=http://download.tarantool.org/tarantool/1.6/el/6/\$basearch/
gpgkey=http://download.tarantool.org/tarantool/1.6/gpgkey
repo_gpgcheck=1
gpgcheck=0
enabled=1

[tarantool_1_6-source]
name=EnterpriseLinux-\$releasever - Tarantool Sources
baseurl=http://download.tarantool.org/tarantool/1.6/el/6/SRPMS
gpgkey=http://download.tarantool.org/tarantool/1.6/gpgkey
repo_gpgcheck=1
gpgcheck=0
EOF

*/

//
// The test
//
// in the client console
//
// 21. mkdir tar_test
// 22. cd tar_test
// 23. cat >tar_test.c // Then copy and paste the whole file tar_test.c that you're reading to the console and press CTRL+D
// 24. sudo yum install gcc
// 25. cc -g -lpthread -O3 -std=gnu99 tar_test.c -ltarantool -o tar_test
//
// In the server console
//
// 26. ifconfig
// 27. Copy IP address.
//
// In the client console
//
// 28. ./tar_test 0.0.0.0:3301 write 3 10000000 // Substitute 0.0.0.0 with the IP address pasted from the clipboard.
// 
// 29. See one million transactions per second!!!
//
// In the server console
//
// 30. top // CPU is the bottleneck.
// 31. sudo ls -la /var/lib/tarantool/example // Number of xlogs is growing and xlogs size is growing.
// 32. sudo yum install sysstat
// 33. iostat -x 3 // Disk is 15-20% busy, 100-120K sectore per second
// 34. fdisk -l // The sector size is 512 bytes. Which means that totally we commit transactions at the speed of 50-60Mb per second.
//
// In the client console
//
// 35. ./tar_test 0.0.0.0:3301 read 5 10000000 // Substitute 0.0.0.0 with the server IP that you got above and you'll get
//     700K-1600K queries per second.
//
// In the server console
//
// 36. top // 30-40% CPU is used.
// 37. iostat -x 3 // No disk activity.
//
// The bottom line:
// One million transactions per second on one CPU core!
// 700K-1600K queries per second on 1/3 CPU core!

#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <sys/time.h>
#include <tarantool/tarantool.h>
#include <tarantool/tnt_net.h>
#include <tarantool/tnt_opt.h>
#include <pthread.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>

// General parameters
typedef struct test_params
{
	char address[256];
	int is_read_test;
	int num_ops;
} test_params;

// Parameters per thread
typedef struct thread_data
{
	test_params *tp;
	long long num_requests, num_responses;

	struct tnt_stream *tnt;
} thread_data;

int NUM_THREADS = 30;

// The thread that reads responses
void *read_thread(void *ctx)
{
	thread_data *td = (thread_data*)ctx;
	struct tnt_stream *tnt = td->tnt;

	struct tnt_reply reply;
	tnt_reply_init(&reply);

	int num_errs = 0;

	// Iterate through all the operations
	for (int i = 0; i < td->tp->num_ops; ++i)
	{
		// Read a reply
                int r = tnt->read_reply(tnt, &reply);
                if (r == 1)
                {
                	// This means that we're reading responses way to fast and there is nothing in the socket
                	// Let's just wait a little bit
                	// It is better to wait 1 ms one in a while than to use mutex every time in terms of CPU  usage
                        --i;
                        usleep(1000);
                        continue;
                }
                else
                if (r == -1)
		{
			--i;
			printf("Read reply failed %s\n", reply.error);
			++num_errs;

			// To many errors
			if (num_errs >= 5)
				break;
		}
		else
		{
			num_errs = 0;
			++td->num_responses;
		}

 		tnt_reply_free(&reply);
	}
}

void *do_test(void *ctx)
{
	thread_data *td = (thread_data*)ctx;

	struct tnt_stream *tnt = tnt_net(NULL);

	td->tnt = tnt;

	tnt_set(tnt, TNT_OPT_URI, td->tp->address);

	// Connect to the Tarantool server
	if (tnt_connect(tnt) < 0)
	{
		printf("Connection refused\n");
       		exit(-1);
	}

	struct tnt_stream *tuple = tnt_object(NULL);

        int k = (int)time(NULL);

	struct timeval tv, prev_tv;

	// Create the thread that reads responses
	pthread_t read_pid;
        int r = pthread_create(&read_pid, NULL, &read_thread, td);
        if (r < 0)
        {
        	fprintf(stderr, "multithread_test: could not create thread, r=%d, errno='%s'\n", r, strerror(errno));
                fflush(stderr);
        }

	// Iterate through all the operations
	for (int i = 0; i < td->tp->num_ops; ++i)
        {
		// Form a request to a server
                if (td->tp->is_read_test)
                {
                        tnt_object_add_array(tuple, 1);
                        tnt_object_add_int(tuple, k + i);

                        tnt_select(tnt, 512, 0, UINT32_MAX, 0, 0, tuple);
               	}
                else
                {
                        tnt_object_add_array(tuple, 2);
                        tnt_object_add_int(tuple, k + i);

                   	tnt_object_add_int(tuple, k + i);

                        tnt_replace(tnt, 512, tuple);
                }

		// Send a request to the server
		tnt_flush(tnt);

		++td->num_requests;

                tnt_object_reset(tuple);
	}

	// Join the read thread before clear all the resources
	void *v;
	pthread_join(read_pid, &v);

        tnt_close(tnt); 
        tnt_stream_free(tuple);
        tnt_stream_free(tnt);
}

// The thread that prints statistics one in a second
void *timer_thread(void *ctx)
{
	thread_data *tds = (thread_data*)ctx;

	long long prev_reqs = 0, prev_resps = 0;
	while (1)
	{
		sleep(1);

		long long reqs = 0, resps = 0;
		for (int i = 0;i < NUM_THREADS;++i)
		{
			reqs += tds[i].num_requests;
			resps += tds[i].num_responses;
		}
		
		long long rps = reqs - prev_reqs, rps2 = resps - prev_resps;

		// If nothing is happening and the pending queue is empty then stop
		if (!rps && !rps2 && reqs == resps)
			break;

		// Latency is very rough here. See RPS instead. The main target of this test was RPS
                printf("Requests per second: %d, Responses per second: %d, Pending requests: %d, Latency: %f ms\n",
                        (int)rps, (int)rps2, (int)(reqs-resps), 1000.0 * (reqs-resps) / rps2);
        	fflush(stdout);

		prev_reqs = reqs;
		prev_resps = resps;
	}
}

int main(int argc, char *argv[])
{
	if (argc < 5)
	{
		printf("Usage: tar_test address:port read|write num_threads num_ops\n");
		return 0;
	}

	test_params tp;
	if (strlen(argv[1]) < 255)
		strcpy(tp.address, argv[1]);
	else
	{
		fprintf(stderr, "Address is too long\n");
		return 1;
	}

	tp.is_read_test = !strcmp(argv[2], "read");
	NUM_THREADS = atoi(argv[3]);
	tp.num_ops = atoi(argv[4]);

        pthread_t pids[NUM_THREADS];

	thread_data tds[NUM_THREADS];

	// Start all the threads
        for (int i = 0;i < NUM_THREADS;++i)
        {
		tds[i].tp = &tp;
		tds[i].num_requests = tds[i].num_responses = 0;
                int r = pthread_create(pids + i, NULL, &do_test, tds + i);
                if (r < 0)
                {
                        fprintf(stderr, "multithread_test: could not create thread, i=%d, r=%d, errno='%s'\n", i, r, strerror(errno));
                        fflush(stderr);
                }
        }

	// Start a timer thread
	pthread_t timer_pid;
	int r = pthread_create(&timer_pid, NULL, &timer_thread, tds);
        if (r < 0)
        {
                fprintf(stderr, "multithread_test: could not create thread, r=%d, errno='%s'\n", r, strerror(errno));
                fflush(stderr);
        } 

	// Join all the threads
        for (int i = 0;i < NUM_THREADS;++i)
        {
                void *v;
                int r = pthread_join(pids[i], &v);
                if (r < 0)
                {
                        fprintf(stderr, "multithread_test: could not join thread, i=%d, r=%d, errno='%s'\n", i, r, strerror(errno));
                        fflush(stderr);
                }
        }
        
        // Join the timer thread
	void *v;
	pthread_join(timer_pid, &v);

	return 0;
}