实现协程的另一种思路——关于协程的思考
偶然看到protothread,用另一种方式实现了协程的效果。说是协程库,其实它并不是以任何库的形式存在的。而仅仅是头文件,在使用时仅仅需要包含头文件即可。基于大神实现了协程效果的思路,我也简单尝试了一下。
本文会首先介绍协程库代码和它的测试例程,然后分析其实现原理。如果之前并未接触过这一思想,相信你会和我一样惊呼,cool,天才是这样创造世界的!在此之后,会对进程、线程、协程谈谈自己的理解,并且聊聊各种实现协程的方式。
一、如何用头文件实现协程库?
在给出库代码之前,我们先来看看测试例程:
- 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
#include
#include
#include
#include
#include
#include
#include
#include
#include"xx_thread.h"
static
int
flag=1;
XX_THREAD_BEGIN(thread1)
//flag=0才运行
while
(1){
xx_wait_while(flag);
flag=1;
printf(
"thread1:1n"
);
}
XX_THREAD_END(thread1)
XX_THREAD_BEGIN(thread2)
while
(1){
printf(
"thread2:1n"
);
flag = 0;
sleep(1);
//为了防止输出太快,它当然会阻塞所有协程
xx_yield();
printf(
"thread2:2n"
);
flag = 1;
xx_yield();
}
XX_THREAD_END(thread2)
XX_THREAD_BEGIN(thread4)
static
int
listen_fd, work_fd;
char
buf[1024];
int
flags, addr_len, recv_len;
struct
sockaddr_in svr_addr;
if
((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
exit(1);
svr_addr.sin_family = AF_INET;
svr_addr.sin_port = htons(9999);
svr_addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr_len =
sizeof
(
struct
sockaddr_in);
flags = fcntl(listen_fd, F_GETFL, 0);
fcntl(listen_fd, F_SETFL, flags | O_NONBLOCK);
if
(bind(listen_fd, (
struct
sockaddr *)&svr_addr, addr_len)<0)
exit(1);
if
(listen(listen_fd, 4) < 0)
exit(1);
xx_accept(listen_fd, NULL, 0, work_fd);
if
(work_fd < 0)
exit(1);
while
(1){
xx_recv(work_fd, buf, 1024, recv_len);
if
(recv_len < 0)
exit(1);
else
if
(recv_len == 0)
break
;
buf[recv_len] =
' '
;
printf(
"我收到了%d字节:%sn"
, recv_len, buf);
}
XX_THREAD_END(thread4)
int
main()
{
xx_add_threads(thread1, thread2, thread4);
xx_schdule();
return
0;
}
例子中共有三个协程,thread2使用yield的方式放弃cpu,thread1在flag上做等待,也就是简单的协程间同步功能,thread4则是监听一个tcp端口,当一个连接到来时,打印传来的内容(测试时,可以使用nc localhost port的方式)。三个协程都是while(1)循环,所以如果是单进程单线程单协程方式下,是无法做到三个循环“宏观同时”运行的。协程库实现了这一点。
核心思想(重点,敲黑板):在单进程单线程下,要想做到用户态的协程切换,本质是需要对现场保存&恢复。往简单了想,协程要有入口函数,我把这个函数当成这个协程,那么问题就是怎么保存现场,怎么恢复现场。有经验的人当然想ucontext就是做这个事的,不过我们再往简单想,这个“现场”主要是什么,主要是函数运行到哪一条指令。如果我让一个函数中途能退出,下次进入时在退出的地方继续运行,是不是就是保存并恢复了简单现场呢。我在外层用一个while(1)循环作为“操作系统的进程调度器”,简单点使用轮询调度,那不是多个函数依次运行,又中断,又在中断的地方接着运行。就是这么简单,那么核心问题就转化为,如何让一个函数能中途退出,下次进入时在退出处接着执行。
答案是goto,每次中途退出前把当前位置做一个lable,并保存起来,为了保存这个lable以便下次进入时可以使用,局部static吧,下次进入判断这个变量直接跳到lable,不就是可以了。你简直是天才,不过有更天才也更有意思的做法:switch case
好了,我们按照这个思路试一下,它看起来应该是这样子:
1 2 3 4 5 6 7 8 9 10 11 | int function( void ) { static int i, state = 0; switch (state) { case 0: /* start of function */ for (i = 0; i < 10; i++) { state = 1; /* so we will come back to "case 1" */ return i; case 1:; /* resume control straight after the return */ } } } |
为了下次调用的时候从这次返回处开始,我们使用一个static变量保存lable,并在return前设置lable,在return后一行case lable,并把整个函数用switch包起来。你可能会有疑问,我们不可避免的把case落在不同的块中,这是合法的么?幸运的是,答案是合法。C语言中著名的"达夫设备"就是利用case语句在其匹配switch语句子块中也是合法的这一事实。
1 2 3 4 5 6 7 8 9 10 11 | switch (count % 8) { case 0: do { *to = *from++; case 7: *to = *from++; case 6: *to = *from++; case 5: *to = *from++; case 4: *to = *from++; case 3: *to = *from++; case 2: *to = *from++; case 1: *to = *from++; } while ((count -= 8) > 0); } |
我们现在需要做的只是构造一些精确宏,并且可以把细节隐藏到这些似是而非的定义里。你可能觉得作为库,无法预期使用者在函数中可能的yield次数,那么我们可以使用__LINE__宏解决,通过使用__LINE__行号作为lable,我们可以允许使用者使用任意多lable。
好了,是时候给出代码了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | #ifndef _XX_THREAD_H_ #define _XX_THREAD_H_ #define XX_THREAD_BEGIN(name) void name(){ static int _xx_s; switch (_xx_s){ case 0:{ #define XX_THREAD_END(name) xx_yield();}}} #define xx_add_threads(...) void (*_xx_thread_array[])()={__VA_ARGS__} #define xx_schdule() for ( int _xx_i=0; ; ++_xx_i){ if (_xx_i >= sizeof (_xx_thread_array)/ sizeof (_xx_thread_array[0])) _xx_i=0; _xx_thread_array[_xx_i](); } #define xx_yield() do {_xx_s=__LINE__; return ; case __LINE__:; } while (0) #define xx_wait_while(condition) do {_xx_s=__LINE__; case __LINE__: if (condition) return ; } while (0) #define xx_wait_until(condition) xx_wait_while(!(condition)) #define xx_accept(fd, addr, alen, new_fd) xx_wait_while((new_fd=accept(fd, addr, alen))<0 && errno==EAGAIN) #define xx_recv(fd, buf, buflen, recv_len) xx_wait_while((recv_len=recv(fd, buf, buflen, MSG_DONTWAIT))<0 && errno==EAGAIN) #endif // _XX_THREAD_H_ |
这就是让开头的测试代码正常运转的协程库的所有代码,是不是比你预期的简短呢?
这只是一个简单版本,它还有诸多限制,其中一些可以进行改进,另外一些则是先天不足
* 协程函数指针由数组改为指针等动态结构,可以做到协程的动态增删
* 将lable和函数指针struct在一起,可以增加一个函数作为两个协程实体的可能性,是的,只是增加可能性,因为还有局部变量
* 如果想要在协程yield前后保留局部变量,就必须使用static,我们也可以像lable一样将这些值返回并struct在协程指针那里,不过这样就会使这个库“变重”,违背了轻量级的原则。这一点要辩证的看
* 例子中的网络调用使用的是非阻塞io,这会导致整个进程即使在空闲时也会跑满cpu,应该可以使用epoll解决这一点。我之所以这样做只是为了尽量简单的做出demo
* 协程中当然不能调用阻塞函数,其中网络IO给出了例子,另外还有定时器常用到。 如果使用epoll,那么timer的问题也解决了(利用epoll 的timeout参数),而如果对库的改造不是用于网络io而没有使用epoll,也可以使用其它方式封装timer。啰嗦两句,做法就是:①由库(使用可以快速选出最早超时的数据结构)维护所有timer,②并选出最早超时的交给操作系统做定时。其中①可以使用红黑树、小顶堆、时间轮等数据结构;第②步可以使用传统unix定时器setitimer()、POSIX定时器timer_create()、linux特有的timerfd timerfd_create()、epoll类超时机制、甚至如果进程等待超时时不需要做其它事情的话直接用sleep()。关于linux环境下定时器的实现方式可以参考拙作http://km.oa.com/group/29048/articles/show/279519(这个广告好硬啊)
* 因为这一库使用switch case,所以恐怕使用者要被要求小心使用switch语法,最好彻底放弃使用switch,当然可以在库中使用goto来避免冲突
* 你应该已经注意到,这一协程库要求所有yield必须在顶层线程函数。这是一个很强的限制。至于如何解决嘛,你当然可以想到使用longjump,不过以它的复杂度来说,已经非常不值得了,不如直接用ucontext。至于这些实现协程的方式,我会在下文中讨论。
尽管有如此诸多限制,但这一技巧仍然足够有吸引力。另外它的宏定义中括号不匹配等问题是相当不符合规范的,因此请小心在正式场合下使用。事实上,原作protothread的针对场景是单片机等嵌入式环境,在那里甚至可能连一个操作系统都没有,程序只能串行执行,在这样的环境下能用如此简单的代码做出一个多线程的效果,想必是一个巨大的进步。
二、关于进程、线程、协程的思考
好了,关于protothread以及switch case的奇技淫巧的介绍到此结束。你可能在怀疑,这一小伎俩怎么能算“协程”?这一技巧说白了只是轮询执行几个函数而已,那么,怎么去理解协程呢,什么又是进程和线程。在这里我谈一谈我的理解,有不对的地方请批评指正。
我们为什么需要多进程/多线程/多协程呢?因为我们希望能在同一时间做多件事情,而cpu的处理速度又足够快,处理一件事绰绰有余,甚至需要cpu停下来等一些事。如《UNIX网络编程》中,前几章描述了使用系统API(不包括select/epoll)单进程单线程能做到的事情是多么有限。因为单进程单线程情况下,只能顺序的执行一条执行流(即使if else等跳转,也是顺序的执行同一个执行流),除了一个例外:信号,信号处理程序是独立于主执行流的,因此信号中要和多线程情况一样小心处理不可重入函数。那么如何实现多条执行流呢?
执行流是有状态的,要想切换它,就要保存它的“状态”,或者叫“上下文”,或者叫“现场”。上下文主要指的是栈和寄存器,而切换时机主要有被动抢占&主动放弃。内核级进程线程当然很容易实现这些,有主动放弃的API,如sleep(),也有被动抢占的时机,如定时tick,又如中断或系统调用造成的陷入内核,在从内核返回用户态时也会顺便检查是否可以抢占进程,因为进入内核时已经做了现场保存,这里即将恢复,如果这个进程不久将被强制切走,不如现在直接去恢复别的进程运行。更详细和权威的内容可以参考《深入理解Linux内核》。操作系统内核之所以能做到这些,依赖于一些条件:操作系统可以提供API、操作系统以下的硬件定时器中断让操作系统可以定时检查一些事。那么要在用户态实现协程,思路无非是主动放弃&被动抢占了。主动放弃可以通过提供yield API实现,而理论上使用内核提供的定时器API可以实现被动抢占。然而被动抢占的复杂度要远远高于主动放弃了,并且在使用上也更复杂。举个例子:STL的map是线程不安全的,单线程多协程下,如果协程只会主动放弃运行,则我们清楚的知道一个协程在操作map的时候不会被中断,它只有显式放弃cpu时才会中断,这时多个协程共用一个map是不用加锁的,而如果协程切换有主动抢占功能,这个主动抢占基本必然依赖于操作系统定时器,这个定时器随时可能到来,完全有可能在map操作一半的时候被中断,这种情况下多协程共享map就要像多线程一样加锁。我想这就是目前的协程实现均未引入抢占式的原因,失远远大于得。
上面一段说的有些混乱了,简单总结一下。裸机编程依赖于硬件,多核CPU可以执行多个执行流,硬件中断可以中断执行流。操作系统对裸机进行了抽象,利用硬件中断等手段做到应用层执行流切换与恢复,抽象出应用层可以并行多个执行流,并且有操作系统定时器可以中断执行流。在应用层,可以再做一层抽象,利用操作系统定时器等手段再做一层“子操作系统”。子操作系统的价值在于:陷入内核的进程/线程切换代价太大,并且多线程的线程不安全很不方便。因此“子操作系统”的定位应该是,切换快速轻量,并发安全。因为后者,所以协程库放弃了利用定时抢占协程。
三、协程的实现方式
最后聊聊协程的多种实现方式,我能想到的三个,在前文都有提到过
第一就是最普遍最强大的,利用汇编代码保存&恢复栈和寄存器,这种做法最靠谱,spp、libco、ucontext都采用这种做法。(ucontext我猜是这样,如果不对请指正)
第二种做法就是本文提到的,使用c语言的奇技淫巧,实现无栈(stack-less)的协程,典型实现是protothread。优点是轻量,缺点是诸多限制,前文都有提到
第三种做法是委托setjump&longjump来做到现场保存&恢复,感觉实用性不高。
参考资料
http://blog.csdn.net/qq910894904/article/details/41911175
http://www.oschina.net/translate/coroutines-in-c