加载中…
个人资料
  • 博客等级:
  • 博客积分:
  • 博客访问:
  • 关注人气:
  • 获赠金笔:0支
  • 赠出金笔:0支
  • 荣誉徽章:
正文 字体大小:

N个线程对M个任务的处理模型

(2011-09-09 10:49:20)
标签:

it

--分析的程序源于<<VC++系统开发实例精粹>>Example1

我们要建立一个基本结构,它代表了一个单一任务,结构如下:

class WorkItemBase
{
   virtual void   DoWork(void* pThreadContext)    = 0;
   virtual void   Abort () = 0;
   friend CWorkQueue;
};

定义任务队列结构,方便其他类使用

typedef std::queue<WorkItemBase*>           WorkItemQueue,*PWorkItemQueue;

 


类结构:

class  CWorkQueue
{
public:

   virtual ~CWorkQueue(){};
  
   bool Create(const unsigned int  nNumberOfThreads,
               void*     *pThreadDataArray             = NULL);
  
   bool InsertWorkItem(WorkItemBase* pWorkItem);
  
   void Destroy();

private:
  
   static unsigned long __stdcall ThreadFunc( void* pParam );
   WorkItemBase* RemoveWorkItem();
  
  
  
   enum{
      ABORT_EVENT_INDEX = 0,
      SEMAPHORE_INDEX,
      NUMBER_OF_SYNC_OBJ,
   };

   PHANDLE     m_phThreads;                            //线程数组,保存创建的N个线程句柄
   unsigned int    m_nNumberOfThreads;                 //保存线程数目
   void*     m_pThreadDataArray;                       //线程参数
   
   HANDLE     m_phSincObjectsArray[NUMBER_OF_SYNC_OBJ];

   CRITICAL_SECTION   m_CriticalSection; 
   PWorkItemQueue           m_pWorkItemQueue;          //任务队列
};

 

//Create方法,主要是创建线程(N个),还有,初始化事件对象和工作队列

 //初始化工作队列
bool CWorkQueue::Create(const unsigned int  nNumberOfThreads,
                              void*         *ThreadData      )
{
     
   //初始化工作队列
   m_pWorkItemQueue = new WorkItemQueue();
  
   if(NULL == m_pWorkItemQueue )
      
      return false;
   }
  
   //创建Semaphore对象  
   m_phSincObjectsArray[SEMAPHORE_INDEX] = CreateSemaphore(NULL,0,LONG_MAX,NULL);
  
   if(m_phSincObjectsArray[SEMAPHORE_INDEX] == NULL)
      
    delete m_pWorkItemQueue;
    m_pWorkItemQueue = NULL;
    return false;
   }
  
   //创建event 事件对象
   m_phSincObjectsArray[ABORT_EVENT_INDEX] = CreateEvent(NULL,TRUE,FALSE,NULL);

  
   if(m_phSincObjectsArray[ABORT_EVENT_INDEX]  == NULL)
      
      delete m_pWorkItemQueue;
      m_pWorkItemQueue = NULL;
      CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
      return false;
   }
  

   //创建临界区以保护工作队列
   InitializeCriticalSection(&m_CriticalSection);

   //分配线程句柄数组
   m_phThreads = new HANDLE[nNumberOfThreads];

   if(m_phThreads == NULL)
    
      delete m_pWorkItemQueue;
      m_pWorkItemQueue = NULL;
      CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
   CloseHandle(m_phSincObjectsArray[ABORT_EVENT_INDEX]);
   DeleteCriticalSection(&m_CriticalSection);   
      return false;
   }

   unsigned int i;

   m_nNumberOfThreads = nNumberOfThreads;

   DWORD dwThreadId;
   PTHREAD_CONTEXT pThreadsContext ;
 
   //创建所有的线程
   for(i = 0 ; i < nNumberOfThreads ; i++ )
  
    //初始化每个线程的上下文,用于传递给线程函数


   pThreadsContext = new THREAD_CONTEXT;

   //这里通过线程的参数将任务队列告诉了线程,使得线程可以处理任务队列的任务函数
   pThreadsContext->pWorkQueue  = this;
   pThreadsContext->pThreadData = ThreadData == NULL? NULL : ThreadData[i];   
   //创建线程
   m_phThreads[i] = CreateThread(NULL,
    0,
    CWorkQueue::ThreadFunc,
    pThreadsContext,
    0,
    &dwThreadId);

      if(m_phThreads[i] == NULL)
     
   delete pThreadsContext;
         m_nNumberOfThreads = i;
         Destroy();
         return false;
      }
   }

   return true;
}


bool CWorkQueue::InsertWorkItem(WorkItemBase* pWorkItem)
{

   assert(pWorkItem != NULL);
   //锁住
   EnterCriticalSection(&m_CriticalSection);
   //插入队列
   m_pWorkItemQueue->push(pWorkItem);
   //解锁
   LeaveCriticalSection(&m_CriticalSection);
   //触发线程
   if (!ReleaseSemaphore(m_phSincObjectsArray[SEMAPHORE_INDEX],1,NULL))
   {
      assert(false);
      return false;    
   }
  
 return true;
 
}

 

 


WorkItemBase*  CWorkQueue::RemoveWorkItem()
{
  
   WorkItemBase* pWorkItem;
   //锁住
   EnterCriticalSection(&m_CriticalSection); 
   //从队列中移除任务 
   pWorkItem = m_pWorkItemQueue->front();
   m_pWorkItemQueue->pop();
   //解锁
   LeaveCriticalSection(&m_CriticalSection);
   assert(pWorkItem != NULL);
   return pWorkItem;
}

 


unsigned long __stdcall CWorkQueue::ThreadFunc( void*  pParam )
{
   PTHREAD_CONTEXT       pThreadContext =  (PTHREAD_CONTEXT)pParam;//线程的传入参数
   WorkItemBase*         pWorkItem      = NULL;
   CWorkQueue*           pWorkQueue     = pThreadContext->pWorkQueue;//工作队列指针
   void*                 pThreadData    = pThreadContext->pThreadData;
   DWORD dwWaitResult;
   for(;;)
   {
   //等待两个事件****
   dwWaitResult = WaitForMultipleObjects(NUMBER_OF_SYNC_OBJ,pWorkQueue->m_phSincObjectsArray,FALSE,INFINITE);

      switch(dwWaitResult - WAIT_OBJECT_0)
      {

      case ABORT_EVENT_INDEX: //离开线程事件
      delete pThreadContext;
         return 0;
      case SEMAPHORE_INDEX://工作任务事件
         //得到工作队列的第一个工作任务
         pWorkItem = pWorkQueue->RemoveWorkItem();    
         if(pWorkItem == NULL)
         {
            assert(false);
            break;
           
         //调用相应的工作函数*****
         pWorkItem->DoWork(pThreadData);    
         break;
      default:
          assert(false);
    delete pThreadContext;
          return 0;

         
   }
   delete pThreadContext;
   return 1;
}



void CWorkQueue::Destroy()
{
  //设置退出事件    因为事件在创建时选择了手动重置,故在没有重置的情况下,可以让所有的线程检测到这个事件,从而退出.
   if(!SetEvent(m_phSincObjectsArray[ABORT_EVENT_INDEX]))
     
      assert(false);
      return;
   }
   //等待所有的线程结束
   WaitForMultipleObjects(m_nNumberOfThreads,m_phThreads,true,INFINITE);
        
   //清除队列
   while(!m_pWorkItemQueue->empty())
   {
      m_pWorkItemQueue->front()->Abort();
   m_pWorkItemQueue->pop();
  
   delete m_pWorkItemQueue;
   m_pWorkItemQueue = NULL;  
   CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
   CloseHandle(m_phSincObjectsArray[ABORT_EVENT_INDEX]);
   DeleteCriticalSection(&m_CriticalSection);
   //关闭所有的线程句柄
   for(int i = 0 ; i < m_nNumberOfThreads ; i++)
    CloseHandle(m_phThreads[i]);
   delete[] m_phThreads;
}

0

阅读 收藏 喜欢 打印举报/Report
  

新浪BLOG意见反馈留言板 欢迎批评指正

新浪简介 | About Sina | 广告服务 | 联系我们 | 招聘信息 | 网站律师 | SINA English | 产品答疑

新浪公司 版权所有