当前位置: 技术问答>linux和unix
关于线程的使用,程序架构的问题
来源: 互联网 发布时间:2016-11-20
本文导语: 功能需求:socket方式收发消息,然后解析接受消息操作对应的设备IO,(属于工业控制类的,不需要UI界面的程序)。 要求就是一个线程以select接收socket套接字发来的消息,另一个线程最大限度的循环查询设备状态,...
功能需求:socket方式收发消息,然后解析接受消息操作对应的设备IO,(属于工业控制类的,不需要UI界面的程序)。
要求就是一个线程以select接收socket套接字发来的消息,另一个线程最大限度的循环查询设备状态,
目前的方案,
(1)线程A接收socket套接字的消息(使用select),收到完整的消息后,把消息放入队列,然后pthread_cond_signal()发送信号;
另外一个线程B是消息处理线程,使用pthread_cond_signal()函数阻塞,收到线程A的信号后,开始处理socket发来的命令,根据发送的命令设置全局变量的值;
主线程C一直循环read设备状态,然后根据线程B设置的变量值,write到设备;
这个方案的缺点是,需要两个互斥体,一个用于线程A,B之间的队列操作,一个用户线程B,主线程之间的全局变量操作, 并且主线程里面的轮询是while循环的,请问这样的while循环能不能让线程A/B有机会执行呢?(主线程的while中没有sleep,为了保证最大限度的循环查询到设备状态)
因为以前看过的源码大都是基于节目的(类似MFC的处理方式),这种工业控制类的没读过别人的源码,也无从参考别人的程序架构,
(1)请有相关项目经验的师兄点评一下这样设计程序的可行性,主要是线程功能和同步设计的部分,硬件部分不需要关心,以及是否有更好的设计方案;
(2)关于基于linux的工业控制,有什么好的源码网站可以提供参考?主要想参考一下别人的思路;
要求就是一个线程以select接收socket套接字发来的消息,另一个线程最大限度的循环查询设备状态,
目前的方案,
(1)线程A接收socket套接字的消息(使用select),收到完整的消息后,把消息放入队列,然后pthread_cond_signal()发送信号;
另外一个线程B是消息处理线程,使用pthread_cond_signal()函数阻塞,收到线程A的信号后,开始处理socket发来的命令,根据发送的命令设置全局变量的值;
主线程C一直循环read设备状态,然后根据线程B设置的变量值,write到设备;
这个方案的缺点是,需要两个互斥体,一个用于线程A,B之间的队列操作,一个用户线程B,主线程之间的全局变量操作, 并且主线程里面的轮询是while循环的,请问这样的while循环能不能让线程A/B有机会执行呢?(主线程的while中没有sleep,为了保证最大限度的循环查询到设备状态)
因为以前看过的源码大都是基于节目的(类似MFC的处理方式),这种工业控制类的没读过别人的源码,也无从参考别人的程序架构,
(1)请有相关项目经验的师兄点评一下这样设计程序的可行性,主要是线程功能和同步设计的部分,硬件部分不需要关心,以及是否有更好的设计方案;
(2)关于基于linux的工业控制,有什么好的源码网站可以提供参考?主要想参考一下别人的思路;
|
队列的存在是出于缓冲的考虑吗?
如果交易量不大,是不是可以考虑让A接管B的工作..
如果交易量不大,是不是可以考虑让A接管B的工作..
|
试验一下吧
或者让一组线程负责socket通信,另外一组线程负责读写IO设备,可否?
或者让一组线程负责socket通信,另外一组线程负责读写IO设备,可否?
|
我不知道说的对不对,仅供参考
实现已经不容易了,“优化”,记得有位老师说过“这是个永恒的话题”
1楼的方案已经在楼主的方案上优化了,不用缓冲队列、减少了互斥量、减少了线程...
不知道具体应用场景,提些建议
1 考察网络通信数据量与设备IO数据量
2 考察网络通信速度与设备IO速度
3 考察网络通信对实时性的要求与设备IO对实时性的要求
4 考察楼主应用所在节点在整个系统中的作用(存储并转发、仅负责传递消息...)
5 考察操作系统线程优先级、任务调度等系统功能
6 只能想到这么多了
根据考察结果,减少临界资源的使用,更有效的分配任务,提高网络消息处理速度,及时与设备交互
再完善点的,考虑并发、多用户、大数据量、多连接数、失效数据等极端情况
实现已经不容易了,“优化”,记得有位老师说过“这是个永恒的话题”
1楼的方案已经在楼主的方案上优化了,不用缓冲队列、减少了互斥量、减少了线程...
不知道具体应用场景,提些建议
1 考察网络通信数据量与设备IO数据量
2 考察网络通信速度与设备IO速度
3 考察网络通信对实时性的要求与设备IO对实时性的要求
4 考察楼主应用所在节点在整个系统中的作用(存储并转发、仅负责传递消息...)
5 考察操作系统线程优先级、任务调度等系统功能
6 只能想到这么多了
根据考察结果,减少临界资源的使用,更有效的分配任务,提高网络消息处理速度,及时与设备交互
再完善点的,考虑并发、多用户、大数据量、多连接数、失效数据等极端情况
|
就楼主给出的这些信息,1楼的方案已经很优了,并且楼主理解错了,是A线程接管B线程的工作,不是C线程接管B线程的工作。也就是说A已经把全局变量设置好了,C线程看到了就直接执行了。A要有缓冲。
两个线程最好以时间片平等调度方式,这样连给A线程匀点时间运行的事都由操作系统任务调度本身来做了。
A线程初始开始select并接受报文,弄到一个完整命令就写全局变量以便控制B线程写;一下子来n个完整命令,或者队列不空,则放到缓冲队列尾,拿出队列头命令写全局变量,同时置select超时为0,开始下一个select循环。
注意设置select为0的地方,还应该记录一下当前时间;拿队列头命令写全局变量时,判断一下B线程是否已经写操作完毕了。写操作用时应该是一个经验值或者给定值。如果写操作还没完,就先别拿队列头的命令。其他的流程,还是该怎样就怎样。希望我讲清楚了……
这样虾米互斥都不需要。B线程还能一个劲儿的跑。但是如果写操作用时是不确定的,嗷卖糕,那AB线程之间的同步互斥就少不了了
两个线程最好以时间片平等调度方式,这样连给A线程匀点时间运行的事都由操作系统任务调度本身来做了。
A线程初始开始select并接受报文,弄到一个完整命令就写全局变量以便控制B线程写;一下子来n个完整命令,或者队列不空,则放到缓冲队列尾,拿出队列头命令写全局变量,同时置select超时为0,开始下一个select循环。
注意设置select为0的地方,还应该记录一下当前时间;拿队列头命令写全局变量时,判断一下B线程是否已经写操作完毕了。写操作用时应该是一个经验值或者给定值。如果写操作还没完,就先别拿队列头的命令。其他的流程,还是该怎样就怎样。希望我讲清楚了……
这样虾米互斥都不需要。B线程还能一个劲儿的跑。但是如果写操作用时是不确定的,嗷卖糕,那AB线程之间的同步互斥就少不了了
|
小心不要掉在TCP Socket
send(人多)send(病少)send(财富)
recv(人多病)recv(少财富)
陷阱里面!
send(人多)send(病少)send(财富)
recv(人多病)recv(少财富)
陷阱里面!
|
线程A B问题不大
使用条件变量无可避免要用到互斥锁,这个锁仅仅是需要更新条件变量以及操作队列的时候锁定,线程从队列中取出数据后锁就释放掉。这个应该没有好的办法继续优化。除非不要使用条件变量。
线程C中你描述:主线程C一直循环read设备状态
这个设备可以通过事件通知机制么?死循环必然要消耗CPU。
还有线程B和线程C操作的全局变量时什么类型的?
使用条件变量无可避免要用到互斥锁,这个锁仅仅是需要更新条件变量以及操作队列的时候锁定,线程从队列中取出数据后锁就释放掉。这个应该没有好的办法继续优化。除非不要使用条件变量。
线程C中你描述:主线程C一直循环read设备状态
这个设备可以通过事件通知机制么?死循环必然要消耗CPU。
还有线程B和线程C操作的全局变量时什么类型的?
|
为啥不用多进程呢,
A进程读socket 写--> 消息队列 --> B进程读消息队列 写命令-->共享内存 -->C进程读设备状态处理命令;用不到什么锁;
没有必要while循环设备状态,采用usleep也能有比较好的效果,而且会显著降低CPU的消耗;
个人意见 供参考~
A进程读socket 写--> 消息队列 --> B进程读消息队列 写命令-->共享内存 -->C进程读设备状态处理命令;用不到什么锁;
没有必要while循环设备状态,采用usleep也能有比较好的效果,而且会显著降低CPU的消耗;
个人意见 供参考~
|
我现在就在做着这个事情,用使用的方法就是与53楼的思路差不多,我用的的是两个进程,父进程管发送,并且还产生几个线程用来作发送用,然后子进程就管接收,也并发出几个线程来接收与处理数据,可是还是有个问题,最后一个数据不能接收到,总会在最后,不知道是为什么?
下面是一些代码,大家交流下~~~~
下面是一些代码,大家交流下~~~~
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define int8 char
#define uint8 unsigned char
#define uint32 unsigned int
#define ulong32 unsigned long
#define long32 long
#define int32 int
#define long64 long long
#define NTPSVR "192.168.2.8" //MY server
#define LOCAL "192.168.2.109"
#define LOCALPORT 8000
#define NTPPORT 123
typedef struct NTPPACKET
{
uint8 li_vn_mode;
uint8 stratum;
uint8 poll;
uint8 precision; //有符号整数表示本地时钟精确度
ulong32 root_delay; //到达服务器的一次往返的总延时,是15到16位有符号的定点小数
ulong32 root_dispersion; // 到达服务器的一次标准误差,是15-16位的无符号的定点小数
int8 ref_id[4];
ulong32 reftimestamphigh; //本地时钟最后被设定或校正的时间T4
ulong32 reftimestamplow;
ulong32 oritimestamphigh; //向服务器请求分离客户机的时间戳,采用64位时标格式T1
ulong32 oritimestamplow;
ulong32 recvtimestamphigh; //向服务器请求到客户机的时间戳,采用64位时标格式T2
ulong32 recvtimestamplow;
ulong32 trantimestamphigh; //向客户机答复分离服务器的时间戳,采用64位时标格式T3,用T3来校正本地时间
ulong32 trantimestamplow;
}NTPPacket;
NTPPacket ntppack,newpack; fd_set inset1;
int32 sockfd;
struct timeval tv,tv1;
struct timezone tz;
struct sockaddr_in addr,local_addr;
int counter=0;
pthread_t tidA,tidB;
int capacity=2 ; //定义线程的数量,int tid_capacity=0; //第几个线程
pthread_mutex_t ntppack_mutex = PTHREAD_MUTEX_INITIALIZER;//init pthread
pthread_mutex_t newpack_mutex = PTHREAD_MUTEX_INITIALIZER;//init pthread
void *recv_pkt(void *);
void *recv_pkt(void *vptr)
{
int rec_pkt=0;
int num=0;
while(recv(sockfd,&newpack,sizeof(newpack),0) >0 )
{
//printf("In revc okn");
pthread_mutex_trylock(&newpack_mutex);
rec_pkt++;
printf("n Packets have been receive %d n",rec_pkt);
pthread_mutex_unlock(&newpack_mutex);
//printf("recv end!n");
if(rec_pkt==500)
{
printf ("total have been receive %d packetsn",rec_pkt);
exit(1);
}
}
return NULL;
}
void *send_pkt(void *);
void *send_pkt (void *vptr)
{
//every pthread need send 20 packets
int num=0; // packets which send by once int maxpkt =250; //一次发包量
int val;
for(num;num