線程安全的環形緩沖區實現
睿豐德科技 專注RFID識別技術和條碼識別技術與管理軟件的集成項目。質量追溯系統、MES系統、金蝶與條碼系統對接、用友與條碼系統對接
來源:http://blog.csdn.net/lezhiyong
應用背景:線程1將每次數量不一的音頻采樣點(PCM音頻數據)寫入環形緩沖區,線程2每次取固定數量采樣點送音頻編碼器,線程1線程2在平均時間內的讀寫數據量相等。(倒入桶中的水量有時大有時小,但每次取一瓢喝:)
該環形緩沖區借鑒CoolPlayer音頻播放器中的環形緩沖區代碼實現,在讀寫操作函數中加了鎖,允許多線程同時操作。CPs_CircleBuffer基于內存段的讀寫,比用模板實現的環形緩沖隊列適用的數據類型更廣些, CPs_CircleBuffer修改成C++中基于對象的實現,加上詳細注釋,m_csCircleBuffer鎖變量為自用的lock類型(將CRITICAL_SECTION封裝起來),調用lock()加鎖,調用unlock()解鎖。使用效果良好,分享出來。
CPs_CircleBuffer環形緩沖還不具備當待寫數據量超出空余緩沖時自動分配內存的功能,這個將在后續進行優化。
CPs_CircleBuffer使用步驟:
[cpp] view plaincopy
- 1、創建對象
- CPs_CircleBuffer* m_pCircleBuffer;
- m_pCircleBuffer = new CPs_CircleBuffer(bufsize);
- 2、寫
- if (m_pCircleBuffer->GetFreeSize() < CIC_READCHUNKSIZE)
- {
- Sleep(20);
- continue;
- }
- m_pCircleBuffer->Write(internetbuffer.lpvBuffer,internetbuffer.dwBufferLength);
- 3、讀
- m_pCircleBuffer->Read(pDestBuffer,iBytesToRead, piBytesRead);
- 4、其他調用
- if(m_pCircleBuffer->IsComplete())
- break;
- iUsedSpace =m_pCircleBuffer->GetUsedSize();
- m_pCircleBuffer->SetComplete();
CPs_CircleBuffer修改為類的定義:
[cpp] view plaincopy- class CPs_CircleBuffer
- {
- public:
- CPs_CircleBuffer(const unsigned int iBufferSize);
- ~CPs_CircleBuffer();
- public:
- // Public functions
- void Uninitialise();
- void Write(const void* pSourceBuffer, const unsigned int iNumBytes);
- bool Read(void* pDestBuffer, const size_t iBytesToRead, size_t* pbBytesRead);
- void Flush();
- unsigned int GetUsedSize();
- unsigned int GetFreeSize();
- void SetComplete();
- bool IsComplete();
- private:
- unsigned char* m_pBuffer;
- unsigned int m_iBufferSize;
- unsigned int m_iReadCursor;
- unsigned int m_iWriteCursor;
- HANDLE m_evtDataAvailable;
- Vlock m_csCircleBuffer;
- bool m_bComplete;
- };
CPs_CircleBuffer修改為類的實現:
[cpp] view plaincopy- #define CIC_WAITTIMEOUT 3000
- CPs_CircleBuffer::CPs_CircleBuffer(const unsigned int iBufferSize)
- {
- m_iBufferSize = iBufferSize;
- m_pBuffer = (unsigned char*)malloc(iBufferSize);
- m_iReadCursor = 0;
- m_iWriteCursor = 0;
- m_bComplete = false;
- m_evtDataAvailable = CreateEvent(NULL, FALSE, FALSE, NULL);
- }
- CPs_CircleBuffer::~CPs_CircleBuffer()
- {
- Uninitialise();
- }
- // Public functions
- void CPs_CircleBuffer::Uninitialise()//沒有必要public這個接口函數,long120817
- {
- CloseHandle(m_evtDataAvailable);
- free(m_pBuffer);
- }
- //Write前一定要調用m_pCircleBuffer->GetFreeSize(),如果FreeSize不夠需要等待,long120817
- void CPs_CircleBuffer::Write(const void* _pSourceBuffer, const unsigned int _iNumBytes)
- {
- unsigned int iBytesToWrite = _iNumBytes;
- unsigned char* pSourceReadCursor = (unsigned char*)_pSourceBuffer;
- //CP_ASSERT(iBytesToWrite <= GetFreeSize());//修改為沒有足夠空間就返回,write前一定要加GetFreeSize判斷,否則進入到這里相當于丟掉數據, // long120817
- if (iBytesToWrite > GetFreeSize())
- {
- return;
- }
- _ASSERT(m_bComplete == false);
- m_csCircleBuffer.Lock();
- if (m_iWriteCursor >= m_iReadCursor)
- {
- // 0 m_iBufferSize
- // |-----------------|===========|--------------|
- // pR-> pW->
- // 計算尾部可寫空間iChunkSize,long120817
- unsigned int iChunkSize = m_iBufferSize - m_iWriteCursor;
- if (iChunkSize > iBytesToWrite)
- {
- iChunkSize = iBytesToWrite;
- }
- // Copy the data
- memcpy(m_pBuffer + m_iWriteCursor,pSourceReadCursor, iChunkSize);
- pSourceReadCursor += iChunkSize;
- iBytesToWrite -= iChunkSize;
- // 更新m_iWriteCursor
- m_iWriteCursor += iChunkSize;
- if (m_iWriteCursor >= m_iBufferSize)//如果m_iWriteCursor已經到達末尾
- m_iWriteCursor -= m_iBufferSize;//返回到起點0位置,long120817
- }
- //剩余數據從Buffer起始位置開始寫
- if (iBytesToWrite)
- {
- memcpy(m_pBuffer + m_iWriteCursor,pSourceReadCursor, iBytesToWrite);
- m_iWriteCursor += iBytesToWrite;
- _ASSERT(m_iWriteCursor < m_iBufferSize);//這個斷言沒什么意思,應該_ASSERT(m_iWriteCursor <= m_iReadCursor);long20120817
- }
- SetEvent(m_evtDataAvailable);//設置數據寫好信號量
- m_csCircleBuffer.UnLock();
- }
- bool CPs_CircleBuffer::Read(void* pDestBuffer, const size_t _iBytesToRead, size_t* pbBytesRead)
- {
- size_t iBytesToRead = _iBytesToRead;
- size_t iBytesRead = 0;
- DWORD dwWaitResult;
- bool bComplete = false;
- while (iBytesToRead > 0 && bComplete == false)
- {
- dwWaitResult = WaitForSingleObject(m_evtDataAvailable, CIC_WAITTIMEOUT);//等待數據寫好,long120817
- if (dwWaitResult == WAIT_TIMEOUT)
- {
- //TRACE_INFO2("Circle buffer - did not fill in time!");
- *pbBytesRead = iBytesRead;
- return FALSE;//等待超時則返回
- }
- m_csCircleBuffer.Lock();
- if (m_iReadCursor > m_iWriteCursor)
- {
- // 0 m_iBufferSize
- // |=================|-----|===========================|
- // pW-> pR->
- unsigned int iChunkSize = m_iBufferSize - m_iReadCursor;
- if (iChunkSize > iBytesToRead)
- iChunkSize = (unsigned int)iBytesToRead;
- //讀取操作
- memcpy((unsigned char*)pDestBuffer + iBytesRead,m_pBuffer + m_iReadCursor,iChunkSize);
- iBytesRead += iChunkSize;
- iBytesToRead -= iChunkSize;
- m_iReadCursor += iChunkSize;
- if (m_iReadCursor >= m_iBufferSize)//如果m_iReadCursor已經到達末尾
- m_iReadCursor -= m_iBufferSize;//返回到起點0位置,long120817
- }
- if (iBytesToRead && m_iReadCursor < m_iWriteCursor)
- {
- unsigned int iChunkSize = m_iWriteCursor - m_iReadCursor;
- if (iChunkSize > iBytesToRead)
- iChunkSize = (unsigned int)iBytesToRead;
- //讀取操作
- memcpy((unsigned char*)pDestBuffer + iBytesRead,m_pBuffer + m_iReadCursor,iChunkSize);
- iBytesRead += iChunkSize;
- iBytesToRead -= iChunkSize;
- m_iReadCursor += iChunkSize;
- }
- //如果有更多的數據要寫
- if (m_iReadCursor == m_iWriteCursor)
- {
- if (m_bComplete)//跳出下一個while循環,該值通過SetComplete()設置,此邏輯什么意思?long120817
- bComplete = true;
- }
- else//還有數據可以讀,SetEvent,在下一個while循環開始可以不用再等待,long120817
- SetEvent(m_evtDataAvailable);
- m_csCircleBuffer.UnLock();
- }
- *pbBytesRead = iBytesRead;
- return bComplete ? false : true;
- }
- // 0 m_iBufferSize
- // |------------------------------------------------|
- // pR
- // pW
- //讀寫指針歸零
- void CPs_CircleBuffer::Flush()
- {
- m_csCircleBuffer.Lock();
- m_iReadCursor = 0;
- m_iWriteCursor = 0;
- m_csCircleBuffer.UnLock();
- }
- //獲取已經寫的內存
- unsigned int CPs_CircleBuffer::GetUsedSize()
- {
- return m_iBufferSize - GetFreeSize();
- }
- unsigned int CPs_CircleBuffer::GetFreeSize()
- {
- unsigned int iNumBytesFree;
- m_csCircleBuffer.Lock();
- if (m_iWriteCursor < m_iReadCursor)
- {
- // 0 m_iBufferSize
- // |=================|-----|===========================|
- // pW-> pR->
- iNumBytesFree = (m_iReadCursor - 1) - m_iWriteCursor;
- }
- else if (m_iWriteCursor == m_iReadCursor)
- {
- iNumBytesFree = m_iBufferSize;
- }
- else
- {
- // 0 m_iBufferSize
- // |-----------------|=====|---------------------------|
- // pR-> pW->
- iNumBytesFree = (m_iReadCursor - 1) + (m_iBufferSize - m_iWriteCursor);
- }
- m_csCircleBuffer.UnLock();
- return iNumBytesFree;
- }
- //該函數什么時候調用?long120817
- void CPs_CircleBuffer::SetComplete()
- {
- m_csCircleBuffer.Lock();
- m_bComplete = true;
- SetEvent(m_evtDataAvailable);
- m_csCircleBuffer.UnLock();
- }
附自動初始化和摧毀的鎖對象Vlock的實現:
[cpp] view plaincopy- #ifdef WIN32
- #include <windows.h>
- #define V_MUTEX CRITICAL_SECTION //利用臨界區實現的鎖變量
- #define V_MUTEX_INIT(m) InitializeCriticalSection(m)
- #define V_MUTEX_LOCK(m) EnterCriticalSection(m)
- #define V_MUTEX_UNLOCK(m) LeaveCriticalSection(m)
- #define V_MUTEX_DESTORY(m) DeleteCriticalSection(m)
- #else
- #define V_MUTEX pthread_mutex_t
- #define V_MUTEX_INIT(m) pthread_mutex_init(m,NULL)
- #define V_MUTEX_LOCK(m) pthread_mutex_Lock(m)
- #define V_MUTEX_UNLOCK(m) pthread_mutex_unLock(m)
- #define V_MUTEX_DESTORY(m) pthread_mutex_destroy(m)
- #endif
- class Vlock
- {
- public:
- Vlock(void)
- {
- V_MUTEX_INIT(&m_Lock);
- }
- ~Vlock(void)
- {
- V_MUTEX_DESTORY(&m_Lock);
- }
- public:
- void Lock(){V_MUTEX_LOCK(&m_Lock);}
- void UnLock(){V_MUTEX_UNLOCK(&m_Lock);}
- private:
- V_MUTEX m_Lock;
- };