写在前面
OpenResty(后面简称:OR)是一个基于Nginx和Lua的高性能Web平台,它内部集成大量的Lua API以及第三方模块,可以利用它快速搭建支持高并发、极具动态性和扩展性的Web应用、Web服务或动态网关。
OR最大的特点就是,将Lua协程与Nginx事件驱动模型及非阻塞I/O结合起来。使用户可以在handler中使用 同步但是依然是非阻塞 的方式编写其应用代码,而无需关心底层的协程调度以及与Nginx事件驱动模型的交互。
本文将先从总体上介绍OR的协程调度机制,然后结合源码以及Lua栈的情况来详细了解各个部分是如何实现的,包括其异常保护、协程初始化、协程的恢复和执行、协程的挂起、协程的执行结束、协程出错的情况。
本文主要关注调度函数内部的逻辑,如果想了解外部的调用流程。可以参看Openresty Lua钩子调用完整流程
注:lua-nginx
模块与stream-lua-nginx
模块的主体部分类似,后者实现相对简单一点。下面的讨论将基于stream-lua
模块。
为了防止歧义,文中用到的一些术语明确一下:
主线程
:表示外层调用run_thread()
的OS线程入口线程
:每个handler被调用时会创建一个入口线程,用于执行lua代码用户线程
:用户在Lua代码中通过ngx.thread.spawn()
创建的线程用户协程
:用户在Lua代码中通过coroutine.create()
创建的协程协程
:泛指所有协程,包括入口线程、用户线程和用户协程vm
:表示Lua虚拟机L
:视出现的上下文,一般表示父协程,在创建入口线程的时候表示Lua VMco
:一般表示新创建的协程L栈: |协程表|新协程|顶|
:表示Lua栈结构,最右边是栈顶
关键数据结构
在深入了解协程调度机制之前,我们先来认识一下主要的数据结构:
- 协程上下文:
ngx_stream_lua_co_ctx_t
- 协程内部栈(
coctx->co
) - 协程状态(
coctx->co_status
) - 维护协程之间关系的数据(父协程
coctx->parent_co_ctx
、僵尸子线程coctx->zombie_child_threads
) - 用户相关数据(
coctx->data
) - 在Lua的registry表中对应该线程指针的引用值(
co_ref
) - 一些状态标记(是否是用户线程
is_uthread
、是否因创建新线程thread_spawn_yielded
被yield)
- 协程内部栈(
- 模块上下文:
ngx_stream_lua_ctx_t
ctx->cur_co_ctx
(当前调度协程上下文)ctx->co_op
(协程是以何种方式YIELD)
- 核心调度函数:
ngx_stream_lua_run_thread()
协程调度
首先你可能很好奇OR为什么要在C引擎层面自己实现协程的调度?或者说这么做的好处是什么?我觉得最主要的原因还是减轻开发者的负担。
原生Lua coroutine接口
我们知道Lua是个非常轻巧的语言,它不像Go有自己的调度器。Lua原生的对协程的操作无非就是coroutine.resume()
和coroutine.yield()
。这两者是成对出现的,协程coroutine.yield()
之后肯定回到父协程coroutine.resume()
的地方,恢复子协程需要显式再次coroutine.resume()
。如果要在Lua代码层面实现非阻塞I/O,那么父协程必须处理子协程I/O等待的情况,并在事件发生时恢复子协程的执行。如果需要同时进行多个任务,那么父协程就需要负责多个协程间的调度。因为协程的拓扑可能是一个复杂的树状结构,所以协程的调度管理将变得异常复杂。
OpenResty实现
OR在C引擎层帮我们把这些事情都做了,你无须再关心所有这些,只需专心写你的业务逻辑。为了支持同步非阻塞的方式编写应用代码,OR重写了coroutine
的接口函数,从而接管了协程的调度,并在coroutine
基础上封装抽象出了thread
的概念。无论是coroutine
还是thread
,I/O等待对于用户都是透明的,用户无需关心。两者的主要区别是,coroutine
父子之间的协作度更高,coroutine.yield()
和coroutine.resume()
成对出现。在子协程执行完成(出错)或者显式coroutine.yield()
之前,父协程一直处于等待状态。而thread
则由调度器进行调度,子thread
一旦开始执行就不再受父协程控制了,在需要并发请求时很有用。thread
提供了spawn()
、wait()
等接口,spawn()
执行参数中指定的函数,直到执行完毕、出错或者I/O等待时返回。wait()
则使父协程可以同步等待子线程执行完毕、获取结果。
OR在对协程调度上,最核心的改动是其创建新协程时的行为(coroutine.resume()
, ngx.thread.spawn()
)。它不会直接调用lua_resume()
,而是先lua_yield()
回到主线程,然后由主线程再根据情况lua_resume()
下一个协程。Lua代码域内从来不会直接调用lua_resume()
,理解了这一点你就理解了OpenResty协程调度的精髓。
所以OR中协程拓扑是一个单层的结构,它只有一个入口点。这样使得协程调度更加灵活,I/O事件的触发时回调函数也更容易实现。
OR调度器根据lua_resume()
的返回值,确定协程是挂起了、结束了还是出错了。因为OR改动了创建新协程时行为,同时又抽象了thread概念,所以如果是协程挂起的情况,还需要知道是什么原因挂起,以便做相应的不同处理。是继续调度?还是返回上层?我们前面提到的ctx->co_op
便是做这个用途。
协程调度逻辑
协程的调度在核心调度函数ngx_stream_lua_run_thread()
中进行,它是创建或恢复协程的唯一入口点。最初是由配置的Lua钩子调用(图中ssl_cert_handler()
),如果碰到了I/O等待的情况,后续则由对应的事件handler(图中的sleep_handler()
和read_handler()
)再次拉起。run_thread()
里面实现了一个调度循环,循环里面先从ctx->cur_co_ctx
获取下一个待resume的协程上下文,然后lua_resume()
执行或恢复该协程,其返回值LUA_YIELD
表示协程挂起,0
表示协程执行结束,其余的表示协程出错了。其中协程挂起又分为四种不同的情况:即等待I/O、新建thread、coroutine.resume()
和coroutine.yield()
。根据不同的情况,决定是跳到循环前面继续恢复下一个协程,还是返回上层函数。
下图是协程调度主要逻辑的示意图,可以看到在Lua代码域中无论是新建、挂起或恢复协程,都是先调用lua_yield()
回到主线程。I/O操作例如ngx.tcp.receive()
如果碰到了I/O等待,会在内部注册epoll事件(对于sleep的情况是定时器),然后自动lua_yield()
,当事件触发时继续未完成的I/O操作,完成之后再调用run_thread()
恢复之前被挂起的协程。
异常保护
作为一个调度器,OpenResty扮演者类似操作系统内核的角色,不过它的调度对象是Lua协程。作为一个“内核”,无论其调度对象出了什么问题,都不应该使这个系统崩溃,而是应该将错误信息打印出来。
Openresty内部就做了一个这样的异常保护,其原理就是用setjmp
、longjmp
包住了run_thread()
里面的整个协程调度逻辑。
1 | /* 首先注册虚拟机的panic回调 */ |
ngx_stream_lua_atpanic()
的实现也非常简单,只是简单地打印崩溃日志,然后调用NGX_LUA_EXCEPTION_THROW(1);
恢复nginx的执行。
1 | int |
这几个宏定义分别如下:
1 |
协程初始化
钩子的入口线程
ngx_stream_lua_new_thread()
用于创建入口线程
OR中需要在Registry表中存储每个创建出来的Lua线程的reference,这个存储协程的表在Registry表中对应的key是全局变量ngx_stream_lua_coroutines_key
的地址,因此下面这段代码就是从Registry表中查询这个储存协程的表,返回到栈顶:
1 | /* 返回栈顶元素的索引,等于栈中元素的个数 */ |
接下来创建一个新的协程,同时初始化其全局表:
1 | /* 创建Lua协程,返回的新lua_State跟原有的lua_State共享所有的全局对象(如表), |
这一块的逻辑有点绕,我们来稍微理一下,其实就是用新建的全局表替换了旧的全局表,其中新的全局表的_G
字段是它自己,新全局表的元表中__index
元方法是旧的全局表。
此时的Lua虚拟机栈顶情况如下图所示:
1 | L->top | 栈顶 | |
下面一步就是在Lua虚拟机中为这个新协程创建一个reference:
1 | /* 为栈顶对象(即新协程),创建并返回一个协程表中的引用 */ |
最后恢复堆栈
1 | /* 设置栈顶索引 */ |
以上步骤还只是创建了一个什么都不能做的Lua协程,回到_by_chunk()
函数之后还需要把入口函数放入协程中。
1 | /* 将lua虚拟机VM栈上的入口函数闭包移到新创建的协程栈上, |
至此,协程入口函数以及环境表已经设置好。接下来就是让它能够运行起来,让调度器能够调度它运行:
1 | /* 将nginx请求保存到协程全局表 */ |
接下来就是注册cleanup钩子,然后ngx_stream_lua_run_thread()
。
用户创建的uthread
用户线程由ngx.thread.spawn()
创建,对应的C实现是ngx_stream_lua_uthread_spawn()
。首先它会调ngx_stream_lua_coroutine_create_helper()
创建一个新的协程。
创建协程
注意协程都是在worker的虚拟机上创建的(不考虑cache off的情况的话)。但是用户协程会继承父协程的全局表,其父子关系由OR进行维护。
1 | /* 获取虚拟机 */ |
此时父协程的栈如下:
1 | /* 当前栈: |entry_func|args|顶| */ |
接下来将父协程的全局表给新创建的协程:
1 | /* make new coroutine share globals of the parent coroutine. |
create_helper
函数返回之后,L的栈顶是新协程,co的栈顶是入口函数。
初始化uthread
ngx_stream_lua_coroutine_create_helper
返回之后,进行uthread的初始化。
此时,父协程L是这样的:
- 栈顶是新创建的协程
- 然后是参数和入口函数
在此之前,先在registry表中保存一个该协程的ref。(到现在还没搞明白这个ref是干嘛用的?除了创建线程和删除线程,貌似只有检查线程是否活着的时候会查一下这个ref,只是检查状态用coctx->co_status
不是也能做到么?8.12更新,之所以要把线程锚定到注册表上,是为了防止被当成垃圾回收。这也解释了为什么只有线程需要锚定到注册表上,而用户协程不需要。因为用户协程肯定由其父协程保留着一个引用。)
1 | /* anchor the newly created coroutine into the Lua registry */ |
接下来是初始化运行环境:
此时的,L的栈情况如下:
1 | |entry_func|参数1|...|参数n|新协程| |
1 | if (n > 1) { |
设置状态,将父协程放入post_thread队列中,设置协程的父子关系,设置新协程为下一个调度的线程
1 | /* 设置状态 */ |
最后,spawn函数的返回值是新创建的协程
1 | /* 将原协程的执行权切换出去,这里的参数1表示栈上留了一个值,这里是指新创建的协程 |
用户创建的coroutine
OR替换了原生的coroutine接口,当存在getfenv(0).__ngx_req
时(全局环境保存了nginx请求),使用重写后的coroutine接口函数。
coroutine.create()
创建新协程部分跟uthread是一样的,都是调用ngx_stream_lua_coroutine_create_helper()
。Lua函数返回新协程。此时新协程栈中是入口函数。
coroutine.resume()
用于开始或恢复新协程,其对应的C函数是ngx_http_lua_coroutine_resume()
。
1 | /* 首先,获取到协程 */ |
接下来,将控制权交还给主协程,并把参数传给主线程。
1 | /* 此时L栈: |co|参数|, co栈: |入口函数| */ |
协程执行和恢复
OR中协程的执行和恢复总是由主线程来进行,不管是coroutine.resume()
还是ngx.thread.spawn()
,都是先lua_yield()
回到主线程之后,在主线程中lua_resume()
。
注意到前面创建阶段,thread是lua_yield(L, 1)
,coroutine是lua_yield(L, lua_gettop(L) - 1)
。yield到主线程之后,我们继续看调度程序的处理。
uthread
先获取参数个数
1 | /* 因为入口函数和参数已经在新线程栈中了,所以从新协程中获取参数个数,-1是除掉入口函数 */ |
然后跳到主循环的前面,执行新线程
1 | /* 保存新协程coctx */ |
在lua_resume
中就会开始新线程的执行。当新线程执行完毕或因I/O中断yield之后,会恢复父协程。在恢复父协程之前,先设置参数个数为1,即之前留在栈上的新协程co。恢复父协程之后,ngx.thread.spawn()
函数就返回了。
1 | if (ctx->cur_co_ctx->thread_spawn_yielded) { |
coroutine
同样是先获取参数个数
1 | /* 获取父协程 */ |
此时子协程栈中是参数和入口函数。
然后跳到主循环的前面,执行新协程,跟前面uthread时一样。
协程挂起
协程的挂起分为两种情况:
- 一种是内部在I/O等待时自动挂起,这种情况用户不用参与,OR会自动将相应的事件及其handler挂到事件驱动上,当事件被唤醒时继续未完成的I/O操作,完成之后由调度器恢复之前挂起的协程。
- 另一种是用户在Lua代码主动调用
coroutine.yield()
挂起。此时由调度器根据情况决定执行下一个执行的协程。
显式主动挂起
我们先来看用户主动挂起的情况,coroutine.yield()
对应的C函数为ngx_stream_lua_coroutine_yield()
。我们先来看看它里面干了些什么。
1 | /* 首先修改当前协程的状态为挂起 */ |
回到主线程之后,根据待挂起协程是thread还是corotine进行不同处理。
thread
1 | if (ngx_stream_lua_is_thread(ctx)) { |
coroutine
1 | /* 获取当前栈的高度,也即coroutine.yield()的参数个数 */ |
I/O等待场景
I/O等待的场景有很多,不过其背后的原理都差不多:
- 定义一个事件,设置恢复时的handler及对应协程上下文,然后
lua_yield()
回到run_thread()
。 - 主线程将
ctx->cur_co_ctx
设为空之后,直接返回NGX_AGAIN
,如果有posted_thread
会继续执行,否则将控制权交还给nginx层 - 后续当事件发生时,继续未完成的操作,完成之后将保存的协程上下文设为
ctx->cur_co_ctx
,然后调用ngx_stream_lua_run_thread()
恢复协程的执行。
这里举两个典型的例子:
ngx.sleep()
它的C函数实现是ngx_stream_lua_ngx_sleep()
,先定义设置好handler和coctx,挂上定时器,然后lua_yield()
1 | ngx_stream_lua_cleanup_pending_operation(coctx); |
在run_thread()
里将当前协程上下文置为NULL
,然后返回NGX_AGAIN
在by_chunk()
里会先检查有没有在post队列里的线程,如果没有则返回
1 | rc = ngx_stream_lua_run_thread(L, r, ctx, 0); |
当定时器超时时,它会执行sleep_handler()
,设置ctx->cur_co_ctx
然后执行run_thread()
恢复协程调度。
ngx.tcp.receive()
其对应的C函数实现是ngx_stream_lua_socket_tcp_receive()
,里面会调ngx_stream_lua_socket_tcp_receive_helper()
。碰到读等待的情况,也是先设置好handler和coctx,然后lua_yield()
。我们来看下里面代码:
1 | /* 这里0表示还未进行协程切换 */ |
回到run_thread()
,同样是将当前协程上下文置为NULL
,然后返回NGX_AGAIN
。
当事件被触发时,执行前面设置的ngx_stream_lua_socket_read_handler()
,里面又会调用读取操作核心函数ngx_stream_lua_socket_tcp_read()
。如果继续碰到等待I/O,handler直接结束,等待下一次事件。如果是完成或出错,会执行如下操作:
1 | /* 恢复该值为0 */ |
r->write_event_handler(r);
是返回Lua层前调用的handler,里面会调用resume_handler
。ngx_stream_lua_socket_tcp_read_resume()
只是封装了一下,最终都是调用的ngx_stream_lua_socket_tcp_resume_helper()
,我们看来下它的代码:
1 | /* 待恢复协程上下文 */ |
至于完成的条件,取决与不同的调用方式。如果是读取固定字节数的话,会维护一个剩余待读取的字节数u->rest
。如果是读取一行,则读取到\n
就结束。如果是readall,则一直读到u->eof
为止。
协程执行完毕
为了不失完整性,再说一下正常结束和出错时的情况。正常执行完毕时,会设置协程状态,然后清理它的僵尸子线程:
1 | /* 将当前协程状态置为DEAD */ |
接下来,根据结束的协程的类型不同执行不同的操作:
入口线程
此时直接删除线程即可,然后根据是否还有用户线程,选择返回NGX_AGAIN
或NGX_OK
1 | if (ngx_stream_lua_is_entry_thread(ctx)) { |
用户线程
此时如果父协程已经死了,处理方式跟入口线程一样,即删除线程,然后根据是否还有任何用户线程或入口线程,选择返回NGX_AGAIN
或NGX_OK
。
如果父协程还活着,并且已经在wait它了,直接恢复父协程。否则,加入到父协程的僵尸线程列表中。
1 | if (ctx->cur_co_ctx->is_uthread) { |
用户协程
剩下的就是用户协程的情况,这个情况跟用户线程被父协程wait的情况是一样的。主要是将返回值移动到父协程栈中,然后跳到主循环前面恢复父协程的执行。
1 | success = 1; |
出错的情况
大致处理步骤是,恢复cur_co_ctx
,获取虚拟机L栈上错误信息,获取当前协程栈中错误信息,后面的操作类似协程执行完毕时,根据不同的情况选择恢复父协程或者返回上层。
1 | /* 恢复cur_co_ctx */ |
用户线程
跟正常结束的处理一样,除了第一个返回值是false。
此时如果父协程已经死了,直接删除线程,然后根据是否还有任何用户线程或入口线程,选择返回NGX_AGAIN
或NGX_OK
。
如果父协程还活着,并且已经在wait它了,直接恢复父协程。否则,加入到父协程的僵尸线程列表中。
入口线程
ngx_stream_lua_request_cleanup()
清理当前请求,里面会清理掉所有的用户创建的协程,然后清理入口协程自己。最后返回错误码。
用户协程
如果是wrap的协程,将错误传递给父协程(就好像是父协程出错了,然后父协程重新走一遍上面的出错处理流程)。
如果是普通协程,则恢复父协程的执行,返回false和错误信息。