IOCP数据中间件
每包最大8K(8192字节),超过8187字节的数据要分包传输
首包有5个字节的包头:4字节数据长度(告诉对方,此次总共将传输几字节数据) + 1字节命令字(告诉对方,此次请求的何种命令)
命令分类1)请求查询数据,应答查询数据2)请求提交数据,应答提交数据3)请求上传文件,应答上传文件4)请求下载文件,应答下载文件5)请求发送字符串消息,应答发送字符串消息
unit uFun;
// 应用协议// cxg 2016-9-23interface
uses
SysUtils, Classes, PeachCtrl.Net.IocpTcpServer, System.Generics.Collections ;const // 包长
pack_len = 8192;const // 命令分类
cmd_qry_req = 1; cmd_qry_res = 2; cmd_post_req = 3; cmd_post_res = 4; cmd_up_file_req = 5; cmd_up_file_res = 6; cmd_down_file_req = 7; cmd_down_file_res = 8; cmd_data = 9;type
THead = packed record // 包头 cmd: Byte; len: Integer; packNo: Integer; packQty: Integer; ver: Byte; end;type
TTask = record // 一次任务 context: Integer; body: TBytes; end;PTTask = ^TTask;
var
g_TaskList: TList<PTTask>; // 任务队列function ValidHead(AHead: THead): Boolean;
function GetTask(AContext: TCustomIocpTcpServer.TPerHandleData): PTTask;procedure ProcessRecved(AContext: TCustomIocpTcpServer.TPerHandleData);implementation
function ValidHead(AHead: THead): Boolean;
begin Result := (AHead.cmd >= 1) and (AHead.len > SizeOf(THead)) and (AHead.packNo >= 1) and (AHead.packQty >= 1);end;function GetTask(AContext: TCustomIocpTcpServer.TPerHandleData): PTTask;
var i: Integer;begin Result := nil; if (AContext = nil) or (g_TaskList.Count = 0) then Exit; System.TMonitor.Enter(g_TaskList); try for i := 0 to g_TaskList.Count - 1 do begin if g_TaskList.Items[i].context = Integer(AContext) then begin Result := g_TaskList.Items[i]; Exit; end; end; finally System.TMonitor.Exit(g_TaskList); end;end;procedure ProcessRecved(AContext: TCustomIocpTcpServer.TPerHandleData);
var pTask: PTTask; buf: TBytes; head: THead; bodyLen: Integer; headLen: Integer;begin headLen := SizeOf(THead); // 包头长 if AContext.RingBuffer.NoProcessBufLen < headLen then Exit; AContext.RingBuffer.Peep(head, headLen); // 取包头 if not uFun.ValidHead(head) then // 校验包头 Exit;if head.packQty = 1 then // 一批次只有一个包
begin if AContext.RingBuffer.NoProcessBufLen < head.len then Exit; New(pTask); pTask.context := Integer(AContext); bodyLen := head.len - headLen; SetLength(pTask.body, bodyLen); SetLength(buf, head.len); AContext.RingBuffer.Pop(buf[0], head.len); Move(buf[headLen], pTask.body[0], bodyLen); g_TaskList.Add(pTask); // 提交任务队列 end else if head.packQty > 1 then // 一批次有多个包 begin if head.packNo = 1 then // 首包 begin if AContext.RingBuffer.NoProcessBufLen < pack_len then Exit; New(pTask); pTask.context := Integer(AContext); SetLength(pTask.body, head.len - head.packQty * headLen); // 一次分好缓存 SetLength(buf, pack_len); AContext.RingBuffer.Pop(buf[0], pack_len); bodyLen := pack_len - headLen; Move(buf[headLen], pTask.body[0], bodyLen); end else if head.packNo > 1 then // 非首包 begin if AContext.RingBuffer.NoProcessBufLen < head.len then Exit; pTask := GetTask(AContext); if pTask = nil then Exit; SetLength(buf, head.len); AContext.RingBuffer.Pop(buf[0], head.len); bodyLen := head.len - headLen; Move(buf[headLen], pTask.body[(head.packNo - 1) * bodyLen], bodyLen); if head.packNo = head.packQty then // 包都收齐 g_TaskList.Add(pTask); // 提交任务队列 end; end;end;end.