在设计CDistributeQueue类时,通常有两种方案值得考虑:
- 1、 本地队列预先创建好,当有线程访问时就可以直接根据线程编号去访问对应的本地队列。
- 2、 不预先创建本地队列,当线程第一次访问分布式队列时,由于获取不到线程编号,由此可以断定本线程是第1次访问分布式队列,此时才创建本地队列。
方案1和方案2可以说各有优缺点。方案1中,在事先不知道有多少线程会访问分布式队列的情况下,预先创建好本地队列会造成程序初始化时间过长,并且可能有一些创建好的队列得不到使用。
方案2中,采用线程访问分布式队列时才创建本地队列,初始化时比较简单,并且不会造成多创建了本地队列的情况。缺点是编程时,队列的操作代码会变复杂一些,效率会有所降低。
typedef int (*GetThreadIdFunc)(void *pArg);
template <class T, class LocalQueue, class SharedQueue>
class
CDistributedQueue {
LocalQueue ** m_ppLocalQueue; // 本地队列数组
SharedQueue * m_pSharedQueue; // 共享队列池或共享队列
DWORD m_dwTlsIndex; //线程本地存储索引
LONG volatile m_lThreadIdIndex; //线程编号最大值
GetThreadIdFunc m_GetThreadIdFunc; //获取线程编号回调函数,如果由外面
void * m_pThreadIdFuncArg; //获取线程编号回调函数的参数
CFastLock m_LocalQueueResizeLock; //专为下面的ResizeLocalQueue函数使用
void ResizeLocalQueue(); //将m_ppLocalQueue数组的大小扩大一倍
m_GetThreadIdFunc = NULL;
m_pThreadIdFuncArg = NULL;
void Create( int nLocalQueueSize, int nLocalQueueCount,
int nSharedQueueSize, int nSharedQueueCount);
void Create( int nLocalQueueSize, int nLocalQueueCount,
int nSharedQueueSize, int nSharedQueueCount,
GetThreadIdFunc GetThreadId, void * pThreadIdFuncArg);
virtual ~CDistributedQueue();
void PushToLocalQueue(T &Data);
void PushToLocalQueue(T &Data, int nIndex);
int PopFromLocalQueue(T &Data);
SharedQueue *GetSharedQueue() { return m_pSharedQueue; };
int PrivatizeSharedQueue(int nSharedQueueIndex);
说明一下:CDistributedQueue类中有三个模板参数,第1个模板参数T是表示数据类型;第2个模板参数是表示本地队列类的类型,为一 个不需要使用锁的普通队列,比如环形队列等;第3个模板参数是表示一个需要使用锁的共享队列类,可以是一个队列池类,也可以是普通的使用锁的共享队列类。
@param int nLocalQueueSize - 本地子队列的大小
@param int nLocalQueueCount - 本地队列的个数(数组的大小)
@param int nSharedQueueSize - 共享子队列的大小
@param int nSharedQueueCount - 共享子队列的个数
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::Create( int nLocalQueueSize, int nLocalQueueCount, int nSharedQueueSize, int nSharedQueueCount) m_nLocalQueueSize = nLocalQueueSize;
m_nSharedQueueSize = nSharedQueueSize;
if ( nLocalQueueCount != 0 )
m_nLocalQueueCount = nLocalQueueCount;
m_nLocalQueueCount = omp_get_num_procs();
if ( nSharedQueueCount != 0 )
m_nSharedQueueCount = nSharedQueueCount;
m_nSharedQueueCount = omp_get_num_procs();
m_ppLocalQueue = new LocalQueue *[m_nLocalQueueCount];
for ( i = 0; i < m_nLocalQueueCount; i++ )
m_ppLocalQueue[i] = NULL;
m_pSharedQueue = new SharedQueue(m_nSharedQueueCount, m_nSharedQueueSize);
m_dwTlsIndex = TlsAlloc();
@param int nLocalQueueSize - 本地子队列的大小
@param int nLocalQueueCount - 本地队列的个数(数组的大小)
@param int nSharedQueueSize - 共享子队列的大小
@param int nSharedQueueCount - 共享子队列的个数
@param GetThreadIdFunc GetThreadId - 获取线程Id回调函数
@param void * pThreadIdFuncArg - GetThreadId回调函数的参数
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::Create( int nLocalQueueSize, int nLocalQueueCount, int nSharedQueueSize, int nSharedQueueCount, GetThreadIdFunc GetThreadId, void * pThreadIdFuncArg) m_GetThreadIdFunc = GetThreadId;
m_pThreadIdFuncArg = pThreadIdFuncArg;
Create(nLocalQueueSize, nLocalQueueCount, nSharedQueueSize, nSharedQueueCount);
template <class T, class LocalQueue, class SharedQueue>
CDistributedQueue<T, LocalQueue, SharedQueue>::~CDistributedQueue() for ( i = 0; i < m_nLocalQueueCount; i++ )
if ( m_ppLocalQueue[i] != NULL )
delete m_ppLocalQueue[i];
delete [] m_ppLocalQueue;
这个函数主要是考虑有可能程序升级后,访问的线程数量可能大于本地队列数组的大小的情况,此时采取将本地队列数组扩大一倍的策略。
/** 分布式队列的将本地队列数组扩大一倍的内部成员函数
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::ResizeLocalQueue() //将本地队列数组扩大一倍, 防止线程数量多于队列数量,以保证程序安全
LocalQueue **ppQueue = new LocalQueue *[m_nLocalQueueCount * 2];
for ( i = 0; i < m_nLocalQueueCount; i++ )
ppQueue[i] = m_ppLocalQueue[i];
for ( i = m_nLocalQueueCount; i < m_nLocalQueueCount * 2; i++ )
delete [] m_ppLocalQueue;
m_ppLocalQueue = ppQueue;
//使用原子操作避免m_nLocalQueueCount的数据竞争问题
AtomicWrite((LONG volatile *)&m_nLocalQueueCount, m_nLocalQueueCount * 2);
获取线程Id成员函数中,这个函数中完成本地队列的创建和分派工作。先是判断获取的线程Id是否为0,如果为0则表明还没有创建本地队列,此时需要给线程进行编号,并创建一个新的本地队列放到数组中下标等于线程编号的位置上。
如果m_GetThreadIdFunc回调函数不为空,则使用它获取Id
template <class T, class LocalQueue, class SharedQueue>
LONG CDistributedQueue<T, LocalQueue, SharedQueue>::ThreadIdGet() LocalQueue *pQueue = NULL;
if ( m_GetThreadIdFunc != NULL )
Id = (*m_GetThreadIdFunc)(m_pThreadIdFuncArg);
if ( Id >= m_nLocalQueueCount )
CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);
if ( Id >= m_nLocalQueueCount )
if ( m_ppLocalQueue[Id] == NULL )
m_ppLocalQueue[Id] = new LocalQueue(m_nLocalQueueSize);
Id = (LONG )TlsGetValue(m_dwTlsIndex);
Id = AtomicIncrement(&m_lThreadIdIndex);
TlsSetValue(m_dwTlsIndex, (void *)Id);
pQueue = new LocalQueue(m_nLocalQueueSize);
if ( Id >= m_nLocalQueueCount)
CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);
if ( Id >= m_nLocalQueueCount )
m_ppLocalQueue[Id] = pQueue;
1、本地队列空时进入本地队列,、共享队列未满时进入共享队列
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::EnQueue(T &Data) if ( m_ppLocalQueue[nId]->IsEmpty() )
m_ppLocalQueue[nId]->EnQueue(Data);
else if ( m_pSharedQueue->Push(Data) != CAPI_SUCCESS )
m_ppLocalQueue[nId]->EnQueue(Data);
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::PushToLocalQueue( m_ppLocalQueue[nId]->EnQueue(Data);
使用这个函数要特别小心,必须保证不会发生数据竞争问题
@param int nIndex - 本地队列的序号
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::PushToLocalQueue( if ( nIndex >= m_nLocalQueueCount * 2)
if ( nIndex >= m_nLocalQueueCount )
CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);
if ( nIndex >= m_nLocalQueueCount )
if ( m_ppLocalQueue[nIndex] == NULL )
m_ppLocalQueue[nIndex] = new LocalQueue(m_nLocalQueueSize);
m_ppLocalQueue[nIndex]->EnQueue(Data);
@return int - 出队成功返回CAPI_SUCCESS, 失败(队列为空)返回CAPI_FAILED.
template <class T, class LocalQueue, class SharedQueue>
int CDistributedQueue<T, LocalQueue, SharedQueue>::PopFromLocalQueue( return m_ppLocalQueue[nId]->DeQueue(Data);
出队操作策略为,先从本地队列中出队,如果失败则从共享队列中出队
@return int - 成功返回CAPI_SUCCESS, 失败返回CAPI_FAILED.
template <class T, class LocalQueue, class SharedQueue>
int CDistributedQueue<T, LocalQueue, SharedQueue>::DeQueue(T &Data) nRet = m_ppLocalQueue[nId]->DeQueue(Data);
if ( nRet == CAPI_FAILED )
nRet = m_pSharedQueue->Pop(Data);
本文转自Intel_ISN 51CTO博客,原文链接:http://blog.51cto.com/intelisn/130452,如需转载请自行联系原作者