<menu id="w8yyk"><menu id="w8yyk"></menu></menu>
  • <dd id="w8yyk"><nav id="w8yyk"></nav></dd>
    <menu id="w8yyk"></menu>
    <menu id="w8yyk"><code id="w8yyk"></code></menu>
    <menu id="w8yyk"></menu>
    <xmp id="w8yyk">
    <xmp id="w8yyk"><nav id="w8yyk"></nav>
  • 網站首頁 > 物聯資訊 > 技術分享

    boost asio 異步實現tcp通訊

    2016-09-28 00:00:00 廣州睿豐德信息科技有限公司 閱讀
    睿豐德科技 專注RFID識別技術和條碼識別技術與管理軟件的集成項目。質量追溯系統、MES系統、金蝶與條碼系統對接、用友與條碼系統對接

    ---恢復內容開始---

    asioboost  

    目錄(?)[-]

    1. 一前言
    2. 二實現思路
      1. 通訊包數據結構
      2. 連接對象
      3. 連接管理器
      4. 服務器端的實現
      5. 對象串行化
     

    一、前言

    boost asio可算是一個簡單易用,功能又強大可跨平臺的C++通訊庫,效率也表現的不錯,linux環境是epoll實現的,而windows環境是iocp實現的。而tcp通訊是項目當中經常用到通訊方式之一,實現的方法有各式各樣,因此總結一套適用于自己項目的方法是很有必要,很可能下一個項目直接套上去就可以用了。

     

    二、實現思路

    1.通訊包數據結構

    RFID設備管理軟件

    Tag:檢查數據包是否合法,具體會在下面講解;

    Length:描述Body的長度;

    Command:表示數據包的類型,0表示心跳包(長連接需要心跳來檢測連接是否正常),1表示注冊包(客戶端連接上服務器之后要將相關信息注冊給服務器),2表示業務消息包;

    business_type:業務消息包類型,服務器會根據業務消息包類型將數據路由到對應的客戶端(客戶端是有業務類型分類的);

    app_id:客戶端唯一標識符;

    Data:消息數據;

    2.連接對象

    客戶端連接上服務器之后,雙方都會產生一個socket連接對象,通過這個對象可以收發數據,因此我定義為socket_session。

    //socket_session.h

     

    [cpp] view plaincopyprint?  
    1. #pragma once  
    2. #include <iostream>  
    3. #include <list>  
    4. #include <hash_map>  
    5. #include <boost/bind.hpp>  
    6. #include <boost/asio.hpp>  
    7. #include <boost/shared_ptr.hpp>  
    8. #include <boost/make_shared.hpp>  
    9. #include <boost/thread.hpp>  
    10. #include <boost/thread/mutex.hpp>  
    11. #include <boost/enable_shared_from_this.hpp>  
    12. #include <firebird/log/logger_log4.hpp>  
    13. #include <firebird/detail/config.hpp>  
    14. #include <firebird/socket_utils/message_archive.hpp>  
    15.   
    16. using boost::asio::ip::tcp;  
    17.   
    18. namespace firebird{  
    19.     enum command{ heartbeat = 0, regist, normal};  
    20.   
    21.     const std::string tag = "KDS";  
    22.   
    23.     class FIREBIRD_DECL socket_session;  
    24.     typedef boost::shared_ptr<socket_session> socket_session_ptr;  
    25.   
    26.     class FIREBIRD_DECL socket_session:  
    27.         public boost::enable_shared_from_this<socket_session>,  
    28.         private boost::noncopyable  
    29.     {  
    30.     public:  
    31.         typedef boost::function<void(socket_session_ptr)> close_callback;  
    32.         typedef boost::function<void(  
    33.             const boost::system::error_code&,   
    34.             socket_session_ptr, message&)> read_data_callback;  
    35.   
    36.         socket_session(boost::asio::io_service& io_service);  
    37.         ~socket_session(void);  
    38.   
    39.         DWORD id() { return m_id; }  
    40.         WORD get_business_type(){ return m_business_type; }  
    41.         void set_business_type(WORD type) { m_business_type = type; }  
    42.         DWORD get_app_id(){ return m_app_id; }  
    43.         void set_app_id(DWORD app_id) { m_app_id = app_id; }  
    44.         std::string& get_remote_addr() { return m_name; }  
    45.         void set_remote_addr(std::string& name) { m_name = name; }  
    46.         tcp::socket& socket() { return m_socket; }  
    47.   
    48.         void installCloseCallBack(close_callback cb){ close_cb = cb; }  
    49.         void installReadDataCallBack(read_data_callback cb) { read_data_cb = cb; }  
    50.   
    51.         void start();  
    52.         void close();  
    53.         void async_write(const std::string& sMsg);  
    54.         void async_write(message& msg);  
    55.   
    56.         bool is_timeout();  
    57.         void set_op_time(){std::time(&m_last_op_time);}  
    58.   
    59.     private:  
    60.         static boost::detail::atomic_count m_last_id;  
    61.   
    62.         DWORD m_id;  
    63.         WORD  m_business_type;  
    64.         DWORD m_app_id;  
    65.         std::string m_name;  
    66.         boost::array<char, 7> sHeader;  
    67.         std::string sBody;  
    68.   
    69.         tcp::socket m_socket;  
    70.         boost::asio::io_service& m_io_service;  
    71.   
    72.         std::time_t m_last_op_time;  
    73.   
    74.         close_callback close_cb;  
    75.         read_data_callback read_data_cb;  
    76.   
    77.         //發送消息  
    78.         void handle_write(const boost::system::error_code& e,   
    79.             std::size_t bytes_transferred, std::string* pmsg);  
    80.   
    81.         //讀消息頭  
    82.         void handle_read_header(const boost::system::error_code& error);  
    83.         //讀消息體  
    84.         void handle_read_body(const boost::system::error_code& error);  
    85.   
    86.         void handle_close();  
    87.     };  
    88. }  

    這里注意的是,定義了一個tag="KDS",目的是為了檢查收到的數據包是否有效,每一個數據包前3個字節不為“KDS”,那么就認為是非法的請求包,你也可以定義tag等于其它字符串,只要按協議發包就正常,當然這是比較簡單的數據包檢查方法了。比較嚴謹的方法是雙方使用哈希算法來檢查的,怎么做,這里先不做詳解。

     

     

    //socket_session.cpp

     

    [cpp] view plaincopyprint?  
    1. #include "socket_session.h"  
    2.   
    3. namespace firebird{  
    4.     boost::detail::atomic_count socket_session::m_last_id(0);  
    5.   
    6.     socket_session::socket_session(boost::asio::io_service& io_srv)  
    7.         :m_io_service(io_srv), m_socket(io_srv),   
    8.         m_business_type(0), m_app_id(0)  
    9.     {  
    10.         m_id = ++socket_session::m_last_id;  
    11.     }  
    12.   
    13.     socket_session::~socket_session(void)  
    14.     {  
    15.         m_socket.close();  
    16.     }  
    17.   
    18.     void socket_session::start()  
    19.     {  
    20.         m_socket.set_option(boost::asio::ip::tcp::acceptor::linger(true, 0));  
    21.         m_socket.set_option(boost::asio::socket_base::keep_alive(true));  
    22.         std::time(&m_last_op_time);  
    23.         const boost::system::error_code error;  
    24.         handle_read_header(error);  
    25.     }  
    26.   
    27.     void socket_session::handle_close()  
    28.     {  
    29.         try{  
    30.             m_socket.close();  
    31.             close_cb(shared_from_this());  
    32.         }  
    33.         catch(std::exception& e)  
    34.         {  
    35.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連接遠程地址:[" << get_remote_addr() << "],socket異常:[" << e.what() << "]");  
    36.         }  
    37.         catch(...)  
    38.         {  
    39.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連接遠程地址:[" << get_remote_addr() << "],socket異常:[未知異常]");  
    40.         }  
    41.     }  
    42.   
    43.     void socket_session::close()  
    44.     {             
    45.         //由于回調中有加鎖的情況,必須提交到另外一個線程去做,不然會出現死鎖  
    46.         m_io_service.post(boost::bind(&socket_session::handle_close, shared_from_this()));  
    47.     }  
    48.   
    49.     static int connection_timeout = 60;  
    50.   
    51.     bool socket_session::is_timeout()  
    52.     {  
    53.         std::time_t now;  
    54.         std::time(&now);      
    55.         return now - m_last_op_time > connection_timeout;  
    56.     }  
    57.   
    58.     //讀消息頭  
    59.     void socket_session::handle_read_header(const boost::system::error_code& error)  
    60.     {  
    61.         LOG4CXX_DEBUG(firebird_log, KDS_CODE_INFO  << "enter.");  
    62.   
    63.         try{  
    64.             if(error)  
    65.             {  
    66.                 LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO  << "連接遠程地址:[" << get_remote_addr() << "],socket異常:[" << error.message().c_str() << "]");  
    67.                 close();  
    68.                 return;  
    69.             }  
    70.   
    71.             std::string data;  
    72.             data.swap(sBody);  
    73.             boost::asio::async_read(m_socket,   
    74.                 boost::asio::buffer(sHeader),  
    75.                 boost::bind(&socket_session::handle_read_body, shared_from_this(),  
    76.                 boost::asio::placeholders::error));  
    77.   
    78.             if (data.length() > 0 && data != "")  
    79.             {//讀到數據回調注冊的READ_DATA函數  
    80.                 message msg;  
    81.                 message_iarchive(msg, data);  
    82.   
    83.                 read_data_cb(error, shared_from_this(), msg);  
    84.             }  
    85.         }  
    86.         catch(std::exception& e)  
    87.         {  
    88.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連接遠程地址:[" << get_remote_addr() << "],socket異常:[" << e.what() << "]");  
    89.             close();  
    90.         }  
    91.         catch(...)  
    92.         {  
    93.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連接遠程地址:[" << get_remote_addr() << "],socket異常:[未知異常]");  
    94.             close();  
    95.         }  
    96.     }  
    97.   
    98.     //讀消息體  
    99.     void socket_session::handle_read_body(const boost::system::error_code& error)  
    100.     {  
    101.         LOG4CXX_DEBUG(firebird_log, KDS_CODE_INFO << "enter.");  
    102.   
    103.         try{  
    104.             if(error)  
    105.             {  
    106.                 LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連接遠程地址:[" << get_remote_addr() << "],socket異常:[" << error.message().c_str() << "]");  
    107.                 close();  
    108.                 return;  
    109.             }  
    110.   
    111.             if (tag.compare(0, tag.length(), sHeader.data(), 0, tag.length()))  
    112.             {  
    113.                 LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO <<  "連接遠程地址:[" << get_remote_addr() << "],socket異常:[這是個非法連接!]");  
    114.                 close();  
    115.                 return;  
    116.             }  
    117.   
    118.             DWORD dwLength = 0;  
    119.   
    120.             char* len = (char*)&dwLength;  
    121.             memcpy(len, &sHeader[tag.length()], sizeof(dwLength));  
    122.   
    123.             sBody.resize(dwLength);  
    124.             char* pBody = &sBody[0];  
    125.   
    126.             boost::asio::async_read(m_socket,   
    127.                 boost::asio::buffer(pBody, dwLength),  
    128.                 boost::bind(&socket_session::handle_read_header, shared_from_this(),  
    129.                 boost::asio::placeholders::error));  
    130.         }  
    131.         catch(std::exception& e)  
    132.         {  
    133.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連接遠程地址:[" << get_remote_addr() << "],socket異常:[" << e.what() << "]");  
    134.             close();  
    135.         }  
    136.         catch(...)  
    137.         {  
    138.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連接遠程地址:[" << get_remote_addr() << "],socket異常:[未知異常]");  
    139.             close();  
    140.         }  
    141.     }  
    142.   
    143.     void socket_session::handle_write(const boost::system::error_code& error,   
    144.         std::size_t bytes_transferred, std::string* pmsg)  
    145.     {  
    146.         //數據發送成功就銷毀  
    147.         if (pmsg != NULL)  
    148.         {  
    149.             delete pmsg;  
    150.         }  
    151.   
    152.         if(error)  
    153.         {  
    154.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連接遠程地址:[" << get_remote_addr() << "],socket異常:[" << error.message().c_str() << "]");  
    155.             close();  
    156.             return;  
    157.         }  
    158.     }  
    159.   
    160.     void socket_session::async_write(const std::string& sMsg)  
    161.     {  
    162.         LOG4CXX_DEBUG(firebird_log, KDS_CODE_INFO  << "enter.")  
    163.   
    164.         try  
    165.         {  
    166.             DWORD dwLength = sMsg.size();  
    167.             char* pLen = (char*)&dwLength;  
    168.   
    169.             //由于是異步發送,要保證數據發送完整時,才把數據銷毀  
    170.             std::string* msg = new std::string();  
    171.             msg->append(tag);  
    172.             msg->append(pLen, sizeof(dwLength));  
    173.             msg->append(sMsg);  
    174.   
    175.             boost::asio::async_write(m_socket,boost::asio::buffer(*msg, msg->size()),   
    176.                 boost::bind(&socket_session::handle_write, shared_from_this(),  
    177.                 boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred,  
    178.                 msg));  
    179.   
    180.         }  
    181.         catch(std::exception& e)  
    182.         {  
    183.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連接遠程地址:[" << get_remote_addr() << "],socket異常:[" << e.what() << "]");  
    184.             close();  
    185.         }  
    186.         catch(...)  
    187.         {  
    188.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連接遠程地址:[" << get_remote_addr() << "],socket異常:[未知異常]");  
    189.             close();  
    190.         }  
    191.     }  
    192.   
    193.     void socket_session::async_write(message& msg)  
    194.     {  
    195.         std::string data;  
    196.         message_oarchive(data, msg);  
    197.           
    198.         async_write(data);  
    199.     }  
    200. }  

    接受數據時,socket_session會先讀取7個字節的head,比較前3個字節“KDS”,然后取得4個字節的Length,再讀出Length長度的數據,最后將該數據傳給read_data_cb回調函數處理,read_data_cb回調函數是在外部注冊的。

    3.連接管理器

    對于服務器來說,它同時服務多個客戶端,為了有效的管理,因此需要一個連接管理器,我定義為session_manager。session_manager主要是對socket_session的增刪改查,和有效性檢查。

    //session_manager.h

     

    [cpp] view plaincopyprint?  
    1. #pragma once  
    2. #include "socket_session.h"  
    3. #include "filter_container.h"  
    4. #include <boost/date_time/posix_time/posix_time.hpp>  
    5. #include <boost/multi_index_container.hpp>  
    6. #include <boost/multi_index/member.hpp>  
    7. #include <boost/multi_index/ordered_index.hpp>  
    8. #include <boost/typeof/typeof.hpp>  
    9. #include <boost/random.hpp>  
    10. #include <boost/pool/detail/singleton.hpp>  
    11.   
    12. namespace firebird{  
    13.     template<typename T>  
    14.     class var_gen_wraper  
    15.     {  
    16.     public:  
    17.         var_gen_wraper(): gen(boost::mt19937((boost::int32_t)std::time(0)),   
    18.             boost::uniform_smallint<>(1, 100)) {}  
    19.         typename T::result_type operator() () { return gen(); }  
    20.     private:  
    21.         T gen;  
    22.     };  
    23.   
    24.     struct  session_stu  
    25.     {  
    26.         DWORD   id;  
    27.         WORD    business_type;  
    28.         std::string address;  
    29.         DWORD   app_id;  
    30.         socket_session_ptr session;  
    31.     };  
    32.   
    33.     struct sid{};  
    34.     struct sbusiness_type{};  
    35.     struct saddress{};  
    36.     struct sapp_id{};  
    37.   
    38.     enum session_idx_member{ session_id = 0, session_business_type, session_address, app_id};  
    39. #define CLIENT 0  
    40. #define SERVER 1  
    41.   
    42.     typedef boost::multi_index::multi_index_container<  
    43.         session_stu,   
    44.         boost::multi_index::indexed_by<  
    45.         boost::multi_index::ordered_unique<  
    46.         boost::multi_index::tag<sid>, BOOST_MULTI_INDEX_MEMBER(session_stu, DWORD, id)>,  
    47.         boost::multi_index::ordered_non_unique<  
    48.         boost::multi_index::tag<sbusiness_type>, BOOST_MULTI_INDEX_MEMBER(session_stu, WORD, business_type)>,  
    49.         boost::multi_index::ordered_non_unique<  
    50.         boost::multi_index::tag<saddress>, BOOST_MULTI_INDEX_MEMBER(session_stu, std::string, address)>,  
    51.         boost::multi_index::ordered_non_unique<  
    52.         boost::multi_index::tag<sapp_id>, BOOST_MULTI_INDEX_MEMBER(session_stu, DWORD, app_id)>  
    53.         >  
    54.     > session_set;  
    55.   
    56. #define MULTI_MEMBER_CON(Tag) boost::multi_index::index<session_set,Tag>::type&  
    57. #define MULTI_MEMBER_ITR(Tag) boost::multi_index::index<session_set,Tag>::type::iterator  
    58.   
    59.     struct is_business_type {  
    60.         is_business_type(WORD type)  
    61.             :m_type(type)  
    62.         {  
    63.   
    64.         }  
    65.         bool operator()(const session_stu& s)   
    66.         {  
    67.             return (s.business_type == m_type);  
    68.         }  
    69.   
    70.         WORD m_type;  
    71.     };  
    72.   
    73.     class session_manager  
    74.     {  
    75.     public:  
    76.         typedef boost::shared_lock<boost::shared_mutex> readLock;  
    77.         typedef boost:: unique_lock<boost::shared_mutex> writeLock;  
    78.   
    79.         session_manager(boost::asio::io_service& io_srv, int type, int expires_time);  
    80.         ~session_manager();  
    81.   
    82.         void add_session(socket_session_ptr p);  
    83.         void update_session(socket_session_ptr p);  
    84.   
    85.         template<typename Tag, typename Member>  
    86.         void del_session(Member m)  
    87.         {  
    88.             writeLock lock(m_mutex);  
    89.             if (m_sessions.empty())  
    90.             {  
    91.                 return ;  
    92.             }  
    93.   
    94.             MULTI_MEMBER_CON(Tag) idx = boost::multi_index::get<Tag>(m_sessions);  
    95.             //BOOST_AUTO(idx, boost::multi_index::get<Tag>(m_sessions));  
    96.             BOOST_AUTO(iter, idx.find(m));  
    97.   
    98.             if (iter != idx.end())  
    99.             {  
    100.                 idx.erase(iter);  
    101.             }  
    102.         }  
    103.   
    104.         //獲取容器中的第一個session  
    105.         template<typename Tag, typename Member>  
    106.         socket_session_ptr get_session(Member m)  
    107.         {  
    108.             readLock lock(m_mutex);  
    109.   
    110.             if (m_sessions.empty())  
    111.             {  
    112.                 return socket_session_ptr();  
    113.             }  
    114.   
    115.             MULTI_MEMBER_CON(Tag) idx = boost::multi_index::get<Tag>(m_sessions);  
    116.             BOOST_AUTO(iter, idx.find(m));  
    117.             return iter != boost::end(idx) ? iter->session : socket_session_ptr();  
    118.         }  
    119.   
    120.         //隨機獲取容器中的session  
    121.         template<typename Tag>  
    122.         socket_session_ptr get_session_by_business_type(WORD m)  
    123.         {  
    124.             typedef filter_container<is_business_type, MULTI_MEMBER_ITR(Tag)> FilterContainer;  
    125.             readLock lock(m_mutex);  
    126.   
    127.             if (m_sessions.empty())  
    128.             {  
    129.                 return socket_session_ptr();  
    130.             }  
    131.   
    132.             MULTI_MEMBER_CON(Tag) idx = boost::multi_index::get<Tag>(m_sessions);  
    133.   
    134.             //對容器的元素條件過濾  
    135.             is_business_type predicate(m);  
    136.             FilterContainer fc(predicate, idx.begin(), idx.end());  
    137.             FilterContainer::FilterIter iter = fc.begin();  
    138.   
    139.             if (fc.begin() == fc.end())  
    140.             {  
    141.                 return socket_session_ptr();  
    142.             }  
    143.   
    144.             //typedef boost::variate_generator<boost::mt19937, boost::uniform_smallint<>> var_gen;  
    145.             //typedef boost::details::pool::singleton_default<var_gen_wraper<var_gen>> s_var_gen;  
    146.             ////根據隨機數產生session  
    147.             //s_var_gen::object_type &gen = s_var_gen::instance();  
    148.             //int step = gen() % fc.szie();  
    149.   
    150.             int step = m_next_session % fc.szie();  
    151.             ++m_next_session;  
    152.   
    153.             for (int i = 0; i < step; ++i)  
    154.             {  
    155.                 iter++;  
    156.             }  
    157.   
    158.             return iter != fc.end() ? iter->session : socket_session_ptr();  
    159.         }  
    160.   
    161.         //根據類型和地址取session  
    162.         template<typename Tag>  
    163.         socket_session_ptr get_session_by_type_ip(WORD m, std::string& ip)  
    164.         {  
    165.             typedef filter_container<is_business_type, MULTI_MEMBER_ITR(Tag)> FilterContainer;  
    166.             readLock lock(m_mutex);  
    167.   
    168.             if (m_sessions.empty())  
    169.             {  
    170.                 return socket_session_ptr();  
    171.             }  
    172.   
    173.             MULTI_MEMBER_CON(Tag) idx = boost::multi_index::get<Tag>(m_sessions);  
    174.   
    175.             //對容器的元素條件過濾  
    176.             is_business_type predicate(m);  
    177.             FilterContainer fc(predicate, idx.begin(), idx.end());  
    178.             FilterContainer::FilterIter iter = fc.begin();  
    179.   
    180.             if (fc.begin() == fc.end())  
    181.             {  
    182.                 return socket_session_ptr();  
    183.             }  
    184.   
    185.             while (iter != fc.end())  
    186.             {  
    187.                 if (iter->session->get_remote_addr().find(ip) != std::string::npos)  
    188.                 {  
    189.                     break;  
    190.                 }  
    191.   
    192.                 iter++;  
    193.             }  
    194.   
    195.             return iter != fc.end() ? iter->session : socket_session_ptr();  
    196.         }  
    197.   
    198.         //根據類型和app_id取session  
    199.         template<typename Tag>  
    200.         socket_session_ptr get_session_by_type_appid(WORD m, DWORD app_id)  
    201.         {  
    202.             typedef filter_container<is_business_type, MULTI_MEMBER_ITR(Tag)> FilterContainer;  
    203.             readLock lock(m_mutex);  
    204.   
    205.             if (m_sessions.empty())  
    206.             {  
    207.                 return socket_session_ptr();  
    208.             }  
    209.   
    210.             MULTI_MEMBER_CON(Tag) idx = boost::multi_index::get<Tag>(m_sessions);  
    211.   
    212.             //對容器的元素條件過濾  
    213.             is_business_type predicate(m);  
    214.             FilterContainer fc(predicate, idx.begin(), idx.end());  
    215.             FilterContainer::FilterIter iter = fc.begin();  
    216.   
    217.             if (fc.begin() == fc.end())  
    218.             {  
    219.                 return socket_session_ptr();  
    220.             }  
    221.   
    222.             while (iter != fc.end())  
    223.             {  
    224.                 if (iter->session->get_app_id() == app_id)  
    225.                 {  
    226.                     break;  
    227.                 }  
    228.   
    229.                 iter++;  
    230.             }  
    231.   
    232.             return iter != fc.end() ? iter->session : socket_session_ptr();  
    233.         }  
    234.   
    235.     private:  
    236.         int m_type;  
    237.         int m_expires_time;  
    238.         boost::asio::io_service& m_io_srv;  
    239.         boost::asio::deadline_timer m_check_tick;  
    240.         boost::shared_mutex m_mutex;  
    241.         unsigned short m_next_session;  
    242.   
    243.         session_set m_sessions;  
    244.   
    245.         void check_connection();  
    246.     };  
    247. }  

     

     

    這里主要用到了boost的multi_index容器,這是一個非常有用方便的容器,可實現容器的多列索引,具體的使用方法,在這里不多做詳解。

     

    //session_manager.cpp

     

    [cpp] view plaincopyprint?  
    1. #include "session_manager.h"  
    2.   
    3. namespace firebird{  
    4.     session_manager::session_manager(boost::asio::io_service& io_srv, int type, int expires_time)  
    5.         :m_io_srv(io_srv), m_check_tick(io_srv), m_type(type), m_expires_time(expires_time),m_next_session(0)  
    6.     {  
    7.         check_connection();  
    8.     }  
    9.   
    10.     session_manager::~session_manager()  
    11.     {  
    12.   
    13.     }  
    14.   
    15.     //檢查服務器所有session的連接狀態  
    16.     void session_manager::check_connection()  
    17.     {  
    18.         try{  
    19.             writeLock lock(m_mutex);  
    20.   
    21.             session_set::iterator iter = m_sessions.begin();  
    22.             while (iter != m_sessions.end())  
    23.             {  
    24.                 LOG4CXX_DEBUG(firebird_log, "循環");  
    25.                 if (CLIENT == m_type)//客戶端的方式  
    26.                 {  
    27.                     if (!iter->session->socket().is_open())//已斷開,刪除已斷開的連接  
    28.                     {  
    29.                         LOG4CXX_INFO(firebird_log, "重新連接[" << iter->address << "]");  
    30.                         iter->session->close(); //通過關閉觸發客戶端重連  
    31.                     }  
    32.                     else{//連接中,發送心跳  
    33.                         message msg;  
    34.                         msg.command = heartbeat;  
    35.                         msg.business_type = iter->session->get_business_type();  
    36.                         msg.app_id = iter->session->get_app_id();  
    37.                         msg.data() = "H";  
    38.   
    39.                         iter->session->async_write(msg);  
    40.                         iter->session->set_op_time();  
    41.                     }  
    42.                 }  
    43.                 else if (SERVER == m_type)//服務器的方式  
    44.                 {  
    45.                     if (!iter->session->socket().is_open())//已斷開,刪除已斷開的連接  
    46.                     {  
    47.                         LOG4CXX_INFO(firebird_log, KDS_CODE_INFO << "刪除已關閉的session:[" << iter->session->get_remote_addr() << "]");  
    48.                         iter = m_sessions.erase(iter);  
    49.                         continue;  
    50.                     }  
    51.                     else{//連接中,設定每30秒檢查一次  
    52.                         if (iter->session->is_timeout()) //如果session已長時間沒操作,則關閉  
    53.                         {  
    54.                             LOG4CXX_INFO(firebird_log, KDS_CODE_INFO << "刪除已超時的session:[" << iter->session->get_remote_addr() << "]");  
    55.                             iter->session->close();//通過關閉觸發刪除session  
    56.                         }  
    57.                     }  
    58.   
    59.                     iter->session->set_op_time();  
    60.                 }  
    61.                 else{  
    62.                     LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "unknown manager_type");  
    63.                 }  
    64.                 ++iter;  
    65.             }  
    66.   
    67.             LOG4CXX_DEBUG(firebird_log, "定時檢查");  
    68.             m_check_tick.expires_from_now(boost::posix_time::seconds(m_expires_time));  
    69.             m_check_tick.async_wait(boost::bind(&session_manager::check_connection, this));  
    70.         }  
    71.         catch(std::exception& e)  
    72.         {  
    73.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "[" << e.what() << "]");  
    74.         }  
    75.         catch(...)  
    76.         {  
    77.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "unknown exception.");  
    78.         }  
    79.     }  
    80.   
    81.     void session_manager::add_session(socket_session_ptr p)  
    82.     {  
    83.         writeLock lock(m_mutex);  
    84.         session_stu stuSession;  
    85.         stuSession.id = p->id();  
    86.         stuSession.business_type = 0;  
    87.         stuSession.address = p->get_remote_addr();  
    88.         stuSession.app_id = p->get_app_id();  
    89.         stuSession.session = p;  
    90.         m_sessions.insert(stuSession);  
    91.     }  
    92.   
    93.     void session_manager::update_session(socket_session_ptr p)  
    94.     {  
    95.         writeLock lock(m_mutex);  
    96.         if (m_sessions.empty())  
    97.         {  
    98.             return ;  
    99.         }  
    100.   
    101.         MULTI_MEMBER_CON(sid) idx = boost::multi_index::get<sid>(m_sessions);  
    102.         BOOST_AUTO(iter, idx.find(p->id()));  
    103.   
    104.         if (iter != idx.end())  
    105.         {  
    106.             const_cast<session_stu&>(*iter).business_type = p->get_business_type();  
    107.             const_cast<session_stu&>(*iter).app_id = p->get_app_id();  
    108.         }  
    109.     }  
    110. }  

     


    這個時候,我就可以使用id、business_type、address、app_id當做key來索引socket_session了,單使用map容器是做不到的。

     

    還有索引時,需要的一個條件過濾器

    //filter_container.h

     

    [cpp] view plaincopyprint?  
    1. #pragma once  
    2. #include <boost/iterator/filter_iterator.hpp>  
    3.   
    4. namespace firebird{  
    5.     template <class Predicate, class Iterator>  
    6.     class filter_container  
    7.     {  
    8.     public:  
    9.         typedef boost::filter_iterator<Predicate, Iterator> FilterIter;  
    10.   
    11.         filter_container(Predicate p, Iterator begin, Iterator end)  
    12.             :m_begin(p, begin, end),  
    13.             m_end(p, end, end)  
    14.         {  
    15.   
    16.         }  
    17.         ~filter_container() {}  
    18.   
    19.         FilterIter begin() { return m_begin; }  
    20.         FilterIter end()   { return m_end; }  
    21.         int szie() {  
    22.             int i = 0;  
    23.             FilterIter fi = m_begin;  
    24.             while(fi != m_end)  
    25.             {  
    26.                 ++i;  
    27.                 ++fi;  
    28.             }  
    29.   
    30.             return i;  
    31.         }  
    32.   
    33.     private:  
    34.         FilterIter m_begin;  
    35.         FilterIter m_end;  
    36.     };  
    37. }  

    4.服務器端的實現

     

    服務器我定義為server_socket_utils,擁有一個session_manager,每當accept成功得到一個socket_session時,都會將其增加到session_manager去管理,注冊相關回調函數。

    read_data_callback   接收到數據的回調函數

    收到數據之后,也就是數據包的body部分,反序列化出command、business_type、app_id和data(我使用到了thrift),如果command==normal正常的業務包,會調用handle_read_data傳入data。

    close_callback 關閉socket_session觸發的回調函數

    根據id將該連接從session_manager中刪除掉

    //server_socket_utils.h

     

    [cpp] view plaincopyprint?  
    1. #pragma once  
    2. #include "socket_session.h"  
    3. #include "session_manager.h"  
    4. #include <boost/format.hpp>  
    5. #include <firebird/message/message.hpp>  
    6.   
    7. namespace firebird{  
    8.     using boost::asio::ip::tcp;  
    9.   
    10.     class FIREBIRD_DECL server_socket_utils  
    11.     {  
    12.     private:  
    13.         boost::asio::io_service m_io_srv;  
    14.         boost::asio::io_service::work m_work;  
    15.         tcp::acceptor m_acceptor;  
    16.   
    17.         void handle_accept(socket_session_ptr session, const boost::system::error_code& error);  
    18.   
    19.         void close_callback(socket_session_ptr session);  
    20.         void read_data_callback(const boost::system::error_code& e,   
    21.             socket_session_ptr session, message& msg);  
    22.   
    23.     protected:  
    24.         virtual void handle_read_data(message& msg, socket_session_ptr pSession) = 0;  
    25.   
    26.     public:  
    27.         server_socket_utils(int port);  
    28.         ~server_socket_utils(void);  
    29.   
    30.         void start();  
    31.         boost::asio::io_service& get_io_service() { return m_io_srv; }  
    32.   
    33.         session_manager m_manager;  
    34.     };  
    35. }  

     

    //server_socket_utils.cpp

     

    [cpp] view plaincopyprint?  
    1. #include "server_socket_utils.h"  
    2.   
    3. namespace firebird{  
    4.     server_socket_utils::server_socket_utils(int port)  
    5.         :m_work(m_io_srv),  
    6.         m_acceptor(m_io_srv, tcp::endpoint(tcp::v4(), port)),  
    7.         m_manager(m_io_srv, SERVER, 3)  
    8.     {  
    9.         //m_acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));  
    10.         //// 關閉連接前留0秒給客戶接收數據  
    11.         //m_acceptor.set_option(boost::asio::ip::tcp::acceptor::linger(true, 0));  
    12.         //m_acceptor.set_option(boost::asio::ip::tcp::no_delay(true));  
    13.         //m_acceptor.set_option(boost::asio::socket_base::keep_alive(true));  
    14.         //m_acceptor.set_option(boost::asio::socket_base::receive_buffer_size(16384));  
    15.     }  
    16.   
    17.     server_socket_utils::~server_socket_utils(void)  
    18.     {  
    19.     }  
    20.   
    21.     void server_socket_utils::start()  
    22.     {  
    23.         try{  
    24.             socket_session_ptr new_session(new socket_session(m_io_srv));  
    25.             m_acceptor.async_accept(new_session->socket(),  
    26.                 boost::bind(&server_socket_utils::handle_accept, this, new_session,  
    27.                 boost::asio::placeholders::error));  
    28.         }  
    29.         catch(std::exception& e)  
    30.         {  
    31.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket異常:[" << e.what() << "]");  
    32.         }  
    33.         catch(...)  
    34.         {  
    35.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket異常:[未知異常]");  
    36.         }  
    37.     }  
    38.   
    39.     void server_socket_utils::handle_accept(socket_session_ptr session, const boost::system::error_code& error)  
    40.     {  
    41.         if (!error)  
    42.         {  
    43.             try{  
    44.                 socket_session_ptr new_session(new socket_session(m_io_srv));  
    45.                 m_acceptor.async_accept(new_session->socket(),  
    46.                     boost::bind(&server_socket_utils::handle_accept, this, new_session,  
    47.                     boost::asio::placeholders::error));  
    48.   
    49.                 if (session != NULL)  
    50.                 {  
    51.                     //注冊關閉回調函數  
    52.                     session->installCloseCallBack(boost::bind(&server_socket_utils::close_callback, this, _1));  
    53.                     //注冊讀到數據回調函數  
    54.                     session->installReadDataCallBack(boost::bind(&server_socket_utils::read_data_callback, this, _1, _2, _3));  
    55.   
    56.                     boost::format fmt("%1%:%2%");  
    57.                     fmt % session->socket().remote_endpoint().address().to_string();  
    58.                     fmt % session->socket().remote_endpoint().port();  
    59.                     session->set_remote_addr(fmt.str());  
    60.   
    61.                     session->start();  
    62.                     m_manager.add_session(session);  
    63.                 }  
    64.             }  
    65.             catch(std::exception& e)  
    66.             {  
    67.                 LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket異常:[" << e.what() << "]");  
    68.             }  
    69.             catch(...)  
    70.             {  
    71.                 LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket異常:[未知異常]");  
    72.             }  
    73.   
    74.         }  
    75.     }  
    76.   
    77.     void server_socket_utils::close_callback(socket_session_ptr session)  
    78.     {  
    79.         LOG4CXX_DEBUG(firebird_log, "close_callback");  
    80.         try{  
    81.             m_manager.del_session<sid>(session->id());  
    82.         }  
    83.         catch(std::exception& e)  
    84.         {  
    85.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket異常:[" << e.what() << "]");  
    86.         }  
    87.         catch(...)  
    88.         {  
    89.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket異常:[未知異常]");  
    90.         }  
    91.   
    92.     }  
    93.   
    94.     void server_socket_utils::read_data_callback(const boost::system::error_code& e,   
    95.         socket_session_ptr session, message& msg)  
    96.     {  
    97.         try{  
    98.             LOG4CXX_DEBUG(firebird_log, "command =[" << msg.command << "],["   
    99.                 << msg.business_type << "],[" << msg.data() << "]");  
    100.   
    101.             if (msg.command == heartbeat)  
    102.             {//心跳  
    103.                 session->async_write(msg);  
    104.             }  
    105.             else if (msg.command == regist)  
    106.             {//注冊  
    107.                 session->set_business_type(msg.business_type);  
    108.                 session->set_app_id(msg.app_id);  
    109.                 m_manager.update_session(session);  
    110.   
    111.                 session->async_write(msg);  
    112.                 LOG4CXX_FATAL(firebird_log, "遠程地址:[" << session->get_remote_addr() << "],服務器類型:[" <<  
    113.                     session->get_business_type() << "],服務器ID:[" << session->get_app_id() << "]注冊成功!");  
    114.             }  
    115.             else if (msg.command == normal)  
    116.             {//業務數據  
    117.                 handle_read_data(msg, session);  
    118.             }  
    119.             else   
    120.             {  
    121.                 LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "收到非法消息包!");  
    122.             }  
    123.         }  
    124.         catch(std::exception& e)  
    125.         {  
    126.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket異常:[" << e.what() << "]");  
    127.         }  
    128.         catch(...)  
    129.         {  
    130.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket異常:[未知異常]");  
    131.         }  
    132.     }  
    133. }  

     

    5.客戶端

     

    客戶端與服務器的邏輯也差不多,區別就是在于客戶端通過connect得到socket_session,而服務器是通過accept得到socket_session。

    //client_socket_utils.h

     

    [cpp] view plaincopyprint?  
    1. #pragma once  
    2. #include "socket_session.h"  
    3. #include "session_manager.h"  
    4. #include <boost/algorithm/string.hpp>  
    5. #include <firebird/message/message.hpp>  
    6.   
    7. namespace firebird{  
    8.     class FIREBIRD_DECL client_socket_utils  
    9.     {  
    10.     public:  
    11.         client_socket_utils();  
    12.         ~client_socket_utils();  
    13.   
    14.         void session_connect(std::vector<socket_session_ptr>& vSession);  
    15.         void session_connect(socket_session_ptr pSession);  
    16.         //socket_session_ptr get_session(std::string& addr);  
    17.         boost::asio::io_service& get_io_service() { return m_io_srv; }  
    18.   
    19.     protected:  
    20.         virtual void handle_read_data(message& msg, socket_session_ptr pSession) = 0;  
    21.   
    22.     private:  
    23.         boost::asio::io_service m_io_srv;  
    24.         boost::asio::io_service::work m_work;  
    25.         session_manager m_manager;  
    26.   
    27.         void handle_connect(const boost::system::error_code& error,  
    28.             tcp::resolver::iterator endpoint_iterator, socket_session_ptr pSession);  
    29.   
    30.         void close_callback(socket_session_ptr session);  
    31.         void read_data_callback(const boost::system::error_code& e,   
    32.             socket_session_ptr session, message& msg);  
    33.     };  
    34. }  

    //client_socket_utils.cpp

     

     

    [cpp] view plaincopyprint?  
    1. #include "client_socket_utils.h"  
    2.   
    3. namespace firebird{  
    4.     client_socket_utils::client_socket_utils()  
    5.         :m_work(m_io_srv), m_manager(m_io_srv, CLIENT, 3)  
    6.     {  
    7.     }  
    8.   
    9.     client_socket_utils::~client_socket_utils()  
    10.     {  
    11.     }  
    12.   
    13.     void client_socket_utils::session_connect(std::vector<socket_session_ptr>& vSession)  
    14.     {  
    15.         for (int i = 0; i < vSession.size(); ++i)  
    16.         {  
    17.             session_connect(vSession[i]);  
    18.         }  
    19.     }  
    20.   
    21.     void client_socket_utils::session_connect(socket_session_ptr pSession)  
    22.     {  
    23.         std::string& addr = pSession->get_remote_addr();  
    24.         try{  
    25.             //注冊關閉回調函數  
    26.             pSession->installCloseCallBack(boost::bind(&client_socket_utils::close_callback, this, _1));  
    27.             //注冊讀到數據回調函數  
    28.             pSession->installReadDataCallBack(boost::bind(&client_socket_utils::read_data_callback, this, _1, _2, _3));  
    29.   
    30.             std::vector<std::string> ip_port;  
    31.             boost::split(ip_port, addr, boost::is_any_of(":"));  
    32.   
    33.             if (ip_port.size() < 2)  
    34.             {  
    35.                 //throw std::runtime_error("ip 格式不正確!");  
    36.                 LOG4CXX_ERROR(firebird_log, "[" << addr << "] ip 格式不正確!");  
    37.                 return;  
    38.             }  
    39.   
    40.             tcp::resolver resolver(pSession->socket().get_io_service());  
    41.             tcp::resolver::query query(ip_port[0], ip_port[1]);  
    42.             tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);  
    43.             //pSession->set_begin_endpoint(endpoint_iterator);//設置起始地址,以便重連  
    44.   
    45.             //由于客戶端是不斷重連的,即使還未連接也要保存該session  
    46.             m_manager.add_session(pSession);  
    47.   
    48.             tcp::endpoint endpoint = *endpoint_iterator;  
    49.             pSession->socket().async_connect(endpoint,  
    50.                 boost::bind(&client_socket_utils::handle_connect, this,  
    51.                 boost::asio::placeholders::error, ++endpoint_iterator, pSession));  
    52.         }  
    53.         catch(std::exception& e)  
    54.         {  
    55.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連接遠程地址:[" << addr << "],socket異常:[" << e.what() << "]");  
    56.         }  
    57.         catch(...)  
    58.         {  
    59.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連接遠程地址:[" << addr << "],socket異常:[未知異常]");  
    60.         }  
    61.     }  
    62.   
    63.     void client_socket_utils::handle_connect(const boost::system::error_code& error,  
    64.         tcp::resolver::iterator endpoint_iterator, socket_session_ptr pSession)  
    65.     {  
    66.         LOG4CXX_DEBUG(firebird_log, KDS_CODE_INFO << " enter.");  
    67.         std::string sLog;  
    68.         try{  
    69.             if (!error)  
    70.             {  
    71.                 LOG4CXX_FATAL(firebird_log, "服務器:[" << pSession->get_business_type() <<"],連接遠程地址:[" << pSession->get_remote_addr().c_str() << "]成功!");  
    72.                 pSession->start();  
    73.   
    74.                 //向服務器注冊服務類型  
    75.                 message msg;  
    76.                 msg.command = regist;  
    77.                 msg.business_type = pSession->get_business_type();  
    78.                 msg.app_id = pSession->get_app_id();  
    79.                 msg.data() = "R";  
    80.   
    81.                 pSession->async_write(msg);  
    82.             }  
    83.             else if (endpoint_iterator != tcp::resolver::iterator())  
    84.             {  
    85.                 LOG4CXX_ERROR(firebird_log, "連接遠程地址:[" << pSession->get_remote_addr().c_str() << "]失敗,試圖重連下一個地址。");  
    86.                 pSession->socket().close();//此處用socket的close,不應用session的close觸發連接,不然會導致一直重連  
    87.                 tcp::endpoint endpoint = *endpoint_iterator;  
    88.                 pSession->socket().async_connect(endpoint,  
    89.                     boost::bind(&client_socket_utils::handle_connect, this,  
    90.                     boost::asio::placeholders::error, ++endpoint_iterator, pSession));  
    91.             }  
    92.             else  
    93.             {  
    94.                 LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連接遠程地址:[" << pSession->get_remote_addr().c_str() << "]失敗!");  
    95.                 pSession->socket().close();//此處用socket的close,不應用session的close觸發連接,不然會導致一直重連  
    96.             }  
    97.         }  
    98.         catch(std::exception& e)  
    99.         {  
    100.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連接遠程地址:[" << pSession->get_remote_addr().c_str() <<"],socket異常:[" << e.what() << "]");  
    101.         }  
    102.         catch(...)  
    103.         {  
    104.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連接遠程地址:[" << pSession->get_remote_addr().c_str() <<"],socket異常:[未知異常]");  
    105.         }  
    106.     }  
    107.   
    108.     void client_socket_utils::read_data_callback(const boost::system::error_code& e,   
    109.         socket_session_ptr session, message& msg)  
    110.     {  
    111.         LOG4CXX_DEBUG(firebird_log, "command =[" << msg.command << "],["   
    112.             << msg.business_type << "],[" << msg.data() << "]");  
    113.   
    114.         if (msg.command == heartbeat)  
    115.         {//心跳  
    116.         }  
    117.         else if (msg.command == regist)  
    118.         {//注冊  
    119.             LOG4CXX_FATAL(firebird_log, "服務器:[" << session->get_business_type() <<"]注冊成功。");  
    120.         }  
    121.         else if (msg.command == normal)  
    122.         {//業務數據  
    123.             handle_read_data(msg, session);  
    124.         }  
    125.         else   
    126.         {  
    127.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "收到非法消息包!");  
    128.         }  
    129.     }  
    130.   
    131.     //關閉session就會重連  
    132.     void client_socket_utils::close_callback(socket_session_ptr session)  
    133.     {  
    134.         LOG4CXX_DEBUG(firebird_log, KDS_CODE_INFO << "enter.");  
    135.   
    136.         try{  
    137.             //tcp::resolver::iterator endpoint_iterator = context.session->get_begin_endpoint();  
    138.   
    139.             std::string& addr = session->get_remote_addr();  
    140.   
    141.             std::vector<std::string> ip_port;  
    142.             boost::split(ip_port, addr, boost::is_any_of(":"));  
    143.   
    144.             if (ip_port.size() < 2)  
    145.             {  
    146.                 LOG4CXX_ERROR(firebird_log, "[" << addr << "] ip 格式不正確!");  
    147.                 return;  
    148.             }  
    149.   
    150.             tcp::resolver resolver(session->socket().get_io_service());  
    151.             tcp::resolver::query query(ip_port[0], ip_port[1]);  
    152.             tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);  
    153.   
    154.             tcp::endpoint endpoint = *endpoint_iterator;  
    155.             session->socket().async_connect(endpoint,  
    156.                 boost::bind(&client_socket_utils::handle_connect, this,  
    157.                 boost::asio::placeholders::error, ++endpoint_iterator, session));  
    158.         }  
    159.         catch(std::exception& e)  
    160.         {  
    161.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連接遠程地址:[" << session->get_remote_addr().c_str() <<"],socket異常:[" << e.what() << "]");  
    162.         }  
    163.         catch(...)  
    164.         {  
    165.             LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連接遠程地址:[" << session->get_remote_addr().c_str() <<"],socket異常:[未知異常]");  
    166.         }  
    167.     }  
    168. }  

    5.對象串行化

    socket_session發送和接收數據包的時候使用到了對象串行化,我這里是通過thrift實現的,其實boost的serialization庫也提供了這樣的功能,使用起來更為方便,但我在測試過程中,thrift相比之下性能會高很多,因此就堅持使用thrift了,感興趣的話可以看我之前寫的使用thrift串行化對象輕量級序列化庫boost serialization》

    5.1字符串與thrift對象的相互轉換

     

    [cpp] view plaincopyprint?  
    1. #pragma once  
    2. #include <boost/shared_ptr.hpp>  
    3. #include <transport/TBufferTransports.h>  
    4. #include <protocol/TProtocol.h>  
    5. #include <protocol/TBinaryProtocol.h>  
    6.   
    7. namespace firebird{  
    8.     using namespace apache::thrift;  
    9.     using namespace apache::thrift::transport;  
    10.     using namespace apache::thrift::protocol;  
    11.   
    12.     template<typename T>  
    13.     void thrift_iserialize(T& stu, std::string& s)  
    14.     {  
    15.         boost::shared_ptr<TMemoryBuffer> trans(new TMemoryBuffer((uint8_t*)&s[0], s.size()));  
    16.         boost::shared_ptr<TProtocol> proto(new TBinaryProtocol(trans));  
    17.         stu.read(proto.get());  
    18.     }  
    19.   
    20.     template<typename T>  
    21.     void thrift_oserialize(T& stu, std::string& s)  
    22.     {  
    23.         boost::shared_ptr<TMemoryBuffer> trans(new TMemoryBuffer());  
    24.         boost::shared_ptr<TProtocol> proto(new TBinaryProtocol(trans));  
    25.         stu.write(proto.get());  
    26.         s = trans->getBufferAsString();  
    27.     }  
    28. }  

    5.2通過thrift對象,普通的對象與字符串的相互轉換

     

     

    [cpp] view plaincopyprint?  
    1. #pragma once  
    2.   
    3. #include "message_archive.hpp"  
    4. #include <firebird/archive/thrift_archive.hpp>  
    5. #include <firebird/message/TMessage_types.h>  
    6.   
    7. namespace firebird  
    8. {  
    9.     /*** message to ThriftMessage ***/  
    10.     void msg_to_tmsg(TMessage& tmsg, message& msg)  
    11.     {  
    12.         //設置  
    13.         tmsg.command = msg.command;  
    14.         tmsg.business_type = msg.business_type;  
    15.         tmsg.app_id = msg.app_id;  
    16.         //設置context  
    17.         tmsg.context.cmdVersion = msg.context().cmdVersion;  
    18.         tmsg.context.cpid.swap(msg.context().cpid);  
    19.         tmsg.context.remote_ip.swap(msg.context().remote_ip);  
    20.         tmsg.context.wSerialNumber = msg.context().wSerialNumber;  
    21.         tmsg.context.session_id = msg.context().session_id;  
    22.   
    23.         //設置source  
    24.         for (int i = 0; i < msg.source().size(); ++i)  
    25.         {  
    26.             tmsg.source.push_back(msg.source()[i]);  
    27.         }  
    28.   
    29.         //設置destination  
    30.         for (int i = 0; i < msg.destination().size(); ++i)  
    31.         {  
    32.             tmsg.destination.push_back(msg.destination()[i]);  
    33.         }  
    34.   
    35.         //設置data  
    36.         tmsg.data = msg.data();  
    37.     }  
    38.   
    39.     /*** ThriftMessage to message ***/  
    40.     void tmsg_to_msg(message& msg, TMessage& tmsg)  
    41.     {  
    42.         //設置  
    43.         msg.command = tmsg.command;  
    44.         msg.business_type = tmsg.business_type;  
    45.         msg.app_id = tmsg.app_id;  
    46.   
    47.         //設置context  
    48.         msg.context().cmdVersion = tmsg.context.cmdVersion;  
    49.         msg.context().cpid = tmsg.context.cpid;  
    50.         msg.context().remote_ip = tmsg.context.remote_ip;  
    51.         msg.context().wSerialNumber = tmsg.context.wSerialNumber;  
    52.         msg.context().session_id = tmsg.context.session_id;  
    53.   
    54.         //設置source  
    55.         for (int i = 0; i < tmsg.source.size(); ++i)  
    56.         {  
    57.             msg.source() << tmsg.source[i];  
    58.         }  
    59.   
    60.         //設置destination  
    61.         for (int i = 0; i < tmsg.destination.size(); ++i)  
    62.         {  
    63.             msg.destination() << tmsg.destination[i];  
    64.         }  
    65.   
    66.         //設置data  
    67.         msg.data() = tmsg.data;  
    68.     }  
    69.   
    70.     void message_iarchive(message& msg, std::string& s)  
    71.     {  
    72.         TMessage tmsg;  
    73.         thrift_iserialize(tmsg, s);  
    74.         tmsg_to_msg(msg, tmsg);  
    75.     }  
    76.   
    77.     void message_oarchive(std::string& s, message& msg)  
    78.     {  
    79.         TMessage tmsg;  
    80.         msg_to_tmsg(tmsg, msg);  
    81.         thrift_oserialize(tmsg, s);  
    82.     }  
    83. }  

    ---恢復內容結束---

    RFID管理系統集成商 RFID中間件 條碼系統中間層 物聯網軟件集成
    最近免费观看高清韩国日本大全