C++實現線程池 .
代碼地址:https://github.com/ithzhang/ThreadpoolLib.git
本文介紹的線程池采用C++語言,在windows平臺下實現。此版本為Version 1.0,以后還會推出功能更完備的后續版本。本著技術分享的精神寫作本文同時公布源代碼。歡迎大家指出該線程池存在的問題并對當前性能進行討論。
適用場景:
1.需要大量的線程來完成任務,且完成任務的時間比較短。
2.對性能要求苛刻的應用,比如要求服務器迅速相應客戶請求。
3.接受突發性的大量請求,但不至于使服務器因此產生大量線程的應用。
不適合在以下場景下使用:
1.可能會長時間運行的任務。
2.具有良好的優先級控制。(本線程池僅僅實現了簡單的優先級控制,有兩種優先級:普通級和高級)。
使用到的數據結構:
任務隊列:任務緩沖區,用于存儲要執行任務的隊列。可以調用線程池成員函數向該隊列中增加任務。
空閑線程堆棧:用于存儲空閑線程。空閑線程堆棧中會被壓入指定數量的線程類對象指針。線程對象個數等于創建線程時初始線程個數。
活動線程鏈表:用以存儲當前正在執行任務的線程。當有任務到來時,線程會從空閑堆棧轉移到活動鏈表中。任務完成,且任務隊列中沒有任務時,會從活動鏈表轉移到空閑堆棧中。本文中我稱其為線程狀態轉換。
調度機制:
1.向任務隊列添加任務后,會檢查此時空閑線程堆棧中是否有空閑線程,如有則從任務隊列隊首取出任務執行。
2.當線程執行完當前任務,準備轉移到空閑堆棧時,也會檢查當前任務隊列是否為空。若不為空,則繼續取出任務執行。否則,轉換到空閑線程堆棧。
除上述兩種調度機制外,沒有采用其他機制。
在創建線程池時會指定一個初始線程個數。此處我采取的是:一次性創建用戶指定的線程,并加入到空閑線程堆棧。以后這個數量無法更改,且不會隨著任務的多寡而增添或減少。
所有處于空閑隊列中的線程都由于等待事件對象觸發而處于阻塞態。等待事件對象成功的線程會進入到活動線程鏈表中。
使用到的類:
CTask類:任務基類。每個任務應繼承自此類,并實現taskProc成員函數。
CMyThread類:工作線程類。每個類管理一個線程。同時關聯一個任務類對象。
CThreadPool類:線程池類,用以創建并管理線程池,同時實現對線程池內線程的調度。
CMyStack類:空閑線程堆棧,用以存儲空閑的工作線程。
CMyList類:活動線程隊列。用以存儲目前正在執行任務的線程。
CTaskQueue類:任務隊列。用以存儲要執行的任務。
CMyMutex類:互斥類。用于實現線程互斥訪問。CMyStack,CMyList和CMyQueue內部都使用了CMyMutex類。它們是線程安全的。
MyThread類和CThreadPool類為核心類。其余為輔助類。
CTask類
CTask是任務基類,所以非常簡單,僅僅提供接口。
其聲明如下:
[cpp] view plaincopyprint?- class CTask
- {
- public:
- CTask(int id);
- ~CTask(void);
- public:
- virtual void taskProc()=0;
- bool getID();
- private:
- int m_ID;
- };
具體的任務類應繼承自此基類,并實現taskProc函數。在該函數實現需要線程池執行的任務。
如:
- //TestTask.h
- #include "task.h"
- class CTestTask :
- public CTask
- {
- public:
- CTestTask(int id);
- ~CTestTask(void);
- public:
- virtual void taskProc();
- };
- //TestTask.cpp
- #include "TestTask.h"
- CTestTask::CTestTask(int id)
- :CTask(id)
- {
- }
- CTestTask::~CTestTask(void)
- {
- }
- void CTestTask::taskProc()
- {
- //模擬任務。
- for(int i=0;i<10000;i++)
- {
- for(int j=0;j<10000;j++)
- {
- int temp=1;
- temp++;
- }
- }
- }
CMyStack空閑線程堆棧類
CMyStack類用以存儲空閑線程。內部采用stack實現。之所以采用棧來存儲線程類對象,是因為:當一個線程執行完任務后,如果此時任務隊列沒有新任務,該線程就被壓入到空閑線程棧。此后當有新任務到來時,棧頂元素,也就是剛剛被壓入的線程會被彈出執行新任務。由于該線程是最近才被壓入,其對應內存空間位于內存中的概率比其他線程的概率要大。這在一定程度上可以節省從系統頁交換文件交換到物理內存的開銷。
- //MyStack.h
- #pragma once
- #include<stack>
- #include "MyMutex.h"
- class CMyThread ;
- class CMyStack
- {
- public:
- CMyStack(void);
- ~CMyStack(void);
- public:
- CMyThread* pop();
- bool push(CMyThread*);
- int getSize();
- bool isEmpty();
- bool clear();
- private:
- std::stack<CMyThread*> m_stack;
- CMyMutex m_mutext;
- };
CMyList活動線程鏈表類
CMyList類用以存儲正在執行任務的線程。內部采用list實現。活動線程在執行完任務后,可以被隨時從活動鏈表中刪除。之所以使用鏈表是因為在鏈表中刪除某一元素的開銷很小。
- //MyList.h
- #pragma once
- #include <list>
- #include "MyMutex.h"
- class CMyThread;
- class CMyList
- {
- public:
- CMyList(void);
- ~CMyList(void);
- public:
- bool addThread(CMyThread*t);
- bool removeThread(CMyThread*t);
- int getSize();
- bool isEmpty();
- bool clear();
- private:
- std::list<CMyThread*>m_list;
- CMyMutex m_mutex;
- };
CMyQueue任務隊列類
CMyQueue用以存儲要執行的任務。內部采用雙向隊列實現。具有簡單的優先級控制機制。當普通的優先級任務到來時,會正常入隊。當高優先級任務到來時會插入到對首。線程池在調度時會簡單的從隊首取出任務并執行。
- //MyQueue.h
- #pragma once
- #include<deque>
- #include"MyMutex.h"
- class CTask;
- class CMyQueue
- {
- public:
- CMyQueue(void);
- ~CMyQueue(void);
- public:
- CTask*pop();
- bool push(CTask*t);
- bool pushFront(CTask*t);、
- bool isEmpty();
- bool clear();
- private:
- std::deque<CTask*>m_TaskQueue;
- CMyMutex m_mutex;
- };
CMyMutex互斥類
CMyMutex類用于控制線程互斥訪問。內部采用CRITICAL_SECTION實現 。在對活動線程鏈表、空閑線程堆棧、任務隊列進行訪問時都需要進行互斥訪問控制。防止多線程同時訪問導致的狀態不一致的情況出現。
類聲明如下:
[cpp] view plaincopyprint?- //MyMutex.h
- #pragma once
- #include "windows.h"
- class CMyMutex
- {
- public:
- CMyMutex(void);
- ~CMyMutex(void);
- public:
- bool Lock();
- bool Unlock();
- private:
- CRITICAL_SECTION m_cs;
- };
CMyThread工作線程類
CMyThread類用于管理一個線程。該類內部有一個CTask*成員和一個事件對象。CTask*成員為與該線程關聯的任務。調用assignTask可以為該線程設置對應的任務。
類聲明如下:
[cpp] view plaincopyprint?- //MyThread.h
- #pragma once
- #include "windows.h"
- class CTask;
- class CBaseThreadPool;
- class CMyThread
- {
- public:
- CMyThread(CBaseThreadPool*threadPool);
- ~CMyThread(void);
- public:
- bool startThread();
- bool suspendThread();
- bool resumeThread();
- bool assignTask(CTask*pTask);
- bool startTask();
- static DWORD WINAPI threadProc(LPVOID pParam);
- DWORD m_threadID;
- HANDLE m_hThread;
- private:
- HANDLE m_hEvent;
- CTask*m_pTask;
- CBaseThreadPool*m_pThreadPool;
- };
startThread用于創建入口函數為threadProc的線程。在該線程內部會循環等待一個事件對象。當沒有任務到來時,線程就會在該事件對象上掛起。當新任務到來,線程池會將該線程對應的事件對象觸發,然后執行其對應的任務。
[cpp] view plaincopyprint?- DWORD WINAPI CMyThread::threadProc( LPVOID pParam )
- {
- CMyThread *pThread=(CMyThread*)pParam;
- while(!pThread->m_bIsExit)
- {
- DWORD ret=WaitForSingleObject(pThread->m_hEvent,INFINITE);
- if(ret==WAIT_OBJECT_0)
- {
- if(pThread->m_pTask)
- {
- pThread->m_pTask->taskProc();、
- delete pThread->m_pTask;
- pThread->m_pTask=NULL;
- pThread->m_pThreadPool->SwitchActiveThread(pThread);
- }
- }
- }
- return 0;
- }
當任務執行完之后,線程內部會調用線程池的SwitchActiveThread成員函數,該函數用以將線程從活動狀態轉變為空閑態。也就是從活動線程鏈表轉移到空閑線程棧中。同時線程繼續等待事件對象觸發。
在此函數內部,在轉換之前會檢查任務隊列中是否還有任務,如果有任務,線程會繼續從任務隊列取出任務繼續執行,而不會切換到空閑態。直到任務隊列中沒有任務時才會執行狀態切換操作。
CMyThreadPool線程池類
任務隊列、活動線程鏈表、空閑線程隊列都作為線程池的成員變量,由線程池維護。
類聲明如下:
[cpp] view plaincopyprint?- //MyThreadPool.h
- #pragma once
- #include<list>
- #include "MyMutex.h"
- #include "MyStack.h"
- #include "MyList.h"
- #include"MyQueue.h"
- class CMyThread;
- class CTask;
- enum PRIORITY
- {
- NORMAL,
- HIGH
- };
- class CBaseThreadPool
- {
- public:
- virtual CMyThread* PopIdleThread()=0;
- virtual CTask*GetNewTask()=0;
- //virtual bool ExecuteNewTask(CTask *task)=0;
- virtual bool SwitchActiveThread(CMyThread*)=0;
- };
- class CMyThreadPool:public CBaseThreadPool
- {
- public:
- CMyThreadPool(int num);
- ~CMyThreadPool(void);
- public:
- virtual CMyThread* PopIdleThread();
- virtual bool SwitchActiveThread(CMyThread*);
- virtual CTask*GetNewTask();
- public:
- //priority為優先級。高優先級的任務將被插入到隊首。
- bool addTask(CTask*t,PRIORITY priority);
- bool start();//開始調度。
- bool destroyThreadPool();
- private:
- int m_nThreadNum;
- bool m_bIsExit;
- CMyStack m_IdleThreadStack;
- CMyList m_ActiveThreadList;
- CMyQueue m_TaskQueue;
- };
addTask函數用于向任務隊列中添加任務。添加任務后,會檢查空閑線程堆棧中是否為空,如不為空則彈出棧頂線程執行任務。
[cpp] view plaincopyprint?- bool CMyThreadPool::addTask( CTask*t,PRIORITY priority )
- {
- assert(t);
- if(!t||m_bIsExit)
- return false;
- CTask *task=NULL;
- if(priority==PRIORITY::NORMAL)
- {
- m_TaskQueue.push(t);//壓入任務隊列尾部。
- }
- else if(PRIORITY::HIGH)
- {
- m_TaskQueue.pushFront(t);//高優先級任務,壓到隊首。
- }
- if(!m_IdleThreadStack.isEmpty())//存在空閑線程。調用空閑線程處理任務。
- {
- task=m_TaskQueue.pop();//取出列頭任務。
- if(task==NULL)
- {
- //std::cout<<"任務取出出錯。"<<std::endl;
- return 0;
- }
- CMyThread*pThread=PopIdleThread();
- m_ActiveThreadList.addThread(pThread);//加入到活動鏈表。
- pThread->assignTask(task);//將任務與線程關聯。
- pThread->startTask();//開始任務,內部對事件對象進行觸發。
- }
- }
switchActiveThread函數用以在線程結束任務之后,將自己切換到空閑態。在切換之前會檢查任務隊列是否有任務,如有任務,則取出繼續執行。直到任務隊列為空時,才將自己切換到空閑態。由各線程類對象調用。
[cpp] view plaincopyprint?- bool CMyThreadPool::SwitchActiveThread( CMyThread*t)
- {
- if(!m_TaskQueue.isEmpty())//任務隊列不為空,繼續取任務執行。
- {
- CTask *pTask=NULL;
- pTask=m_TaskQueue.pop();
- t->assignTask(pTask);
- t->startTask();
- }
- else//任務隊列為空,該線程掛起。
- {
- m_ActiveThreadList.removeThread(t);
- m_IdleThreadStack.push(t);
- }
- return true;
- }
代碼地址:https://github.com/ithzhang/ThreadpoolLib.git