<menu id="w8yyk"><menu id="w8yyk"></menu></menu>
  • <dd id="w8yyk"><nav id="w8yyk"></nav></dd>
    <menu id="w8yyk"></menu>
    <menu id="w8yyk"><code id="w8yyk"></code></menu>
    <menu id="w8yyk"></menu>
    <xmp id="w8yyk">
    <xmp id="w8yyk"><nav id="w8yyk"></nav>
  • 網站首頁 > 物聯資訊 > 技術分享

    boost中asio網絡庫多線程并發處理實現,以及asio在多線程模型中線程的調度情況和線程安全。

    2016-09-28 00:00:00 廣州睿豐德信息科技有限公司 閱讀
    睿豐德科技 專注RFID識別技術和條碼識別技術與管理軟件的集成項目。質量追溯系統、MES系統、金蝶與條碼系統對接、用友與條碼系統對接

    1、實現多線程方法:

    其實就是多個線程同時調用io_service::run

            for (int i = 0; i != m_nThreads; ++i)
            {
                boost::shared_ptr<boost::thread> pTh(new boost::thread(
                    boost::bind(&boost::asio::io_service::run,&m_ioService)));
                m_listThread.push_back(pTh);
            }

    2、多線程調度情況:

    asio規定:只能在調用io_service::run的線程中才能調用事件完成處理器。

    注:事件完成處理器就是你async_accept、async_write等注冊的句柄,類似于回調的東西。

    單線程:

    如果只有一個線程調用io_service::run,根據asio的規定,事件完成處理器也只能在這個線程中執行。也就是說,你所有代碼都在同一個線程中運行,因此變量的訪問是安全的。

    多線程:

    如果有多個線程同時調用io_service::run以實現多線程并發處理。對于asio來說,這些線程都是平等的,沒有主次之分。如果你投遞的一個請求比如async_write完成時,asio將隨機的激活調用io_service::run的線程。并在這個線程中調用事件完成處理器(async_write當時注冊的句柄)。如果你的代碼耗時較長,這個時候你投遞的另一個async_write請求完成時,asio將不等待你的代碼處理完成,它將在另外的一個調用io_service::run線程中,調用async_write當時注冊的句柄。也就是說,你注冊的事件完成處理器有可能同時在多個線程中調用。

    當然你可以使用 boost::asio::io_service::strand讓完成事件處理器的調用,在同一時間只有一個, 比如下面的的代碼:

      socket_.async_read_some(boost::asio::buffer(buffer_),
          strand_.wrap(
            boost::bind(&connection::handle_read, shared_from_this(),
              boost::asio::placeholders::error,
              boost::asio::placeholders::bytes_transferred)));

    ...

    boost::asio::io_service::strand strand_;

     

    此時async_read_som完成后掉用handle_read時,必須等待其它handle_read調用完成時才能被執行(async_read_som引起的handle_read調用)。

          多線程調用時,還有一個重要的問題,那就是無序化。比如說,你短時間內投遞多個async_write,那么完成處理器的調用并不是按照你投遞async_write的順序調用的。asio第一次調用完成事件處理器,有可能是第二次async_write返回的結果,也有可能是第3次的。使用strand也是這樣的。strand只是保證同一時間只運行一個完成處理器,但它并不保證順序。

     

    代碼測試:

    服務器:

    將下面的代碼編譯以后,使用cmd命令提示符下傳人參數<IP> <port> <threads>調用

    比如:test.exe 0.0.0.0 3005 10   

    客服端 使用windows自帶的telnet

    cmd命令提示符:

    telnet 127.0.0.1 3005

     

    原理:客戶端連接成功后,同一時間調用100次boost::asio::async_write給客戶端發送數據,并且在完成事件處理器中打印調用序號,和線程ID。

    核心代碼:

        void start()
        {
            for (int i = 0; i != 100; ++i)
            {
                boost::shared_ptr<string> pStr(new string);
                *pStr = boost::lexical_cast<string>(boost::this_thread::get_id());
                *pStr += "\r\n";
                boost::asio::async_write(m_nSocket,boost::asio::buffer(*pStr),
                    boost::bind(&CMyTcpConnection::HandleWrite,shared_from_this(),
                     boost::asio::placeholders::error,
                     boost::asio::placeholders::bytes_transferred,
                     pStr,i)
                    );
            }
        }

    //去掉 boost::mutex::scoped_lock lk(m_ioMutex); 效果更明顯。

        void HandleWrite(const boost::system::error_code& error
            ,std::size_t bytes_transferred
            ,boost::shared_ptr<string> pStr,int nIndex)
        {
            if (!error)
            {
                boost::mutex::scoped_lock lk(m_ioMutex);
                cout << "發送序號=" << nIndex << ",線程id=" << boost::this_thread::get_id() << endl;
            }
            else
            {
                cout << "連接斷開" << endl;
            }
        }

     

    完整代碼:

    #include <boost/bind.hpp>
    #include <boost/shared_ptr.hpp>
    #include <boost/enable_shared_from_this.hpp>
    #include <boost/asio.hpp>
    #include <boost/lexical_cast.hpp>
    #include <boost/thread.hpp>
    #include <boost/thread/mutex.hpp>
    #include <string>
    #include <iostream>


    using std::cout;
    using std::endl;
    using std::string;
    using boost::asio::ip::tcp;


    class CMyTcpConnection
        : public boost::enable_shared_from_this<CMyTcpConnection>
    {
    public:
        CMyTcpConnection(boost::asio::io_service &ser)
            :m_nSocket(ser)
        {
        }
        typedef boost::shared_ptr<CMyTcpConnection> CPMyTcpCon;


        static CPMyTcpCon CreateNew(boost::asio::io_service& io_service)
        {
            return CPMyTcpCon(new CMyTcpConnection(io_service));
        }


       
    public:
        void start()
        {
            for (int i = 0; i != 100; ++i)
            {
                boost::shared_ptr<string> pStr(new string);
                *pStr = boost::lexical_cast<string>(boost::this_thread::get_id());
                *pStr += "\r\n";
                boost::asio::async_write(m_nSocket,boost::asio::buffer(*pStr),
                    boost::bind(&CMyTcpConnection::HandleWrite,shared_from_this(),
                     boost::asio::placeholders::error,
                     boost::asio::placeholders::bytes_transferred,
                     pStr,i)
                    );
            }
        }
        tcp::socket& socket()
        {
            return m_nSocket;
        }
    private:
        void HandleWrite(const boost::system::error_code& error
            ,std::size_t bytes_transferred
            ,boost::shared_ptr<string> pStr,int nIndex)
        {
            if (!error)
            {
                boost::mutex::scoped_lock lk(m_ioMutex);
                cout << "發送序號=" << nIndex << ",線程id=" << boost::this_thread::get_id() << endl;
            }
            else
            {
                cout << "連接斷開" << endl;
            }
        }
    private:
        tcp::socket m_nSocket;
        boost::mutex m_ioMutex;
    };


    class CMyService
        : private boost::noncopyable
    {
    public:
        CMyService(string const &strIP,string const &strPort,int nThreads)
            :m_tcpAcceptor(m_ioService)
            ,m_nThreads(nThreads)
        {
            tcp::resolver resolver(m_ioService);
            tcp::resolver::query query(strIP,strPort);
            tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
            boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
            m_tcpAcceptor.open(endpoint.protocol());
            m_tcpAcceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
            m_tcpAcceptor.bind(endpoint);
            m_tcpAcceptor.listen();


            StartAccept();
        }
        ~CMyService(){Stop();}
    public:
        void Stop() 
        { 
            m_ioService.stop();
            for (std::vector<boost::shared_ptr<boost::thread>>::const_iterator it = m_listThread.cbegin();
                it != m_listThread.cend(); ++ it)
            {
                (*it)->join();
            }
        }
        void Start()
        {
            for (int i = 0; i != m_nThreads; ++i)
            {
                boost::shared_ptr<boost::thread> pTh(new boost::thread(
                    boost::bind(&boost::asio::io_service::run,&m_ioService)));
                m_listThread.push_back(pTh);
            }
        }
    private:
        void HandleAccept(const boost::system::error_code& error
            ,boost::shared_ptr<CMyTcpConnection> newConnect)
        {
            if (!error)
            {
                newConnect->start();
            }
            StartAccept();
        }


        void StartAccept()
        {
            CMyTcpConnection::CPMyTcpCon newConnect = CMyTcpConnection::CreateNew(m_tcpAcceptor.get_io_service());
            m_tcpAcceptor.async_accept(newConnect->socket(),
                boost::bind(&CMyService::HandleAccept, this,
                boost::asio::placeholders::error,newConnect));
        }
    private:
        boost::asio::io_service m_ioService;
        boost::asio::ip::tcp::acceptor m_tcpAcceptor;
        std::vector<boost::shared_ptr<boost::thread>> m_listThread;
        std::size_t m_nThreads;
    };


    int main(int argc, char* argv[])
    {
        try
        {
            if (argc != 4)
            {
                std::cerr << "<IP> <port> <threads>\n";
                return 1;
            }
            int nThreads = boost::lexical_cast<int>(argv[3]);
            CMyService mySer(argv[1],argv[2],nThreads);
            mySer.Start();
            getchar();
            mySer.Stop();
        }
        catch (std::exception& e)
        {
            std::cerr << "Exception: " << e.what() << "\n";
        }
        return 0;
    }

     

     

    測試發現和上面的理論是一致的,發送序號是亂的,線程ID也不是同一個。

     

    asio多線程中線程的合理個數:

    作為服務器,在不考慮省電的情況下,應該盡可能的使用cpu。也就是說,為了讓cpu都忙起來,你的線程個數應該大于等于你電腦的cpu核心數(一個核心運行一個線程)。具體的值沒有最優方案,大多數人使用cpu核心數*2 + 2的這種方案,但它不一定適合你的情況。

    asio在windows xp等系統中的實現:

    asio在windows下使用完成端口,如果你投遞的請求沒有完成,那么這些線程都在等待GetQueuedCompletionStatus的返回,也就是等待內核對象,此時線程是不占用cpu時間的。

    RFID管理系統集成商 RFID中間件 條碼系統中間層 物聯網軟件集成
    最近免费观看高清韩国日本大全