编程知识:C#高性能TCP服务的实现方案

标签:
apm方式tap方式saea方式rio方式socketasynceventargs |
分类: FA与PA:离散与过程自动化 |
【前言】
本篇文章的主旨是使用
-
APM 方式,即 Asynchronous Programming Model
-
TAP 方式,即 Task-based Asynchronous Pattern
-
SAEA 方式,即 SocketAsyncEventArgs
-
RIO 方式,即 Registered I/O
在 .NET/C# 中对于 Socket 的支持均是基于
-
在任何 TCP Server 的实现中,一定存在一个 Accept Socket Loop,用于接收 Client 端的 Connect 请求以建立 TCP Connection。
-
在任何 TCP Server 的实现中,一定存在一个 Read Socket Loop,用于接收 Client 端 Write 过来的数据。
如果 Accept 循环阻塞,则会导致无法快速的建立连接,服务端 Pending Backlog 满,进而导致 Client 端收到 Connect Timeout 的异常。如果 Read 循环阻塞,则显然会导致无法及时收到 Client 端发过来的数据,进而导致 Client 端 Send Buffer 满,无法再发送数据。从实现细节的角度看,能够导致服务阻塞的位置可能在:
-
Accept 到新的
Socket,构建新的 Connection 需要分配各种资源,分配资源慢; -
Accept 到新的
Socket,没有及时触发下一次 Accept; -
Read 到新的
Buffer,判定 Payload 消息长度,判定过程长; -
Read 到新的
Buffer,发现 Payload 还没有收全,继续 Read,则可能会导致一次 Buffer Copy; -
Payload 接收完毕,进行 Serialization 转成可识别的 Protocol Message,序列化慢;
-
由 Business Module 来处理相应的 Protocol Message,处理过程慢;
1-2 涉及到 Accept 过程和 Connection 的建立过程,3-4 涉及到 ReceiveBuffer
的处理过程,5-6 涉及到应用逻辑侧的实现。Java 中著名的 Netty 网络库从 4.0 版本开始对于 Buffer
部分做了全新的尝试,采用了名叫
【APM 方式:TcpSocketServer】
TcpSocketServer
-
BeginAccept -> EndAccept->
BeginAccept -> EndAccept -> BeginAccept -> …
每一个建立成功的 Connection
由
-
BeginRead -> EndRead -> BeginRead -> EndRead -> BeginRead -> …
TcpSocketServer 通过暴露 Event 来实现 Connection 的建立与断开和数据接收的通知。
event
EventHandler ClientConnected; event
EventHandler ClientDisconnected; event
EventHandler ClientDataReceived;
使用也是简单直接,直接订阅事件通知。
private
static void StartServer() {
_server = new TcpSocketServer(22222);
_server.ClientConnected += server_ClientConnected;
_server.ClientDisconnected += server_ClientDisconnected;
_server.ClientDataReceived += server_ClientDataReceived;
_server.Listen(); }
static
void server_ClientConnected(object sender, TcpClientConnectedEventA rgs e) {
Console.WriteLine(string.Format("TCP client {0} has connected {1}.", e.Session.RemoteEndPoint, e.Session)); }
static
void server_ClientDisconnected(object sender, TcpClientDisconnectedEve ntArgs e) {
Console.WriteLine(string.Format("TCP client {0} has disconnected.", e.Session)); }
static
void server_ClientDataReceived(object sender, TcpClientDataReceivedEve ntArgs e) {
var text = Encoding.UTF8.GetString(e.Data, e.DataOffset, e.DataLength);
Console.Write(string.Format("Client : {0} {1} --> ", e.Session.RemoteEndPoint, e.Session));
Console.WriteLine(string.Format("{0}", text));
_server.Broadcast(Encoding.UTF8.GetBytes(text)); }
【TAP
方式:AsyncTcpSocketServer】
AsyncTcpSocketServer
//************* Task-based async public methods *************************
[HostProtection(ExternalThreading =
true)] public
Task AcceptSocketAsync() {
return Task.Factory.FromAsync(BeginAcceptSocket, EndAcceptSocket, null); }
[HostProtection(ExternalThreading =
true)] public
Task AcceptTcpClientAsync() {
return Task.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null); }
AsyncTcpSocketServer
while
(IsListening) {
var tcpClient = await _listener.AcceptTcpClientAsync(); }
每一个建立成功的 Connection
由
while
(State == TcpSocketConnectionState .Connected) {
int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length); }
为了将
public
interface IAsyncTcpSocketServerMes sageDispatcher {
Task OnSessionStarted(AsyncTcpSocketSession session);
Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count);
Task OnSessionClosed(AsyncTcpSocketSession session); }
使用时仅需将一个实现了该接口的对象注入到 AsyncTcpSocketServer 的构造函数中即可。
public
class SimpleMessageDispatcher : IAsyncTcpSocketServerMes sageDispatcher {
public async Task OnSessionStarted(AsyncTcpSocketSession session)
{
Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
await Task.CompletedTask;
}
public async Task OnSessionDataReceived(AsyncTcpSocketSession session, byte[] data, int offset, int count)
{
var text = Encoding.UTF8.GetString(data, offset, count);
Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
Console.WriteLine(string.Format("{0}", text));
await session.SendAsync(Encoding.UTF8.GetBytes(text));
}
public async Task OnSessionClosed(AsyncTcpSocketSession session)
{
Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
await Task.CompletedTask;
} }
当然,对于接口的实现也不是强制了,也可以在构造函数中直接注入方法的实现。
public
AsyncTcpSocketServer(
IPEndPoint listenedEndPoint,
Funcbyte[], int, int, Task> onSessionDataReceived = null,
Func onSessionStarted = null,
Func onSessionClosed = null,
AsyncTcpSocketServerConf iguration configuration = null) {}
【SAEA
方式:TcpSocketSaeaServer】
SAEA
是
The main feature of these enhancements is the
avoidance of the repeated allocation and synchronization of objects during high-volume asynchronous socket I/O. The Begin/End design pattern currently implemented by the Socket class for asynchronous socket I/O requires a System.IAsyncResult object be allocated for each asynchronous socket operation.
也就是说,优点就是无需为每次调用都生成
使用 SocketAsyncEventArgs 的推荐步骤如下:
-
Allocate a new SocketAsyncEventArgs context object, or get a free one from an application pool.
-
Set properties on the context object to the operation about to be performed (the callback delegate method and data buffer, for example).
-
Call the appropriate socket method (xxxAsync) to initiate the asynchronous operation.
-
If the asynchronous socket method (xxxAsync) returns true in the callback, query the context properties for completion status.
-
If the asynchronous socket method (xxxAsync) returns false in the callback, the operation completed synchronously. The context properties may be queried for the operation result.
-
Reuse the context for another operation, put it back in the pool, or discard it.
重点在于池化(Pooling),池化的目的就是为了重用和减少运行时分配和垃圾回收的压力。
TcpSocketSaeaServer
public
static SaeaAwaitable AcceptAsync(this Socket socket, SaeaAwaitable awaitable) public
static SaeaAwaitable ConnectAsync(this Socket socket, SaeaAwaitable awaitable) public
static SaeaAwaitable DisonnectAsync(this Socket socket, SaeaAwaitable awaitable) public
static SaeaAwaitable ReceiveAsync(this Socket socket, SaeaAwaitable awaitable) public
static SaeaAwaitable SendAsync(this Socket socket, SaeaAwaitable awaitable)
SaeaPool 则是一个 QueuedObjectPool 的衍生实现,用于池化 SaeaAwaitable
实例。同时,为了减少
TcpSocketSaeaServer
while
(IsListening) {
var saea = _acceptSaeaPool.Take();
var socketError = await _listener.AcceptAsync(saea);
if (socketError == SocketError.Success)
{
var acceptedSocket = saea.Saea.AcceptSocket;
}
_acceptSaeaPool.Return(saea); }
每一个建立成功的 Connection
由
var saea = _saeaPool.Take();
saea.Saea.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length);
while
(State == TcpSocketConnectionState .Connected) {
saea.Saea.SetBuffer(0, _receiveBuffer.Length);
var socketError = await _socket.ReceiveAsync(saea);
if (socketError != SocketError.Success)
break;
var receiveCount = saea.Saea.BytesTransferred;
if (receiveCount == 0)
break; }
同样,TcpSocketSaeaServer
public
interface ITcpSocketSaeaServerMess ageDispatcher {
Task OnSessionStarted(TcpSocketSaeaSession session);
Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count);
Task OnSessionClosed(TcpSocketSaeaSession session); }
使用起来也是简单直接:
public
class SimpleMessageDispatcher : ITcpSocketSaeaServerMess ageDispatcher {
public async Task OnSessionStarted(TcpSocketSaeaSession session)
{
Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
await Task.CompletedTask;
}
public async Task OnSessionDataReceived(TcpSocketSaeaSession session, byte[] data, int offset, int count)
{
var text = Encoding.UTF8.GetString(data, offset, count);
Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
Console.WriteLine(string.Format("{0}", text));
await session.SendAsync(Encoding.UTF8.GetBytes(text));
}
public async Task OnSessionClosed(TcpSocketSaeaSession session)
{
Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
await Task.CompletedTask;
} }
【RIO
方式:TcpSocketRioServer】
从 Windows 8.1 / Windows Server 2012 R2
开始,微软推出了
The following functions are supported for Windows Store apps on Windows 8.1, Windows Server 2012 R2, and later. Microsoft Visual Studio 2013 Update 3 or later is required for Windows Store apps.
-
RIOCloseCompletionQueue
-
RIOCreateCompletionQueue
-
RIOCreateRequestQueue
-
RIODequeueCompletion
-
RIODeregisterBuffer
-
RIONotify
-
RIOReceive
-
RIOReceiveEx
-
RIORegisterBuffer
-
RIOResizeCompletionQueue
-
RIOResizeRequestQueue
-
RIOSend
-
RIOSendEx
到目前为止,.NET Framework 还没有推出对 RIO 的支持,所以若想在 C# 中实现 RIO 则只能通过
P/Invoke 方式,RioSharp
Cowboy.Sockets
同样,通过
_listener.OnAccepted = (acceptedSocket) =>
{
Task.Run(async () =>
{
await Process(acceptedSocket);
})
.Forget(); };
通过
while
(State == TcpSocketConnectionState .Connected) {
int receiveCount = await _stream.ReadAsync(_receiveBuffer, 0, _receiveBuffer.Length);
if (receiveCount == 0)
break; }
测试代码一如既往的类似:
public
class SimpleMessageDispatcher : ITcpSocketRioServerMessa geDispatcher {
public async Task OnSessionStarted(TcpSocketRioSession session)
{
//Console.WriteLine(string.Format("TCP session {0} has connected {1}.", session.RemoteEndPoint, session));
Console.WriteLine(string.Format("TCP session has connected {0}.", session));
await Task.CompletedTask;
}
public async Task OnSessionDataReceived(TcpSocketRioSession session, byte[] data, int offset, int count)
{
var text = Encoding.UTF8.GetString(data, offset, count);
//Console.Write(string.Format("Client : {0} --> ", session.RemoteEndPoint));
Console.Write(string.Format("Client : --> "));
Console.WriteLine(string.Format("{0}", text));
await session.SendAsync(Encoding.UTF8.GetBytes(text));
}
public async Task OnSessionClosed(TcpSocketRioSession session)
{
Console.WriteLine(string.Format("TCP session {0} has disconnected.", session));
await Task.CompletedTask;
} }
【参考资料】
-
Asynchronous Programming Model (APM)
-
Task-based Asynchronous Pattern (TAP)
-
Event-based Asynchronous Pattern (EAP)
-
SocketAsyncEventArgs
-
Registered I/O
-
Netty:
Reference counted objects -
Socket Performance Enhancements in Version 3.5
-
What’s New for Windows Sockets
for Windows 8.1 and Windows Server 2012 R2 -
RIO_EXTENSION_FUNCTION_TABLE structure
-
Windows 8 Registered I/O Networking Extensions
来源:Dennis Gao
链接:http://www.cnblogs.com/gaochundong/p/csharp_tcp_service_models.html