当前位置:  编程技术>c/c++/嵌入式

c++版线程池和任务池示例

    来源: 互联网  发布时间:2014-10-24

    本文导语:  commondef.h 代码如下://单位秒,监测空闲列表时间间隔,在空闲队列中超过TASK_DESTROY_INTERVAL时间的任务将被自动销毁const int CHECK_IDLE_TASK_INTERVAL = 300;//单位秒,任务自动销毁时间间隔const int TASK_DESTROY_INTERVAL = 60; //监控线程池是...

commondef.h

代码如下:

//单位秒,监测空闲列表时间间隔,在空闲队列中超过TASK_DESTROY_INTERVAL时间的任务将被自动销毁
const int CHECK_IDLE_TASK_INTERVAL = 300;
//单位秒,任务自动销毁时间间隔
const int TASK_DESTROY_INTERVAL = 60;

//监控线程池是否为空时间间隔,微秒
const int IDLE_CHECK_POLL_EMPTY = 500;

//线程池线程空闲自动退出时间间隔 ,5分钟
const int  THREAD_WAIT_TIME_OUT = 300;

taskpool.cpp

代码如下:

#include "taskpool.h"

#include

#include
#include

    TaskPool::TaskPool(const int & poolMaxSize)
    : m_poolSize(poolMaxSize)
      , m_taskListSize(0)
      , m_bStop(false)
{
    pthread_mutex_init(&m_lock, NULL);
    pthread_mutex_init(&m_idleMutex, NULL);
    pthread_cond_init(&m_idleCond, NULL);

    pthread_attr_t attr;
    pthread_attr_init( &attr );
    pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE ); // 让线程独立运行
    pthread_create(&m_idleId, &attr, CheckIdleTask, this); //创建监测空闲任务进程
    pthread_attr_destroy(&attr);
}

TaskPool::~TaskPool()
{
    if(!m_bStop)
    {
        StopPool();
    }
    if(!m_taskList.empty())
    {
        std::list::iterator it = m_taskList.begin();
        for(; it != m_taskList.end(); ++it)
        {
            if(*it != NULL)
            {
                delete *it;
                *it = NULL;
            }
        }
        m_taskList.clear();
        m_taskListSize = 0;
    }
    if(!m_idleList.empty())
    {
        std::list::iterator it = m_idleList.begin();
        for(; it != m_idleList.end(); ++it)
        {
            if(*it != NULL)
            {
                delete *it;
                *it = NULL;
            }
        }
        m_idleList.clear();
    }


    pthread_mutex_destroy(&m_lock);
    pthread_mutex_destroy(&m_idleMutex);
    pthread_cond_destroy(&m_idleCond);
}

void * TaskPool::CheckIdleTask(void * arg)
{
    TaskPool * pool = (TaskPool*)arg;
    while(1)
    {
        pool->LockIdle();
        pool->RemoveIdleTask();
        if(pool->GetStop())
        {
            pool->UnlockIdle();
            break;
        }
        pool->CheckIdleWait();
        pool->UnlockIdle();
    }
}

void TaskPool::StopPool()
{
    m_bStop = true;
    LockIdle();
    pthread_cond_signal(&m_idleCond); //防止监控线程正在等待,而引起无法退出的问题
    UnlockIdle();
    pthread_join(m_idleId, NULL);
}

bool TaskPool::GetStop()
{
    return m_bStop;
}

void TaskPool::CheckIdleWait()
{
    struct timespec timeout;
    memset(&timeout, 0, sizeof(timeout));
    timeout.tv_sec = time(0) + CHECK_IDLE_TASK_INTERVAL;
    timeout.tv_nsec = 0;
    pthread_cond_timedwait(&m_idleCond, &m_idleMutex, &timeout);
}

int TaskPool::RemoveIdleTask()
{
    int iRet = 0;
    std::list::iterator it, next;
    std::list::reverse_iterator rit = m_idleList.rbegin();
    time_t curTime = time(0);
    for(; rit != m_idleList.rend(); )
    {
        it = --rit.base();
        if(difftime(curTime,((*it)->last_time)) >= TASK_DESTROY_INTERVAL)
        {
            iRet++;
            delete *it;
            *it = NULL;
            next = m_idleList.erase(it);
            rit = std::list::reverse_iterator(next);
        }
        else
        {
            break;   
        }
    }
}

int TaskPool::AddTask(task_fun fun, void *arg)
{
    int iRet = 0;
    if(0 != fun)
    {
        pthread_mutex_lock(&m_lock);
        if(m_taskListSize >= m_poolSize)
        {
            pthread_mutex_unlock(&m_lock);
            iRet = -1; //task pool is full;
        }
        else
        {
            pthread_mutex_unlock(&m_lock);
            Task * task = GetIdleTask();
            if(NULL == task)
            {
                task = new Task;
            }
            if(NULL == task)
            {
                iRet = -2; // new failed
            }
            else
            {
                task->fun = fun;
                task->data = arg;
                pthread_mutex_lock(&m_lock);
                m_taskList.push_back(task);
                ++m_taskListSize;
                pthread_mutex_unlock(&m_lock);
            }
        }
    }
    return iRet;
}

Task* TaskPool::GetTask()
{
    Task *task = NULL;
    pthread_mutex_lock(&m_lock);
    if(!m_taskList.empty())
    {
        task =  m_taskList.front();
        m_taskList.pop_front();
        --m_taskListSize;
    }
    pthread_mutex_unlock(&m_lock);
    return task;
}

void TaskPool::LockIdle()
{
    pthread_mutex_lock(&m_idleMutex);
}

void TaskPool::UnlockIdle()
{
    pthread_mutex_unlock(&m_idleMutex);
}

Task * TaskPool::GetIdleTask()
{
    LockIdle();
    Task * task = NULL;
    if(!m_idleList.empty())
    {
        task = m_idleList.front();
        m_idleList.pop_front();
    }
    UnlockIdle();
    return task;
}

void TaskPool::SaveIdleTask(Task*task)
{
    if(NULL != task)
    {
        task->fun = 0;
        task->data = NULL;
        task->last_time = time(0);
        LockIdle();
        m_idleList.push_front(task);
        UnlockIdle();
    }
}

taskpool.h

代码如下:

#ifndef TASKPOOL_H
#define TASKPOOL_H
/* purpose @ 任务池,主要是缓冲外部高并发任务数,有manager负责调度任务
 *          任务池可自动销毁长时间空闲的Task对象
 *          可通过CHECK_IDLE_TASK_INTERVAL设置检查idle空闲进程轮训等待时间
 *          TASK_DESTROY_INTERVAL 设置Task空闲时间,超过这个时间值将会被CheckIdleTask线程销毁
 * date    @ 2013.12.23
 * author  @ haibin.wang
 */

#include
#include
#include "commondef.h"

//所有的用户操作为一个task,
typedef void (*task_fun)(void *);
struct Task
{
    task_fun fun; //任务处理函数
    void* data; //任务处理数据
    time_t last_time; //加入空闲队列的时间,用于自动销毁
};

//任务池,所有任务会投递到任务池中,管理线程负责将任务投递给线程池
class TaskPool
{
public:
 /* pur @ 初始化任务池,启动任务池空闲队列自动销毁线程
     * para @ maxSize 最大任务数,大于0
    */
    TaskPool(const int & poolMaxSize);
    ~TaskPool();

    /* pur @ 添加任务到任务队列的尾部
     * para @ task, 具体任务
     * return @ 0 添加成功,负数 添加失败
    */   
    int AddTask(task_fun fun, void* arg);

    /* pur @ 从任务列表的头获取一个任务
     * return @  如果列表中有任务则返回一个Task指针,否则返回一个NULL
    */   
    Task* GetTask();

    /* pur @ 保存空闲任务到空闲队列中
     * para @ task 已被调用执行的任务
     * return @
    */
    void SaveIdleTask(Task*task);

    void StopPool();
public:
    void LockIdle();
    void UnlockIdle();
    void CheckIdleWait();
    int RemoveIdleTask();
    bool GetStop();
private:
    static void * CheckIdleTask(void *);
    /* pur @ 获取空闲的task
     * para @
     * para @
     * return @ NULL说明没有空闲的,否则从m_idleList中获取一个
    */
    Task* GetIdleTask();
    int GetTaskSize();
private:
    int m_poolSize; //任务池大小
    int m_taskListSize; // 统计taskList的大小,因为当List的大小会随着数量的增多而耗时增加
    bool m_bStop; //是否停止
    std::list m_taskList;//所有待处理任务列表
    std::list m_idleList;//所有空闲任务列表
    pthread_mutex_t m_lock; //对任务列表进行加锁,保证每次只能取一个任务
    pthread_mutex_t m_idleMutex; //空闲任务队列锁
    pthread_cond_t m_idleCond; //空闲队列等待条件
    pthread_t m_idleId;;
};
#endif

threadpool.cpp

代码如下:

/* purpose @ 线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留)
 * date    @ 2014.01.03
 * author  @ haibin.wang
 */

#include "threadpool.h"
#include
#include

/*
#include
#include
*/

Thread::Thread(bool detach, ThreadPool * pool)
    : m_pool(pool)
{
    pthread_attr_init(&m_attr);
    if(detach)
    {
        pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_DETACHED ); // 让线程独立运行
    }
    else
    {
         pthread_attr_setdetachstate(&m_attr, PTHREAD_CREATE_JOINABLE );
    }

    pthread_mutex_init(&m_mutex, NULL); //初始化互斥量
    pthread_cond_init(&m_cond, NULL); //初始化条件变量
    task.fun = 0;
    task.data = NULL;
}

Thread::~Thread()
{
    pthread_cond_destroy(&m_cond);
    pthread_mutex_destroy(&m_mutex);
    pthread_attr_destroy(&m_attr);
}

    ThreadPool::ThreadPool()
    : m_poolMax(0)
    , m_idleNum(0)
    , m_totalNum(0)
      , m_bStop(false)
{
    pthread_mutex_init(&m_mutex, NULL);
    pthread_mutex_init(&m_runMutex,NULL);
    pthread_mutex_init(&m_terminalMutex, NULL);
    pthread_cond_init(&m_terminalCond, NULL);
    pthread_cond_init(&m_emptyCond, NULL);
}

ThreadPool::~ThreadPool()
{
    /*if(!m_threads.empty())
    {
        std::list::iterator it = m_threads.begin();
        for(; it != m_threads.end(); ++it)
        {
            if(*it != NULL)
            {
                pthread_cond_destroy( &((*it)->m_cond) );
                pthread_mutex_destroy( &((*it)->m_mutex) );
                delete *it;
                *it = NULL;
            }
        }
        m_threads.clear();
    }*/
    pthread_mutex_destroy(&m_runMutex);
    pthread_mutex_destroy(&m_terminalMutex);
    pthread_mutex_destroy(&m_mutex);
    pthread_cond_destroy(&m_terminalCond);
    pthread_cond_destroy(&m_emptyCond);
}

int ThreadPool::InitPool(const int & poolMax, const int & poolPre)
{
    if(poolMax < poolPre
            || poolPre < 0
            || poolMax task.fun = fun;
    thread->task.data = arg;       
    pthread_cond_signal(&thread->m_cond); //触发线程WapperFun循环执行
    pthread_mutex_unlock( &thread->m_mutex );
}

int ThreadPool::Run(task_fun fun, void * arg)
{
    pthread_mutex_lock(&m_runMutex); //保证每次只能由一个线程执行
    int iRet = 0;
    if(m_totalNum m_threadId,&thread->m_attr, ThreadPool::WapperFun , thread); //通过WapperFun将线程加入到空闲队列中
        if(0 != iret)
        {
            delete thread;
            thread = NULL;
        }
    }
    return thread;
}

void * ThreadPool::WapperFun(void*arg)
{
    Thread * thread = (Thread*)arg;
    if(NULL == thread || NULL == thread->m_pool)
    {
        return NULL;
    }
    ThreadPool * pool = thread->m_pool;
    pool->IncreaseTotalNum();
    struct timespec abstime;
    memset(&abstime, 0, sizeof(abstime));
    while(1)
    {
        if(0 != thread->task.fun)
        {
            thread->task.fun(thread->task.data);
        }

        if( true == pool->GetStop() ) 
        {
            break; //确定当前任务执行完毕后再判定是否退出线程
        }
        pthread_mutex_lock( &thread->m_mutex );
        pool->SaveIdleThread(thread); //将线程加入到空闲队列中
        abstime.tv_sec = time(0) + THREAD_WAIT_TIME_OUT;
        abstime.tv_nsec = 0;
        if(ETIMEDOUT  == pthread_cond_timedwait( &thread->m_cond, &thread->m_mutex, &abstime )) //等待线程被唤醒 或超时自动退出
        {
            pthread_mutex_unlock( &thread->m_mutex );
            break;
        }
        pthread_mutex_unlock( &thread->m_mutex );
    }

    pool->LockMutex();
    pool->DecreaseTotalNum();
    if(thread != NULL)
    {
        pool->RemoveThread(thread);
        delete thread;
        thread = NULL;
    }
    pool->UnlockMutex();
    return 0;
}

void ThreadPool::SaveIdleThread(Thread * thread )
{
    if(thread)
    {
        thread->task.fun = 0;
        thread->task.data = NULL;
        LockMutex();
        if(m_threads.empty())
        {
            pthread_cond_broadcast(&m_emptyCond); //发送不空的信号,告诉run函数线程队列已经不空了
        }
        m_threads.push_front(thread);
        UnlockMutex();
    }
}

int ThreadPool::TotalThreads()
{
    return m_totalNum;
}


void ThreadPool::SendSignal()
{
    LockMutex();
    std::list::iterator it = m_threads.begin();
    for(; it!= m_threads.end(); ++it)
    {
        pthread_mutex_lock( &(*it)->m_mutex );
        pthread_cond_signal(&((*it)->m_cond));
        pthread_mutex_unlock( &(*it)->m_mutex );
    }
    UnlockMutex();
}

void * ThreadPool::TerminalCheck(void* arg)
{
    Thread * thread = (Thread*)arg;
    if(NULL == thread || NULL == thread->m_pool)
    {
        return NULL;
    }
    ThreadPool * pool = thread->m_pool;
    while((false == pool->GetStop()) || pool->TotalThreads() >0 )
    {
        pool->SendSignal();

        usleep(IDLE_CHECK_POLL_EMPTY);
    }
    //pool->TerminalCondSignal();
    return 0;
}

void ThreadPool::TerminalCondSignal()
{
    pthread_cond_signal(&m_terminalCond);
}

void ThreadPool::RemoveThread(Thread* thread)
{
    m_threads.remove(thread);
}

void ThreadPool::LockMutex()
{
    pthread_mutex_lock( &m_mutex);
}

void ThreadPool::UnlockMutex()
{
    pthread_mutex_unlock( &m_mutex );
}

void ThreadPool::IncreaseTotalNum()
{
    LockMutex();
    m_totalNum++;
    UnlockMutex();
}
void ThreadPool::DecreaseTotalNum()
{
    m_totalNum--;
}

threadpool.h

代码如下:

#ifndef THREADPOOL_H
#define THREADPOOL_H
/* purpose @ 线程池类,负责线程的创建与销毁,实现线程超时自动退出功能(半驻留)a
 *          当线程池退出时创建TerminalCheck线程,负责监测线程池所有线程退出
 * date    @ 2013.12.23
 * author  @ haibin.wang
 */

#include
#include
#include "taskpool.h"
//通过threadmanager来控制任务调度进程
//threadpool的TerminalCheck线程负责监测线程池所有线程退出


class ThreadPool;
class Thread
{
public:
    Thread(bool detach, ThreadPool * pool);
    ~Thread();
    pthread_t  m_threadId; //线程id
    pthread_mutex_t m_mutex; //互斥锁
    pthread_cond_t m_cond; //条件变量
    pthread_attr_t m_attr; //线程属性
 Task  task; //
    ThreadPool * m_pool; //所属线程池
};

//线程池,负责创建线程处理任务,处理完毕后会将线程加入到空闲队列中,从任务池中
class ThreadPool
{
public:
    ThreadPool();
    ~ThreadPool();

    /* pur @ 初始化线程池
     * para @ poolMax 线程池最大线程数
     * para @ poolPre 预创建线程数
     * return @ 0:成功
     *          -1: parameter error, must poolMax > poolPre >=0
     *          -2: 创建线程失败
    */
    int InitPool(const int & poolMax, const int & poolPre);

    /* pur @ 执行一个任务
     * para @ task 任务指针
     * return @ 0任务分配成功,负值 任务分配失败,-1,创建新线程失败
    */
    int Run(task_fun fun, void* arg);

 /* pur @ 设置是否停止线程池工作
     * para @ bStop true停止,false不停止
    */
 void StopPool(bool bStop);

public: //此公有函数主要用于静态函数调用
    /* pur @ 获取进程池的启停状态
     * return @
    */
    bool GetStop();   
 void SaveIdleThread(Thread * thread );
    void LockMutex();
    void UnlockMutex();
    void DecreaseTotalNum();
    void IncreaseTotalNum();
    void RemoveThread(Thread* thread);
    void TerminalCondSignal();
    int TotalThreads();
    void SendSignal();
private:
 /* pur @ 创建线程
     * return @ 非空 成功,NULL失败,
    */
 Thread * CreateThread();

    /* pur @ 从线程池中获取一个一个线程运行任务
     * para @ fun 函数指针
     * para @ arg 函数参数
     * return @
    */
    void GetThreadRun(task_fun fun, void* arg);

 static void * WapperFun(void*);
 static void * TerminalCheck(void*);//循环监测是否所有线程终止线程

private:
    int m_poolMax;//线程池最大线程数
    int m_idleNum; //空闲线程数
    int m_totalNum; //当前线程总数 小于最大线程数 
 bool m_bStop; //是否停止线程池
 pthread_mutex_t m_mutex; //线程列表锁
 pthread_mutex_t m_runMutex; //run函数锁

    pthread_mutex_t m_terminalMutex; //终止所有线程互斥量
    pthread_cond_t  m_terminalCond; //终止所有线程条件变量
    pthread_cond_t  m_emptyCond; //空闲线程不空条件变量

    std::list m_threads; // 线程列表
};
#endif

threadpoolmanager.cpp

代码如下:

#include "threadpoolmanager.h"
#include "threadpool.h"
#include "taskpool.h"

#include
#include

/*#include
#include
#include */
 //   struct timeval time_beg, time_end;
ThreadPoolManager::ThreadPoolManager()
    : m_threadPool(NULL)
    , m_taskPool(NULL)
    , m_bStop(false)
{
    pthread_mutex_init(&m_mutex_task,NULL);
    pthread_cond_init(&m_cond_task, NULL);

   /* memset(&time_beg, 0, sizeof(struct timeval));
    memset(&time_end, 0, sizeof(struct timeval));
    gettimeofday(&time_beg, NULL);*/
}

ThreadPoolManager::~ThreadPoolManager()
{
    StopAll();
    if(NULL != m_threadPool)
    {
        delete m_threadPool;
        m_threadPool = NULL;
    }
    if(NULL != m_taskPool)
    {
        delete m_taskPool;
        m_taskPool = NULL;
    }

    pthread_cond_destroy( &m_cond_task);
    pthread_mutex_destroy( &m_mutex_task );

    /*gettimeofday(&time_end, NULL);
    long total = (time_end.tv_sec - time_beg.tv_sec)*1000000 + (time_end.tv_usec - time_beg.tv_usec);
    printf("manager total time = %dn", total);
    gettimeofday(&time_beg, NULL);*/
}

int ThreadPoolManager::Init(
        const int &tastPoolSize,
        const int &threadPoolMax,
        const int &threadPoolPre)
{
    m_threadPool = new ThreadPool();
    if(NULL == m_threadPool)
    {
        return -1;
    }
    m_taskPool = new TaskPool(tastPoolSize);
    if(NULL == m_taskPool)
    {
        return -2;
    }

    if(0>m_threadPool->InitPool(threadPoolMax, threadPoolPre))
    {
        return -3;
    }
    //启动线程池
    //启动任务池
    //启动任务获取线程,从任务池中不断拿任务到线程池中
    pthread_attr_t attr;
    pthread_attr_init( &attr );
    pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE );
    pthread_create(&m_taskThreadId, &attr, TaskThread, this); //创建获取任务进程
    pthread_attr_destroy(&attr);
    return 0;
}

void ThreadPoolManager::StopAll()
{
    m_bStop = true;
    LockTask();
    pthread_cond_signal(&m_cond_task);
    UnlockTask();
    pthread_join(m_taskThreadId, NULL);
    //等待当前所有任务执行完毕
    m_taskPool->StopPool();
    m_threadPool->StopPool(true); // 停止线程池工作
}

void ThreadPoolManager::LockTask()
{
    pthread_mutex_lock(&m_mutex_task);
}

void ThreadPoolManager::UnlockTask()
{
    pthread_mutex_unlock(&m_mutex_task);
}

void* ThreadPoolManager::TaskThread(void* arg)
{
    ThreadPoolManager * manager = (ThreadPoolManager*)arg;
    while(1)
    {
        manager->LockTask(); //防止任务没有执行完毕发送了停止信号
        while(1) //将任务队列中的任务执行完再退出
        {
            Task * task = manager->GetTaskPool()->GetTask();
            if(NULL == task)
            {
                break;
            }
            else
            {
                manager->GetThreadPool()->Run(task->fun, task->data);
                manager->GetTaskPool()->SaveIdleTask(task);
            }
        }

        if(manager->GetStop())
        {
            manager->UnlockTask();
            break;
        }
        manager->TaskCondWait(); //等待有任务的时候执行
        manager->UnlockTask();
    }
    return 0;
}

ThreadPool * ThreadPoolManager::GetThreadPool()
{
    return m_threadPool;
}

TaskPool * ThreadPoolManager::GetTaskPool()
{
    return m_taskPool;
}

int  ThreadPoolManager::Run(task_fun fun,void* arg)
{
    if(0 == fun)
    {
        return 0;
    }
    if(!m_bStop)
    {  
        int iRet =  m_taskPool->AddTask(fun, arg);

        if(iRet == 0 && (0 == pthread_mutex_trylock(&m_mutex_task)) )
        {
            pthread_cond_signal(&m_cond_task);
            UnlockTask();
        }
        return iRet;
    }
    else
    {
        return -3;
    }
}

bool ThreadPoolManager::GetStop()
{
    return m_bStop;
}

void ThreadPoolManager::TaskCondWait()
{
    struct timespec to;
    memset(&to, 0, sizeof to);
    to.tv_sec = time(0) + 60;
    to.tv_nsec = 0;

    pthread_cond_timedwait( &m_cond_task, &m_mutex_task, &to); //60秒超时
}

threadpoolmanager.h

代码如下:

#ifndef THREADPOOLMANAGER_H
#define THREADPOOLMANAGER_H
/* purpose @
 *      基本流程:
 *          管理线程池和任务池,先将任务加入任务池,然后由TaskThread负责从任务池中将任务取出放入到线程池中
 *      基本功能:
 *          1、工作线程可以在业务不忙的时候自动退出部分长时间不使用的线程
 *          2、任务池可以在业务不忙的时候自动释放长时间不使用的资源(可通过commondef.h修改)
 *          3、当程序退时不再向任务池中添加任务,当任务池中所有任务执行完毕后才退出相关程序(做到程序的安全退出)
 *      线程资源:
 *          如果不预分配任何处理线程的话,ThreadPool只有当有任务的时候才实际创建需要的线程,最大线程创建数为用户指定
 *          当manager销毁的时候,manager会创建一个监控所有任务执行完毕的监控线程,只有当所有任务执行完毕后manager才销毁
 *          线程最大数为:1个TaskPool线程 + 1个manager任务调度线程 + ThreadPool最大线程数 + 1个manager退出监控线程 + 1线程池所有线程退出监控线程
 *          线程最小数为:1个TaskPool创建空闲任务资源销毁监控线程 + 1个manager创建任务调度线程
 *      使用方法:
 *          ThreadPoolManager manager;
 *          manager.Init(100000, 50, 5);//初始化一个任务池为10000,线程池最大线程数50,预创建5个线程的管理器
 *          manager.run(fun, data); //添加执行任务到manager中,fun为函数指针,data为fun需要传入的参数,data可以为NULL
 *
 * date    @ 2013.12.23
 * author  @ haibin.wang
 *
 *  详细参数控制可以修改commondef.h中的相关变量值
 */

#include
typedef void (*task_fun)(void *);

class ThreadPool;
class TaskPool;

class ThreadPoolManager
{
public:
    ThreadPoolManager();
    ~ThreadPoolManager();

    /* pur @ 初始化线程池与任务池,threadPoolMax > threadPoolPre > threadPoolMin >= 0
     * para @ tastPoolSize 任务池大小
     * para @ threadPoolMax 线程池最大线程数
     * para @ threadPoolPre 预创建线程数
     * return @ 0:初始化成功,负数 初始化失败
     *          -1:创建线程池失败
     *          -2:创建任务池失败
     *          -3:线程池初始化失败
    */
    int Init(const int &tastPoolSize,
            const int &threadPoolMax,
            const int &threadPoolPre);

    /* pur @ 执行一个任务
     * para @ fun 需要执行的函数指针
     * para @ arg fun需要的参数,默认为NULL
     * return @ 0 任务分配成功,负数 任务分配失败
     *          -1:任务池满
     *          -2:任务池new失败
     *          -3:manager已经发送停止信号,不再接收新任务
    */
    int Run(task_fun fun,void* arg=NULL);

public: //以下public函数主要用于静态函数调用
    bool GetStop();
    void TaskCondWait();
    TaskPool * GetTaskPool();
    ThreadPool * GetThreadPool();
    void LockTask();
    void UnlockTask();
    void LockFull();

private:
 static void * TaskThread(void*); //任务处理线程
 void StopAll();

private:
    ThreadPool *m_threadPool; //线程池
    TaskPool * m_taskPool; //任务池
    bool m_bStop; // 是否终止管理器

    pthread_t m_taskThreadId; // TaskThread线程id
 pthread_mutex_t m_mutex_task;
    pthread_cond_t m_cond_task;
};
#endif

main.cpp

代码如下:

#include
#include
#include "threadpoolmanager.h"
#include
#include
#include
#include


using namespace std;
int seq = 0;
int billNum =0;
int inter = 1;
pthread_mutex_t m_mutex;
void myFunc(void*arg)
{
    pthread_mutex_lock(&m_mutex);
    seq++;
    if(seq%inter == 0 )
    {
        cout


    
 
 

您可能感兴趣的文章:

  • Windows和Linux下C++类成员方法作为线程函数方法介绍
  • C++实现CreatThread函数主线程与工作线程交互的方法
  • c++的boost库多线程(Thread)编程(线程操作,互斥体mutex,条件变量)详解
  • C++多线程同步问题!
  • 请教Linux下多线程C++编程
  • linux下的c++如何判断线程是否结束?
  • C++ 解析器线程的传播库 libpondyparser
  • 项目需要,麻烦有做过Linux平台c++语言多线程开发的同学进来看下
  • C++ 创建线程问题
  • linux c++ 多线程 问题
  • C++线程库 TinyThread++
  • C++类中创建线程
  • C++中,为什么执行pthread_mutex_lock()时执行了两次线程才被阻塞?
  • linux下的c++,线程有类似于进程wait的函数么?
  • eclipse调试c++ 线程,主线程显示状态stepping????
  • linux 线程问题c++,这是那错了啊!大家帮帮忙!!!!
  • pthread线程库和C++一起使用的问题
  • c++程序移植到Linux下线程问题请教
  • 关键词:◆C++类 ◆线程 ◆共享数据 ◆◆◆请有经验的高手帮忙出出主意
  • 请教:关于C++类中方法线程调用的问题。
  • pthread_create错误,怎么在C++类中定义线程函数指针
  • java多线程编程之捕获子线程异常示例
  • 输出java进程的jstack信息示例分享 通过线程堆栈信息分析java线程
  • python创建线程示例
  • python线程锁(thread)学习示例
  • c#后台线程访问前台控件并显示信息示例
  • android使用handler ui线程和子线程通讯更新ui示例
  • c语言多线程编程使用示例
  • java通过共享变量结束run停止线程的方法示例
  • java线程并发semaphore类示例
  • C#实现线程池的简单示例
  •  
    本站(WWW.)旨在分享和传播互联网科技相关的资讯和技术,将尽最大努力为读者提供更好的信息聚合和浏览方式。
    本站(WWW.)站内文章除注明原创外,均为转载、整理或搜集自网络。欢迎任何形式的转载,转载请注明出处。












  • 相关文章推荐
  • linux c 多线程问题任务分配问题
  • 多任务,多进程,多线程的概念
  • socket通讯时如何为线程池分配Recv任务?
  • 在LINUX OS 上是否有类似于Netant 或FlashGet之类多线程多任务的下载软件及其源代码,若有哪里能下载
  • java多线程并发executorservice(任务调度)类
  • 在Android线程池里运行代码任务实例
  • 异步/多线程/任务/并行编程之一:如何选择合适的多线程模型?
  • Linux中有没有什么好的下载工具?像Flashget的可以断点续传多线程下载的,我试过用Download for X,但添加任务时,程序就没反应了!
  • Java中多线程相关类Thread介绍
  • 一个进程创建了两个线程,如何使得当任何一个线程(比如线程a)结束时,同时也结束线程b,也就是使两个线程一起死掉,怎么办呢?
  • c#多线程更新窗口(winform)GUI的数据
  • java 线程,对当前线程(非主线程)调用sleep,为什么主线程(窗口)也没反应了
  • Linux下GCC内置原子操作函数(多线程资源访问)介绍
  • 如何实现一个线程组内多线程的非同不执行,即一个线程执行完毕后再执行下一个线程???
  • 请问:进程创建的线程是怎样运行的啊,线程的处理函数运行完了,线程就退出了吗?
  • 关于线程的问题,什么样的线程不是active线程?
  • 请问Linux核心支持多线程吗?开发库有线程库吗?线程好用吗?(稳定?)
  • 请问,在一个进程中创建多线程时如何能避免不同的线程获得同一个线程标识
  • 我的一个多线程服务里, 总是有一个线程莫名其妙的变成僵尸线程。
  • 能否通过线程id控制线程的状态?或是观察到线程的状态?
  • 如何在一个线程中启动另外一个线程,然后本线程就退出?
  • 我要设置一个线程的优先级, 这个属性结构并没有线程的id,它怎么知道是设置哪个线程呢?
  • 请问在java多线程中,是只有run(){}内的代码运行在一个新线程下呢?还是这个类中的代码都运行在一个新线程下?
  • gcc链接的库,分不分单线程版本的和多线程版本的?
  • 内核栈~ 内核线程 ~用户线程 之间关系 问题
  • 子线程的数据如何返回给主线程?
  • 如果父线程死掉 那么子线程会不会死掉呢
  • 多线程为何比单线程慢许多?
  • 如何设计线程池的监视线程




  • 特别声明:169IT网站部分信息来自互联网,如果侵犯您的权利,请及时告知,本站将立即删除!

    ©2012-2021,,E-mail:www_#163.com(请将#改为@)

    浙ICP备11055608号-3