--分析的程序源于<<VC++系统开发实例精粹>>Example1
我们要建立一个基本结构,它代表了一个单一任务,结构如下:
class WorkItemBase
{
virtual
void DoWork(void*
pThreadContext)
= 0;
virtual
void Abort () = 0;
friend CWorkQueue;
};
定义任务队列结构,方便其他类使用
typedef
std::queue<WorkItemBase*>
WorkItemQueue,*PWorkItemQueue;
类结构:
class CWorkQueue
{
public:
virtual
~CWorkQueue(){};
bool Create(const unsigned
int nNumberOfThreads,
void* *pThreadDataArray
= NULL);
bool
InsertWorkItem(WorkItemBase* pWorkItem);
void Destroy();
private:
static unsigned long __stdcall
ThreadFunc( void* pParam );
WorkItemBase*
RemoveWorkItem();
enum{
ABORT_EVENT_INDEX = 0,
SEMAPHORE_INDEX,
NUMBER_OF_SYNC_OBJ,
};
PHANDLE m_phThreads;
//线程数组,保存创建的N个线程句柄
unsigned
int m_nNumberOfThreads;
//保存线程数目
void* m_pThreadDataArray; //线程参数
HANDLE m_phSincObjectsArray[NUMBER_OF_SYNC_OBJ];
CRITICAL_SECTION m_CriticalSection;
PWorkItemQueue
m_pWorkItemQueue;
//任务队列
};
//Create方法,主要是创建线程(N个),还有,初始化事件对象和工作队列
//初始化工作队列
bool CWorkQueue::Create(const unsigned int
nNumberOfThreads,
void*
*ThreadData
)
{
//初始化工作队列
m_pWorkItemQueue = new
WorkItemQueue();
if(NULL == m_pWorkItemQueue
)
{
return false;
}
//创建Semaphore对象
m_phSincObjectsArray[SEMAPHORE_INDEX] =
CreateSemaphore(NULL,0,LONG_MAX,NULL);
if(m_phSincObjectsArray[SEMAPHORE_INDEX] == NULL)
{
delete
m_pWorkItemQueue;
m_pWorkItemQueue = NULL;
return
false;
}
//创建event 事件对象
m_phSincObjectsArray[ABORT_EVENT_INDEX] =
CreateEvent(NULL,TRUE,FALSE,NULL);
if(m_phSincObjectsArray[ABORT_EVENT_INDEX] ==
NULL)
{
delete m_pWorkItemQueue;
m_pWorkItemQueue = NULL;
CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
return false;
}
//创建临界区以保护工作队列
InitializeCriticalSection(&m_CriticalSection);
//分配线程句柄数组
m_phThreads = new
HANDLE[nNumberOfThreads];
if(m_phThreads ==
NULL)
{
delete m_pWorkItemQueue;
m_pWorkItemQueue = NULL;
CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
CloseHandle(m_phSincObjectsArray[ABORT_EVENT_INDEX]);
DeleteCriticalSection(&m_CriticalSection);
return false;
}
unsigned int i;
m_nNumberOfThreads =
nNumberOfThreads;
DWORD dwThreadId;
PTHREAD_CONTEXT
pThreadsContext ;
//创建所有的线程
for(i = 0 ; i <
nNumberOfThreads ; i++ )
{
//初始化每个线程的上下文,用于传递给线程函数
pThreadsContext = new
THREAD_CONTEXT;
//这里通过线程的参数将任务队列告诉了线程,使得线程可以处理任务队列的任务函数
pThreadsContext->pWorkQueue =
this;
pThreadsContext->pThreadData = ThreadData == NULL?
NULL :
ThreadData[i];
//创建线程
m_phThreads[i] =
CreateThread(NULL,
0,
CWorkQueue::ThreadFunc,
pThreadsContext,
0,
&dwThreadId);
if(m_phThreads[i] == NULL)
{
delete pThreadsContext;
m_nNumberOfThreads = i;
Destroy();
return false;
}
}
return true;
}
bool CWorkQueue::InsertWorkItem(WorkItemBase* pWorkItem)
{
assert(pWorkItem !=
NULL);
//锁住
EnterCriticalSection(&m_CriticalSection);
//插入队列
m_pWorkItemQueue->push(pWorkItem);
//解锁
LeaveCriticalSection(&m_CriticalSection);
//触发线程
if
(!ReleaseSemaphore(m_phSincObjectsArray[SEMAPHORE_INDEX],1,NULL))
{
assert(false);
return
false;
}
return true;
}
WorkItemBase* CWorkQueue::RemoveWorkItem()
{
WorkItemBase* pWorkItem;
//锁住
EnterCriticalSection(&m_CriticalSection);
//从队列中移除任务
pWorkItem =
m_pWorkItemQueue->front();
m_pWorkItemQueue->pop();
//解锁
LeaveCriticalSection(&m_CriticalSection);
assert(pWorkItem !=
NULL);
return pWorkItem;
}
unsigned long __stdcall CWorkQueue::ThreadFunc(
void* pParam )
{
PTHREAD_CONTEXT
pThreadContext =
(PTHREAD_CONTEXT)pParam;//线程的传入参数
WorkItemBase*
pWorkItem
= NULL;
CWorkQueue*
pWorkQueue
= pThreadContext->pWorkQueue;//工作队列指针
void*
pThreadData =
pThreadContext->pThreadData;
DWORD dwWaitResult;
for(;;)
{
//等待两个事件****
dwWaitResult =
WaitForMultipleObjects(NUMBER_OF_SYNC_OBJ,pWorkQueue->m_phSincObjectsArray,FALSE,INFINITE);
switch(dwWaitResult - WAIT_OBJECT_0)
{
case ABORT_EVENT_INDEX: //离开线程事件
delete pThreadContext;
return 0;
case SEMAPHORE_INDEX://工作任务事件
//得到工作队列的第一个工作任务
pWorkItem =
pWorkQueue->RemoveWorkItem();
if(pWorkItem == NULL)
{
assert(false);
break;
}
//调用相应的工作函数*****
pWorkItem->DoWork(pThreadData);
break;
default:
assert(false);
delete
pThreadContext;
return 0;
}
}
delete pThreadContext;
return 1;
}
void CWorkQueue::Destroy()
{
//设置退出事件
因为事件在创建时选择了手动重置,故在没有重置的情况下,可以让所有的线程检测到这个事件,从而退出.
if(!SetEvent(m_phSincObjectsArray[ABORT_EVENT_INDEX]))
{
assert(false);
return;
}
//等待所有的线程结束
WaitForMultipleObjects(m_nNumberOfThreads,m_phThreads,true,INFINITE);
//清除队列
while(!m_pWorkItemQueue->empty())
{
m_pWorkItemQueue->front()->Abort();
m_pWorkItemQueue->pop();
}
delete m_pWorkItemQueue;
m_pWorkItemQueue =
NULL;
CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
CloseHandle(m_phSincObjectsArray[ABORT_EVENT_INDEX]);
DeleteCriticalSection(&m_CriticalSection);
//关闭所有的线程句柄
for(int i = 0 ; i
< m_nNumberOfThreads ; i++)
CloseHandle(m_phThreads[i]);
delete[] m_phThreads;
}
加载中,请稍候......