mbohun
2/19/2013 - 12:33 PM

gistfile1.html

// g++ udp_mon.cpp -o udp_mon \
//                 -pthread \
//                 -I /usr/local/boost_1_51_0/include \
//                 -L /usr/local/boost_1_51_0/lib \
//                 -lboost_system -lboost_chrono -lboost_thread \
//                 `pkg-config libwebsockets --cflags --libs`
//
#include <cstdlib> //for strtol
#include <cstring>
#include <iostream>
#include <sstream>
#include <string>
#include <boost/asio.hpp>
#include <boost/chrono.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include "fifo.h"

#include <libwebsockets.h>

using boost::asio::ip::udp;

namespace {
	const std::string MSG_HELP =
		"USAGE: ./udp_mon <monitoring input port> [<forward host> <forward port>]\n"
		"\n"
		"\tEXAMPLE:\n"
		"\t./udp_mon 2345                    monitor incoming UDP stream on port 2345\n"
		"\t./udp_mon 2345 192.168.0.86 2346  monitor incoming UDP stream on port 2345\n"
                "\t                                  and forward it to 192.168.0.86:2346\n";
	enum {
		// the max possible size of the UDP data chunk in bytes - ridiculous
		UDP_DATA_SIZE_MAX       = 65507,

		// cat /sys/class/net/eth0/mtu => 1500
		UDP_DATA_SIZE_MAX_MTU   = (1500 - (16 + 20 + 8)),

		// this is the static 'default' MAX for the datagram data/payload
		DATA_SIZE_MAX           = UDP_DATA_SIZE_MAX_MTU,

		// arbitrary size of statically pre-allocated 'pool' of struct packet
		STATIC_PACKET_POOL_SIZE = 100
	};

	struct packet {
		udp::endpoint      src;                     //TODO: this is a possible problem/BUG if we want a POD (for easy/fast use with memcpy)
		std::size_t        len;
		unsigned long long time_stamp;              //what units?
		unsigned char      data[DATA_SIZE_MAX];
	};

	void receive(udp::socket* sock_in, fifo<packet*> *fifo_in, fifo<packet*> *fifo_out) {
		try {
			for (;;) {
				packet *p = fifo_in->pop();

				p->len = sock_in->receive_from(boost::asio::buffer(p->data, DATA_SIZE_MAX),
							       p->src);
				// TODO: this is a little overhead - all we need is save start time before entering this loop, and then record the ns/us diff
				const boost::chrono::nanoseconds ns =
					boost::chrono::duration_cast<boost::chrono::nanoseconds>(boost::chrono::system_clock::now().time_since_epoch());
					
				p->time_stamp = ns.count();

				fifo_out->push(p);
			}

		} catch (const std::exception& e) {
			std::cerr << "receive exception:" << e.what() << std::endl;
			return;
		}
	}

	// TODO: this is a bit naive, forwarding and monitoring datagram are read only operations,
	//       therefore there is no reason why they shouldn't run in parallel
	void forward(udp::socket* sock_out, udp::endpoint* remote, fifo<packet*> *fifo_in, fifo<packet*> *fifo_out) {
		try {
			for (;;) {
				packet *p = fifo_in->pop();

				size_t remain = p->len;
				do {
					const size_t sent_bytes =
						sock_out->send_to(boost::asio::buffer(p->data, remain),
								  *remote);

					remain = remain - sent_bytes;
				} while (remain > 0);

				fifo_out->push(p);
			}

		} catch (const std::exception& e) {
			std::cerr << "forward exception:" << e.what() << std::endl;
			return;
		}
	}

	boost::mutex counter_mutex;
	size_t counter_packet       = 0;
	size_t counter_packet_bytes = 0;

	// TODO: SEPARATE the reading of datagram data FROM what to do with it 
	void monitor(fifo<packet*> *fifo_in, fifo<packet*> *fifo_out) {

		// NOTE: the very first packet is used to setup/init a 'session'
		packet *p = fifo_in->pop();

		const udp::endpoint endpoint_remote(p->src);

		boost::mutex::scoped_lock lock(counter_mutex);
		counter_packet       = 1;
		counter_packet_bytes = p->len;
		lock.unlock();

		fifo_out->push(p);

		for (;;) {
			boost::this_thread::yield();
			p                         = fifo_in->pop();

			// read/copy datagram values
			const udp::endpoint ep(p->src);
			const size_t packet_bytes = p->len;
			const bool is_ts          = (0x47 == (unsigned char)p->data[0]); //NOTE: trivial but fully working example of payload 'analysis'

			fifo_out->push(p); //release datagram (recycle)
			boost::this_thread::yield();
			
			// verify remote endpoint - to avoid mixing datagrams stats from diff sources
			if (endpoint_remote != ep) {
				std::cerr << "receive DIFF endpoint:" << ep << std::endl;
				break; //TODO: something more civilized - either post message or use some on_diff_endpoint_callback()
			}

			boost::mutex::scoped_lock lock(counter_mutex);
			counter_packet++;
			counter_packet_bytes += packet_bytes;
		}
	}

	const char* reason2str(const enum libwebsocket_callback_reasons reason) {
		static const struct {
			const enum  libwebsocket_callback_reasons reason;
			const char*                               name;

		} REASONS[] = {
			{ LWS_CALLBACK_ESTABLISHED,                              "LWS_CALLBACK_ESTABLISHED" },
			{ LWS_CALLBACK_CLIENT_CONNECTION_ERROR,                  "LWS_CALLBACK_CLIENT_CONNECTION_ERROR" },
			{ LWS_CALLBACK_CLIENT_ESTABLISHED,                       "LWS_CALLBACK_CLIENT_ESTABLISHED" },
			{ LWS_CALLBACK_CLOSED,                                   "LWS_CALLBACK_CLOSED" },
			{ LWS_CALLBACK_RECEIVE,                                  "LWS_CALLBACK_RECEIVE" },
			{ LWS_CALLBACK_CLIENT_RECEIVE,                           "LWS_CALLBACK_CLIENT_RECEIVE" },
			{ LWS_CALLBACK_CLIENT_RECEIVE_PONG,                      "LWS_CALLBACK_CLIENT_RECEIVE_PONG" },
			{ LWS_CALLBACK_CLIENT_WRITEABLE,                         "LWS_CALLBACK_CLIENT_WRITEABLE" },
			{ LWS_CALLBACK_SERVER_WRITEABLE,                         "LWS_CALLBACK_SERVER_WRITEABLE" },
			{ LWS_CALLBACK_HTTP,                                     "LWS_CALLBACK_HTTP" },
			{ LWS_CALLBACK_BROADCAST,                                "LWS_CALLBACK_BROADCAST" },
			{ LWS_CALLBACK_FILTER_NETWORK_CONNECTION,                "LWS_CALLBACK_FILTER_NETWORK_CONNECTION" },
			{ LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION,               "LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION" },
			{ LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS,   "LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS" },
			{ LWS_CALLBACK_OPENSSL_LOAD_EXTRA_SERVER_VERIFY_CERTS,   "LWS_CALLBACK_OPENSSL_LOAD_EXTRA_SERVER_VERIFY_CERTS" },
			{ LWS_CALLBACK_OPENSSL_PERFORM_CLIENT_CERT_VERIFICATION, "LWS_CALLBACK_OPENSSL_PERFORM_CLIENT_CERT_VERIFICATION" },
			{ LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER,           "LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER" },
			{ LWS_CALLBACK_CONFIRM_EXTENSION_OKAY,                   "LWS_CALLBACK_CONFIRM_EXTENSION_OKAY" },
			{ LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED,       "LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED" },
			/* external poll() management support */
			{ LWS_CALLBACK_ADD_POLL_FD,                              "LWS_CALLBACK_ADD_POLL_FD" },
			{ LWS_CALLBACK_DEL_POLL_FD,                              "LWS_CALLBACK_DEL_POLL_FD" },
			{ LWS_CALLBACK_SET_MODE_POLL_FD,                         "LWS_CALLBACK_SET_MODE_POLL_FD" },
			{ LWS_CALLBACK_CLEAR_MODE_POLL_FD,                       "LWS_CALLBACK_CLEAR_MODE_POLL_FD" },
			/* terminator */
			{ LWS_CALLBACK_ESTABLISHED,                              NULL } 
		};

		for (size_t i = 0; NULL != REASONS[i].name; i++) {
			if (REASONS[i].reason == reason) {
				return REASONS[i].name; 
			}
		}

		return "UNKNOWN";
	}

	int callback_http(struct libwebsocket_context        *context,
			  struct libwebsocket                *wsi,
			  enum libwebsocket_callback_reasons  reason,
			  void                               *user,
			  void                               *in,
			  size_t len) {

		char client_name[128];
		char client_ip[128];

		libwebsockets_get_peer_addresses((int)(long)user,
						 client_name,
						 sizeof(client_name),
						 client_ip,
						 sizeof(client_ip));

		fprintf(stderr, "callback_http: host=%s, ip=%s, reason=%s, user=%p, in=%p (%s)\n",
			client_name,
			client_ip,
			reason2str(reason),
			user,
			in,
			(const char*)in);

		switch (reason) {
		case LWS_CALLBACK_HTTP:
			if (in && (0 == strcmp((const char*)in, "/favicon.ico"))) {
				if (libwebsockets_serve_http_file(wsi, "./favicon.ico", "image/x-icon")) {
				 	fprintf(stderr, "Failed to send favicon\n");
				}

				break;
			}

			//TODO: move these hardcoded keys into some external config file
			if (libwebsockets_serve_http_file(wsi, "./test.html", "text/html")) {
				fprintf(stderr, "Failed to send HTTP file\n");
			}
			break;

		case LWS_CALLBACK_FILTER_NETWORK_CONNECTION:
			//return non-zero from here to kill the *NETWORK* connection
			break;

		default:
			break;
		}

		return 0;
	}

	const size_t MSG_LEN_MAX  = 128;

	size_t clients_ws_counter = 0;

	int callback_udp_mon_test(struct libwebsocket_context        *context,
				  struct libwebsocket                *wsi,
				  enum libwebsocket_callback_reasons  reason,
				  void                               *user,
				  void                               *in,
				  size_t                              len) {

		fprintf(stderr, "callback_udp_mon_test: reason=%s, user=%p in=%p len=%d (%s)\n",
			reason2str(reason),
			user,
			in,
			len,
			in);

		struct lws_tokens *tokens = 0;
		int n                     = 0;

		switch (reason) {
		case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION:
			tokens = (struct lws_tokens *)user;
			fprintf(stderr, "callback_udp_mon_test LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION: %s, %s, %s\n",
				tokens[0].token,
				tokens[1].token,
				tokens[2].token);
			break;

		case LWS_CALLBACK_ESTABLISHED:
			// ctor - setup (user) client/session here - the mem is automatically allocated according to the size requested in the protocols table
			// NOTE: we could agree with a ws client on some ticket/hash mechanism that would allow for resuming/reconnecting?
			clients_ws_counter++;
			break;

		case LWS_CALLBACK_CLOSED:
			// dtor
			clients_ws_counter--;
			break;

		case LWS_CALLBACK_BROADCAST:
			n = libwebsocket_write(wsi,
					       (unsigned char*)in,
					       len,
					       LWS_WRITE_TEXT);

			if (n < 0) {
				fprintf(stderr,
					"callback_udp_mon_test LWS_CALLBACK_BROADCAST: libwebsocket_write ERROR, n=%d\n",
					n); //TODO: what is the return val n?
			}

			break;

		// NOTE: ATM we do not use these
		case LWS_CALLBACK_RECEIVE: //libwebsocket_rx_flow_control(wsi, 0); //NOTE: with manual/write mode - - flow control off
			if (0 == strncmp("quit", (const char*)in, 4)) {
				const int fd = libwebsocket_get_socket_fd(wsi);
				fprintf(stderr, "user=%p (fd=%d) requested QUIT!\n",
					user, fd);

				libwebsockets_hangup_on_client(context, fd);
			}

		case LWS_CALLBACK_SERVER_WRITEABLE:
			break;

		default:
			break;
		}

		return 0;
	}

	// TODO: clean it up
	void update_session_stats(const unsigned int update_interval) {
		size_t counter_packet_bytes_last = 0;
		size_t counter_packet_last       = 0;

		const float per_sec = 1000.0 / update_interval;
		std::cout << "per_sec:" << per_sec << std::endl;

		const boost::posix_time::milliseconds update_interval_ms(update_interval);
		
		// NOTE: only tmp solution for the HTML5 WebSocket-s client will be moved (as planned) to a
		//       separate (plugin_websocket) file.
		//

		//first protocol must always be HTTP handler for ws handshake over HTTP
		static struct libwebsocket_protocols protocols[] = {
			{
				"http-only",   //name
				callback_http, //callback
				0              //per_session_data_size
			},
			{
				"udp-mon-test-protocol",
				callback_udp_mon_test,
				128 //number of bytes libwebsockets is going to allocate per session/user/client/connection and give you in the (void *user) var, we init/setup this block in LWS_CALLBACK_ESTABLISHED
			},
			{
				NULL, NULL, 0
			}
		};


		struct libwebsocket_context *context;
		unsigned char                buf[LWS_SEND_BUFFER_PRE_PADDING + MSG_LEN_MAX + LWS_SEND_BUFFER_POST_PADDING];
		char                        *msg                 = (char*)&buf[LWS_SEND_BUFFER_PRE_PADDING];
		int                          opts                = 0;
		char                         interface_name[128] = "";
		const char                  *interface           = 0;
		int                          port                = 7681;

		context = libwebsocket_create_context(port,
						      interface,
						      protocols,
						      libwebsocket_internal_extensions,
						      NULL, //cert path for ssl
						      NULL, //key path for ssl
						      NULL,
						      -1,
						      -1,
						      opts,
						      NULL);

		bool use_ws = true;

		if (NULL == context) {
			std::cerr << "libwebsocket_create_context err, ws disabled..." << std::endl;
			use_ws = false;
		}

		for (;;) {
			boost::mutex::scoped_lock lock(counter_mutex); //1. lock the counters (or use some atomic access)
			const size_t cp  = counter_packet;             //2. copy the values (or swap pointers - if there is a bigger structure in future)
			const size_t cpb = counter_packet_bytes;       //   so the monitor thread is not waiting for no reason
			lock.unlock();                                 //3. and unlock

			// NOTE 1: There is *NO* need to do any calculation-s inside this app AT ALL, it could simply
			//         only collect and forward 'raw data' to the client-s (other processes) who could do
			//         their own calculations. Perhaps we could later add some --raw-mode config option
			//         that will do just that.
			//
			// NOTE 2: now do any 'calculations'/'updates', you want, ATM we have:
			//         - delta-s of received bytes and packet counters
			//           since the previous update
			//         - bit/s per seconds
			//
			const size_t p      = (cp - counter_packet_last);
			counter_packet_last = cp;

			const size_t b   = (cpb - counter_packet_bytes_last);
			counter_packet_bytes_last = cpb;
			const size_t bps = (b * 8) * per_sec;

			// var update =
			//     window.JSON.parse('{ "udp_total":147701301, "bytes_total":1100854764, "udp":1710, "bytes":2250360, "bps":18002880 }')
			//
			std::stringstream ss;
			ss << "{ "
			   << "\"udp_total\":"   << cp  << ", "  // total number of UDP packets received
			   << "\"bytes_total\":" << cpb << ", "  // total number of bytes received
			   << "\"udp\":"         << p   << ", "  // number of UDP packets received since the last update
			   << "\"bytes\":"       << b   << ", "  // number of bytes received since the last update
			   << "\"bps\":"         << bps << " }"; // bit/s since the last update

			const std::string s = ss.str();
			std::cout << s << std::endl;

			// TODO: move this to the separate ws plugin - so this app does not contain any ws specific stuff
			if (use_ws) {
				fprintf(stderr, "clients_ws_counter=%d\n", clients_ws_counter);

				// collect stats for ws distribution *ONLY* if someone  is listening (at least 1 ws client)
				if (clients_ws_counter > 0) {
					strncpy(msg, s.c_str(), MSG_LEN_MAX);
					libwebsockets_broadcast(&protocols[1],
								(unsigned char*)msg,
								strlen(msg));
				}

				const int n = libwebsocket_service(context, 100);
				fprintf(stderr, "libwebsocket_service:%d\n", n);
			}

			boost::this_thread::sleep(update_interval_ms); //adjust later
		}
		
		if (use_ws) {
			libwebsocket_context_destroy(context);
		}
	}
}

int main(int argc, char* argv[]) {
	// TODO: register signal handlers
 
	if (argc < 2) {
		std::cerr << MSG_HELP << std::endl;
		return 0;
	}

	try {

		const unsigned short port = std::strtol(argv[1], NULL, 10);
		std::cout << "port:" << port << std::endl;

		boost::asio::io_service io;
		udp::socket             sock_in(io, udp::endpoint(udp::v4(), port));

		fifo<packet*> fifo_datagram;
		fifo<packet*> fifo_monitor;

		const unsigned int update_interval = 1000; //ms
		boost::thread thread_update(update_session_stats,
					    update_interval);

		boost::thread thread_monitor(monitor,
					     &fifo_monitor,
					     &fifo_datagram);

		// TODO: static pool is the default, add a config option for a dynamic one (size of packet, number of packets to prealloc)
		packet pool[STATIC_PACKET_POOL_SIZE];
		for (size_t i = 0; i < STATIC_PACKET_POOL_SIZE; i++) {
			// std::cout << i << ":" << sizeof (pool[i].data) << std::endl;
			fifo_datagram.push(&pool[i]);
		}

		if (argc == 4) {
			try {
				udp::resolver           resolver(io);
				udp::resolver::query    query(udp::v4(), argv[2], argv[3]);
				udp::endpoint           endpoint_forward(*resolver.resolve(query));
				std::cout << "endpoint_forward:" << endpoint_forward << std::endl;

				udp::socket sock_out(io, udp::v4());
				//TODO: sock_out.connect(endpoint_forward); ... sock_out.send(); doesn't work?

				fifo<packet*> fifo_forward;
				boost::thread thread_forward(forward,
							     &sock_out,
							     &endpoint_forward,
							     &fifo_forward,
							     &fifo_monitor);

				receive(&sock_in,
					&fifo_datagram,
					&fifo_forward);

			} catch (const std::exception& e) {
				std::cerr << "forward disabled:" << e.what() << std::endl;
			}

		}

		receive(&sock_in,
			&fifo_datagram,
			&fifo_monitor);

	} catch (const std::exception& e) {
		std::cerr << "main:" << e.what() << std::endl;
	}

	std::cout << "END" << std::endl;
}
<html>
  <head>
    <title>test udp_mon</title>
    <meta charset="utf-8" />
  </head>

  <body>
    <canvas id="udp_packets" width="400" height="200" data-test="blah"></canvas>
    <br/>

    input port:<input id="input_port" type="text" size="10"></input>
    <button type="button" id="button_send">SEND</button><button type="button" id="button_stop">STOP</button>
    <br/>

    <script type="text/javascript">


(function () {

        var get_ws_url = function (url) {
                if (url.substring(0, 5) == "https") {
                        return "wss://".concat(url.substr(8).split('/')[0]);

                } else if (url.substring(0, 4) == "http") {
                        return "ws://".concat(url.substr(7).split('/')[0]);

                } else {
                        return null; //or throw an exception uknown URL/URI type
                }
        };

        var udp_mon_ws_url = get_ws_url(document.URL),
        dummy = {};

        console.log("udp_mon_ws_url:" + udp_mon_ws_url);

        var connect = function () {
                console.log("connect:" + udp_mon_ws_url);

                try {
                        // NOTE: ctor throws SECURITY_ERR if the port is blocked
                        dummy.sock = new WebSocket(udp_mon_ws_url, "udp-mon-test-protocol");

                        dummy.sock.onopen = function (event) {
                                console.log("dummy.sock.onopen:" + event);
                        };

                        dummy.sock.onmessage = (function () {
                                var ctx = document.getElementById('udp_packets').getContext("2d"),
                                fifo = [], //TODO: this is really a TypedArray
                                i;

                                // fifo.forEach(function (e, i, a) {a[i] = 200;});
                                for (i=0; i<200; i++) {
                                        fifo[i] = 200;
                                }

                                return function (event) {
                                        var update = JSON.parse(event.data),
                                        u = update["udp"],
                                        x;

                                        //console.log("dummy.sock.onmessage:" + event.data);
                                        fifo.shift(); //discard 'oldest' element

                                        //NOTE: for example: given number of UDP packets received since last update is 1689,
                                        //      this should interpret it as 200 - 168 = 32; that would become the start y
                                        //      coordinate of the bar representing the UDP update.
                                        fifo.push(parseInt(200 - (parseFloat(u)/10)));
                                        //console.log(fifo);

                                        ctx.fillStyle   = "rgb(0, 0, 200)";
                                        ctx.fillRect(0, 0, 400, 200);

                                        ctx.fillStyle   = "rgba(200, 0, 0, 0.5)";
                                        for (x=0; x<200; x++) {
                                                ctx.fillRect(x*2, fifo[x], 2, 200-fifo[x]);
                                        }

                                        ctx.font = "10pt DejaVu";
                                        ctx.fillStyle   = "rgb(200, 200, 200)";
                                        ctx.fillText(u, 200, 180);
                                };
                        })();

                        dummy.sock.onerror = function (event) {
                                console.log("dummy.sock.onerror:" + event);
                        };
                        
                        dummy.sock.onclose = function (event) {
                                console.log("dummy.sock.onclose:" + event.reason);

                                // re-connect (after 10 sec)
                                window.setTimeout(connect,
                                                  10000);
                        };

                } catch (ex) {
                        console.log(ex);
                        return; // no point in attaching the button event callbacks if dummy.socket setup failed
                }

        };

        connect();

        // attach the button event handlers
        document.getElementById('button_send').addEventListener("click",
                                                                function () {
                                                                        console.log("onclick_handler_send");
                                                                        var input_port = document.getElementById('input_port').value;
                                                                        console.log(input_port);
                                                                        dummy.sock.send(input_port);
                                                                });

        document.getElementById('button_stop').addEventListener("click",
                                                                function () {
                                                                        console.log("onclick_handler_stop");
                                                                        dummy.sock.send("quit"); // workaround the sock.close() BUG, requesting a server-side close
                                                                        if (dummy.sock && dummy.sock.close) {
                                                                                console.log("onclick_handler_stop calling dummy.sock.close()");
                                                                                dummy.sock.close();
                                                                        }

                                                                        // NOTE: because dummy.sock.close() works (as of 15-02-2013) only in opera.
                                                                        //       - google-chrome seems to be ignoring it completely
                                                                        //       - firefox is 'doing something' - the traffic stops and after couple of seconds it shows an error:
                                                                        //         "The connection to ws://mhormann-linux:7681/ was interrupted while the page was loading"
                                                                        //         blaming it on the line: dummy.sock = new WebDummy.Socket(udp_mon_ws_url, "udp-mon-test-protocol");
                                                                        dummy.sock.onmessage = function (event) {};
                                                                        delete dummy.sock;
                                                                });
        // only for testing
        console.log("init success...");

})();

    </script>

  </body>
</html>