当前位置: 技术问答>linux和unix
线程之间如何实现缓冲机制
来源: 互联网 发布时间:2016-03-28
本文导语: 其实我发现进程间的消息队列挺好的,实现了消息缓冲机制! 现在我有个项目是在线程之间进行处理的,其中一个线程从网络接收数据,再转给另一个线程去处理 如果同时接收了很多数据时,那效率会变低,甚至会...
其实我发现进程间的消息队列挺好的,实现了消息缓冲机制!
现在我有个项目是在线程之间进行处理的,其中一个线程从网络接收数据,再转给另一个线程去处理
如果同时接收了很多数据时,那效率会变低,甚至会丢失数据
请问如何在线程之间实现消息缓冲的机制呢?
现在我有个项目是在线程之间进行处理的,其中一个线程从网络接收数据,再转给另一个线程去处理
如果同时接收了很多数据时,那效率会变低,甚至会丢失数据
请问如何在线程之间实现消息缓冲的机制呢?
|
//cthread.h
#ifndef __CTHREAD_H__
#define __CTHREAD_H__
#include
#include "base.h"
class CThread
{
public:
CThread();
virtual ~CThread();
virtual int Init(); //需重载
virtual void* Run(); //需重载
int Start();
int Stop();
int InsertMsg(void* msg);
int GetMsg(void*& msg);
int IsStop();
private:
public:
protected:
pthread_t m_iThreadID; //线程ID
int m_iState; //线程状态,为0退出,为1运行
int m_iStopFlag; //是否已经停止
pthread_mutex_t m_mutexMsg;
pthread_cond_t m_condMsg;
private:
list m_lstMsg;
};
#endif
//cthread.cpp
#include "cthread.h"
void* ThreadRun(void* arg)
{
// pthread_cleanup_push();
CThread* pBase = (CThread*)arg;
pBase->Run();
// pthread_cleanup_pop;
return NULL;
}
CThread::CThread()
{
m_iThreadID = (pthread_t)-1;
m_iState = 0;
m_iStopFlag = 0;
m_lstMsg.resize(0);
pthread_mutex_init(&m_mutexMsg, NULL);
pthread_cond_init(&m_condMsg, NULL);
}
CThread::~CThread()
{
pthread_mutex_destroy(&m_mutexMsg);
pthread_cond_destroy(&m_condMsg);
}
int CThread::Init()
{
return 0;
}
int CThread::Start()
{
int ret = -1;
m_iState = 1;
ret = pthread_create(&m_iThreadID, NULL, ThreadRun, this);
if (ret != 0)
{
m_iState = 0;
return -1;
}
else
{
return 0;
}
}
void* CThread::Run()
{
while (m_iState != 0)
{
usleep(1000);
}
m_iStopFlag = 1;
return NULL;
}
int CThread::Stop()
{
m_iState = 0;
return 0;
}
int CThread::InsertMsg(void* msg)
{
if (NULL == msg)
{
return -1;
}
pthread_mutex_lock(&m_mutexMsg);
m_lstMsg.push_back(msg);
pthread_cond_signal(&m_condMsg);
pthread_mutex_unlock(&m_mutexMsg);
return 0;
}
int CThread::GetMsg(void*& msg)
{
pthread_mutex_lock(&m_mutexMsg);
int ret = -1;
get:
if (m_lstMsg.size() > 0)
{
msg = m_lstMsg.front();
m_lstMsg.pop_front();
ret = m_lstMsg.size();
}
else
{
struct timeval now;
struct timespec timeout;
gettimeofday(&now, NULL);
timeout.tv_sec = now.tv_sec + 3;
timeout.tv_nsec = 0;
ret = pthread_cond_timedwait(&m_condMsg, &m_mutexMsg, &timeout);
if(ret != ETIMEDOUT)
{
goto get;
}
else
{
ret = -1;
}
}
pthread_mutex_unlock(&m_mutexMsg);
return ret;
}
int CThread::IsStop()
{
return m_iStopFlag;
}