mutoo
1/14/2016 - 1:02 PM

NetworkMgr

NetworkMgr

using System.Collections.Generic;
using System.Net.Sockets;
using System;

public class SendWindow
{
	private IList<ArraySegment<byte>>[] _bufferLists;

	// 发送缓冲区:一个简单的双缓冲
	public SendWindow ()
	{
		_bufferLists = new IList<ArraySegment<byte>>[]{
			new List<ArraySegment<byte>> (),
			new List<ArraySegment<byte>> ()
		};
	}
	
	// 双缓冲位: 0 或 1
	private int switcher = 0;

	// 初始化双缓冲
	public void initBuffer (SocketAsyncEventArgs args)
	{
		args.BufferList = _bufferLists [switcher];
		args.SetBuffer (null, 0, 0);
	}

	// 将要发送的数据放到当前缓冲区
	public void addToBuffer (byte[] bytes)
	{
		_bufferLists [switcher].Add (new ArraySegment<byte> (bytes));
	}

	// 检查当前缓冲区是否有要发送的数据
	public bool hasDataToSend {
		get {
			return _bufferLists [switcher].Count > 0;
		}
	}

	// 交换缓冲区
	public void swapBuffer (SocketAsyncEventArgs args)
	{
		// 将当前缓冲区放到异步发送对象
		args.BufferList = _bufferLists [switcher];

		// 切换开关
		// 1 - 0 = 1;
		// 1 - 1 = 0;
		switcher = 1 - switcher;
		_bufferLists [switcher].Clear ();
	}
}
using UnityEngine;
using System.Collections;
using System.Net.Sockets;

public class ReceiveWindow
{
	// 模拟循环数组,两次取模用于处理负数情况
	// -1 %% 10 = 9
	public static byte GetByte (byte[] buffer, int index)
	{
		int size = buffer.Length;
		return buffer [(index % size + size) % size];
	}

	private byte[] _buffer;
	private int _bufferSize;
	private int _offset;

	// 建立缓冲区
	public ReceiveWindow (int size)
	{
		// 双缓冲模式的循环数组
		// 可使用的部分为 [0, size - 1]
		// 以及 [size, size * 2 - 1]
		_buffer = new byte[size * 2];
		_bufferSize = size;

		// 表示数据开始的位置
		// 初始为 0 交换后为 size
		_offset = 0;
	}

	// 初始化缓冲区
	public void initBuffer (SocketAsyncEventArgs args)
	{
		args.SetBuffer (_buffer, _offset, _bufferSize);

		// UserToken 为数字时表示前一个缓冲区剩下多少有效数据
		// 初始化时为 0
		args.UserToken = 0;
	}

	// 双缓冲模式,交找缓冲区
	public void swapBuffer (SocketAsyncEventArgs args)
	{
		// 在 0 和 bufferSize 之前切换
		_offset = (_offset + _bufferSize) % (_bufferSize * 2);

		// 修改偏移,但不修改 buffer,避免多余开销
		args.SetBuffer (_offset, _bufferSize);
	}

	// 协议头 2(len) + 1(zip) + 2(cmd) + 2(idx) = 7
	private static int PROTOCOL_HEADER_SIZE = 7;
	
	public Packet[] ParsePackets (SocketAsyncEventArgs args)
	{
		ArrayList packets = new ArrayList ();
		
		int head = args.Offset;
		int tail = args.Offset + args.BytesTransferred;

		// 如果 UserToken 是一个 Packet 对象
		// 表示有未处理完的包
		Packet packet = args.UserToken as Packet;
		
		if (packet == null) {
			// 否则 packet 是一个数字,表示前一次剩余的有效数据长度
			// head 需要前移 n 位,有可能变为负数
			int remain = (int)args.UserToken;
			head -= remain;
		}

		// 开始处理数据
		int seek = head;
		
		do {
			// 如果缓冲区够大,可能会有多个包粘在一起
			// 所以需要循环处理数据

			if (packet == null) {
				// 解析一个新的包

				if (tail - seek < PROTOCOL_HEADER_SIZE)
					// 如果当前的数据不足以满足协议头,则终止
					break;

				// 解析包头
				int packetLen = GetByte (args.Buffer, seek + 0) << 8 | GetByte (args.Buffer, seek + 1);
				byte zip = GetByte (args.Buffer, seek + 2); // 压缩位,暂时无用
				int cmd = GetByte (args.Buffer, seek + 3) << 8 | GetByte (args.Buffer, seek + 4);
				int idx = GetByte (args.Buffer, seek + 5) << 8 | GetByte (args.Buffer, seek + 6);
				
				seek += PROTOCOL_HEADER_SIZE;
				
				// 创建一个新包
				byte[] data = new byte[packetLen - 5];
				packet = new Packet ((ushort)cmd, (ushort)idx, data);
			}

			// 使用余下的数据填充包
			seek += packet.fillCycle (args.Buffer, seek, tail);
			
			// 看看数据包是否填充完成
			if (packet.isVaild) {
				// 如果填充完成
				// 将剩下的数据长度存到 UserToken 供之后使用
				args.UserToken = tail - seek;
				
				packets.Add (packet);
				packet = null;
				
			} else {
				// 如果未填充完成
				// 将 packet 放到 UserToken 待下次收到数据继续填充
				args.UserToken = packet;
			}

			// 如果数据还够长,可以立即开始解析下一个包
		} while (tail - seek >= PROTOCOL_HEADER_SIZE);

		// 一种错误的情况:不应该在包未填满缓冲区的时候出现多余的数据
		if (tail - seek > 0 && args.BytesTransferred != 10) {
			Debug.LogError ("receive data error");
			args.UserToken = 0;
		}
		
		return (Packet[])packets.ToArray (typeof(Packet));
	}
}
using UnityEngine;
using System.Collections;
using System;

public class Packet
{
	// 协议号 2 bytes
	private ushort _cmd;
	
	public ushort cmd { 
		get {
			return _cmd;
		} 
	}

	// 数据包编号 2 bytes
	private ushort _idx;
	
	public ushort idx { 
		get {
			return _idx;
		} 
	}

	// 数据
	private byte[] _data;
	
	public byte[] data {
		get {
			return _data;
		}
	}

	// 填充进度
	private ushort _seek;
	
	public Packet (ushort cmd, ushort idx, byte[] data)
	{
		_cmd = cmd;
		_idx = idx;
		_data = data;
		_seek = 0;
	}

	// 包是否完整
	public bool isVaild {
		get {
			return this._seek == this._data.Length;
		}
	}

	// 使用接收缓冲区的内容填充数据
	public ushort fillCycle (byte[] buffer, int start, int end)
	{
		int need = _data.Length - _seek;
		int count = end - start;
		
		if (count >= 0) {
			ushort len = (ushort)Mathf.Min (count, need);
			
			for (int i = 0; i < len; i++) {
				this._data[_seek + i] = ReceiveWindow.GetByte(buffer, start + i);
			}
			
			_seek += len;
			return len;
			
		} else {
			throw new OverflowException ();
		}
	}

	// 转成 lua 使用的数据
	public LuaStringBuffer getLuaStringBuffer()
	{
		return new LuaStringBuffer(_data);
	}
}

using UnityEngine;
using System.Collections;
using System.Collections.Generic;
using System.Net.Sockets;
using System;
using System.Net;

public class NetworkMgr : MonoBehaviour
{
	
	// NetworkMgr 单例模式
	private static NetworkMgr _inst;
	
	public static NetworkMgr Inst {
		get {
			return _inst;
		}
	}
	
	void Awake ()
	{
		if (_inst != null) {
			Debug.LogWarning ("NetworkMgr is singleton.");
			return;
		}
		
		_inst = this;
	}
	
	private Socket _socket;

	// 三个异步操作对象
	// _conn 用于连接
	// _recv 用于接收
	// _send 用于发送
	private SocketAsyncEventArgs _connSocketArgs;
	private SocketAsyncEventArgs _recvSocketArgs;
	private SocketAsyncEventArgs _sendSocketArgs;

	// 两个缓冲区
	// _recv 用于接收数据
	// _send 用于发送数据
	private ReceiveWindow _recvWin;
	private SendWindow _sendWin;

	// 四个回调
	// onConnected 连接建立
	// onDisconnected 连接断开
	// onData 收到数据
	// onError 所有异常
	public Action onConnected;
	public Action onDisconnected;
	public Action<Packet> onData;
	public Action<SocketError> onError;

	// 初始化
	public void Init ()
	{		
		_socket = new Socket (AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
		
		_connSocketArgs = new SocketAsyncEventArgs ();
		_connSocketArgs.Completed += OnSocketIO;

		_recvSocketArgs = new SocketAsyncEventArgs ();
		_recvWin = new ReceiveWindow (1024);
		_recvWin.initBuffer (_recvSocketArgs);
		
		_sendSocketArgs = new SocketAsyncEventArgs ();
		_sendWin = new SendWindow ();
		_sendWin.initBuffer (_sendSocketArgs);

		// 使用 UnityThreadHelper 和主线程通讯
		UnityThreadHelper.EnsureHelper ();

		Debug.Log ("NetworkMgr Initialized.");
	}

	// 释放资源
	public void Dispose ()
	{
		_connSocketArgs.Completed -= OnSocketIO;
		_recvSocketArgs.Completed -= OnReceive;
		_sendSocketArgs.Completed -= OnSend;
		
		_connSocketArgs = null;
		_recvSocketArgs = null;
		_sendSocketArgs = null;
		
		_socket.Close ();
		
		_socket = null;
		
		Debug.Log ("NetworkMgr Disposed.");
	}

	// 由于异步 Socket 的回调都是在另一个线程触发的,
	// 但是回调逻辑需要在主线程执行,所以使用 UnityThreadHelper 进行转发
	private void InvokeOnMainThread(Action act)
	{
		if (act != null) {
			UnityThreadHelper.Dispatcher.Dispatch (() => {
				act.Invoke ();
			});
		}
	}

	// 带参数的回调,其他同上
	private void InvokeOnMainThread<T>(Action<T> act, T arg)
	{
		if (act != null) {
			UnityThreadHelper.Dispatcher.Dispatch (() => {
				act.Invoke (arg);
			});
		}
	}

	// 检查 Socket 对象
	private void CheckSocket ()
	{
		if (_socket == null)
			throw new NullReferenceException ("Socket not found");
	}

	// 是否连接(基于最后一次操作)
	public bool isConnected {
		get {
			CheckSocket ();
			return _socket.Connected;
		}
	}

	// 连接和断开使用同一个异步对象,这里做一下转发
	private void OnSocketIO (object sender, SocketAsyncEventArgs args)
	{
		switch (args.LastOperation) {
		case SocketAsyncOperation.Connect:
			OnConnect (sender, args);
			break;

		case SocketAsyncOperation.Disconnect:
			OnDisconnect (sender, args);
			break;
		}
	}

	private bool isConnecting = false;

    // 连接服务器
	public void Connect (string host, int port, int timeout)
	{
		CheckSocket ();

		if (isConnected) {
			InvokeOnMainThread<SocketError>(onError.Invoke, SocketError.IsConnected);
			return;
		}

		lock (_connSocketArgs) {
			if (isConnecting) {
				return;
			}

			isConnecting = true;
		}

		// 异步操作使用 RemoteEndPoint 来指定服务器地址
		_connSocketArgs.RemoteEndPoint = new IPEndPoint (IPAddress.Parse (host), port);
		bool pending = _socket.ConnectAsync (_connSocketArgs);
		if (!pending) {
			OnConnect (null, _connSocketArgs);
			return;
		}
		
		// TODO: handle timeout
		// CancelConnectAsync 只能在高版本的 .net 上使用
		// 这里得想下其它办法
	}

	// 连接结果
	private void OnConnect (object sender, SocketAsyncEventArgs args)
	{
		CheckSocket ();
		
		if (args.SocketError == SocketError.Success) {

			lock(_connSocketArgs) {
				isConnecting = false;
			}

			InvokeOnMainThread(onConnected);

			// 设置回调
			_recvSocketArgs.Completed += OnReceive;
			_sendSocketArgs.Completed += OnSend;

			// 开始接收数据(循环)
			StartRecieve ();
			
		} else {
			Debug.LogWarning (DateTime.Now + " OnConnect error: " + args.SocketError);
			InvokeOnMainThread<SocketError>(onError, args.SocketError);
		}
	}

	private bool isDisconnecting = false;

	// 断开连接
	public void Disconnect ()
	{
		CheckSocket ();

		if (!isConnected) {
			InvokeOnMainThread<SocketError>(onError.Invoke, SocketError.NotConnected);
			return;
		}

		lock (_connSocketArgs) {
			if (isDisconnecting) {
				return;
			}
			
			isDisconnecting = true;
		}

		// 把回调清掉,不然 _recv 会触发一个 SocketError.NotSocket
		_recvSocketArgs.Completed -= OnReceive;
		_sendSocketArgs.Completed -= OnSend;

		_socket.Shutdown (SocketShutdown.Both);

		_connSocketArgs.DisconnectReuseSocket = true;
		bool pending = _socket.DisconnectAsync (_connSocketArgs);
		if (!pending) {
			OnDisconnect (null, _connSocketArgs);
			return;
		}
	}

	// 断开连接结果
	private void OnDisconnect (object sender, SocketAsyncEventArgs args)
	{
		CheckSocket ();
		
		if (args.SocketError == SocketError.Success) {

			lock(_connSocketArgs) {
				isDisconnecting = false;
			}

			InvokeOnMainThread(onDisconnected);
			
		} else {
			Debug.LogWarning (DateTime.Now + " OnDisconnect error: " + args.SocketError);
			InvokeOnMainThread<SocketError>(onError, args.SocketError);
		}
	}

	// 开始接收数据
	private void StartRecieve ()
	{
		CheckSocket ();

		// buffer 在 Init 中初始化,这里不需要再处理
		// 在 OnReceive 会准备好下一次 recv 用的 buffer
		bool pending = _socket.ReceiveAsync (_recvSocketArgs);
		if (!pending) {
			OnReceive (null, _recvSocketArgs);
			return;
		}
	}

	// 接收数据结果
	private void OnReceive (object sender, SocketAsyncEventArgs args)
	{
		CheckSocket ();
		
		if (args.SocketError == SocketError.Success) {

			if (args.BytesTransferred > 0) {
				// 如果有数据就进行处理

				Debug.Log (DateTime.Now + " Receive bytes: " + args.BytesTransferred);

				// 解包过程交给接收缓冲区,在里面处理粘包等问题
				// 如果解出完整的包,在主线程调用数据回调函数
				Packet[] packets = _recvWin.ParsePackets (args);
				foreach (Packet packet in packets) {
					InvokeOnMainThread<Packet>(onData, packet);
				}

				// 交换缓冲区
				_recvWin.swapBuffer (args);

				// 继续接收数据,直到服务器下线
				StartRecieve ();

			} else {
				// 基于 stream 的 TCP 如果没有数据了,说明服务器主动断开

				Debug.Log (DateTime.Now + " Connection was closed by server.");
				Disconnect ();
			}
			
		} else {
			Debug.LogWarning (DateTime.Now + " OnReceive error: " + args.SocketError);
			InvokeOnMainThread<SocketError>(onError, args.SocketError);
		}
	}

	// 发送中标记
	private bool isSending = false;

	// 发送数据到服务器
	public void Send (byte[] bytes)
	{
		// 先把数据放到发送缓冲区,然后开始发送
		_sendWin.addToBuffer (bytes);
		StartSend ();
	}

	// 开始发送数据
	private void StartSend ()
	{
		CheckSocket ();

		// 检查是否连接
		if (!_socket.Connected) {
			Debug.LogWarning ("Socket is not connected.");
			onError (SocketError.NotConnected);
			return;
		}

		// 有没有数据需要发送
		if (!_sendWin.hasDataToSend) {
			return;
		}

		// 检查是否正在发送
		// 若正在发送数据,则当前操作取消
		// 等到当前发送结束后,会重新调用 StartSend
		lock (_sendWin) {
			// 这里加锁是因为 OnSend 在另一个线程中运行
        	// 且会修改 isSending 的值

			if (isSending) {
				return;
			}

			isSending = true;
		}

		// 双缓冲模式
		// 交换缓冲区
		// 旧的数据将被发送
		// 新的数据填充到另一个缓冲区
		_sendWin.swapBuffer (_sendSocketArgs);

		bool pending = _socket.SendAsync (_sendSocketArgs);
		if (!pending) {
			OnSend (null, _sendSocketArgs);
			return;
		}
	}

	// 发送结果
	private void OnSend (object sender, SocketAsyncEventArgs args)
	{
		CheckSocket ();
		
		if (args.SocketError == SocketError.Success) {
			Debug.Log (DateTime.Now + " Send bytes: " + args.BytesTransferred);

			// 发送完成
			lock (_sendWin) {
				// 这里加锁,防止主线程发数据的时候误判
				isSending = false;
			}

			// 异步发送期间可能有新的数据被加到缓冲区
			// 继续检查并发送数据
			StartSend ();

		} else {
			Debug.LogWarning (DateTime.Now + " OnSend error: " + args.SocketError);
			InvokeOnMainThread<SocketError>(onError, args.SocketError);
		}
	}
}
using UnityEngine;
using System.Collections;
using System.Net.Sockets;
using System;

public class Main : MonoBehaviour
{
	static byte[] GetBytes (string str)
	{
		byte[] bytes = new byte[str.Length * sizeof(char)];
		System.Buffer.BlockCopy (str.ToCharArray (), 0, bytes, 0, bytes.Length);
		return bytes;
	}

	// Use this for initialization
	IEnumerator Start ()
	{
		NetworkMgr.Inst.Init ();

		NetworkMgr.Inst.onConnected += () => {
			Debug.Log (DateTime.Now + " connected");
			NetworkMgr.Inst.Send (GetBytes ("test\n"));
			//NetworkMgr.Inst.Disconnect();
		};

		NetworkMgr.Inst.onError += (SocketError errorCode) => {
			Debug.Log (DateTime.Now + " socket error: " + errorCode.ToString ());
		};

		NetworkMgr.Inst.onDisconnected += () => {
			Debug.Log (DateTime.Now + " disconnected");
		};
		
		Debug.Log (DateTime.Now + " connecting");
		NetworkMgr.Inst.Connect ("192.168.1.57", 9999, 1000);

//		yield return new WaitForSeconds (2f);

//		Debug.Log (DateTime.Now + " sending");
//		byte[] largeBytes = new byte[8*1024*1024];
//		for (int i=0; i<8*1024*1024; i++) {
//			largeBytes[i] = 96;
//		}

//		yield return new WaitForSeconds (2f);

//		Debug.Log (DateTime.Now + " isConnected: " + NetworkMgr.Inst.isConnected);
//		NetworkMgr.Inst.Disconnect ();

		yield break;
	}

	void OnApplicationQuit ()
	{
		NetworkMgr.Inst.Dispose ();
	}
}
public class CycleBuffer
{
	public static byte GetByte(byte[] buffer, int index) {
		int size = buffer.Length;
		return buffer[(index % size + size) % size];
	}
}