TCP Proxy
#!/usr/bin/env escript
cDS_PACKET_RELAY_DELAY() -> 1. % delay inserted between splitted segments sent by downstream (in ms)
cUS_PACKET_RELAY_DELAY() -> 1. % delay inserted between splitted segments sent by upstream (in ms)
cSPLIT_DS_SEGMENT_LEN() -> 1. % downstream segment length to split into (in byte)
cSPLIT_US_SEGMENT_LEN() -> 1. % upstream segment length to split into (in byte)
cSPLIT_DS_PACKET() -> true. % whether split packet sent by downstream into segments before relaying to upstream
cSPLIT_US_PACKET() -> true. % whether split packet sent by upstream into segments before relaying to downstream
cFORCE_ACCURATE_SPLIT() -> false. % whether force accurate splitting (don't relay if there's not enough data for a segment)
cCLOSE_INSTEAD_OF_DELAY() -> false. % close upstream/downstream connection instead of delay a certain time when packet is being splitting
cDEFAULT_DEST_ADDR() -> {127, 0, 0, 1}.
cLSOCK_OPTS() -> [binary, {packet, 0}, {reuseaddr, true}]. % Listening socket default options
cCSOCK_OPTS() -> [binary, {packet, 0}, {nodelay, true}]. % Client socket default options (upstream/downstream)
main([Src, Dest]) ->
SrcPort = list_to_integer(Src),
{DestAddr, DestPort} = parse_addr(Dest),
run_proxy(SrcPort, DestAddr, DestPort);
main(_) -> usage().
run_proxy(SP, DA, DP) ->
io:format("Proxy tcp traffic from port ~b to ~p:~b~n", [SP, DA, DP]),
{ok, LS} = gen_tcp:listen(SP, [{active, false} | cLSOCK_OPTS()]),
proxy_server_loop(LS, DA, DP).
proxy_server_loop(LSock, DA, DP) ->
{ok, DS} = gen_tcp:accept(LSock),
{ok, {SAddr, SPort}} = inet:peername(DS),
{ok, {DAddr, DPort}} = inet:sockname(DS),
io:format("*** Incoming connection from ~p:~b to ~p:~b~n", [SAddr, SPort, DAddr, DPort]),
Pid = spawn(
fun () ->
receive start -> ok end,
ok = inet:setopts(DS, [{active, true} | cCSOCK_OPTS()]),
run_proxy_client(DS, DA, DP),
gen_tcp:close(DS)
end
),
ok = gen_tcp:controlling_process(DS, Pid),
Pid ! start,
proxy_server_loop(LSock, DA, DP).
run_proxy_client(DS, DA, DP) ->
{ok, US} = gen_tcp:connect(DA, DP, [{active, true} | cCSOCK_OPTS()]),
proxy_client_loop(DS, US),
gen_tcp:close(US).
proxy_client_loop(DS, US) ->
receive
{tcp, DS, Data} ->
relay_downstream_packet(DS, US, Data, false),
proxy_client_loop(DS, US);
{tcp_error, DS, Reason} ->
io:format("*** Error occured on downstream socket: ~p~n", [Reason]),
% flushing all pending downstream packets to upstream socket
relay_downstream_packet(DS, US, <<>>, true),
done;
{tcp_closed, DS} ->
io:format("*** Downstream socket closed~n"),
% flushing all pending downstream packets to upstream socket
relay_downstream_packet(DS, US, <<>>, true),
done;
{tcp, US, Data} ->
relay_upstream_packet(DS, US, Data, false),
proxy_client_loop(DS, US);
{tcp_error, US, Reason} ->
io:format("*** Error occured on upstream socket: ~p~n", [Reason]),
% flushing all pending upstream packets to downstream socket
relay_upstream_packet(DS, US, <<>>, true),
done;
{tcp_closed, US} ->
io:format("*** Upstream socket closed~n"),
% flushing all pending upstream packets to downstream socket
relay_upstream_packet(DS, US, <<>>, true),
done;
Other ->
io:format("*** Invalid message: ~p~n", [Other]),
proxy_client_loop(DS, US)
end.
usage() ->
io:format("Usage: etcproxy <src port> [<dest addr>:]<dest port>~n").
parse_addr(L) ->
case string:tokens(L, ":") of
[PortL] ->
Addr = cDEFAULT_DEST_ADDR(),
Port = list_to_integer(PortL),
{Addr, Port};
[AddrL, PortL] ->
{ok, Addr} = inet_parse:ipv4_address(AddrL),
Port = list_to_integer(PortL),
{Addr, Port};
_ ->
erlang:error("invalid destination", L)
end.
relay_downstream_packet(DS, US, Data, Flush) ->
io:format("*** Proxy ~b bytes from downstream to upstream: ~n~p~n", [byte_size(Data), Data]),
SplitDsPacket = cSPLIT_DS_PACKET(),
CloseInsteadOfDelay = cCLOSE_INSTEAD_OF_DELAY(),
SegSize = if
SplitDsPacket -> cSPLIT_DS_SEGMENT_LEN();
true -> 0
end,
BinL = merge_packet('downstream', Data, SegSize, cFORCE_ACCURATE_SPLIT(), Flush),
% io:format("~p~n", [BinL]),
if SplitDsPacket and CloseInsteadOfDelay -> % send the first packet, then close connection
gen_tcp:send(US, hd(BinL)),
gen_tcp:close(US),
gen_tcp:close(DS);
SegSize =:= 0 -> % no splitting occured, send data immediately without delay
gen_tcp:send(US, BinL);
true -> % send all packets to upstream with delay inserted among them
lists:foreach(
fun (Bin) ->
gen_tcp:send(US, Bin),
% Emulate network packet delay
sleep(cDS_PACKET_RELAY_DELAY())
end,
BinL
)
end,
io:format("*** Transferred~n"),
ok.
relay_upstream_packet(DS, US, Data, Flush) ->
io:format("*** Proxy ~b bytes from upstream to downstream: ~n~p~n", [byte_size(Data), Data]),
SplitUsPacket = cSPLIT_US_PACKET(),
CloseInsteadOfDelay = cCLOSE_INSTEAD_OF_DELAY(),
SegSize = if
SplitUsPacket -> cSPLIT_US_SEGMENT_LEN();
true -> 0
end,
BinL = merge_packet('upstream', Data, SegSize, cFORCE_ACCURATE_SPLIT(), Flush),
% io:format("~p~n", [BinL]),
if SplitUsPacket and CloseInsteadOfDelay -> % send the first packet, then close connection
gen_tcp:send(DS, hd(BinL)),
gen_tcp:close(US),
gen_tcp:close(DS);
SegSize =:= 0 -> % no splitting occured, send data immediately without delay
gen_tcp:send(DS, BinL);
true -> % send all packets to downstream with delay inserted among them
lists:foreach(
fun (Bin) ->
gen_tcp:send(DS, Bin),
% Emulate network packet delay
sleep(cUS_PACKET_RELAY_DELAY())
end,
BinL
)
end,
io:format("*** Transferred~n"),
ok.
% @doc Merge new upstream / downstream packet with previous remaining data, and
% return those data which can be sent to downstream / upstream. If 'Flush' is
% true, the new packet along with all remaining data will be returned, in order
% to flush buffered data.
% @end
merge_packet(Key, Data, SegSize, AccuSplit, Flush) ->
PrevB = case get(Key) of
undefined -> <<>>;
Val -> Val
end,
if Flush -> % flush previous remaining data along with current packet
put(Key, <<>>),
[iolist_to_binary([PrevB, Data])];
true ->
if SegSize > 0 -> % need splitting to segments
{Segs, Remain} = split_to_segments(
SegSize,
iolist_to_binary([PrevB, Data])
),
if AccuSplit -> % force accurate splitting
put(Key, Remain),
Segs;
true -> % no accurate splitting, return data even if it's not long enough for a segment
put(Key, <<>>),
lists:flatten([Segs, Remain])
end;
true -> % no limiting to data length
put(Key, <<>>),
[iolist_to_binary([PrevB, Data])]
end
end.
split_to_segments(N, Bin) when N > 0 -> split_to_segments(N, Bin, {[], <<>>}).
split_to_segments(_, <<>>, {L, Remain}) -> {lists:reverse(L), Remain};
split_to_segments(N, Bin, {L, _}) when byte_size(Bin) < N -> split_to_segments(N, <<>>, {L, Bin});
split_to_segments(N, Bin, {L, _}) ->
<<Seg:N/binary, Remain/binary>> = Bin,
split_to_segments(N, Remain, {[Seg | L], <<>>}).
sleep(N) when N >= 0 ->
receive
after N -> ok
end.
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <netdb.h>
#include <signal.h>
#define LOG_ERROR(...)
#define LOG_INFO(...)
#define LOG_DEBUG(...)
#define DEBUG 0
#define BUFFER_SIZE 16384
#define LINE_SIZE 32
/* parameter variable */
int listen_port = 0;
int remote_port = 0;
char * remote_host = NULL;
int foreground = 1;
/*global variable */
int listen_socket;
void help(char * program) {
printf("Usage syntax: %s -l listen_port -r remote_host -p remote_port [-f (in foreground)]\n", program);
}
void dumpBufferRaw(const unsigned char buffer[], size_t len) {
int i = 0;
int j = 0;
int k = 0;
char output[LINE_SIZE * 4 + 10] = { ' ' };
// print header
for (k = 0; k < LINE_SIZE; k++) {
printf("%02d ", k);
}
printf("\n");
for (k = 0; k < LINE_SIZE; k++) {
printf("---");
}
printf("\n");
memset(output, ' ', sizeof(output));
for (; i < len; i++) {
sprintf(output + j * 3, "%02X ", buffer[i]);
sprintf(output + LINE_SIZE * 3 + 4 + j , "%c", isprint(buffer[i]) ? buffer[i] : '.');
j++;
if (j >= LINE_SIZE) {
output[j * 3] = ' ';
printf("%s\n", output);
memset(output, ' ', sizeof(output));
j = 0;
}
}
if (j > 0) {
output[j * 3] = ' ';
printf("%s\n", output);
};
// print tailer
for (k = 0; k < LINE_SIZE; k++) {
printf("---");
}
printf("\n");
}
/* create server socket */
int server_listen() {
int optval = 1;
struct sockaddr_in listen_addr;
LOG_DEBUG("Entry of server_listen");
if ((listen_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
LOG_ERROR("Cannot create listen socket, errno=[%d], errstr=[%s]", errno, strerror(errno));
return -1;
}
if (setsockopt(listen_socket, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
LOG_ERROR("Cannot set listen socket option, errno=[%d], errstr=[%s]", errno, strerror(errno));
return -1;
}
memset(&listen_addr, 0, sizeof(listen_addr));
listen_addr.sin_family = AF_INET;
listen_addr.sin_port = htons(listen_port);
listen_addr.sin_addr.s_addr = INADDR_ANY;
if (bind(listen_socket, (struct sockaddr *)&listen_addr, sizeof(listen_addr)) != 0) {
LOG_ERROR("Cannot bind listen socket, errno=[%d], errstr=[%s]", errno, strerror(errno));
return -1;
}
if (listen(listen_socket, 5) < 0) {
LOG_ERROR("Cannot listen socket, errno=[%d], errstr=[%s]", errno, strerror(errno));
return -1;
}
LOG_DEBUG("Server listen on port: %d", listen_port);
return 0;
}
/**
* return > 0 : success
* = 0 : socket close
* < 0 : error
*/
ssize_t forward_data(int fromsocket, int tosocket) {
unsigned char buffer[BUFFER_SIZE];
ssize_t n = recv(fromsocket, buffer, sizeof(buffer), 0);
if (n > 0) {
LOG_DEBUG("data socket: %d -> %d, size: %d", fromsocket, tosocket, n);
if (!DEBUG) {
dumpBufferRaw(buffer, n);
}
ssize_t m = send(tosocket, buffer, n, 0); // send data to output socket
if (m < 0) {
LOG_ERROR("Cannot call send, errno=[%d], errstr=[%s]", errno, strerror(errno));
return m;
}
else if (m != n) {
LOG_ERROR("Cannot call send, return %ld, expected %ld", m, n);
return -1;
}
else {
return n;
}
}
else if (n == 0) {
LOG_INFO("socket closed: %d", fromsocket);
return 0;
}
else {
LOG_ERROR("Cannot call recv, errno=[%d], errstr=[%s]", errno, strerror(errno));
return n;
}
}
void message_loop(int client_socket) {
fd_set fdsets;
int remote_socket = connect_remote();
if (remote_socket > 0) {
LOG_INFO("Remote connection, remote socket=%d", remote_socket);
while (1) {
FD_ZERO(&fdsets);
FD_SET(client_socket, &fdsets);
FD_SET(remote_socket, &fdsets);
int ret = select(FD_SETSIZE, &fdsets, NULL, NULL, NULL);
if (ret < 0) {
LOG_ERROR("Cannot call select, errno=[%d], errstr=[%s]", errno, strerror(errno));
break;
}
else if (ret == 0) {
continue;
}
if (FD_ISSET(client_socket, &fdsets)) {
ssize_t n = forward_data(client_socket, remote_socket);
if (n <= 0) {
break;
}
}
if (FD_ISSET(remote_socket, &fdsets)) {
ssize_t n = forward_data(remote_socket, client_socket);
if (n <= 0) {
break;
}
}
}
close(remote_socket);
}
close(client_socket);
}
void server_loop() {
while (1) {
struct sockaddr client_addr;
socklen_t client_size = sizeof(client_addr);
int client_socket = accept(listen_socket, (struct sockaddr *)&client_addr, &client_size);
if (client_socket < 0) {
LOG_ERROR("Cannot accept client connection, errno=[%d], errstr=[%s]", errno, strerror(errno));
continue;
}
LOG_INFO("Connection accept, client socket=%d", client_socket);
pid_t pid = fork();
if (pid > 0) { // parent process
LOG_INFO("close client socket in parent process");
close(client_socket);
}
else if (pid == 0) { // child process
close(listen_socket);
LOG_INFO("close listen socket in child process");
message_loop(client_socket);
exit(0);
}
else if (pid < 0) {
LOG_ERROR("Cannot fork client process, errno=[%d], errstr=[%s]", errno, strerror(errno));
continue;
}
}
}
int connect_remote() {
struct sockaddr_in remote_addr;
struct hostent * remote_server;
int remote_socket;
if ((remote_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
LOG_ERROR("Cannot create remote socket, errno=[%d], errstr=[%s]", errno, strerror(errno));
return -1;
}
if ((remote_server = gethostbyname(remote_host)) == NULL) {
LOG_ERROR("Cannot get host by remote host name %s, errno=[%d], errstr=[%s]", remote_host, errno, strerror(errno));
return -1;
}
memset(&remote_addr, 0, sizeof(remote_addr));
remote_addr.sin_family = AF_INET;
memcpy(&remote_addr.sin_addr.s_addr, remote_server->h_addr, remote_server->h_length);
remote_addr.sin_port = htons(remote_port);
if (connect(remote_socket, (struct sockaddr *) &remote_addr, sizeof(remote_addr)) < 0) {
LOG_ERROR("Cannot connect to remote host %s:%d, errno=[%d], errstr=[%s]", remote_host, remote_port, errno, strerror(errno));
return -1;
}
return remote_socket;
}
int parse_options(int argc, char *argv[]) {
int c = 0;
while ((c = getopt(argc, argv, "l:r:p:f:h")) != -1) {
switch(c) {
case 'l':
listen_port = atoi(optarg);
break;
case 'r':
remote_host = optarg;
break;
case 'p':
remote_port = atoi(optarg);
break;
case 'f':
foreground = 1;
break;
case 'h':
help(argv[0]);
exit(0);
default:
fprintf(stderr, "ERROR: unsupported parameter %c\n", c);
return -1;
}
}
if (listen_port == 0 || remote_host == NULL || remote_port == 0) {
return -1;
}
LOG_INFO("Server configuration, listen on %d, remote host: [%s], remote port: %d", listen_port, remote_host, remote_port);
return 0;
}
/* handle finished child process */
void sigchld_handler(int signal) {
while (waitpid(-1, NULL, WNOHANG) > 0);
}
int main(int argc, char *argv[]) {
if (parse_options(argc, argv) < 0) {
help(argv[0]);
return -1;
}
if (server_listen() < 0) { // start server
LOG_ERROR("Cannot start server listen on port %d", listen_port);
return -1;
}
signal(SIGCHLD, sigchld_handler); // prevent ended children from becoming zombies
if (!foreground) {
pid_t pid = fork();
if (pid > 0) { // parent
exit(0);
}
else if (pid == 0) { // // deamonized child
}
else { // error
LOG_ERROR("Cannot create listen socket, errno=[%d], errstr=[%s]", errno, strerror(errno));
return -1;
}
}
server_loop();
return 0;
}