DIOCP open source project - a encoder and decoder and how to transfer in compres

Recommended for you: Get network issues from WhatsUp Gold. Not end users.

>>>>>>DIOCP discussion group: 320641073

>>>>>>SVN source and DEMO Download: HTTPS://code.google.com/p/diocp/

The limited network bandwidth, the data compression and transmission can effectively utilize the bandwidth and increase the transmission efficiency.

Compression of data transmission in DIOCP, need to modify the encoder and decoder, usage and features about these two things.

For example: we want to put a computer Express back to school I was using, so home is the server (S), computer is that we want to send the object (O), express is the transmission process of TCP.

In this process, sending an object (computer) the client used the encoder, decoder receives the object (computer) used in server.

Before writing the DIOCP example uses the JSonStream object for transmission, the TJSonStream class, there are two main parts of the data, the first part contains the JSon string data, the second part contains the Stream data stream. In the three layer data example we take the client request is placed in the JSon. The server receives data through the decoder to restore the JSonStream object server. Then the logic processing, server written back to the object, the server sends back to the encoder for encoding to write JSonStream, client by client decoder receives and is reduced to the JSonStream object. On the server side to write CDS data packet to XML string data written in the JSonStream.Stream, if the compression of Stream objects, in the debugger compression is found when a 70K packet to compression, data packets can be turned into 7K, the text compression effect is very good.

*Of course, we allow ourselves to define the protocol and the preparation of encoder and decoder, we can define their own TStrStream or TXMLStream and so on, and then write the corresponding coder and decoder on the line.

The analysis of the code

The client code:

var
  lvJSonStream, lvRecvObject:TJsonStream;
  lvStream:TStream;
  lvData:AnsiString;
  l, j, x:Integer;
begin
  lvJSonStream := TJsonStream.Create;
  try
    lvJSonStream.JSon := SO();
    lvJSonStream.JSon.I['cmdIndex'] := 1001;   //Open a SQL script, data acquisition
    lvJSonStream.Json.S['sql'] := mmoSQL.Lines.Text;

    FClientSocket.sendObject(lvJSonStream);
  finally
    lvJSonStream.Free;
  end;

The first is to create a TJSonStream object, and then set the information, because it is SQL so there is no Stream data. Followed by FClientSocket.sendObject (lvJSonStream); // are sent with the Socket,

procedure TD10ClientSocket.sendObject(pvObject:TObject);
begin
  if FCoder = nil then raise Exception.Create('No object code and decoder register(registerCoder)!');

  if not Active then Exit;

  FCoder.Encode(Self, pvObject);
end;

As can be seen here call register encoder, call the Encode function


The Encode function client encoder are as follows

procedure TJSonStreamClientCoder.Encode(pvSocket: TClientSocket; pvObject:
    TObject);
var
  lvJSonStream:TJsonStream;
  lvJSonLength:Integer;
  lvStreamLength:Integer;
  sData, lvTemp:String;
  lvStream:TStream;
  lvTempBuf:PAnsiChar;

  lvBytes, lvTempBytes:TBytes;
  
  l:Integer;
  lvBufBytes:array[0..1023] of byte;
begin
  if pvObject = nil then exit;
  lvJSonStream := TJsonStream(pvObject);
  
  //If the compression flow
  if (lvJSonStream.Stream <> nil) then
  begin
    if lvJSonStream.Json.O['config.stream.zip'] <> nil then
    begin
      if lvJSonStream.Json.B['config.stream.zip'] then
      begin
        //Compressible flow
        TZipTools.compressStreamEx(lvJSonStream.Stream);
      end;
    end else if lvJSonStream.Stream.Size > 0 then
    begin
      //Compressible flow
      TZipTools.compressStreamEx(lvJSonStream.Stream);
      lvJSonStream.Json.B['config.stream.zip'] := true;
    end;
  end;

  sData := lvJSonStream.JSon.AsJSon(True);

  lvBytes := TNetworkTools.ansiString2Utf8Bytes(sData);

  lvJSonLength := Length(lvBytes);
  lvStream := lvJSonStream.Stream;

  lvJSonLength := TNetworkTools.htonl(lvJSonLength);

  if pvSocket.sendBuffer(@lvJSonLength, SizeOf(lvJSonLength)) = SOCKET_ERROR then Exit;


  if lvStream <> nil then
  begin
    lvStreamLength := lvStream.Size;
  end else
  begin
    lvStreamLength := 0;
  end;

  lvStreamLength := TNetworkTools.htonl(lvStreamLength);
  if pvSocket.sendBuffer(@lvStreamLength, SizeOf(lvStreamLength)) = SOCKET_ERROR then Exit;

  //json bytes
  if pvSocket.sendBuffer(@lvBytes[0], Length(lvBytes)) = SOCKET_ERROR then Exit;

  if lvStream.Size > 0 then
  begin
    lvStream.Position := 0;
    repeat
      l := lvStream.Read(lvBufBytes, SizeOf(lvBufBytes));
      if pvSocket.sendBuffer(@lvBufBytes[0], l) = SOCKET_ERROR then Exit;
    until (l = 0);
  end;
end;

This part has the function of the completion of

1 judge whether need to compress the Stream data.

2 send Json data and Stream data length length

3 Json data transmission

4 Stream data transmission


Explain:

lvJSonLength := TNetworkTools.ntohl(lvJSonLength);

lvStreamLength := TNetworkTools.ntohl(lvStreamLength);

lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);

These three lines of code to explain, is to be compatible with Java, netty server to facilitate decoding, of course, we also can't convert. Sent directly to. As long as the server on the line. The agreement is the definition of their own.


Next is the server IOCP queue can receive the signal receiving data.

function TIOCPObject.processIOQueued: Integer;
var
  BytesTransferred:Cardinal;
  lvResultStatus:BOOL;
  lvRet:Integer;
  lvIOData:POVERLAPPEDEx;

  lvDataObject:TObject;

  lvClientContext:TIOCPClientContext;
begin
  Result := IOCP_RESULT_OK;

  //Worker threads will stop to the GetQueuedCompletionStatus function, until receives the data.
  lvResultStatus := GetQueuedCompletionStatus(FIOCoreHandle,
 
  .......
    if lvIOData.IO_TYPE = IO_TYPE_Accept then  //Connection request
    begin
      TIODataMemPool.instance.giveBackIOData(lvIOData);
      PostWSARecv(lvClientContext);
    end else if lvIOData.IO_TYPE = IO_TYPE_Recv then
    begin
      //Added to the socket corresponding to the cache, the processing logic
      lvClientContext.RecvBuffer(lvIOData.DataBuf.buf,
        lvIOData.Overlapped.InternalHigh);

      TIODataMemPool.instance.giveBackIOData(lvIOData);

      //A request is received to delivery
      PostWSARecv(lvClientContext);
    end;    
  .........
end;

//Added to the socket corresponding to the cache, the processing logic

lvClientContext.RecvBuffer(lvIOData.DataBuf.buf,

  lvIOData.Overlapped.InternalHigh);

//This will call attempt to decode decoder

procedure TIOCPClientContext.RecvBuffer(buf:PAnsiChar; len:Cardinal);
var
  lvObject:TObject;
begin
  FCS.Enter;
  try
    //Adding to the socket corresponding cache
    FBuffers.AddBuffer(buf, len);

    //Call the registration of < decoder to decode;>
    lvObject := TIOCPContextFactory.instance.FDecoder.Decode(FBuffers);
    if lvObject <> nil then
    try
      try
        //Successful decoding, processing method invoke business logic
        dataReceived(lvObject);
      except
        on E:Exception do
        begin
          TIOCPFileLogger.logErrMessage('Acquisition processing logic anomalies!' + e.Message);
        end;
      end; 
      //Clean out the distribution of a memory <if there is no block of memory available>Clear
      if FBuffers.validCount = 0 then
      begin
        FBuffers.clearBuffer;
      end;
    finally
      lvObject.Free;
    end;
  finally
    FCS.Leave;
  end;
end;

We use before Demo in the TIOCPJSonStreamDecoder decoder


function TIOCPJSonStreamDecoder.Decode(const inBuf: TBufferLink): TObject;
var
  lvJSonLength, lvStreamLength:Integer;
  lvData:String;
  lvBuffer:array of Char;
  lvBufData:PAnsiChar;
  lvStream:TMemoryStream;
  lvJsonStream:TJsonStream;
  lvBytes:TBytes;
  lvValidCount:Integer;
begin
  Result := nil;

  //If the data in the cache is not long enough, Baotou length, decoding failure <JSON string length, the length of the stream>
  lvValidCount := inBuf.validCount;
  if (lvValidCount <SizeOf(Integer) + SizeOf(Integer)) then
  begin
    Exit;
  end;

  //Recording and reading position
  inBuf.markReaderIndex;
  inBuf.readBuffer(@lvJSonLength, SizeOf(Integer));
  inBuf.readBuffer(@lvStreamLength, SizeOf(Integer));

  lvJSonLength := TNetworkTools.ntohl(lvJSonLength);
  lvStreamLength := TNetworkTools.ntohl(lvStreamLength);

  //If the data in the cache is not the length of JSON and flow length <the data are not collected.>Decoding failure
  lvValidCount := inBuf.validCount;
  if lvValidCount <(lvJSonLength + lvStreamLength) then
  begin
    //Returns the buf read position
    inBuf.restoreReaderIndex;
    exit;
  end else if (lvJSonLength + lvStreamLength) = 0 then
  begin
    //Two for 0< 0; two>The client can be used as automatic reconnection use
    TIOCPFileLogger.logDebugMessage('Receiving a [00] data!');
    Exit;
  end;



  //Decoding success
  lvJsonStream := TJsonStream.Create;
  Result := lvJsonStream;

  //Read the JSON string
  if lvJSonLength > 0 then
  begin
    SetLength(lvBytes, lvJSonLength);
    ZeroMemory(@lvBytes[0], lvJSonLength);
    inBuf.readBuffer(@lvBytes[0], lvJSonLength);

    lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);

    lvJsonStream.Json := SO(lvData);
  end else
  begin
    TFileLogger.instance.logMessage('Receiving a JSon as a data request is empty!', 'IOCP_ALERT_');
  end;


  //Reading the stream data 
  if lvStreamLength > 0 then
  begin
    GetMem(lvBufData, lvStreamLength);
    try
      inBuf.readBuffer(lvBufData, lvStreamLength);
      lvJsonStream.Stream.Size := 0;
      lvJsonStream.Stream.WriteBuffer(lvBufData^, lvStreamLength);

      //Decompression stream
      if lvJsonStream.Json.B['config.stream.zip'] then
      begin
        //Decompression
        TZipTools.unCompressStreamEX(lvJsonStream.Stream);
      end;
    finally
      FreeMem(lvBufData, lvStreamLength);
    end;
  end;
end;

//There are three lines of code to the server to client code decoder with flow

lvJSonLength := TNetworkTools.ntohl(lvJSonLength);

lvStreamLength := TNetworkTools.ntohl(lvStreamLength);

lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);

/////

The main function of the completion of a decoder

If it receives 0 judgment data can be decoded, if the exit can not, decoding is not successful.

1 receiving JSON length, the length of flow data

2 receiving JSON data, receive streaming data stored in JsonStream.json,

3 according to the JSON config.stream.zip judge whether need to decompress the data stream. In the JsonStream.stream

4 decoding success returns a JsonStream object.


Decoding is completed can see

lvObject := TIOCPContextFactory.instance.FDecoder.Decode(FBuffers);

if lvObject <> nil then

try //Successful decoding, processing method invoke business logic

dataReceived(lvObject);

………

Decoding success calls dataReceived, processing logic.


Summary:

   The decoder supporting client server encoder, decoder encoder supporting client server.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download

Posted by Jerome at December 07, 2013 - 8:06 PM