// g++ udp_mon.cpp -o udp_mon \
// -pthread \
// -I /usr/local/boost_1_51_0/include \
// -L /usr/local/boost_1_51_0/lib \
// -lboost_system -lboost_chrono -lboost_thread \
// `pkg-config libwebsockets --cflags --libs`
//
#include <cstdlib> //for strtol
#include <cstring>
#include <iostream>
#include <sstream>
#include <string>
#include <boost/asio.hpp>
#include <boost/chrono.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include "fifo.h"
#include <libwebsockets.h>
using boost::asio::ip::udp;
namespace {
const std::string MSG_HELP =
"USAGE: ./udp_mon <monitoring input port> [<forward host> <forward port>]\n"
"\n"
"\tEXAMPLE:\n"
"\t./udp_mon 2345 monitor incoming UDP stream on port 2345\n"
"\t./udp_mon 2345 192.168.0.86 2346 monitor incoming UDP stream on port 2345\n"
"\t and forward it to 192.168.0.86:2346\n";
enum {
// the max possible size of the UDP data chunk in bytes - ridiculous
UDP_DATA_SIZE_MAX = 65507,
// cat /sys/class/net/eth0/mtu => 1500
UDP_DATA_SIZE_MAX_MTU = (1500 - (16 + 20 + 8)),
// this is the static 'default' MAX for the datagram data/payload
DATA_SIZE_MAX = UDP_DATA_SIZE_MAX_MTU,
// arbitrary size of statically pre-allocated 'pool' of struct packet
STATIC_PACKET_POOL_SIZE = 100
};
struct packet {
udp::endpoint src; //TODO: this is a possible problem/BUG if we want a POD (for easy/fast use with memcpy)
std::size_t len;
unsigned long long time_stamp; //what units?
unsigned char data[DATA_SIZE_MAX];
};
void receive(udp::socket* sock_in, fifo<packet*> *fifo_in, fifo<packet*> *fifo_out) {
try {
for (;;) {
packet *p = fifo_in->pop();
p->len = sock_in->receive_from(boost::asio::buffer(p->data, DATA_SIZE_MAX),
p->src);
// TODO: this is a little overhead - all we need is save start time before entering this loop, and then record the ns/us diff
const boost::chrono::nanoseconds ns =
boost::chrono::duration_cast<boost::chrono::nanoseconds>(boost::chrono::system_clock::now().time_since_epoch());
p->time_stamp = ns.count();
fifo_out->push(p);
}
} catch (const std::exception& e) {
std::cerr << "receive exception:" << e.what() << std::endl;
return;
}
}
// TODO: this is a bit naive, forwarding and monitoring datagram are read only operations,
// therefore there is no reason why they shouldn't run in parallel
void forward(udp::socket* sock_out, udp::endpoint* remote, fifo<packet*> *fifo_in, fifo<packet*> *fifo_out) {
try {
for (;;) {
packet *p = fifo_in->pop();
size_t remain = p->len;
do {
const size_t sent_bytes =
sock_out->send_to(boost::asio::buffer(p->data, remain),
*remote);
remain = remain - sent_bytes;
} while (remain > 0);
fifo_out->push(p);
}
} catch (const std::exception& e) {
std::cerr << "forward exception:" << e.what() << std::endl;
return;
}
}
boost::mutex counter_mutex;
size_t counter_packet = 0;
size_t counter_packet_bytes = 0;
// TODO: SEPARATE the reading of datagram data FROM what to do with it
void monitor(fifo<packet*> *fifo_in, fifo<packet*> *fifo_out) {
// NOTE: the very first packet is used to setup/init a 'session'
packet *p = fifo_in->pop();
const udp::endpoint endpoint_remote(p->src);
boost::mutex::scoped_lock lock(counter_mutex);
counter_packet = 1;
counter_packet_bytes = p->len;
lock.unlock();
fifo_out->push(p);
for (;;) {
boost::this_thread::yield();
p = fifo_in->pop();
// read/copy datagram values
const udp::endpoint ep(p->src);
const size_t packet_bytes = p->len;
const bool is_ts = (0x47 == (unsigned char)p->data[0]); //NOTE: trivial but fully working example of payload 'analysis'
fifo_out->push(p); //release datagram (recycle)
boost::this_thread::yield();
// verify remote endpoint - to avoid mixing datagrams stats from diff sources
if (endpoint_remote != ep) {
std::cerr << "receive DIFF endpoint:" << ep << std::endl;
break; //TODO: something more civilized - either post message or use some on_diff_endpoint_callback()
}
boost::mutex::scoped_lock lock(counter_mutex);
counter_packet++;
counter_packet_bytes += packet_bytes;
}
}
const char* reason2str(const enum libwebsocket_callback_reasons reason) {
static const struct {
const enum libwebsocket_callback_reasons reason;
const char* name;
} REASONS[] = {
{ LWS_CALLBACK_ESTABLISHED, "LWS_CALLBACK_ESTABLISHED" },
{ LWS_CALLBACK_CLIENT_CONNECTION_ERROR, "LWS_CALLBACK_CLIENT_CONNECTION_ERROR" },
{ LWS_CALLBACK_CLIENT_ESTABLISHED, "LWS_CALLBACK_CLIENT_ESTABLISHED" },
{ LWS_CALLBACK_CLOSED, "LWS_CALLBACK_CLOSED" },
{ LWS_CALLBACK_RECEIVE, "LWS_CALLBACK_RECEIVE" },
{ LWS_CALLBACK_CLIENT_RECEIVE, "LWS_CALLBACK_CLIENT_RECEIVE" },
{ LWS_CALLBACK_CLIENT_RECEIVE_PONG, "LWS_CALLBACK_CLIENT_RECEIVE_PONG" },
{ LWS_CALLBACK_CLIENT_WRITEABLE, "LWS_CALLBACK_CLIENT_WRITEABLE" },
{ LWS_CALLBACK_SERVER_WRITEABLE, "LWS_CALLBACK_SERVER_WRITEABLE" },
{ LWS_CALLBACK_HTTP, "LWS_CALLBACK_HTTP" },
{ LWS_CALLBACK_BROADCAST, "LWS_CALLBACK_BROADCAST" },
{ LWS_CALLBACK_FILTER_NETWORK_CONNECTION, "LWS_CALLBACK_FILTER_NETWORK_CONNECTION" },
{ LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION, "LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION" },
{ LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS, "LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS" },
{ LWS_CALLBACK_OPENSSL_LOAD_EXTRA_SERVER_VERIFY_CERTS, "LWS_CALLBACK_OPENSSL_LOAD_EXTRA_SERVER_VERIFY_CERTS" },
{ LWS_CALLBACK_OPENSSL_PERFORM_CLIENT_CERT_VERIFICATION, "LWS_CALLBACK_OPENSSL_PERFORM_CLIENT_CERT_VERIFICATION" },
{ LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER, "LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER" },
{ LWS_CALLBACK_CONFIRM_EXTENSION_OKAY, "LWS_CALLBACK_CONFIRM_EXTENSION_OKAY" },
{ LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED, "LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED" },
/* external poll() management support */
{ LWS_CALLBACK_ADD_POLL_FD, "LWS_CALLBACK_ADD_POLL_FD" },
{ LWS_CALLBACK_DEL_POLL_FD, "LWS_CALLBACK_DEL_POLL_FD" },
{ LWS_CALLBACK_SET_MODE_POLL_FD, "LWS_CALLBACK_SET_MODE_POLL_FD" },
{ LWS_CALLBACK_CLEAR_MODE_POLL_FD, "LWS_CALLBACK_CLEAR_MODE_POLL_FD" },
/* terminator */
{ LWS_CALLBACK_ESTABLISHED, NULL }
};
for (size_t i = 0; NULL != REASONS[i].name; i++) {
if (REASONS[i].reason == reason) {
return REASONS[i].name;
}
}
return "UNKNOWN";
}
int callback_http(struct libwebsocket_context *context,
struct libwebsocket *wsi,
enum libwebsocket_callback_reasons reason,
void *user,
void *in,
size_t len) {
char client_name[128];
char client_ip[128];
libwebsockets_get_peer_addresses((int)(long)user,
client_name,
sizeof(client_name),
client_ip,
sizeof(client_ip));
fprintf(stderr, "callback_http: host=%s, ip=%s, reason=%s, user=%p, in=%p (%s)\n",
client_name,
client_ip,
reason2str(reason),
user,
in,
(const char*)in);
switch (reason) {
case LWS_CALLBACK_HTTP:
if (in && (0 == strcmp((const char*)in, "/favicon.ico"))) {
if (libwebsockets_serve_http_file(wsi, "./favicon.ico", "image/x-icon")) {
fprintf(stderr, "Failed to send favicon\n");
}
break;
}
//TODO: move these hardcoded keys into some external config file
if (libwebsockets_serve_http_file(wsi, "./test.html", "text/html")) {
fprintf(stderr, "Failed to send HTTP file\n");
}
break;
case LWS_CALLBACK_FILTER_NETWORK_CONNECTION:
//return non-zero from here to kill the *NETWORK* connection
break;
default:
break;
}
return 0;
}
const size_t MSG_LEN_MAX = 128;
size_t clients_ws_counter = 0;
int callback_udp_mon_test(struct libwebsocket_context *context,
struct libwebsocket *wsi,
enum libwebsocket_callback_reasons reason,
void *user,
void *in,
size_t len) {
fprintf(stderr, "callback_udp_mon_test: reason=%s, user=%p in=%p len=%d (%s)\n",
reason2str(reason),
user,
in,
len,
in);
struct lws_tokens *tokens = 0;
int n = 0;
switch (reason) {
case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION:
tokens = (struct lws_tokens *)user;
fprintf(stderr, "callback_udp_mon_test LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION: %s, %s, %s\n",
tokens[0].token,
tokens[1].token,
tokens[2].token);
break;
case LWS_CALLBACK_ESTABLISHED:
// ctor - setup (user) client/session here - the mem is automatically allocated according to the size requested in the protocols table
// NOTE: we could agree with a ws client on some ticket/hash mechanism that would allow for resuming/reconnecting?
clients_ws_counter++;
break;
case LWS_CALLBACK_CLOSED:
// dtor
clients_ws_counter--;
break;
case LWS_CALLBACK_BROADCAST:
n = libwebsocket_write(wsi,
(unsigned char*)in,
len,
LWS_WRITE_TEXT);
if (n < 0) {
fprintf(stderr,
"callback_udp_mon_test LWS_CALLBACK_BROADCAST: libwebsocket_write ERROR, n=%d\n",
n); //TODO: what is the return val n?
}
break;
// NOTE: ATM we do not use these
case LWS_CALLBACK_RECEIVE: //libwebsocket_rx_flow_control(wsi, 0); //NOTE: with manual/write mode - - flow control off
if (0 == strncmp("quit", (const char*)in, 4)) {
const int fd = libwebsocket_get_socket_fd(wsi);
fprintf(stderr, "user=%p (fd=%d) requested QUIT!\n",
user, fd);
libwebsockets_hangup_on_client(context, fd);
}
case LWS_CALLBACK_SERVER_WRITEABLE:
break;
default:
break;
}
return 0;
}
// TODO: clean it up
void update_session_stats(const unsigned int update_interval) {
size_t counter_packet_bytes_last = 0;
size_t counter_packet_last = 0;
const float per_sec = 1000.0 / update_interval;
std::cout << "per_sec:" << per_sec << std::endl;
const boost::posix_time::milliseconds update_interval_ms(update_interval);
// NOTE: only tmp solution for the HTML5 WebSocket-s client will be moved (as planned) to a
// separate (plugin_websocket) file.
//
//first protocol must always be HTTP handler for ws handshake over HTTP
static struct libwebsocket_protocols protocols[] = {
{
"http-only", //name
callback_http, //callback
0 //per_session_data_size
},
{
"udp-mon-test-protocol",
callback_udp_mon_test,
128 //number of bytes libwebsockets is going to allocate per session/user/client/connection and give you in the (void *user) var, we init/setup this block in LWS_CALLBACK_ESTABLISHED
},
{
NULL, NULL, 0
}
};
struct libwebsocket_context *context;
unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + MSG_LEN_MAX + LWS_SEND_BUFFER_POST_PADDING];
char *msg = (char*)&buf[LWS_SEND_BUFFER_PRE_PADDING];
int opts = 0;
char interface_name[128] = "";
const char *interface = 0;
int port = 7681;
context = libwebsocket_create_context(port,
interface,
protocols,
libwebsocket_internal_extensions,
NULL, //cert path for ssl
NULL, //key path for ssl
NULL,
-1,
-1,
opts,
NULL);
bool use_ws = true;
if (NULL == context) {
std::cerr << "libwebsocket_create_context err, ws disabled..." << std::endl;
use_ws = false;
}
for (;;) {
boost::mutex::scoped_lock lock(counter_mutex); //1. lock the counters (or use some atomic access)
const size_t cp = counter_packet; //2. copy the values (or swap pointers - if there is a bigger structure in future)
const size_t cpb = counter_packet_bytes; // so the monitor thread is not waiting for no reason
lock.unlock(); //3. and unlock
// NOTE 1: There is *NO* need to do any calculation-s inside this app AT ALL, it could simply
// only collect and forward 'raw data' to the client-s (other processes) who could do
// their own calculations. Perhaps we could later add some --raw-mode config option
// that will do just that.
//
// NOTE 2: now do any 'calculations'/'updates', you want, ATM we have:
// - delta-s of received bytes and packet counters
// since the previous update
// - bit/s per seconds
//
const size_t p = (cp - counter_packet_last);
counter_packet_last = cp;
const size_t b = (cpb - counter_packet_bytes_last);
counter_packet_bytes_last = cpb;
const size_t bps = (b * 8) * per_sec;
// var update =
// window.JSON.parse('{ "udp_total":147701301, "bytes_total":1100854764, "udp":1710, "bytes":2250360, "bps":18002880 }')
//
std::stringstream ss;
ss << "{ "
<< "\"udp_total\":" << cp << ", " // total number of UDP packets received
<< "\"bytes_total\":" << cpb << ", " // total number of bytes received
<< "\"udp\":" << p << ", " // number of UDP packets received since the last update
<< "\"bytes\":" << b << ", " // number of bytes received since the last update
<< "\"bps\":" << bps << " }"; // bit/s since the last update
const std::string s = ss.str();
std::cout << s << std::endl;
// TODO: move this to the separate ws plugin - so this app does not contain any ws specific stuff
if (use_ws) {
fprintf(stderr, "clients_ws_counter=%d\n", clients_ws_counter);
// collect stats for ws distribution *ONLY* if someone is listening (at least 1 ws client)
if (clients_ws_counter > 0) {
strncpy(msg, s.c_str(), MSG_LEN_MAX);
libwebsockets_broadcast(&protocols[1],
(unsigned char*)msg,
strlen(msg));
}
const int n = libwebsocket_service(context, 100);
fprintf(stderr, "libwebsocket_service:%d\n", n);
}
boost::this_thread::sleep(update_interval_ms); //adjust later
}
if (use_ws) {
libwebsocket_context_destroy(context);
}
}
}
int main(int argc, char* argv[]) {
// TODO: register signal handlers
if (argc < 2) {
std::cerr << MSG_HELP << std::endl;
return 0;
}
try {
const unsigned short port = std::strtol(argv[1], NULL, 10);
std::cout << "port:" << port << std::endl;
boost::asio::io_service io;
udp::socket sock_in(io, udp::endpoint(udp::v4(), port));
fifo<packet*> fifo_datagram;
fifo<packet*> fifo_monitor;
const unsigned int update_interval = 1000; //ms
boost::thread thread_update(update_session_stats,
update_interval);
boost::thread thread_monitor(monitor,
&fifo_monitor,
&fifo_datagram);
// TODO: static pool is the default, add a config option for a dynamic one (size of packet, number of packets to prealloc)
packet pool[STATIC_PACKET_POOL_SIZE];
for (size_t i = 0; i < STATIC_PACKET_POOL_SIZE; i++) {
// std::cout << i << ":" << sizeof (pool[i].data) << std::endl;
fifo_datagram.push(&pool[i]);
}
if (argc == 4) {
try {
udp::resolver resolver(io);
udp::resolver::query query(udp::v4(), argv[2], argv[3]);
udp::endpoint endpoint_forward(*resolver.resolve(query));
std::cout << "endpoint_forward:" << endpoint_forward << std::endl;
udp::socket sock_out(io, udp::v4());
//TODO: sock_out.connect(endpoint_forward); ... sock_out.send(); doesn't work?
fifo<packet*> fifo_forward;
boost::thread thread_forward(forward,
&sock_out,
&endpoint_forward,
&fifo_forward,
&fifo_monitor);
receive(&sock_in,
&fifo_datagram,
&fifo_forward);
} catch (const std::exception& e) {
std::cerr << "forward disabled:" << e.what() << std::endl;
}
}
receive(&sock_in,
&fifo_datagram,
&fifo_monitor);
} catch (const std::exception& e) {
std::cerr << "main:" << e.what() << std::endl;
}
std::cout << "END" << std::endl;
}