NetworkMgr
using System.Collections.Generic;
using System.Net.Sockets;
using System;
public class SendWindow
{
private IList<ArraySegment<byte>>[] _bufferLists;
// 发送缓冲区:一个简单的双缓冲
public SendWindow ()
{
_bufferLists = new IList<ArraySegment<byte>>[]{
new List<ArraySegment<byte>> (),
new List<ArraySegment<byte>> ()
};
}
// 双缓冲位: 0 或 1
private int switcher = 0;
// 初始化双缓冲
public void initBuffer (SocketAsyncEventArgs args)
{
args.BufferList = _bufferLists [switcher];
args.SetBuffer (null, 0, 0);
}
// 将要发送的数据放到当前缓冲区
public void addToBuffer (byte[] bytes)
{
_bufferLists [switcher].Add (new ArraySegment<byte> (bytes));
}
// 检查当前缓冲区是否有要发送的数据
public bool hasDataToSend {
get {
return _bufferLists [switcher].Count > 0;
}
}
// 交换缓冲区
public void swapBuffer (SocketAsyncEventArgs args)
{
// 将当前缓冲区放到异步发送对象
args.BufferList = _bufferLists [switcher];
// 切换开关
// 1 - 0 = 1;
// 1 - 1 = 0;
switcher = 1 - switcher;
_bufferLists [switcher].Clear ();
}
}
using UnityEngine;
using System.Collections;
using System.Net.Sockets;
public class ReceiveWindow
{
// 模拟循环数组,两次取模用于处理负数情况
// -1 %% 10 = 9
public static byte GetByte (byte[] buffer, int index)
{
int size = buffer.Length;
return buffer [(index % size + size) % size];
}
private byte[] _buffer;
private int _bufferSize;
private int _offset;
// 建立缓冲区
public ReceiveWindow (int size)
{
// 双缓冲模式的循环数组
// 可使用的部分为 [0, size - 1]
// 以及 [size, size * 2 - 1]
_buffer = new byte[size * 2];
_bufferSize = size;
// 表示数据开始的位置
// 初始为 0 交换后为 size
_offset = 0;
}
// 初始化缓冲区
public void initBuffer (SocketAsyncEventArgs args)
{
args.SetBuffer (_buffer, _offset, _bufferSize);
// UserToken 为数字时表示前一个缓冲区剩下多少有效数据
// 初始化时为 0
args.UserToken = 0;
}
// 双缓冲模式,交找缓冲区
public void swapBuffer (SocketAsyncEventArgs args)
{
// 在 0 和 bufferSize 之前切换
_offset = (_offset + _bufferSize) % (_bufferSize * 2);
// 修改偏移,但不修改 buffer,避免多余开销
args.SetBuffer (_offset, _bufferSize);
}
// 协议头 2(len) + 1(zip) + 2(cmd) + 2(idx) = 7
private static int PROTOCOL_HEADER_SIZE = 7;
public Packet[] ParsePackets (SocketAsyncEventArgs args)
{
ArrayList packets = new ArrayList ();
int head = args.Offset;
int tail = args.Offset + args.BytesTransferred;
// 如果 UserToken 是一个 Packet 对象
// 表示有未处理完的包
Packet packet = args.UserToken as Packet;
if (packet == null) {
// 否则 packet 是一个数字,表示前一次剩余的有效数据长度
// head 需要前移 n 位,有可能变为负数
int remain = (int)args.UserToken;
head -= remain;
}
// 开始处理数据
int seek = head;
do {
// 如果缓冲区够大,可能会有多个包粘在一起
// 所以需要循环处理数据
if (packet == null) {
// 解析一个新的包
if (tail - seek < PROTOCOL_HEADER_SIZE)
// 如果当前的数据不足以满足协议头,则终止
break;
// 解析包头
int packetLen = GetByte (args.Buffer, seek + 0) << 8 | GetByte (args.Buffer, seek + 1);
byte zip = GetByte (args.Buffer, seek + 2); // 压缩位,暂时无用
int cmd = GetByte (args.Buffer, seek + 3) << 8 | GetByte (args.Buffer, seek + 4);
int idx = GetByte (args.Buffer, seek + 5) << 8 | GetByte (args.Buffer, seek + 6);
seek += PROTOCOL_HEADER_SIZE;
// 创建一个新包
byte[] data = new byte[packetLen - 5];
packet = new Packet ((ushort)cmd, (ushort)idx, data);
}
// 使用余下的数据填充包
seek += packet.fillCycle (args.Buffer, seek, tail);
// 看看数据包是否填充完成
if (packet.isVaild) {
// 如果填充完成
// 将剩下的数据长度存到 UserToken 供之后使用
args.UserToken = tail - seek;
packets.Add (packet);
packet = null;
} else {
// 如果未填充完成
// 将 packet 放到 UserToken 待下次收到数据继续填充
args.UserToken = packet;
}
// 如果数据还够长,可以立即开始解析下一个包
} while (tail - seek >= PROTOCOL_HEADER_SIZE);
// 一种错误的情况:不应该在包未填满缓冲区的时候出现多余的数据
if (tail - seek > 0 && args.BytesTransferred != 10) {
Debug.LogError ("receive data error");
args.UserToken = 0;
}
return (Packet[])packets.ToArray (typeof(Packet));
}
}
using UnityEngine;
using System.Collections;
using System;
public class Packet
{
// 协议号 2 bytes
private ushort _cmd;
public ushort cmd {
get {
return _cmd;
}
}
// 数据包编号 2 bytes
private ushort _idx;
public ushort idx {
get {
return _idx;
}
}
// 数据
private byte[] _data;
public byte[] data {
get {
return _data;
}
}
// 填充进度
private ushort _seek;
public Packet (ushort cmd, ushort idx, byte[] data)
{
_cmd = cmd;
_idx = idx;
_data = data;
_seek = 0;
}
// 包是否完整
public bool isVaild {
get {
return this._seek == this._data.Length;
}
}
// 使用接收缓冲区的内容填充数据
public ushort fillCycle (byte[] buffer, int start, int end)
{
int need = _data.Length - _seek;
int count = end - start;
if (count >= 0) {
ushort len = (ushort)Mathf.Min (count, need);
for (int i = 0; i < len; i++) {
this._data[_seek + i] = ReceiveWindow.GetByte(buffer, start + i);
}
_seek += len;
return len;
} else {
throw new OverflowException ();
}
}
// 转成 lua 使用的数据
public LuaStringBuffer getLuaStringBuffer()
{
return new LuaStringBuffer(_data);
}
}
using UnityEngine;
using System.Collections;
using System.Collections.Generic;
using System.Net.Sockets;
using System;
using System.Net;
public class NetworkMgr : MonoBehaviour
{
// NetworkMgr 单例模式
private static NetworkMgr _inst;
public static NetworkMgr Inst {
get {
return _inst;
}
}
void Awake ()
{
if (_inst != null) {
Debug.LogWarning ("NetworkMgr is singleton.");
return;
}
_inst = this;
}
private Socket _socket;
// 三个异步操作对象
// _conn 用于连接
// _recv 用于接收
// _send 用于发送
private SocketAsyncEventArgs _connSocketArgs;
private SocketAsyncEventArgs _recvSocketArgs;
private SocketAsyncEventArgs _sendSocketArgs;
// 两个缓冲区
// _recv 用于接收数据
// _send 用于发送数据
private ReceiveWindow _recvWin;
private SendWindow _sendWin;
// 四个回调
// onConnected 连接建立
// onDisconnected 连接断开
// onData 收到数据
// onError 所有异常
public Action onConnected;
public Action onDisconnected;
public Action<Packet> onData;
public Action<SocketError> onError;
// 初始化
public void Init ()
{
_socket = new Socket (AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_connSocketArgs = new SocketAsyncEventArgs ();
_connSocketArgs.Completed += OnSocketIO;
_recvSocketArgs = new SocketAsyncEventArgs ();
_recvWin = new ReceiveWindow (1024);
_recvWin.initBuffer (_recvSocketArgs);
_sendSocketArgs = new SocketAsyncEventArgs ();
_sendWin = new SendWindow ();
_sendWin.initBuffer (_sendSocketArgs);
// 使用 UnityThreadHelper 和主线程通讯
UnityThreadHelper.EnsureHelper ();
Debug.Log ("NetworkMgr Initialized.");
}
// 释放资源
public void Dispose ()
{
_connSocketArgs.Completed -= OnSocketIO;
_recvSocketArgs.Completed -= OnReceive;
_sendSocketArgs.Completed -= OnSend;
_connSocketArgs = null;
_recvSocketArgs = null;
_sendSocketArgs = null;
_socket.Close ();
_socket = null;
Debug.Log ("NetworkMgr Disposed.");
}
// 由于异步 Socket 的回调都是在另一个线程触发的,
// 但是回调逻辑需要在主线程执行,所以使用 UnityThreadHelper 进行转发
private void InvokeOnMainThread(Action act)
{
if (act != null) {
UnityThreadHelper.Dispatcher.Dispatch (() => {
act.Invoke ();
});
}
}
// 带参数的回调,其他同上
private void InvokeOnMainThread<T>(Action<T> act, T arg)
{
if (act != null) {
UnityThreadHelper.Dispatcher.Dispatch (() => {
act.Invoke (arg);
});
}
}
// 检查 Socket 对象
private void CheckSocket ()
{
if (_socket == null)
throw new NullReferenceException ("Socket not found");
}
// 是否连接(基于最后一次操作)
public bool isConnected {
get {
CheckSocket ();
return _socket.Connected;
}
}
// 连接和断开使用同一个异步对象,这里做一下转发
private void OnSocketIO (object sender, SocketAsyncEventArgs args)
{
switch (args.LastOperation) {
case SocketAsyncOperation.Connect:
OnConnect (sender, args);
break;
case SocketAsyncOperation.Disconnect:
OnDisconnect (sender, args);
break;
}
}
private bool isConnecting = false;
// 连接服务器
public void Connect (string host, int port, int timeout)
{
CheckSocket ();
if (isConnected) {
InvokeOnMainThread<SocketError>(onError.Invoke, SocketError.IsConnected);
return;
}
lock (_connSocketArgs) {
if (isConnecting) {
return;
}
isConnecting = true;
}
// 异步操作使用 RemoteEndPoint 来指定服务器地址
_connSocketArgs.RemoteEndPoint = new IPEndPoint (IPAddress.Parse (host), port);
bool pending = _socket.ConnectAsync (_connSocketArgs);
if (!pending) {
OnConnect (null, _connSocketArgs);
return;
}
// TODO: handle timeout
// CancelConnectAsync 只能在高版本的 .net 上使用
// 这里得想下其它办法
}
// 连接结果
private void OnConnect (object sender, SocketAsyncEventArgs args)
{
CheckSocket ();
if (args.SocketError == SocketError.Success) {
lock(_connSocketArgs) {
isConnecting = false;
}
InvokeOnMainThread(onConnected);
// 设置回调
_recvSocketArgs.Completed += OnReceive;
_sendSocketArgs.Completed += OnSend;
// 开始接收数据(循环)
StartRecieve ();
} else {
Debug.LogWarning (DateTime.Now + " OnConnect error: " + args.SocketError);
InvokeOnMainThread<SocketError>(onError, args.SocketError);
}
}
private bool isDisconnecting = false;
// 断开连接
public void Disconnect ()
{
CheckSocket ();
if (!isConnected) {
InvokeOnMainThread<SocketError>(onError.Invoke, SocketError.NotConnected);
return;
}
lock (_connSocketArgs) {
if (isDisconnecting) {
return;
}
isDisconnecting = true;
}
// 把回调清掉,不然 _recv 会触发一个 SocketError.NotSocket
_recvSocketArgs.Completed -= OnReceive;
_sendSocketArgs.Completed -= OnSend;
_socket.Shutdown (SocketShutdown.Both);
_connSocketArgs.DisconnectReuseSocket = true;
bool pending = _socket.DisconnectAsync (_connSocketArgs);
if (!pending) {
OnDisconnect (null, _connSocketArgs);
return;
}
}
// 断开连接结果
private void OnDisconnect (object sender, SocketAsyncEventArgs args)
{
CheckSocket ();
if (args.SocketError == SocketError.Success) {
lock(_connSocketArgs) {
isDisconnecting = false;
}
InvokeOnMainThread(onDisconnected);
} else {
Debug.LogWarning (DateTime.Now + " OnDisconnect error: " + args.SocketError);
InvokeOnMainThread<SocketError>(onError, args.SocketError);
}
}
// 开始接收数据
private void StartRecieve ()
{
CheckSocket ();
// buffer 在 Init 中初始化,这里不需要再处理
// 在 OnReceive 会准备好下一次 recv 用的 buffer
bool pending = _socket.ReceiveAsync (_recvSocketArgs);
if (!pending) {
OnReceive (null, _recvSocketArgs);
return;
}
}
// 接收数据结果
private void OnReceive (object sender, SocketAsyncEventArgs args)
{
CheckSocket ();
if (args.SocketError == SocketError.Success) {
if (args.BytesTransferred > 0) {
// 如果有数据就进行处理
Debug.Log (DateTime.Now + " Receive bytes: " + args.BytesTransferred);
// 解包过程交给接收缓冲区,在里面处理粘包等问题
// 如果解出完整的包,在主线程调用数据回调函数
Packet[] packets = _recvWin.ParsePackets (args);
foreach (Packet packet in packets) {
InvokeOnMainThread<Packet>(onData, packet);
}
// 交换缓冲区
_recvWin.swapBuffer (args);
// 继续接收数据,直到服务器下线
StartRecieve ();
} else {
// 基于 stream 的 TCP 如果没有数据了,说明服务器主动断开
Debug.Log (DateTime.Now + " Connection was closed by server.");
Disconnect ();
}
} else {
Debug.LogWarning (DateTime.Now + " OnReceive error: " + args.SocketError);
InvokeOnMainThread<SocketError>(onError, args.SocketError);
}
}
// 发送中标记
private bool isSending = false;
// 发送数据到服务器
public void Send (byte[] bytes)
{
// 先把数据放到发送缓冲区,然后开始发送
_sendWin.addToBuffer (bytes);
StartSend ();
}
// 开始发送数据
private void StartSend ()
{
CheckSocket ();
// 检查是否连接
if (!_socket.Connected) {
Debug.LogWarning ("Socket is not connected.");
onError (SocketError.NotConnected);
return;
}
// 有没有数据需要发送
if (!_sendWin.hasDataToSend) {
return;
}
// 检查是否正在发送
// 若正在发送数据,则当前操作取消
// 等到当前发送结束后,会重新调用 StartSend
lock (_sendWin) {
// 这里加锁是因为 OnSend 在另一个线程中运行
// 且会修改 isSending 的值
if (isSending) {
return;
}
isSending = true;
}
// 双缓冲模式
// 交换缓冲区
// 旧的数据将被发送
// 新的数据填充到另一个缓冲区
_sendWin.swapBuffer (_sendSocketArgs);
bool pending = _socket.SendAsync (_sendSocketArgs);
if (!pending) {
OnSend (null, _sendSocketArgs);
return;
}
}
// 发送结果
private void OnSend (object sender, SocketAsyncEventArgs args)
{
CheckSocket ();
if (args.SocketError == SocketError.Success) {
Debug.Log (DateTime.Now + " Send bytes: " + args.BytesTransferred);
// 发送完成
lock (_sendWin) {
// 这里加锁,防止主线程发数据的时候误判
isSending = false;
}
// 异步发送期间可能有新的数据被加到缓冲区
// 继续检查并发送数据
StartSend ();
} else {
Debug.LogWarning (DateTime.Now + " OnSend error: " + args.SocketError);
InvokeOnMainThread<SocketError>(onError, args.SocketError);
}
}
}
using UnityEngine;
using System.Collections;
using System.Net.Sockets;
using System;
public class Main : MonoBehaviour
{
static byte[] GetBytes (string str)
{
byte[] bytes = new byte[str.Length * sizeof(char)];
System.Buffer.BlockCopy (str.ToCharArray (), 0, bytes, 0, bytes.Length);
return bytes;
}
// Use this for initialization
IEnumerator Start ()
{
NetworkMgr.Inst.Init ();
NetworkMgr.Inst.onConnected += () => {
Debug.Log (DateTime.Now + " connected");
NetworkMgr.Inst.Send (GetBytes ("test\n"));
//NetworkMgr.Inst.Disconnect();
};
NetworkMgr.Inst.onError += (SocketError errorCode) => {
Debug.Log (DateTime.Now + " socket error: " + errorCode.ToString ());
};
NetworkMgr.Inst.onDisconnected += () => {
Debug.Log (DateTime.Now + " disconnected");
};
Debug.Log (DateTime.Now + " connecting");
NetworkMgr.Inst.Connect ("192.168.1.57", 9999, 1000);
// yield return new WaitForSeconds (2f);
// Debug.Log (DateTime.Now + " sending");
// byte[] largeBytes = new byte[8*1024*1024];
// for (int i=0; i<8*1024*1024; i++) {
// largeBytes[i] = 96;
// }
// yield return new WaitForSeconds (2f);
// Debug.Log (DateTime.Now + " isConnected: " + NetworkMgr.Inst.isConnected);
// NetworkMgr.Inst.Disconnect ();
yield break;
}
void OnApplicationQuit ()
{
NetworkMgr.Inst.Dispose ();
}
}
public class CycleBuffer
{
public static byte GetByte(byte[] buffer, int index) {
int size = buffer.Length;
return buffer[(index % size + size) % size];
}
}