<menu id="w8yyk"><menu id="w8yyk"></menu></menu>
  • <dd id="w8yyk"><nav id="w8yyk"></nav></dd>
    <menu id="w8yyk"></menu>
    <menu id="w8yyk"><code id="w8yyk"></code></menu>
    <menu id="w8yyk"></menu>
    <xmp id="w8yyk">
    <xmp id="w8yyk"><nav id="w8yyk"></nav>
  • 網站首頁 > 物聯資訊 > 技術分享

    <摘錄>詳談高性能TCP服務器的開發

    2016-09-28 00:00:00 廣州睿豐德信息科技有限公司 閱讀
    睿豐德科技 專注RFID識別技術和條碼識別技術與管理軟件的集成項目。質量追溯系統、MES系統、金蝶與條碼系統對接、用友與條碼系統對接

    對于開發一款高性能服務器程序,廣大服務器開發人員在一直為之奮斗和努力.其中一個影響服務器的重要瓶頸就是服務器的網絡處理模塊.如果一款服務器程序不能及時的處理用戶的數據.則服務器的上層業務邏輯再高效也是徒勞.所以一個服務器程序的網絡處理能力直接影響到整個服務器的性能, 本文主要介紹在windows平臺下開發高性能的網絡處理模塊以及自己在設計開發服務器網絡模塊遇到的一些問題和開發心得.本篇主要介紹TCP服務器的設計, 下一篇將主要介紹UDP服務器的設計.

      眾所周知, 對于服務器來說windows下網絡I/O處理的最佳方式就是完成端口, 因此本服務器的開發主要基于完成端口的模式.完成端口(completion port)是應用程序使用線程池處理異步I/O請求的一種機制.將創建好的socket和完成端口綁定后就可以向該socket上投遞相應的I/O操作, 當操作完成后I/O系統會向完成端口發送一個通知包;應用程序通過GetQueuedCompletionStatus()函數獲取這些通知包并進行相應的處理.下面切入正題談談TCP服務器的開發.

      本人在開發TCP服務器的經過了兩個階段, 第一次設計出來的TCP服務器網絡層只能支持5000 – 8000個在線用戶同時和服務器交互, 但是會出現一些莫名其妙的系統異常.所以網絡層不是很穩定.這次開發主要用到一個系統的I/O線程池函數BindIoCompletionCallback() 該函數在win2000以后都支持, BindIoCompletion-

    Callback()是一個I/O線程池函數,其主要功能是采用系統線程池進行I/O處理,優點是用戶無需自己創建完成端口和線程池,完成端口和工作者線程池的創建和管理都由系統維護.給用戶帶了很大的方便.用戶只需要將自己創建的socket和I/O回調函數傳遞給BindIoCompletionCallback()函數即可, 然后就可以在該socket上投遞相應的操作.當操作完成后系統會調用用戶的回調函數通知用戶.這種方式給開發者帶來了很大的方便, 開發者甚至不需要去了解完成端口的工作機制就可以開發出一個較高性能的網絡程序.但同時也帶來了很多麻煩,用戶無法知道在完成端口上到底有多少個工作者線程, 而且當連接到服務器上的用戶量過大時會出現線程堆棧錯誤等異常,同時有1000-2000個用戶斷開連接后, 服務器就無法讓后續用戶連接到服務器. 在這種方式下的服務器網絡層最多只支持4000 – 5000用戶同時連接到服務器.用戶量再大時就會出現一些系統異常而且無法解決.

      借鑒于第一次開發的經驗和教訓, 在第二次開發服務器TCP層時決定自己創建完成端口和工作者線程池, 并對其進行維護和管理.這樣做的好處是出了問題好定位和處理.下面將我開發的代碼拿出來和大家切磋切磋, 如果什么地方寫得問題還希望能夠指正出來, 歡迎郵件聯系我: siqiang0312@163.com, QQ: 24633959, MSN: beifangying@hotmail.com

    1.         首先介紹網絡上下文(NET_CONTEXT)的定義:

     class NET_CONTEXT

     {

     public:

      WSAOVERLAPPED m_ol;

      SOCKET m_hSock; 

      CHAR* m_pBuf;  //接收或發送數據的緩沖區

      INT m_nOperation; //在該網絡上下文上進行的操作 OP_ACCEPT…

      static DWORD S_PAGE_SIZE;  //緩沖區的最大容量 

      NET_CONTEXT();

      virtual ~NET_CONTEXT();

     

      static void InitReource();

      static void ReleaseReource();

     

     private:

      void* operator new (size_t nSize);

      void operator delete(void* p);

      static HANDLE s_hDataHeap;

      static vector<char * > s_IDLQue;  //無效數據緩沖區的隊列

      static CRITICAL_SECTION s_IDLQueLock;  //訪問s_IDLQue的互斥鎖

     };

     NET_CONTEXT 是所有網絡上下文的基類, 對于TCP的recv, send, accep, connect的上下文都繼承自該類.UDP的send和recv的網絡上下文也繼承自該類. m_ol 必須放置在第一個位置否則當從完成封包取net_context不能得到正確的結果. S_PAGE_SIZE 為數據緩沖區m_pBuf的大小,其大小和相應的操作系統平臺有關, win32下其值為4096, win64下其值為8192, 即為操作系統的一個內存頁的大小.設置為一個內存頁的原因是因為在投遞重疊操作時系統會鎖定投遞的緩沖區, 在鎖定時是按照內存頁的邊界來鎖定的.因此即使你只發送一個1K字節數據系統也會鎖定整個內存頁(4096/8192). s_hDataHeap 為自定義的BUF申請的堆.其優點是用戶可以自己對堆進行管理和操作. s_IDLQue 為用過的BUF隊列, 當用戶用完相應的NET_CONTEXT后在執行析構操作時并不會真正把m_pBuf所占的內存釋放掉而是將其放入到s_IDLQue隊列中, 當下次申請新的NET_CONTEXT時只需從s_IDLQue中取出就可以使用, 避免頻繁的new和delete操作.

    2.         數據包頭的定義:

     struct PACKET_HEAD

     {

      LONG nTotalLen;   //數據包的總長度

      ULONG nSerialNum;  //數據包的序列號

      WORD nCurrentLen;  //當前數據包的長度

      WORD nType;    //數據包的類型

     };

    數據包頭位于每一個接收到的或待發送的數據包的首部,用于確定接收到的數據包是否合法以及該數據包是做什么用的.用戶可以定義自己包頭.

    3.         TCP_CONTEXT主要用于定義接收和發送數據的緩沖區, 繼承自NET_CONTEXT

     class TCP_CONTEXT : public NET_CONTEXT

     {

      friend class TcpServer;

     protected:

      DWORD m_nDataLen;  //TCP模式下累計發送和接收數據的長度

     

      TCP_CONTEXT()

       : m_nDataLen(0)

      { 

      }

      virtual ~TCP_CONTEXT() {} 

      void* operator new(size_t nSize);

      void operator delete(void* p); 

      enum

      {

       E_TCP_HEAP_SIZE = 1024 * 1024* 10,

       MAX_IDL_DATA = 20000,

      };

     private:

      static vector<TCP_CONTEXT* > s_IDLQue;  //無效的數據隊列

      static CRITICAL_SECTION s_IDLQueLock;  //訪問s_IDLQue的互斥鎖

      static HANDLE s_hHeap; //TCP_CONTEXT的數據申請堆

     };

    TCP_CONTEXT類主要用在網絡上發送和接收數據的上下文.每個連接到服務器的SOCKET都會有一個發送和接收數據的TCP_CONTEXT.這里重載了new和delete函數.這樣做的優點在于當申請一個新的TCP_CONTEXT對象時會先判斷無效的數據隊列中是否有未使用的TCP_CONTEXT,若有則直接取出來使用否則從s_hHeap堆上新申請一個.new 函數的定義如下

     void* TCP_CONTEXT::operator new(size_t nSize)

     {

      void* pContext = NULL; 

      try

      {

       if (NULL == s_hHeap)

       {

        throw ((long)(__LINE__));

       } 

       //為新的TCP_CONTEXT申請內存, 先從無效隊列中找, 如無效隊列為空則從堆上申請      

       EnterCriticalSection(&s_IDLQueLock);

       vector<TCP_CONTEXT* >::iterator iter = s_IDLQue.begin(); 

       if (iter != s_IDLQue.end())

       {

        pContext = *iter;

        s_IDLQue.erase(iter);   

       }

       else

       {

        pContext = HeapAlloc(s_hHeap, HEAP_ZERO_MEMORY | HEAP_NO_SERIALIZE, nSize);

       }

       LeaveCriticalSection(&s_IDLQueLock); 

       if (NULL == pContext)

       {

        throw ((long)(__LINE__));

       }

      }

      catch (const long& iErrCode)

      {

       pContext = NULL;

       _TRACE("\r\nExcept : %s--%ld", __FILE__, iErrCode);

      } 

      return pContext;

     }

    當使用完TCP_CONTEXT時調用delete函數進行對內存回收, 在進行內存回收時先查看無效隊列已存放的數據是否達到MAX_IDL_DATA, 若沒有超過MAX_IDL_DATA則將其放入到s_IDLQue中否則將其釋放掉.delete函數的實現如下:

     void TCP_CONTEXT::operator delete(void* p)

     {

      if (p) 

      {

       //若空閑隊列的長度小于MAX_IDL_DATA, 則將其放入無效隊列中否則釋

    //放之

     

       EnterCriticalSection(&s_IDLQueLock);

       const DWORD QUE_SIZE = (DWORD)(s_IDLQue.size());

       TCP_CONTEXT* const pContext = (TCP_CONTEXT*)p; 

       if (QUE_SIZE <= MAX_IDL_DATA)

       {

        s_IDLQue.push_back(pContext);

       }

       else

       {

        HeapFree(s_hHeap, HEAP_NO_SERIALIZE, p);

       }

       LeaveCriticalSection(&s_IDLQueLock); 

      } 

      return;

     }

    4.         ACCEPT_CONTEXT 主要用于投遞AcceptEx操作, 繼承自NET_CONTEXT類

     class ACCEPT_CONTEXT : public NET_CONTEXT

     {

      friend class TcpServer;

     protected:

      SOCKET m_hRemoteSock;   //連接本服務器的客戶端SOCKET 

      ACCEPT_CONTEXT()

       : m_hRemoteSock(INVALID_SOCKET)

      { 

      } 

      virtual ~ACCEPT_CONTEXT() {} 

      void* operator new(size_t nSize);

      void operator delete(void* p);

     private:

      static vector<ACCEPT_CONTEXT* > s_IDLQue;  //無效的數據隊列

      static CRITICAL_SECTION s_IDLQueLock;   //訪問s_IDLQueµ互斥鎖

      static HANDLE s_hHeap; //ACCEPT_CONTEXT的自定義堆

     };

    5.         TCP_RCV_DATA, 當服務器的某個socket從網絡上收到數據后并且數據合法便為收到的數據申請一個新的TCP_RCV_DATA實例存儲收到的數據.其定義如下:

     class DLLENTRY TCP_RCV_DATA

     {

      friend class TcpServer;

     public:

      SOCKET m_hSocket;  //與該數據相關的socket

      CHAR* m_pData;   //數據緩沖區地址

      INT m_nLen;    //收到的數據的長度

     

      TCP_RCV_DATA(SOCKET hSock, const CHAR* pBuf, INT nLen);

      ~TCP_RCV_DATA();

     

      void* operator new(size_t nSize);

      void operator delete(void* p);

     

      enum

      {

       HEAP_SIZE = 1024 *1024* 50, 

       DATA_HEAP_SIZE = 1024 *1024 * 100,

       MAX_IDL_DATA = 100000,

      };

     

     private:

      static vector<TCP_RCV_DATA* > s_IDLQue;  //無效數據隊列

      static CRITICAL_SECTION s_IDLQueLock;  //訪問s_IDLQue的互斥鎖

      static HANDLE s_hHeap; 

      static HANDLE s_DataHeap;

     };

    6.         前面講的相關的數據結構都是為下面要探討的TcpServer類服務的. TcpServer類是本文要探討的核心數據結構;主要用于啟動服務, 管理連接等操作.

    class DLLENTRY TcpServer

     {

     public:

      TcpServer();

      ~TcpServer();

     

      /************************************************************************

      * Desc : 初始化相關靜態資源,在申請TCP實例之前必須先調用該方法對相關資

      * 源進行初始化

      ************************************************************************/

      static void InitReource();

     

      /************************************************************************

      * Desc : 釋放相應的靜態資源

      ************************************************************************/

      static void ReleaseReource();

     

      /****************************************************

      * Name : StartServer()

      * Desc : 啟動TCP服務

      ****************************************************/

      BOOL StartServer(

       const char *szIp //要啟動服務的本地地址, 若為NULL則采用默認地址

       , INT nPort //要啟動服務的端口

       , LPCLOSE_ROUTINE pCloseFun  //客戶端socket關閉的通知函數

       , LPVOID pParam     //close函數的參數

       );

     

      /****************************************************

      * Name : CloseServer()

      * Desc : 關閉TCP服務

      ****************************************************/

      void CloseServer();

     

      /****************************************************

      * Name : SendData()

      * Desc : 對客戶端hSock發送長度為nDataLen的數據

      ****************************************************/

      BOOL SendData(SOCKET hSock, const CHAR* szData, INT nDataLen);

     

      /****************************************************

      * Name : GetRcvData()

      * Desc : 從接收數據隊列中獲取一個接收數據包

      * pQueLen 不為NULL時返回其長度

      ****************************************************/

      TCP_RCV_DATA* GetRcvData(

       DWORD* const pQueLen

       );

     protected:

      enum

      {

       LISTEN_EVENTS = 2,     //監聽socket的事件個數

       MAX_ACCEPT = 50,      //每次最多投遞的accept操作的個數

       _SOCK_NO_RECV = 0xf0000000, //客戶端socket已連接上但為發送數據

       _SOCK_RECV = 0xf0000001 //客戶端socket已連接上并也收到數據  

      };

     

      vector<TCP_RCV_DATA* > m_RcvDataQue;  //接收到的數據緩沖區隊列

      CRITICAL_SECTION m_RcvQueLock; //訪問m_RcvDataQue的互斥鎖

     

      vector<SOCKET> m_SocketQue; //連接本服務器的客戶端socket隊列

      CRITICAL_SECTION m_SockQueLock;  //訪問m_SocketQue的互斥鎖

     

      LPCLOSE_ROUTINE m_pCloseFun; //客戶端socket關閉的通知函數

      LPVOID m_pCloseParam; //傳遞給m_pCloseFun的用戶參數

     

      SOCKET m_hSock;    //要進行服務器監聽的socket

      long volatile m_bThreadRun; //是否允許后臺線程繼續運行

      long volatile m_nAcceptCount;    //當前已投遞的accept操作的個數

      BOOL m_bSerRun; //服務是否正在運行

     

      //accept的事件

      HANDLE m_ListenEvents[LISTEN_EVENTS];

      HANDLE *m_pThreads;    //創建的后臺線程的句柄

      HANDLE m_hCompletion;     //完成端口句柄

     

      static LPFN_ACCEPTEX s_pfAcceptEx;    //AcceptEx地址

      // GetAcceptExSockaddrs的地址

      static LPFN_GETACCEPTEXSOCKADDRS s_pfGetAddrs;

     

     

      /****************************************************

      * Name : AcceptCompletionProc()

      * Desc : acceptEx操作完成后回調函數

      ****************************************************/

      void AcceptCompletionProc(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped);

     

      /****************************************************

      * Name : RecvCompletionProc()

      * Desc : 接收操作完成后的回調函數

      ****************************************************/

      void RecvCompletionProc(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped);

     

      /****************************************************

      * Name : SendCompletionProc()

      * Desc : 發送操作完成后的回調函數

      ****************************************************/

       void SendCompletionProc(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped);

     

     

      /****************************************************

      * Name : ListenThread()

      * Desc : 監聽線程

      ****************************************************/

      static UINT WINAPI ListenThread(LPVOID lpParam);

     

      /****************************************************

      * Name : WorkThread()

      * Desc : 在完成端口上工作的后臺線程

      ****************************************************/

      static UINT WINAPI WorkThread(LPVOID lpParam);

     

      /****************************************************

      * Name : AideThread()

      * Desc :  后臺輔助線程

      ****************************************************/

      static UINT WINAPI AideThread(LPVOID lpParam);

     };

    下面將對相關實現細節作詳細介紹.

    也許您已經注意到本類只提供了客戶端socket關閉的接口, 而沒有提供客戶端連接到服務器的相關接口;這樣做的主要原因是因為當一個客戶端連接成功需要在完成端口的I/O線程中進行通知, 若用戶在該接口中進行復雜的運算操作將會使I/O工作線程阻塞.所以此處沒有提供連接成功的通知接口, 其實用戶可以根據客戶端發來的特定數據包(例如登陸數據包)確定用戶是否連接到本服務器.

      當有客戶端連接服務器投遞的accept操作就會完成, m_ListenEvents[1] 事件對象就會授信這時ListenThread線程將被喚醒并投遞一個accept操作. 若有大量的客戶端連接到本服務器而沒有足夠的accept接受連接此時m_ListenEvents[0]事件就會受信此時ListenThread線程會再次投遞MAX_ACCEPT個accept操作已接受更多的連接.

       ListenThread線程主要用來投遞aeecptex操作, 當m_ListenEvents[0]或者m_ListenEvents[1]受信時就會投遞一定量的AcceptEx操作以接受更多的客戶端連接.

      WorkThread 線程工作在完成端口上, 當相關的操作完成時該線程組負責從完成端口隊列上取得相應的完成封包進行處理. AideThread線程主要用于維護連接本服務器的socket隊列, 如果客戶端連接到服務器但長時間沒有進行發送數據便斷開該客戶端, 防止客戶端惡意連接.當有客戶端斷開連接時也在該線程中調用關閉接口通知用戶.

    相關函數介紹如下:

    l  TcpServer(), 該函數主要對相關成員變量進行初始化, 創建完成端口和相關線程.

     TcpServer::TcpServer()

      : m_pCloseFun(NULL)

      , m_hSock(INVALID_SOCKET)

      , m_pCloseParam(NULL)

      , m_bThreadRun(TRUE)

      , m_bSerRun(FALSE)

      , m_nAcceptCount(0)

     {

      m_RcvDataQue.reserve(10000 * sizeof(void *));

      m_SocketQue.reserve(50000 * sizeof(SOCKET));

     

      InitializeCriticalSection(&m_RcvQueLock);

      InitializeCriticalSection(&m_SockQueLock);

     

      //創建監聽事件

      for (int nIndex = 0; nIndex < LISTEN_EVENTS; nIndex ++)

      {

       m_ListenEvents[nIndex] = CreateEvent(NULL, FALSE, FALSE, NULL);

      }

      //創建完成端口

      m_hCompletion = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

     

      //創建輔助線程, 監聽線程, 工作者線程. 工作者線程的數目為CPU數目*2+2

      SYSTEM_INFO sys_info;

      GetSystemInfo(&sys_info);

     

      const DWORD MAX_THREAD = sys_info.dwNumberOfProcessors * 2 +2 + 2;

      m_pThreads = new HANDLE[MAX_THREAD];

      assert(m_pThreads);

     

      m_pThreads[0] = (HANDLE)_beginthreadex(NULL, 0, ListenThread, this, 0, NULL);

      m_pThreads[1] = (HANDLE)_beginthreadex(NULL, 0, AideThread, this, 0, NULL);

     

      for (DWORD nIndex = 2; nIndex < MAX_THREAD; nIndex++)

      {

       m_pThreads[nIndex] = (HANDLE)_beginthreadex(NULL, 0, WorkThread, this, 0, NULL);

      }

     }

    l  StartServer(), 該函數主要啟動服務并投遞MAX_ACCEPT個操作接受客戶端連接.

        BOOL TcpServer::StartServer( const char *szIp , INT nPort , LPCLOSE_ROUTINE pCloseFun , LPVOID pParam )

     {

      BOOL bSucc = TRUE;

      int nRet = 0;

      DWORD dwBytes = 0;

      ULONG ul = 1;

      int nOpt = 1;

     

      try

      {

       //若服務已運行則不允許啟動新的服務

       if (m_bSerRun || m_nAcceptCount)

       {

        THROW_LINE;

       }

     

       m_pCloseFun = pCloseFun;

       m_pCloseParam = pParam;

       m_bSerRun = TRUE;

     

       //創建監聽socket

       m_hSock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);

       if (INVALID_SOCKET == m_hSock)

       {

        THROW_LINE;

       }

     

       //加載AcceptEx函數

       GUID guidProc = WSAID_ACCEPTEX;

       if (NULL == s_pfAcceptEx)

       {

        nRet = WSAIoctl(m_hSock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guidProc, sizeof(guidProc)

         , &s_pfAcceptEx, sizeof(s_pfAcceptEx), &dwBytes, NULL, NULL);

       }

       if (NULL == s_pfAcceptEx || SOCKET_ERROR == nRet)

       {

        THROW_LINE;

       }

     

       //加載GetAcceptExSockaddrs函數

       GUID guidGetAddr = WSAID_GETACCEPTEXSOCKADDRS;

       dwBytes = 0;

       if (NULL == s_pfGetAddrs)

       {

        nRet = WSAIoctl(m_hSock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guidGetAddr, sizeof(guidGetAddr)

         , &s_pfGetAddrs, sizeof(s_pfGetAddrs), &dwBytes, NULL, NULL);

       }

       if (NULL == s_pfGetAddrs)

       {

        THROW_LINE;

       }

     

       ioctlsocket(m_hSock, FIONBIO, &ul);

     

       //設置地址重用, 當服務關閉后可以立即在該端口上啟動服務  

       setsockopt(m_hSock, SOL_SOCKET, SO_REUSEADDR, (char*)&nOpt, sizeof(nOpt));

     

       sockaddr_in LocalAddr;

       LocalAddr.sin_family = AF_INET;

       LocalAddr.sin_port = htons(nPort);

       if (szIp)

       {

        LocalAddr.sin_addr.s_addr = inet_addr(szIp);

       }  

       else

       {

        LocalAddr.sin_addr.s_addr = htonl(INADDR_ANY);

       }

     

       nRet = bind(m_hSock, (sockaddr*)&LocalAddr, sizeof(LocalAddr));

       if (SOCKET_ERROR == nRet)

       {

        THROW_LINE;

       }

     

       nRet = listen(m_hSock, 200);

       if (SOCKET_ERROR == nRet)

       {

        THROW_LINE;

       }

     

       //將監聽socket綁定完成端口上

       CreateIoCompletionPort((HANDLE)m_hSock, m_hCompletion, 0, 0);

       WSAEventSelect(m_hSock, m_ListenEvents[0], FD_ACCEPT);

     

       //投遞MAX_ACCEPT個AcceptEx操作

       for (int nIndex = 0; nIndex < MAX_ACCEPT; )

       {

        SOCKET hClient = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);

        if (INVALID_SOCKET == hClient)

        {

         continue;

        }

     

        ul = 1;

        ioctlsocket(hClient, FIONBIO, &ul);

     

        ACCEPT_CONTEXT* pAccContext = new ACCEPT_CONTEXT();

        if (NULL == pAccContext)

        {

         THROW_LINE;

        }

     

        pAccContext->m_hSock = m_hSock;

        pAccContext->m_hRemoteSock = hClient;

        pAccContext->m_nOperation = OP_ACCEPT;

     

        nRet = s_pfAcceptEx(m_hSock, hClient, pAccContext->m_pBuf, 0

         , sizeof(sockaddr_in) +16, sizeof(sockaddr_in) +16, &dwBytes, &(pAccContext->m_ol));

     

        if (FALSE == nRet && ERROR_IO_PENDING != WSAGetLastError())

        {

         closesocket(hClient);

         delete pAccContext;

         pAccContext = NULL;

         THROW_LINE;

        }

        else

        {

         InterlockedExchangeAdd(&m_nAcceptCount, 1);

        }

     

        nIndex++;

       }

      }

      catch (const long &lErrLine)

      {

       bSucc = FALSE;

       m_bSerRun = FALSE;

       _TRACE("Exp : %s -- %ld", __FILE__, lErrLine);

      }

      return bSucc;

     }

    l  ListenThread() 該函數用于投遞AcceptEx操作以接受客戶端的連接.

     UINT WINAPI TcpServer::ListenThread(LPVOID lpParam)

     {

      TcpServer *pThis = (TcpServer *)lpParam;

      try

      {

       int nRet = 0;

       DWORD nEvents = 0;

       DWORD dwBytes = 0;

       int nAccept = 0;

     

       while (TRUE)

       {   

        nEvents = WSAWaitForMultipleEvents(LISTEN_EVENTS, pThis->m_ListenEvents, FALSE, WSA_INFINITE, FALSE);  

      

        //等待失敗線程退出

        if (WSA_WAIT_FAILED == nEvents)

        {

         THROW_LINE;

        }

        else

        {

         nEvents = nEvents - WAIT_OBJECT_0;

         if (0 == nEvents)

         {

          nAccept = MAX_ACCEPT;

         }

         else if (1 == nEvents)

         {

          nAccept = 1;

         }

     

         //最多只能投遞200個AcceptEx操作

         if (InterlockedExchangeAdd(&(pThis->m_nAcceptCount), 0) > 200)

         {

          nAccept = 0;

         }

     

         for (int nIndex = 0; nIndex < nAccept; )

         {

          SOCKET hClient = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);

          if (INVALID_SOCKET == hClient)

          {

           continue;

          }

     

          ULONG ul = 1;

          ioctlsocket(hClient, FIONBIO, &ul);

     

          ACCEPT_CONTEXT* pAccContext = new ACCEPT_CONTEXT();

          if (pAccContext && pAccContext->m_pBuf)

          {

           pAccContext->m_hSock = pThis->m_hSock;

           pAccContext->m_hRemoteSock = hClient;

           pAccContext->m_nOperation = OP_ACCEPT;

     

           nRet = s_pfAcceptEx(pThis->m_hSock, hClient, pAccContext->m_pBuf, 0

            , sizeof(sockaddr_in) +16, sizeof(sockaddr_in) +16, &dwBytes, &(pAccContext->m_ol));

     

           if (FALSE == nRet && ERROR_IO_PENDING != WSAGetLastError())

           {

            closesocket(hClient);

            delete pAccContext;

            pAccContext = NULL;

           }

           else

           {

            InterlockedExchangeAdd(&(pThis->m_nAcceptCount), 1);

           }

          }

          else

          {

           delete pAccContext;

          }

          nIndex++;

         }

        }

     

        if (FALSE == InterlockedExchangeAdd(&(pThis->m_bThreadRun), 0))

        {

         THROW_LINE;

        }

       }

      }

      catch ( const long &lErrLine)

      {

       _TRACE("Exp : %s -- %ld", __FILE__, lErrLine);

      }

      return 0;

     }

    l  CloseServer(), 關閉服務

     void TcpServer::CloseServer()

     {

      //關閉所有的socket

      closesocket(m_hSock);

     

      EnterCriticalSection(&m_SockQueLock);

     

      for (vector<SOCKET>::iterator iter_sock = m_SocketQue.begin(); m_SocketQue.end() != iter_sock; iter_sock++)

      {

       closesocket(*iter_sock);

      }

     

      LeaveCriticalSection(&m_SockQueLock);

     

      m_bSerRun = FALSE;

     }

    l  SendData() 發送數據

     BOOL TcpServer::SendData(SOCKET hSock, const CHAR* szData, INT nDataLen)

     {

    #ifdef _XML_NET_

      //數據長度非法

      if (((DWORD)nDataLen > TCP_CONTEXT::S_PAGE_SIZE) || (NULL == szData))

      {

       return FALSE;

      }

    #else

      //數據長度非法

      if ((nDataLen > (int)(TCP_CONTEXT::S_PAGE_SIZE)) || (NULL == szData) || (nDataLen < sizeof(PACKET_HEAD)))

      {

       return FALSE;

      }

    #endif //#ifdef _XML_NET_

     

      BOOL bResult = TRUE;

      DWORD dwBytes = 0;

      WSABUF SendBuf;

      TCP_CONTEXT *pSendContext = new TCP_CONTEXT();

      if (pSendContext && pSendContext->m_pBuf)

      {

       pSendContext->m_hSock = hSock;

       pSendContext->m_nDataLen = 0;

       pSendContext->m_nOperation = OP_WRITE;

       memcpy(pSendContext->m_pBuf, szData, nDataLen);

     

       SendBuf.buf = pSendContext->m_pBuf;

       SendBuf.len = nDataLen;

     

       assert(szData); 

       INT iErr = WSASend(pSendContext->m_hSock, &SendBuf, 1, &dwBytes, 0, &(pSendContext->m_ol), NULL);

     

       if (SOCKET_ERROR == iErr && ERROR_IO_PENDING != WSAGetLastError())

       {

        delete pSendContext;

        pSendContext = NULL;

        _TRACE("\r\n%s : %ld LAST_ERROR = %ld", __FILE__, __LINE__, WSAGetLastError());    

        bResult = FALSE;

       }

      }

      else

      {

       delete pSendContext;

       bResult = FALSE;

      }

     

      return bResult;

     }

    l  GetRcvData(), 從接收到的數據隊列中取出數據.

     TCP_RCV_DATA * TcpServer::GetRcvData( DWORD* const pQueLen )

     {

      TCP_RCV_DATA* pRcvData = NULL;

     

      EnterCriticalSection(&m_RcvQueLock);

      vector<TCP_RCV_DATA*>::iterator iter = m_RcvDataQue.begin();

      if (m_RcvDataQue.end() != iter)

      {

       pRcvData = *iter;

       m_RcvDataQue.erase(iter);

      }

     

      if (NULL != pQueLen)

      {

       *pQueLen = (DWORD)(m_RcvDataQue.size());

      }

      LeaveCriticalSection(&m_RcvQueLock);

     

      return pRcvData;

     }

    l  WorkThread(), 工作者線程

     UINT WINAPI TcpServer::WorkThread(LPVOID lpParam)

     {

      TcpServer *pThis = (TcpServer *)lpParam;

      DWORD dwTrans = 0, dwKey = 0, dwSockSize = 0;

      LPOVERLAPPED pOl = NULL;

      NET_CONTEXT *pContext = NULL;

      BOOL bRun = TRUE;

     

      while (TRUE)

      {

       BOOL bOk = GetQueuedCompletionStatus(pThis->m_hCompletion, &dwTrans, &dwKey, (LPOVERLAPPED *)&pOl, WSA_INFINITE);

     

       pContext = CONTAINING_RECORD(pOl, NET_CONTEXT, m_ol);

       if (pContext)

       {

        switch (pContext->m_nOperation)

        {

        case OP_ACCEPT:

         pThis->AcceptCompletionProc(bOk, dwTrans, pOl);

         break;

        case OP_READ:

         pThis->RecvCompletionProc(bOk, dwTrans, pOl);

         break;

        case OP_WRITE:

         pThis->SendCompletionProc(bOk, dwTrans, pOl);

         break;

        }

       }

     

       EnterCriticalSection(&(pThis->m_SockQueLock));

     

       dwSockSize = (DWORD)(pThis->m_SocketQue.size());

       if (FALSE == InterlockedExchangeAdd(&(pThis->m_bThreadRun), 0) && 0 == dwSockSize

        && 0 == InterlockedExchangeAdd(&(pThis->m_nAcceptCount), 0))

       {

        bRun = FALSE;

       }

     

       LeaveCriticalSection(&(pThis->m_SockQueLock));

     

       if (FALSE == bRun)

       {   

        break;

       }

      }

     

      return 0;

     }

    l  AcceptCompletionProc(), 當客戶端連接到服務器時調用該函數.

     void TcpServer::AcceptCompletionProc(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)

     {

      ACCEPT_CONTEXT *pContext = CONTAINING_RECORD(lpOverlapped, ACCEPT_CONTEXT, m_ol);

      INT nZero = 0;

      int nPro = _SOCK_NO_RECV;

      IP_ADDR* pClientAddr = NULL;

      IP_ADDR* pLocalAddr = NULL;

      INT nClientLen = 0;

      INT nLocalLen = 0;

      int iErrCode;

      DWORD nFlag = 0;  

      DWORD nBytes = 0;

      WSABUF RcvBuf;

     

      if (bSuccess)

      {

       setsockopt(pContext->m_hRemoteSock, SOL_SOCKET, SO_SNDBUF, (char*)&nZero, sizeof(nZero));

       setsockopt(pContext->m_hRemoteSock, SOL_SOCKET, SO_RCVBUF, (CHAR*)&nZero, sizeof(nZero));

       setsockopt(pContext->m_hRemoteSock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&(pContext->m_hSock), sizeof(pContext->m_hSock));

       setsockopt(pContext->m_hRemoteSock, SOL_SOCKET, SO_GROUP_PRIORITY, (char *)&nPro, sizeof(nPro));

     

       s_pfGetAddrs(pContext->m_pBuf, 0, sizeof(sockaddr_in) +16, sizeof(sockaddr_in) +16

        , (LPSOCKADDR*)&pLocalAddr, &nLocalLen, (LPSOCKADDR*)&pClientAddr, &nClientLen);

     

       //為新來的連接投遞讀操作

       TCP_CONTEXT *pRcvContext = new TCP_CONTEXT;

       if (pRcvContext && pRcvContext->m_pBuf)

       {

        pRcvContext->m_hSock = pContext->m_hRemoteSock;

        pRcvContext->m_nOperation = OP_READ;

        CreateIoCompletionPort((HANDLE)(pRcvContext->m_hSock), m_hCompletion, NULL, 0);

     

        RcvBuf.buf = pRcvContext->m_pBuf;

        RcvBuf.len = TCP_CONTEXT::S_PAGE_SIZE;

     

        iErrCode = WSARecv(pRcvContext->m_hSock, &RcvBuf, 1, &nBytes, &nFlag, &(pRcvContext->m_ol), NULL);

     

        //投遞失敗

        if (SOCKET_ERROR == iErrCode && WSA_IO_PENDING != WSAGetLastError())

        {

         closesocket(pRcvContext->m_hSock);

         delete pRcvContext;

         pRcvContext = NULL;

         _TRACE("\r\n%s : %ld  SOCKET = 0x%x LAST_ERROR = %ld", __FILE__, __LINE__, pContext->m_hRemoteSock, WSAGetLastError());

        }

        else

        {

         EnterCriticalSection(&m_SockQueLock);

         m_SocketQue.push_back(pRcvContext->m_hSock);

         LeaveCriticalSection(&m_SockQueLock);

        }

       }

       else

       {

        delete pRcvContext;

       }

      

       SetEvent(m_ListenEvents[1]);

      }

      else

      {

       closesocket(pContext->m_hRemoteSock);

       _TRACE("\r\n %s -- %ld accept 操作失敗_FILE__, __LINE__);

      }

     

      InterlockedExchangeAdd(&m_nAcceptCount, -1); 

      delete pContext;

      pContext = NULL;

     }

    l  RecvCompletionProc(),讀操作完成后的回調函數

     void TcpServer::RecvCompletionProc(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)

     {

      TCP_CONTEXT* pRcvContext = CONTAINING_RECORD(lpOverlapped, TCP_CONTEXT, m_ol);

      DWORD dwFlag = 0;

      DWORD dwBytes = 0;

      WSABUF RcvBuf;

      int nErrCode = 0;

      int nPro = _SOCK_RECV;

     

      try

      {

       if ((FALSE == bSuccess || 0 == dwNumberOfBytesTransfered) && (WSA_IO_PENDING != WSAGetLastError()))

       {

        closesocket(pRcvContext->m_hSock);

        THROW_LINE;

       }

     

       setsockopt(pRcvContext->m_hSock, SOL_SOCKET, SO_GROUP_PRIORITY, (char *)&nPro, sizeof(nPro));

     

    #ifndef _XML_NET_  //處理二進制流

       //非法而客戶端發來的數據包, 關閉該客戶端.

       if (0 == pRcvContext->m_nDataLen && dwNumberOfBytesTransfered < sizeof(PACKET_HEAD))

       {

        THROW_LINE;

       }

    #endif //#ifndef _XML_NET_

     

    #ifdef _XML_NET_ //處理XML流

       TCP_RCV_DATA* pRcvData = new TCP_RCV_DATA(

        pRcvContext->m_hSock

        , pRcvContext->m_pBuf

        , dwNumberOfBytesTransfered

        );

     

       if (pRcvData && pRcvData->m_pData)

       {    

        EnterCriticalSection(&m_RcvQueLock);

        m_RcvDataQue.push_back(pRcvData);

        LeaveCriticalSection(&m_RcvQueLock);

       }

     

       pRcvContext->m_nDataLen = 0;

       RcvBuf.buf = pRcvContext->m_pBuf;

       RcvBuf.len = TCP_CONTEXT::S_PAGE_SIZE;

     

    #else    //處理二進制數據流

     

       //解析數據包頭信息中應接收的數據包的長度

       pRcvContext->m_nDataLen += dwNumberOfBytesTransfered;

       

       PACKET_HEAD* pHeadInfo = (PACKET_HEAD*)(pRcvContext->m_pBuf);

     

       //數據包長度合法才處理

       if ((pHeadInfo->nCurrentLen <= TCP_CONTEXT::S_PAGE_SIZE)

        //&& (0 == dwErrorCode)

        && ((WORD)(pRcvContext->m_nDataLen) <= pHeadInfo->nCurrentLen + sizeof(PACKET_HEAD)))

       {

        //該包的所有數據以讀取完畢, 將其放入到數據隊列中

        if ((WORD)(pRcvContext->m_nDataLen) == pHeadInfo->nCurrentLen + sizeof(PACKET_HEAD))

        {

         TCP_RCV_DATA* pRcvData = new TCP_RCV_DATA(

          pRcvContext->m_hSock

          , pRcvContext->m_pBuf

          , pRcvContext->m_nDataLen

          );

        

         if (pRcvData && pRcvData->m_pData)

         {

          EnterCriticalSection(&m_RcvQueLock);

          m_RcvDataQue.push_back(pRcvData);

          LeaveCriticalSection(&m_RcvQueLock);

         }

     

         pRcvContext->m_nDataLen = 0;

         RcvBuf.buf = pRcvContext->m_pBuf;

         RcvBuf.len = TCP_CONTEXT::S_PAGE_SIZE;

        }

        //數據沒有接收完畢繼續接收

        else

        {

         RcvBuf.buf = pRcvContext->m_pBuf +pRcvContext->m_nDataLen;

         RcvBuf.len = pHeadInfo->nCurrentLen - pRcvContext->m_nDataLen +sizeof(PACKET_HEAD);

        }

       }

       //數據非法, 直接進行下一次讀操作

       else

       {

        pRcvContext->m_nDataLen = 0;

        RcvBuf.buf = pRcvContext->m_pBuf;

        RcvBuf.len = TCP_CONTEXT::S_PAGE_SIZE;

       }

    #endif //#ifdef _XML_NET_

     

       //繼續投遞讀操作

       nErrCode = WSARecv(pRcvContext->m_hSock, &RcvBuf, 1, &dwBytes, &dwFlag, &(pRcvContext->m_ol), NULL);

      

       if (SOCKET_ERROR == nErrCode && WSA_IO_PENDING != WSAGetLastError())

       {

        closesocket(pRcvContext->m_hSock);

        THROW_LINE;

       }

      }

      catch (const long &lErrLine)

      {

       _TRACE("Exp : %s -- %ld SOCKET = 0x%x ERR_CODE = 0x%x", __FILE__, lErrLine, pRcvContext->m_hSock, WSAGetLastError());

       delete pRcvContext;  

      }

     }

    l  SendCompletionProc(), 當發送操作完成后調用該接口

     void TcpServer::SendCompletionProc(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)

     {

      TCP_CONTEXT* pSendContext = CONTAINING_RECORD(lpOverlapped, TCP_CONTEXT, m_ol);

      delete pSendContext;

      pSendContext = NULL;

     }

    l  AideThread(), 后臺輔助線程主要負責對連接本服務器的客戶端SOCKET隊列進行管理

    UINT WINAPI TcpServer::AideThread(LPVOID lpParam)

     {

      TcpServer *pThis = (TcpServer *)lpParam;

      try

      {

       const int SOCK_CHECKS = 10000;

       int nSockTime = 0;

       int nPro = 0;

       int nTimeLen = 0;  

       vector<SOCKET>::iterator sock_itre = pThis->m_SocketQue.begin();

     

       while (TRUE)

       {

        for (int index = 0; index < SOCK_CHECKS; index++)

        {

         nPro = 0;

         nSockTime = 0x0000ffff;

         // 檢查socket隊列

         EnterCriticalSection(&(pThis->m_SockQueLock));

     

         if (pThis->m_SocketQue.end() != sock_itre)

         {

          nTimeLen = sizeof(nPro);

          getsockopt(*sock_itre, SOL_SOCKET, SO_GROUP_PRIORITY, (char *)&nPro, &nTimeLen);

          if (_SOCK_RECV != nPro)

          {

           nTimeLen = sizeof(nSockTime);

           getsockopt(*sock_itre, SOL_SOCKET, SO_CONNECT_TIME, (char *)&nSockTime, &nTimeLen);

     

           if (nSockTime > 120)

           {

            closesocket(*sock_itre);

     

            pThis->m_pCloseFun(pThis->m_pCloseParam, *sock_itre);

            pThis->m_SocketQue.erase(sock_itre);

     

            _TRACE("%s -- %ld SOCKET = 0x%x出現錯誤S_ERR = 0x%x, nPro = 0x%x, TIME = %ld", __FILE__, __LINE__, *sock_itre, WSAGetLastError(), nPro, nSockTime);

           }

           else

           {

            sock_itre++;

           }

          }

          else

          {

           sock_itre ++;     

          }  

         }

         else

         {

          sock_itre = pThis->m_SocketQue.begin();

          LeaveCriticalSection(&(pThis->m_SockQueLock));

          break;

         }

     

         LeaveCriticalSection(&(pThis->m_SockQueLock));

        }

     

        if (FALSE == InterlockedExchangeAdd(&(pThis->m_bThreadRun), 0))

        {

         THROW_LINE;

        }

     

        Sleep(100);

       }

      }

      catch (const long &lErrLine)

      {

       _TRACE("Exp : %s -- %ld", __FILE__, lErrLine);

      }

       return 0;

     }

    本服務器的測試程序在WindowsXP的32位平臺下測試時可以同時接受20K個客戶端同時連接到服務器, CPU只占10%, 內存占20M左右.

    RFID管理系統集成商 RFID中間件 條碼系統中間層 物聯網軟件集成
    最近免费观看高清韩国日本大全