OpenSSL的async异步框架/OpenSSL协程

OpenSSL异步框架是在OpenSSL-1.1.0上引入的一个新特性。它理论上可以应用于任何的异步操作,但当前主要是用于在引擎框架中执行的加密学操作。本文首先介绍其底层原理,接着介绍OpenSSL的异步基础设施,最后着重分析ASYNC_JOB(OpenSSL中的协程)的处理流程(dispatcher和job之间是如何切换的)。

引入异步框架主要是出于性能的考虑,对于普通的同步操作模式,API调用会一直阻塞直到请求完成。在API之下,要么执行一个忙循环等待响应,要么进行进程上下文切换。忙循环浪费CPU周期且无法并行运行多个操作,而进行上下文切换虽然允许并行,但是上下文切换的开销仍然是相当大的。

而异步模型通过充分利用这些间隙来提升性能。提交请求之后并不等待任务完成,而是直接返回,当任务完成时再通知用户代码获取响应。

下文中job和协程基本可以互换。

底层核心函数

OpenSSL的异步框架,底层是基于以下几个核心函数。getcontex/makecontext/setcontext用于生成用户态的协程,setjmp/longjmp用于在dispatcher/job之间进行跳转切换。

下面分别介绍下这几个核心函数:

ucontext_t

首先ucontext_t结构体顾名思义是用户态的上下文

1
2
3
4
5
6
7
8
9
typedef struct ucontext_t
{
unsigned long int __ctx(uc_flags);
struct ucontext_t *uc_link;
stack_t uc_stack;
mcontext_t uc_mcontext;
sigset_t uc_sigmask;
struct _libc_fpstate __fpregs_mem;
} ucontext_t;
  • uc_link:指向当前context执行结束之后下一个执行的用户上下文。
  • uc_stack:是该上下文的堆栈
  • uc_mcontext:是硬件相关的,保存具体的上下文,如各寄存器的值
  • uc_sigmask:执行上下文过程中,要屏蔽的信号集合

getcontext

1
int getcontext(ucontext_t *ucp);

用于将ucp所指ucontext初始化为当前活跃的上下文。

setcontext

1
int setcontext(const ucontext_t *ucp);

恢复ucp所指定的上下文,该上下文是通过getcontext、makecontext、或信号处理函数第三个参数中获取到的。

  • 如果上下文是从getcontext调用中获取到的,那么程序将从getcontext位置继续执行,就好像是getcontext调用刚刚返回一样。
  • 如果上下文是从makecontext调用中获取到的,程序执行参数中指定的函数,当函数返回时继续执行makecontext时ucp参数中的uc_link,如果为NULL则OS线程中止。成功的调用不返回。
  • 如果context是从信号处理函数中获取的,结果未定义。

makecontext

1
void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...);

修改ucp指定的上下文,在调用makecontext之前调用者必须给这个上下文分配一个新的栈ucp->uc_stack,并定义一个后继的上下文ucp->uc_link。然后在调用makecontext之后通过setcontext或swapcontext激活这个上下文,传递argc后面的一系列int参数。当函数返回时,后继上下文被激活,如果后继上下文为NULL则线程结束。

swapcontext

1
int swapcontext(ucontext_t *oucp, ucontext_t *ucp);

将当前上下文保存到oucp,然后激活ucp。类似于getcontext和setcontext的结合体。swapcontext成功时同样不返回,但是后续可以通过激活ocup返回,此时就好像swapcontext返回了0。

关于这几个函数的用法示例,可以参考makecontext(3)/swapcontext(3)

setjmp/longjmp

1
2
3
int setjmp(jmp_buf env);

void longjmp(jmp_buf env, int val);

setjmp将栈上下文保存到参数env中,首次调用返回0。后续使用longjmp跳转到之前setjmp的位置,就像是刚从setjmp函数返回一样,此时的返回值是longjmp参数的非零值val。

一些疑问

  1. 为什么OpenSSL选择使用setjmp/longjmp来进行协程切换,而不用swapcontext呢?

    因为swapcontext的开销比setjmp/longjmp大?swapcontext提供了更多的控制,不过我并不是太确定。

    通过查找资料,找到了如下commit提交记录,发现确实是出于如上考虑。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    commit 7070e5ca2fa41940d56599bf016a45cb1c0e03f0
    Author: Matt Caswell <matt at openssl.org>
    Date: Tue May 5 15:08:39 2015 +0100

    Use longjmp at setjmp where possible

    Where we can we should use longjmp and setjmp in preference to swapcontext/
    setcontext as they seem to be more performant.

    Reviewed-by: Rich Salz <rsalz at openssl.org>
  2. 你可能已经注意到了,OpenSSL中其实是使用了_setjmp/_longjmp,而不是setjmp/longjmp,两者有啥区别?

    区别是前者不操作信号掩码,而后者会。这会在上下文切换中带来速度的显著提升。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    commit 06b9ff06cc7fdd8f51abb92aaac39d3988a7090e
    Author: Matt Caswell <matt at openssl.org>
    Date: Fri Oct 9 15:55:01 2015 +0100

    Swap to using _longjmp/_setjmp instead of longjmp/setjmp

    _longjmp/_setjmp do not manipulate the signal mask whilst
    longjmp/setjmp may do. Online sources suggest this could result
    in a significant speed up in the context switching.

    Reviewed-by: Rich Salz <rsalz at openssl.org>

异步框架基础设施

OpenSSL通过ASYNC_JOB实现异步能力。一个job表示一个协程,它执行一段指定的代码,直到发生一些事件,通常是I/O操作或者是调用硬件加速卡等。此时协程可以被暂停,将控制权交还给用户代码。当后续事件指示可以恢复job时,用户代码再恢复该job。

async框架外部是用户代码,例如libssl,负责开始job、恢复job;async框架内部则封装了异步的API,通常是通过引擎来实现,负责暂停job以及恢复的通知机制。

libssl - async - libcrypto(engine) - nonBlockingAPI

外层指定一个job的入口函数并启动job,内层碰到异步API接口未完成就暂停该job,在暂停之前设置通知机制。这样当接口操作完成时,外层可以获取到通知,再一次继续之前暂停的job,最终job执行完毕,外层释放这个job。

数据结构

1
2
3
4
5
typedef struct async_fibre_st {
ucontext_t fibre;
jmp_buf env;
int env_init;
} async_fibre;

async_fibre表示上下文,包含了协程生成和切换时需要的信息。它是系统环境相关的,上面展示的是posix环境的版本。其中ucontext_t类型就是xxxcontext函数使用的上下文,jmp_buf类型则是setjmp保存的环境。

1
2
3
4
5
struct async_ctx_st {
async_fibre dispatcher;
ASYNC_JOB *currjob;
unsigned int blocked;
};

async_ctx_st是线程的异步上下文结构,每个线程有一个。其中的dispatcher表示主上下文,也可以认为是调度器,currjob则表示当前执行的job。

1
2
3
4
5
6
7
8
9
10
struct async_job_st {
async_fibre fibrectx;
int (*func) (void *);
void *funcargs;
int ret;
int status;
ASYNC_WAIT_CTX *waitctx;
};

typedef struct async_job_st ASYNC_JOB;

ASYNC_JOB表示一个job,其中fibrectx是上下文,func和funcargs是实际要执行的工作函数及其参数,ret用于保存工作函数的返回值,status记录job的当前状态。waitctx是通知机制需要使用的上下文。

主要接口

OpenSSL提供了如下原语供使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <openssl/async.h>

int ASYNC_init_thread(size_t max_size, size_t init_size);
void ASYNC_cleanup_thread(void);

int ASYNC_start_job(ASYNC_JOB **job, ASYNC_WAIT_CTX *ctx, int *ret,
int (*func)(void *), void *args, size_t size);
int ASYNC_pause_job(void);

ASYNC_JOB *ASYNC_get_current_job(void);
ASYNC_WAIT_CTX *ASYNC_get_wait_ctx(ASYNC_JOB *job);
void ASYNC_block_pause(void);
void ASYNC_unblock_pause(void);

int ASYNC_is_capable(void);

其中ASYNC_init_thread和ASYNC_cleanup_thread用于初始化和清理异步job。如果是多线程的程序,每个线程都需要进行相应操作。其内部使用了thread-local数据。

ASYNC_start_job和ASYNC_pause_job是核心的两个函数,其相当于Lua协程中的resume和yield。前者用于开始一个job,或者恢复一个job;后者用于暂停job。

ASYNC_get_current_job返回当前执行的job的指针。如果不在job上下文中则返回NULL。

关于接口的更多说明,见文末参考资料

初始化

通常不需要显式进行初始化,在第一次调用接口时候会自动进行。

其中会初始化thread-local变量,ctxkey和poolkey分别是async上下文和job池的键。

1
2
3
4
5
6
7
8
9
10
11
12
13
// crypto/async/async.c
int async_init(void)
{
if (!CRYPTO_THREAD_init_local(&ctxkey, NULL))
return 0;

if (!CRYPTO_THREAD_init_local(&poolkey, NULL)) {
CRYPTO_THREAD_cleanup_local(&ctxkey);
return 0;
}

return 1;
}

用户代码ASYNC_start_job

这里以libssl为例来看,SSL的很多接口如SSL_do_handshake/SSL_read/SSL_write等都支持了异步job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ssl/ssl_lib.c
int SSL_do_handshake(SSL *s)
// 第一次进来会调ssl_start_async_job,将job的处理函数和参数传进去了
if (SSL_in_init(s) || SSL_in_before(s)) {
if ((s->mode & SSL_MODE_ASYNC) && ASYNC_get_current_job() == NULL) {
struct ssl_async_args args;

args.s = s;

ret = ssl_start_async_job(s, &args, ssl_do_handshake_intern);
} else {
ret = s->handshake_func(s);
}
}

这里只是一个简单的包装,将具体的工作函数及其参数传入。其中ssl_async_args结构如下:

1
2
3
4
5
6
7
8
9
10
11
struct ssl_async_args {
SSL *s;
void *buf;
size_t num;
enum { READFUNC, WRITEFUNC, OTHERFUNC } type;
union {
int (*func_read) (SSL *, void *, size_t, size_t *);
int (*func_write) (SSL *, const void *, size_t, size_t *);
int (*func_other) (SSL *);
} f;
};

ssl_start_async_job是ASYNC_start_job的一层包装,首先创建waitctx,然后调用ASYNC_start_job并处理返回值,将结果反馈给上层。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// ssl/ssl_lib.c
static int ssl_start_async_job(SSL *s, struct ssl_async_args *args,
int (*func) (void *))
{
int ret;
if (s->waitctx == NULL) {
s->waitctx = ASYNC_WAIT_CTX_new();
if (s->waitctx == NULL)
return -1;
}
switch (ASYNC_start_job(&s->job, s->waitctx, &ret, func, args,
sizeof(struct ssl_async_args))) {
case ASYNC_ERR:
s->rwstate = SSL_NOTHING;
SSLerr(SSL_F_SSL_START_ASYNC_JOB, SSL_R_FAILED_TO_INIT_ASYNC);
return -1;
case ASYNC_PAUSE:
s->rwstate = SSL_ASYNC_PAUSED;
return -1;
case ASYNC_NO_JOBS:
s->rwstate = SSL_ASYNC_NO_JOBS;
return -1;
case ASYNC_FINISH:
s->job = NULL;
return ret;
default:
s->rwstate = SSL_NOTHING;
SSLerr(SSL_F_SSL_START_ASYNC_JOB, ERR_R_INTERNAL_ERROR);
/* Shouldn't happen */
return -1;
}
}

其中返回值为ASYNC_PAUSED表示job暂停,对应SSL_get_error就返回SSL_ERROR_WANT_ASYNCASYNC_FINISH则表示job结束了。

内部代码ASYNC_pause_job

暂停job是在协程内部调用,基本的使用方式如下:

1
2
3
4
5
6
7
do {
sts = nonBlockingOperation();
if (sts == STATUS_RETRY) {
ASYNC_pause_job();
}
}
while (sts == STATUS_RETRY)

在调用非阻塞接口之后,如果是类似retry等结果表示需要等待,那么就调用ASYNC_pause_job暂停job,控制权归还给主控dispatcher,此时ASYNC_start_job返回ASYNC_PAUSE。

当用户代码决定恢复job时,会再次调用ssl_start_async_job继而调用ASYNC_start_job,控制权又回到了协程中。

此时ASYNC_pause_job返回,会再次调用之前的接口,如果还是需要等待则重复之前的操作,如果已经结束那么继续执行后面的代码。

最终job执行结束之后,ASYNC_start_job会返回ASYNC_FINISH。

通知机制

那么下次怎么恢复刚才暂停的那个job呢?这个就涉及其通知机制了。ASYNC_WAIT_CTX支持两种通知机制。

wait fd机制

一种是通过一个wait fd进行通知,主要接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <openssl/async.h>

ASYNC_WAIT_CTX *ASYNC_WAIT_CTX_new(void);
void ASYNC_WAIT_CTX_free(ASYNC_WAIT_CTX *ctx);
int ASYNC_WAIT_CTX_set_wait_fd(ASYNC_WAIT_CTX *ctx, const void *key,
OSSL_ASYNC_FD fd,
void *custom_data,
void (*cleanup)(ASYNC_WAIT_CTX *, const void *,
OSSL_ASYNC_FD, void *));
int ASYNC_WAIT_CTX_get_fd(ASYNC_WAIT_CTX *ctx, const void *key,
OSSL_ASYNC_FD *fd, void **custom_data);
int ASYNC_WAIT_CTX_get_all_fds(ASYNC_WAIT_CTX *ctx, OSSL_ASYNC_FD *fd,
size_t *numfds);
int ASYNC_WAIT_CTX_get_changed_fds(ASYNC_WAIT_CTX *ctx, OSSL_ASYNC_FD *addfd,
size_t *numaddfds, OSSL_ASYNC_FD *delfd,
size_t *numdelfds);
int ASYNC_WAIT_CTX_clear_fd(ASYNC_WAIT_CTX *ctx, const void *key);

内部代码中创建一个fd,然后调用ASYNC_WAIT_CTX_set_wait_fd()将fd加入到ASYNC_WAIT_CTX的fds中,该函数一般在ASYNC_pause_job()前调用。外部用户代码通过对应的get函数获取到这个fd,并将其加入epoll中。当这个fd上有读事件的时候就再次恢复之前的暂停的job。

内部的异步操作接口并不总是支持epoll fd的,如果支持那就最好不过,直接ASYNC_WAIT_CTX_set_wait_fd()设置这个fd就可以。如果内部接口不支持,那么就只能进行轮询了,也就是接口外部不断地检查操作是否已经完成了。Intel的QAT Engine的旧版本就是采用的这种做法,它在ASYNC_pause_job()暂停job之前,会先往这个fd中写入数据,于是外部用户代码将该fd加入到epoll之后,读事件已经是处于激活状态。在下一次事件循环时就会恢复暂停的job,ASYNC_pause_job()中恢复job之后从fd中读取数据。下面是QAT Engine中的代码片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
do {
// 省略...
sts = cpaCyRsaDecrypt(qat_instance_handles[inst_num], qat_rsaCallbackFn, &op_done,
dec_op_data, output_buf);
if (sts == CPA_STATUS_RETRY) {
if ((qat_wake_job(op_done.job, ASYNC_STATUS_EAGAIN) == 0) ||
(qat_pause_job(op_done.job, ASYNC_STATUS_EAGAIN) == 0)) {
WARN("qat_wake_job or qat_pause_job failed\n");
break;
}
}
}
while (sts == CPA_STATUS_RETRY);

其中qat_wake_job中会往fd中写入数据,qat_pause_job中会调用ASYNC_pause_job,在ASYNC_pause_job返回之后读取fd中的数据。

callback机制

另一种是通过callback(OpenSSL 3.0上新增),也需要异步接口的支持。当引擎完成异步操作之后调用回调函数去通知用户代码,ASYNC_WAIT_CTX_set_callback()用于设置callback及其参数。回调函数需要是小的且非阻塞的。

callback相关的接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <openssl/async.h>

#define ASYNC_STATUS_UNSUPPORTED 0
#define ASYNC_STATUS_ERR 1
#define ASYNC_STATUS_OK 2
#define ASYNC_STATUS_EAGAIN 3
typedef int (*ASYNC_callback_fn)(void *arg);
int ASYNC_WAIT_CTX_set_callback(ASYNC_WAIT_CTX *ctx,
ASYNC_callback_fn callback,
void *callback_arg);
int ASYNC_WAIT_CTX_get_callback(ASYNC_WAIT_CTX *ctx,
ASYNC_callback_fn *callback,
void **callback_arg);
int ASYNC_WAIT_CTX_set_status(ASYNC_WAIT_CTX *ctx, int status);
int ASYNC_WAIT_CTX_get_status(ASYNC_WAIT_CTX *ctx);

关于接口的更多说明,见文末参考资料

ASYNC_job处理流程

总体的处理流程如下图所示:

async_job_processing_flow

job状态管理ASYNC_start_job

如前所述,ASYNC_start_job开始一个新的job或者恢复之前的job,是job管理的核心函数。如果是开始一个新job就创建并执行新job,否则根据job的状态决定下一步的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// crypto/async/async.c
int ASYNC_start_job(ASYNC_JOB **job, ASYNC_WAIT_CTX *wctx, int *ret,
int (*func)(void *), void *args, size_t size)
{
async_ctx *ctx;

if (!OPENSSL_init_crypto(OPENSSL_INIT_ASYNC, NULL))
return ASYNC_ERR;

ctx = async_get_ctx();
if (ctx == NULL)
ctx = async_ctx_new();
if (ctx == NULL)
return ASYNC_ERR;

if (*job)
ctx->currjob = *job;

for (;;) {
if (ctx->currjob != NULL) {
if (ctx->currjob->status == ASYNC_JOB_STOPPING) {
*ret = ctx->currjob->ret;
ctx->currjob->waitctx = NULL;
async_release_job(ctx->currjob);
ctx->currjob = NULL;
*job = NULL;
return ASYNC_FINISH;
}

if (ctx->currjob->status == ASYNC_JOB_PAUSING) {
*job = ctx->currjob;
ctx->currjob->status = ASYNC_JOB_PAUSED;
ctx->currjob = NULL;
return ASYNC_PAUSE;
}

if (ctx->currjob->status == ASYNC_JOB_PAUSED) {
ctx->currjob = *job;
/* Resume previous job */
if (!async_fibre_swapcontext(&ctx->dispatcher,
&ctx->currjob->fibrectx, 1)) {
ASYNCerr(ASYNC_F_ASYNC_START_JOB,
ASYNC_R_FAILED_TO_SWAP_CONTEXT);
goto err;
}
continue;
}

/* Should not happen */
ASYNCerr(ASYNC_F_ASYNC_START_JOB, ERR_R_INTERNAL_ERROR);
async_release_job(ctx->currjob);
ctx->currjob = NULL;
*job = NULL;
return ASYNC_ERR;
}

/* Start a new job */
if ((ctx->currjob = async_get_pool_job()) == NULL)
return ASYNC_NO_JOBS;

if (args != NULL) {
ctx->currjob->funcargs = OPENSSL_malloc(size);
if (ctx->currjob->funcargs == NULL) {
ASYNCerr(ASYNC_F_ASYNC_START_JOB, ERR_R_MALLOC_FAILURE);
async_release_job(ctx->currjob);
ctx->currjob = NULL;
return ASYNC_ERR;
}
memcpy(ctx->currjob->funcargs, args, size);
} else {
ctx->currjob->funcargs = NULL;
}

ctx->currjob->func = func;
ctx->currjob->waitctx = wctx;
if (!async_fibre_swapcontext(&ctx->dispatcher,
&ctx->currjob->fibrectx, 1)) {
ASYNCerr(ASYNC_F_ASYNC_START_JOB, ASYNC_R_FAILED_TO_SWAP_CONTEXT);
goto err;
}
}

err:
async_release_job(ctx->currjob);
ctx->currjob = NULL;
*job = NULL;
return ASYNC_ERR;
}

创建job

如果是开始一个新的job,从pool获取一个,如果池中没有则创建。job刚创建时的状态为ASYNC_JOB_RUNNING

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// crypto/async/async.c
static ASYNC_JOB *async_get_pool_job(void) {
ASYNC_JOB *job;
async_pool *pool;

pool = (async_pool *)CRYPTO_THREAD_get_local(&poolkey);
if (pool == NULL) {
/*
* Pool has not been initialised, so init with the defaults, i.e.
* no max size and no pre-created jobs
*/
if (ASYNC_init_thread(0, 0) == 0)
return NULL;
pool = (async_pool *)CRYPTO_THREAD_get_local(&poolkey);
}

job = sk_ASYNC_JOB_pop(pool->jobs);
if (job == NULL) {
/* Pool is empty */
if ((pool->max_size != 0) && (pool->curr_size >= pool->max_size))
return NULL;

job = async_job_new();
if (job != NULL) {
if (! async_fibre_makecontext(&job->fibrectx)) {
async_job_free(job);
return NULL;
}
pool->curr_size++;
}
}
return job;
}

创建上下文通过async_fibre_makecontext完成,这里的入口函数是async_start_func,它对实际的工作函数进行了一层包装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// crypto/async/arch/async_posix.c
int async_fibre_makecontext(async_fibre *fibre)
{
fibre->env_init = 0;
if (getcontext(&fibre->fibre) == 0) {
fibre->fibre.uc_stack.ss_sp = OPENSSL_malloc(STACKSIZE);
if (fibre->fibre.uc_stack.ss_sp != NULL) {
fibre->fibre.uc_stack.ss_size = STACKSIZE;
fibre->fibre.uc_link = NULL;
makecontext(&fibre->fibre, async_start_func, 0);
return 1;
}
} else {
fibre->fibre.uc_stack.ss_sp = NULL;
}
return 0;
}

包装入口函数

async_start_func中执行job的工作函数,返回之后再切换回dispatcher,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// crypto/async/async.c
void async_start_func(void)
{
ASYNC_JOB *job;
async_ctx *ctx = async_get_ctx();

while (1) {
/* Run the job */
job = ctx->currjob;
job->ret = job->func(job->funcargs);

/* Stop the job */
job->status = ASYNC_JOB_STOPPING;
if (!async_fibre_swapcontext(&job->fibrectx,
&ctx->dispatcher, 1)) {
/*
* Should not happen. Getting here will close the thread...can't do
* much about it
*/
ASYNCerr(ASYNC_F_ASYNC_START_FUNC, ASYNC_R_FAILED_TO_SWAP_CONTEXT);
}
}
}

协程切换核心函数

async_fibre_swapcontext是切换的核心函数,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// crypto/async/arch/async_posix.h
static ossl_inline int async_fibre_swapcontext(async_fibre *o, async_fibre *n, int r)
{
o->env_init = 1;

if (!r || !_setjmp(o->env)) {
if (n->env_init)
_longjmp(n->env, 1);
else
setcontext(&n->fibre);
}

return 1;
}

其中env_init用于表示对应的协程上下文是否已经开始运行,如果还没有则调用setcontext开始,否则直接调用_longjmp进行跳转,这里主要是为了性能考虑。

接下来的讲解将围绕这个切换函数和ASYNC_start_job。

开始job: dispatcher->job

前面提到了,刚创建的job是ASYNC_JOB_RUNNING状态,第一次调用上下文切换函数async_fibre_swapcontext,从dispatcher切到job。

首先将dispatcher的env_init置1,setjmp保存环境到dispatcher的env中,由于此时n->env_init为0,所以调用setcontext开始执行async_start_func,在其中执行job的工作函数。

暂停job: job->dispatcher

如果在job处理过程中发生阻塞,会调用ASYNC_pause_job切换回dispatcher,此时job状态变成ASYCNC_JOB_PAUSING

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int ASYNC_pause_job(void)
{
ASYNC_JOB *job;
async_ctx *ctx = async_get_ctx();

if (ctx == NULL
|| ctx->currjob == NULL
|| ctx->blocked) {
/*
* Could be we've deliberately not been started within a job so this is
* counted as success.
*/
return 1;
}

job = ctx->currjob;
job->status = ASYNC_JOB_PAUSING;

if (!async_fibre_swapcontext(&job->fibrectx,
&ctx->dispatcher, 1)) {
ASYNCerr(ASYNC_F_ASYNC_PAUSE_JOB, ASYNC_R_FAILED_TO_SWAP_CONTEXT);
return 0;
}
/* Reset counts of added and deleted fds */
async_wait_ctx_reset_counts(job->waitctx);

return 1;
}

ASYNC_pause_job第二次调用上下文切换函数,这次从job切换回dispatcher。

将job的env_init置1,setjmp保存job的环境(恢复时用到),因为此时n->env_init为1,所以longjmp回第一次调用async_fibre_swapcontext时的setjmp位置返回。

于是就回到了ASYNC_start_job函数中,dispatcher在ASYNC_start_job中继续下一次for循环,因为此时job状态为ASYCNC_JOB_PAUSING,所以执行如下操作:

1
2
3
4
5
6
if (ctx->currjob->status == ASYNC_JOB_PAUSING) {
*job = ctx->currjob;
ctx->currjob->status = ASYNC_JOB_PAUSED;
ctx->currjob = NULL;
return ASYNC_PAUSE;
}

将SSL中的job指针设为当前job,更改状态为ASYCNC_JOB_PAUSED,将ctx中currjob置空。这几个设置操作是为了下次可以正常恢复job,设置完之后返回ASYNC_PAUSE。上层的ssl_start_async_job则将ssl的rwstate置为SSL_ASYNC_PAUSED,再外层的SSL接口如SSL_do_handshake返回-1。

然后外层检测到SSL_ASYNC_PAUSED,可以通过调用SSL_get_changed_async_fds,它封装了ASYNC_WAIT_CTX_get_changed_fds,获取fd加入到epoll中(asynch_mode_nginx中的ngx_ssl_async_process_fds)。

恢复job: dispatcher->job

当fd的读事件激活时,就会执行对应的事件处理函数再次恢复job。以asynch_mode_nginx为例,可能是如下的handler函数,ngx_ssl_handshake_async_handlerngx_ssl_read_async_handlerngx_ssl_write_async_handlerngx_ssl_shutdown_async_handler。在handler中再次进入到ssl_start_async_job,并调用ASYNC_start_job

此时job的状态为SSL_ASYNC_PAUSED,于是走到下面这里再次进入async_fibre_swapcontext

1
2
3
4
5
6
7
8
9
10
11
if (ctx->currjob->status == ASYNC_JOB_PAUSED) {
ctx->currjob = *job;
/* Resume previous job */
if (!async_fibre_swapcontext(&ctx->dispatcher,
&ctx->currjob->fibrectx, 1)) {
ASYNCerr(ASYNC_F_ASYNC_START_JOB,
ASYNC_R_FAILED_TO_SWAP_CONTEXT);
goto err;
}
continue;
}

第三次进入async_fibre_swapcontext,从dispatcher切换到job。

同样将dispatcher的env_init置1,setjmp保存环境到dispatcher的env中,此时job的env_init已经是1了,所以这次是通过longjmp回到第二次切换setjmp的地方,继续执行ASYNC_pause_job后续的代码。

结束job: job->dispatcher

假设这次job正常执行结束了,即async_start_func中的job->func返回,此时job的状态变成ASYNC_JOB_STOPPING,然后再次从job切换回dispatcher。

第四次进入async_fibre_swapcontext,跟第二次进入时一样,将job的env_init置1,setjmp保存job的环境,因为此时n->env_init为1,所以longjmp回第三次调用async_fibre_swapcontext时的setjmp位置返回。dispatcher在ASYNC_start_job中继续下一次for循环,因为此时job状态为STOPING,所以执行如下清理操作,将返回值传递出去了:

1
2
3
4
5
6
7
8
if (ctx->currjob->status == ASYNC_JOB_STOPPING) {
*ret = ctx->currjob->ret;
ctx->currjob->waitctx = NULL;
async_release_job(ctx->currjob);
ctx->currjob = NULL;
*job = NULL;
return ASYNC_FINISH;
}

回到外层ssl_start_async_job返回ret,这样这个job就结束了。不过ASYNC_WAIT_CTX并没有清理掉,ASYNC_WAIT_CTX_free在SSL_free里面调用的,它里面会清理fd。

以上便是job的完整生命周期。

参考资料

-------------本文结束感谢您的阅读-------------

欢迎关注我的其它发布渠道