一個Windows C++的線程池的實現 2016-09-28 00:00:00 廣州睿豐德信息科技有限公司 閱讀 睿豐德科技 專注RFID識別技術和條碼識別技術與管理軟件的集成項目。質量追溯系統、MES系統、金蝶與條碼系統對接、用友與條碼系統對接 此線程池所依賴的線程類,請參看《一個Windows C++的線程類實現》: http://blog.csdn.net/huyiyang2010/archive/2010/08/10/5801597.aspx ThreadPoolExecutor.h [cpp] view plaincopy #ifndef __THREAD_POOL_EXECUTOR__ #define __THREAD_POOL_EXECUTOR__ #include "Thread.h" #include <set> #include <list> #include <windows.h> class CThreadPoolExecutor { public: CThreadPoolExecutor(void); ~CThreadPoolExecutor(void); /** 初始化線程池,創建minThreads個線程 **/ bool Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTaskse); /** 執行任務,若當前任務列表沒有滿,將此任務插入到任務列表,返回true 若當前任務列表滿了,但當前線程數量小于最大線程數,將創建新線程執行此任務,返回true 若當前任務列表滿了,但當前線程數量等于最大線程數,將丟棄此任務,返回false **/ bool Execute(Runnable * pRunnable); /** 終止線程池,先制止塞入任務, 然后等待直到任務列表為空, 然后設置最小線程數量為0, 等待直到線程數量為空, 清空垃圾堆中的任務 **/ void Terminate(); /** 返回線程池中當前的線程數量 **/ unsigned int GetThreadPoolSize(); private: /** 獲取任務列表中的任務,若任務列表為空,返回NULL **/ Runnable * GetTask(); static unsigned int WINAPI StaticThreadFunc(void * arg); private: class CWorker : public CThread { public: CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask = NULL); ~CWorker(); void Run(); private: CThreadPoolExecutor * m_pThreadPool; Runnable * m_pFirstTask; volatile bool m_bRun; }; typedef std::set<CWorker *> ThreadPool; typedef std::list<Runnable *> Tasks; typedef Tasks::iterator TasksItr; typedef ThreadPool::iterator ThreadPoolItr; ThreadPool m_ThreadPool; ThreadPool m_TrashThread; Tasks m_Tasks; CRITICAL_SECTION m_csTasksLock; CRITICAL_SECTION m_csThreadPoolLock; volatile bool m_bRun; volatile bool m_bEnableInsertTask; volatile unsigned int m_minThreads; volatile unsigned int m_maxThreads; volatile unsigned int m_maxPendingTasks; }; #endif ThreadPoolExecutor.cpp [cpp] view plaincopy #include "ThreadPoolExecutor.h" CThreadPoolExecutor::CWorker::CWorker(CThreadPoolExecutor * pThreadPool, Runnable * pFirstTask) : m_pThreadPool(pThreadPool), m_pFirstTask(pFirstTask), m_bRun(true) { } CThreadPoolExecutor::CWorker::~CWorker() { } /** 執行任務的工作線程。 當前沒有任務時, 如果當前線程數量大于最小線程數量,減少線程, 否則,執行清理程序,將線程類給釋放掉 **/ void CThreadPoolExecutor::CWorker::Run() { Runnable * pTask = NULL; while(m_bRun) { if(NULL == m_pFirstTask) { pTask = m_pThreadPool->GetTask(); } else { pTask = m_pFirstTask; m_pFirstTask = NULL; } if(NULL == pTask) { EnterCriticalSection(&(m_pThreadPool->m_csThreadPoolLock)); if(m_pThreadPool->GetThreadPoolSize() > m_pThreadPool->m_minThreads) { ThreadPoolItr itr = m_pThreadPool->m_ThreadPool.find(this); if(itr != m_pThreadPool->m_ThreadPool.end()) { m_pThreadPool->m_ThreadPool.erase(itr); m_pThreadPool->m_TrashThread.insert(this); } m_bRun = false; } else { ThreadPoolItr itr = m_pThreadPool->m_TrashThread.begin(); while(itr != m_pThreadPool->m_TrashThread.end()) { (*itr)->Join(); delete (*itr); m_pThreadPool->m_TrashThread.erase(itr); itr = m_pThreadPool->m_TrashThread.begin(); } } LeaveCriticalSection(&(m_pThreadPool->m_csThreadPoolLock)); continue; } else { pTask->Run(); pTask = NULL; } } } ///////////////////////////////////////////////////////////////////////////////////////////// CThreadPoolExecutor::CThreadPoolExecutor(void) : m_bRun(false), m_bEnableInsertTask(false) { InitializeCriticalSection(&m_csTasksLock); InitializeCriticalSection(&m_csThreadPoolLock); } CThreadPoolExecutor::~CThreadPoolExecutor(void) { Terminate(); DeleteCriticalSection(&m_csTasksLock); DeleteCriticalSection(&m_csThreadPoolLock); } bool CThreadPoolExecutor::Init(unsigned int minThreads, unsigned int maxThreads, unsigned int maxPendingTasks) { if(minThreads == 0) { return false; } if(maxThreads < minThreads) { return false; } m_minThreads = minThreads; m_maxThreads = maxThreads; m_maxPendingTasks = maxPendingTasks; unsigned int i = m_ThreadPool.size(); for(; i<minThreads; i++) { //創建線程 CWorker * pWorker = new CWorker(this); if(NULL == pWorker) { return false; } EnterCriticalSection(&m_csThreadPoolLock); m_ThreadPool.insert(pWorker); LeaveCriticalSection(&m_csThreadPoolLock); pWorker->Start(); } m_bRun = true; m_bEnableInsertTask = true; return true; } bool CThreadPoolExecutor::Execute(Runnable * pRunnable) { if(!m_bEnableInsertTask) { return false; } if(NULL == pRunnable) { return false; } if(m_Tasks.size() >= m_maxPendingTasks) { if(m_ThreadPool.size() < m_maxThreads) { CWorker * pWorker = new CWorker(this, pRunnable); if(NULL == pWorker) { return false; } EnterCriticalSection(&m_csThreadPoolLock); m_ThreadPool.insert(pWorker); LeaveCriticalSection(&m_csThreadPoolLock); pWorker->Start(); } else { return false; } } else { EnterCriticalSection(&m_csTasksLock); m_Tasks.push_back(pRunnable); LeaveCriticalSection(&m_csTasksLock); } return true; } Runnable * CThreadPoolExecutor::GetTask() { Runnable * Task = NULL; EnterCriticalSection(&m_csTasksLock); if(!m_Tasks.empty()) { Task = m_Tasks.front(); m_Tasks.pop_front(); } LeaveCriticalSection(&m_csTasksLock); return Task; } unsigned int CThreadPoolExecutor::GetThreadPoolSize() { return m_ThreadPool.size(); } void CThreadPoolExecutor::Terminate() { m_bEnableInsertTask = false; while(m_Tasks.size() > 0) { Sleep(1); } m_bRun = false; m_minThreads = 0; m_maxThreads = 0; m_maxPendingTasks = 0; while(m_ThreadPool.size() > 0) { Sleep(1); } EnterCriticalSection(&m_csThreadPoolLock); ThreadPoolItr itr = m_TrashThread.begin(); while(itr != m_TrashThread.end()) { (*itr)->Join(); delete (*itr); m_TrashThread.erase(itr); itr = m_TrashThread.begin(); } LeaveCriticalSection(&m_csThreadPoolLock); } 用法: #include "Thread.h"#include "ThreadPoolExecutor.h"class R : public Runnable{public: ~R() { } void Run() { printf("Hello World/n"); }};int _tmain(int argc, _TCHAR* argv[]){ CThreadPoolExecutor * pExecutor = new CThreadPoolExecutor(); pExecutor->Init(1, 10, 50); R r; for(int i=0;i<100;i++) { while(!pExecutor->Execute(&r)) { } } pExecutor->Terminate(); delete pExecutor; getchar(); return 0;} 測試結果: 機器: Intel(R) Core(TM)2 Duo CPU E8400 @ 3.00GHz 2G內存 對于100個任務并且每個任務包含10000000個循環,任務中無等待: 單線程執行耗時:2281時間片 單線程池執行耗時:2219時間片 2個線程的線程池耗時:1156時間片 5個線程的線程池耗時:1166時間片 10個線程的線程池耗時:1157時間片 100個線程的線程池耗時:1177時間片 from:RFID管理系統集成商 RFID中間件 條碼系統中間層 物聯網軟件集成