first commit

This commit is contained in:
2019-08-19 19:19:58 +08:00
parent 7bb6b54204
commit 2b71bf8135
145 changed files with 23208 additions and 0 deletions

371
inc/hgl/thread/Workflow.h Normal file
View File

@@ -0,0 +1,371 @@
#ifndef HGL_WORKFLOW_INCLUDE
#define HGL_WORKFLOW_INCLUDE
#include<hgl/thread/Thread.h>
#include<hgl/thread/ThreadMutex.h>
#include<hgl/thread/SwapData.h>
#include<hgl/thread/DataPost.h>
#include<hgl/type/List.h>
namespace hgl
{
/**
* 工作流名字空间<br>
* 工作流是一种对工作的多线安排管理机制,它适用于按业务分配多线程的工作环境<br>
* 开发者需要为每一种工作指定一定的线程数量,但每一种工作确只有一个工作分配入口和分发出口。<br>
* 由其它程序提交工作任务到入口,开发者可以自行重载分配入口的分配函数。
*/
namespace workflow
{
/**
* 工作处理基类模板
* @param W 工作对象
*/
template<typename W> class WorkProc
{
public:
virtual ~WorkProc()=default;
public: //投递工作线程所需调用的方法
virtual void Post(W *w)=0; ///<投递一个工作
virtual void Post(W **w,int count)=0; ///<投递一批工作
public: //需用户重载实现的真正执行工作的方法
/**
* 单个工作执行事件函数,此函数需用户重载实现
*/
virtual void OnWork(const uint,W *)=0;
public: //由工作线程调用的执行工作事件函数
/**
* 工作执行处理函数
*/
virtual bool OnExecuteWork(const uint)=0;
};//template<typename W> class WorkProc
/**
* 单体工作处理<br>
* 该类可以由多个线程投递工作,但只能被一个工作线程获取工作
*/
template<typename W> class SingleWorkProc:public WorkProc<W>
{
public:
using WorkList=List<W *>;
private:
SemSwapData<WorkList> work_list; ///<工程列表
protected:
double time_out;
public:
SingleWorkProc()
{
time_out=5;
}
virtual ~SingleWorkProc()=default;
void SetTimeOut(const double to) ///<设置超时时间
{
if(to<=0)time_out=0;
else time_out=to;
}
virtual void Post(W *w) override ///<投递一个工作
{
WorkList &wl=work_list.GetPost();
wl.Add(w);
work_list.ReleasePost();
}
virtual void Post(W **w,int count) override ///<投递一批工作
{
WorkList &wl=work_list.GetPost();
wl.Add(w,count);
work_list.ReleasePost();
}
virtual void ToWork() ///<将堆积的工作列表发送给工作线程
{
work_list.PostSem(1);
}
public:
/**
* 当前工作序列完成事件函数,如需使用请重载此函数
*/
virtual void OnFinish(const uint wt_index)
{
}
/**
* 开始执行工作函数
*/
virtual bool OnExecuteWork(const uint wt_index) override
{
//为什么不使用TrySemSwap使用TrySemSwap固然会立即返回结果但会引起线程频繁刷新造成CPU的流费。
//使用WaitSemSwap目前唯一坏处是在退出时需要等待超时时间。
if(!work_list.WaitSemSwap(time_out))
return(false);
WorkList &wl=work_list.GetReceive();
const int count=wl.GetCount();
if(count>0)
{
W **p=wl.GetData();
for(int i=0;i<count;i++)
{
this->OnWork(wt_index,*p);
++p;
}
this->OnFinish(wt_index);
wl.ClearData();
}
return(true);
}
};//template<typename W> class SingleWorkProc:public WorkProc<W>
/**
* 多体工作处理<br>
* 该类可以由多个线程投递工作,也可以同时被多个工作线程获取工作
*/
template<typename W> class MultiWorkProc:public WorkProc<W>
{
protected:
SemDataPost<W> work_list; ///<工程列表
protected:
double time_out;
public:
MultiWorkProc()
{
time_out=5;
}
virtual ~MultiWorkProc()=default;
void SetTimeOut(const double to) ///<设置超时时间
{
if(to<=0)time_out=0;
else time_out=to;
}
virtual void Post(W *w) override ///<投递一个工作
{
if(!w)return;
work_list.Post(w);
work_list.PostSem(1);
}
virtual void Post(W **w,int count) override ///<投递一批工作
{
if(!w||count<=0)return;
work_list.Post(w,count);
work_list.PostSem(count);
}
public:
/**
* 开始执行工作函数
*/
virtual bool OnExecuteWork(const uint wt_index) override
{
//为什么不使用TrySemReceive使用TrySemReceive固然会立即返回结果但会引起线程频繁刷新造成CPU的流费。
//使用WaitSemReceive目前唯一坏处是在退出时需要等待超时时间。
W *obj=work_list.WaitSemReceive(time_out);
if(!obj)
return(false);
this->OnWork(wt_index,obj);
return(true);
}
};//template<typename W> class MultiWorkProc:public WorkProc<W>
/**
* 工作线程,用于真正处理事务
*/
template<typename W> class WorkThread:public Thread
{
protected:
using WorkList=List<W *>;
WorkProc<W> *work_proc;
uint work_thread_index;
bool force_close;
public:
WorkThread(WorkProc<W> *wp)
{
work_proc=wp;
work_thread_index=0;
force_close=false;
}
#ifndef _DEBUG
virtual ~WorkThread()=default;
#else
virtual ~WorkThread()
{
LOG_INFO(U8_TEXT("WorkThread Destruct [")+thread_addr_string+U8_TEXT("]"));
}
#endif//_DEBUG
bool IsExitDelete()const override{return false;} ///<返回在退出线程时,不删除本对象
void SetWorkThreadIndex(const uint index)
{
work_thread_index=index;
}
void ExitWork(const bool fc)
{
force_close=fc;
Thread::WaitExit();
}
virtual void ProcEndThread() override
{
if(!force_close) //不是强退
while(work_proc->OnExecuteWork(work_thread_index)); //把工作全部做完
#ifdef _DEBUG
{
LOG_INFO(U8_TEXT("WorkThread Finish [")+thread_addr_string+U8_TEXT("]"));
}
#endif//_DEBUG
}
virtual bool Execute() override
{
if(!work_proc)
RETURN_FALSE;
work_proc->OnExecuteWork(work_thread_index);
return(true);
}
};//template<typename W> class WorkThread:public Thread
/**
* 工作组<br>
* 用于管理一组的工作线程以及投递器<br>
* 注:可以一组工作线程共用一个投递器,也可以每个工作线程配一个投递器。工作组管理只为方便统一清理
*/
template<typename WP,typename WT> class WorkGroup
{
ObjectList<WP> wp_list; ///<投递器列表
ObjectList<WT> wt_list; ///<工作线程列表
bool run=false;
public:
virtual ~WorkGroup()
{
Close();
}
virtual bool Add(WP *wp)
{
if(!wp)return(false);
wp_list.Add(wp);
return(true);
}
virtual bool Add(WP **wp,const int count)
{
if(!wp)return(false);
wp_list.Add(wp,count);
return(true);
}
virtual bool Add(WT *wt)
{
if(!wt)return(false);
int index=wt_list.Add(wt);
wt->SetWorkThreadIndex(index);
return(true);
}
virtual bool Add(WT **wt,const int count)
{
if(!wt)return(false);
int index=wt_list.Add(wt,count);
for(int i=0;i<count;i++)
{
(*wt)->SetWorkThreadIndex(index);
++index;
++wt;
}
return(true);
}
virtual bool Start()
{
int count=wt_list.GetCount();
if(count<=0)
RETURN_FALSE;
WT **wt=wt_list.GetData();
for(int i=0;i<count;i++)
wt[i]->Start();
run=true;
return(true);
}
virtual void Close(bool force_close=false)
{
if(!run)return;
int count=wt_list.GetCount();
WT **wt=wt_list.GetData();
for(int i=0;i<count;i++)
wt[i]->ExitWork(force_close);
run=false;
}
};//template<typename WP,typename WT> class WorkGroup
}//namespace workflow
}//namespace hgl
#endif//HGL_WORKFLOW_INCLUDE