Nginx的共享内存管理--slab算法

初次了解到slab算法,是在学习Linux内核时。内核中采用了伙伴系统(Buddy System)算法对内存页面进行管理。但是对于小对象,还用页面分配器就显得有些浪费了,于是slab就应运而生了,内核的kmalloc()就是使用slab进行管理的。nginx中的共享内存管理使用了相同的思想,当然它没有Linux内核中的slab那么复杂。

slab原理

它的基本工作原理如下:

  1. 首先对共享内存进行初始化,最前面一部分是用于管理的元数据,剩余部分按页进行划分。每个页在前面元数据中都有一个对应的页头,用于管理该页
  2. 元数据中有一个空闲页块链表,管理着所有空闲的页块(初始是一个完整的连续页块)
  3. 分配对象时将长度按2的次幂进行对齐,nginx中称其为不同的slot
  4. 一个页面被分配给某个slot,就只用于这种长度对象的分配,例如一个4KB页面,可以切分成64个64B的对象,或者32个128B的对象。
  5. 每种slot的对象都有一个未满页链表,管理着该slot所有未满且未空的页。
  6. slot的对象是通过bitmap进行管理的,一个对象被分配了就将对应位置1,释放了置0,由此判断页面中哪些内存块是空闲的
  7. 如果某个页满了,那么就从未满页链表中移除;如果一个页从满页变为未满(即满页释放了一个对象),则重新挂到未满页链表中
  8. 如果分配某slot对象时,发现未满页链表空了,那么就从空闲页块链表中分配一个空闲页,分配对象之后插入该slot的未满页链表中;如果释放对象之后,这个页空了,则还给空闲页块链表
  9. 分配和释放页面时,空闲页块链表有切分和合并操作
  10. 大对象(超过半页)直接从空闲页块链表中分配,不走slot

nginx-slab-mechanism

值得一提的是,slot根据对象的尺寸大小分成3类,对于大尺寸对象因为一个页可以分成的内存块数较少,直接用页面头中的slab字段作为bitmap就够了。而对于小尺寸对象,因为切分的块数很多,所以需要使用实际的内存页面中前面的几个块作为bitmap。

主要数据结构

注:后面的代码都是基于Nginx-1.19.3

下图给出了几个主要结构体之间的关系,以及几个主要的指针指向的共享内存的位置。从addr到last是用于管理的元数据,从start到end则实际用于分配的内存。slots和stats分别是各个slot的半满链表头和统计信息。pages是页面头数组,跟后面实际分配的内存页面一一对应。通过页面头可以找到对应页,通过页面地址也可以找到它的页面头,连续的页块其页面头也是连续的。

nginx-slab-structures

ngx_shm_zone_t是用于描述共享内存的结构体,data是私有数据指针,init是私有的初始化函数,tag用于区别不同的共享内存。

1
2
3
4
5
6
7
8
9
// ngx_shm_zone_t是共享内存的结构体,里面包含了shm,data是私有数据
struct ngx_shm_zone_s {
void *data;
ngx_shm_t shm;
ngx_shm_zone_init_pt init;
void *tag;
void *sync;
ngx_uint_t noreuse; /* unsigned noreuse:1; */
};

其中的shm记录了共享内存的具体信息,addr是共享内存起始地址,size是大小,name名字。

1
2
3
4
5
6
7
8
// shm里的addr指向实际mmap的共享内存
typedef struct {
u_char *addr;
size_t size;
ngx_str_t name;
ngx_log_t *log;
ngx_uint_t exists; /* unsigned exists:1; */
} ngx_shm_t;

ngx_slab_pool_t用于管理slab,它位于共享内存的最开头。min_size/min_shift记录了最小的slot的尺寸及对应位偏移,free是前面已经提到的空闲页块链表头。pages/last/stats/start/end/data/addr都是指针,指向共享内存的不同位置,具体参看下图。lock和mutex用于保护共享内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 共享内存最开头是ngx_slab_pool_t,它用于管理slab,后面会详细介绍它的字段
typedef struct {
ngx_shmtx_sh_t lock;

size_t min_size; // 分配内存最小尺寸,8
size_t min_shift;

ngx_slab_page_t *pages; // 指向页面头数组
ngx_slab_page_t *last; // 指向页面头数组末尾
ngx_slab_page_t free; // 空闲页面链表头

ngx_slab_stat_t *stats; // 指向统计数组
ngx_uint_t pfree; // 总空闲页面数

u_char *start; // 实际数据页面起始位置
u_char *end; // 共享内存结束位置

ngx_shmtx_t mutex;

u_char *log_ctx;
u_char zero;

unsigned log_nomem:1;

void *data; // 私有数据
void *addr;
} ngx_slab_pool_t;

ngx_slab_page_s用于管理页面,就是我们前面所说的页面头,它既作为链表节点,又记录一些页面的信息。slab字段在不同情况下用途也不同,prev除了作为链表指针,低位还用于表示页面类型。

1
2
3
4
5
6
// 页面头,用于管理页面
struct ngx_slab_page_s {
uintptr_t slab; // 对于空闲页面头节点,表示当前节点页块大小;空闲页面非头节点,为FREE;对于已分配页块,第一个页是pages | NGX_SLAB_PAGE_START,其余是NGX_SLAB_PAGE_BUSY;对于slab页面,小对象slab记录shift值,中对象slab用作bitmap,大对象slab高位用作bitmap,低位记录shift值
ngx_slab_page_t *next; // 不在链表中时,为NULL
uintptr_t prev; // 对于已经分配的页,低位还兼职表示页类型
};

ngx_slab_stat_t用于统计slot的信息

1
2
3
4
5
6
7
8
// 用于统计slot信息
typedef struct {
ngx_uint_t total; // 总共有多少个对象
ngx_uint_t used; // 目前使用了多少个

ngx_uint_t reqs; // 请求该slot次数,累加计数器
ngx_uint_t fails; // 请求失败的次数,累加计数器
} ngx_slab_stat_t;

初始化流程

首次启动

共享初始化的主要函数调用关系如下:

1
2
3
4
5
6
ngx_shared_memory_add()		// 初始化ngx_shm_zone_t,并放到全局链表cf->cycle->shared_memory中
ngx_init_cycle()
ngx_shm_alloc(&shm_zone[i].shm) // 分配内存
ngx_init_zone_pool(cycle, &shm_zone[i]) // slab初始化
ngx_slab_init()
shm_zone[i].init(&shm_zone[i], oshm_zone[n].data) // 各共享内存私有的初始化函数,例如ngx_ssl_session_cache_init,主要负责初始化ngx_ssl_session_cache_t
  • 在配置处理阶段,将shm_zone加到链表中
  • 然后在init_cycle中分配共享内存,初始化shm_zone
  • ngx_slab_init()主要初始化ngx_slab_pool_t结构体
  • 完成通用的初始化之后,进行各共享内存私有的初始化操作

ngx_slab_init()是所有共享内存共同的初始化操作,主要是初始化ngx_slab_pool_t结构体中的各个字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
void
ngx_slab_init(ngx_slab_pool_t *pool)
{
u_char *p;
size_t size;
ngx_int_t m;
ngx_uint_t i, n, pages;
ngx_slab_page_t *slots, *page;

pool->min_size = (size_t) 1 << pool->min_shift;

slots = ngx_slab_slots(pool);

p = (u_char *) slots;
size = pool->end - p;

ngx_slab_junk(p, size);

n = ngx_pagesize_shift - pool->min_shift; // slot的种类

for (i = 0; i < n; i++) {
/* only "next" is used in list head */
slots[i].slab = 0;
slots[i].next = &slots[i];
slots[i].prev = 0;
}

p += n * sizeof(ngx_slab_page_t);

pool->stats = (ngx_slab_stat_t *) p;
ngx_memzero(pool->stats, n * sizeof(ngx_slab_stat_t));

p += n * sizeof(ngx_slab_stat_t);

size -= n * (sizeof(ngx_slab_page_t) + sizeof(ngx_slab_stat_t));

pages = (ngx_uint_t) (size / (ngx_pagesize + sizeof(ngx_slab_page_t))); // 计算页数

pool->pages = (ngx_slab_page_t *) p; // 指向页面头数组
ngx_memzero(pool->pages, pages * sizeof(ngx_slab_page_t));

page = pool->pages;

/* only "next" is used in list head */
pool->free.slab = 0;
pool->free.next = page;
pool->free.prev = 0;
// 初始只有一个大的块
page->slab = pages; // 页块的页数
page->next = &pool->free; // 指向空闲链表的哨兵
page->prev = (uintptr_t) &pool->free;// 指向空闲链表的哨兵

pool->start = ngx_align_ptr(p + pages * sizeof(ngx_slab_page_t),
ngx_pagesize); // 实际数据页面开始位置

m = pages - (pool->end - pool->start) / ngx_pagesize;
if (m > 0) {
pages -= m;
page->slab = pages;
}

pool->last = pool->pages + pages; // 页面头数组的末尾
pool->pfree = pages; // 总空闲页面数

pool->log_nomem = 1;
pool->log_ctx = &pool->zero;
pool->zero = '\0';
}

接下来shm_zone[i].init()是各共享内存私有的初始化,一般会初始化私有数据,然后将指针赋值到ngx_shm_zone_tngx_slab_pool_t的私有数据data字段。

reload

reload的时候默认是重用之前的共享内存的,除非ngx_shm_zone_s中的noreuse置1了,但是这个字段目前是代码写死的,并没有作为配置开放。所以reload时一般只是执行如下函数:

1
2
ngx_shared_memory_add()
shm_zone[i].init(&shm_zone[i], oshm_zone[n].data)

不过有一种情况下,reload也会重新生成共享内存,那就是大小改变了的时候。

分配逻辑

分配逻辑和释放逻辑是slab的核心,想完全理解需要仔细阅读接下来4个函数。

分配页面

页面分配逻辑比较简单,剩余的空闲页块是一个链表,pool->free是头节点,用作哨兵。遍历链表寻找足够大的空闲页块,如果页块正好等于请求的大小,那么将其从链表中移除,如果页块还有剩余,将剩余部分的第一个页面头插入链表中,如果剩余块大于1页,那么将最后一页的页面头的prev指向其第一个页面头的位置(合并页块时需要用到)。

对于分配的页块,第一个页面头记录页块头标志,并记录了页块大小,后面的页面头记录BUSY标志。prev的最后两位表示页类型,NGX_SLAB_PAGE表示是整页分配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
static ngx_slab_page_t *
ngx_slab_alloc_pages(ngx_slab_pool_t *pool, ngx_uint_t pages)
{
ngx_slab_page_t *page, *p;

for (page = pool->free.next; page != &pool->free; page = page->next) {

if (page->slab >= pages) {

if (page->slab > pages) { // 有剩余的情况
page[page->slab - 1].prev = (uintptr_t) &page[pages]; // 这一步是合并时候用的,找到它的块头
// 剩余部分仍然插入链表中
page[pages].slab = page->slab - pages;
page[pages].next = page->next;
page[pages].prev = page->prev;

p = (ngx_slab_page_t *) page->prev;
p->next = &page[pages];
page->next->prev = (uintptr_t) &page[pages];

} else { // 正好的情况
p = (ngx_slab_page_t *) page->prev; // 将节点从链表移除
p->next = page->next;
page->next->prev = page->prev;
}
// 第一个页记录页数
page->slab = pages | NGX_SLAB_PAGE_START;
page->next = NULL;
page->prev = NGX_SLAB_PAGE;

pool->pfree -= pages; // 更新剩余空闲页数

if (--pages == 0) {
return page;
}
// 其余页设为BUSY
for (p = page + 1; pages; pages--) {
p->slab = NGX_SLAB_PAGE_BUSY;
p->next = NULL;
p->prev = NGX_SLAB_PAGE;
p++;
}

return page;
}
}

if (pool->log_nomem) {
ngx_slab_error(pool, NGX_LOG_CRIT,
"ngx_slab_alloc() failed: no memory");
}

return NULL;
}

分配对象

小对象根据尺寸大小,分成了3类,即代码中的NGX_SLAB_BIGNGX_SLAB_EXACTNGX_SLAB_SMALL。之所以要分成三类,是为了节省管理的内存开销。其中exact的表示页面头中的slab字段正好可以用作一个页面的bitmap。small的就需要使用页面最前面的内存块来作为bitamp了,此时slab用于记录该页的shift值(即它属于哪个slot)。而对于big的,因为内存块数较少,slab字段高位用作bitmap,低位则记录shift值。

对于超大对象直接分配页,否则遍历对应的slot半满链表中获取一个空闲内存块,找到后将相应的bit位置位,如果链表为空则先分配一个新页,然后插入slot半满链表中。如果分配一个对象后页面满了,则将其从半满链表中移除。

ngx_slab_alloc_locked是最核心的一个函数了,有兴趣的可以看看它是怎么操作bitmap和链表的。代码中基本都做了注释,这里就不再赘述了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
void *
ngx_slab_alloc_locked(ngx_slab_pool_t *pool, size_t size)
{
size_t s;
uintptr_t p, m, mask, *bitmap;
ngx_uint_t i, n, slot, shift, map;
ngx_slab_page_t *page, *prev, *slots;
// 超大对象直接分配页
if (size > ngx_slab_max_size) {

ngx_log_debug1(NGX_LOG_DEBUG_ALLOC, ngx_cycle->log, 0,
"slab alloc: %uz", size);

page = ngx_slab_alloc_pages(pool, (size >> ngx_pagesize_shift)
+ ((size % ngx_pagesize) ? 1 : 0));
if (page) {
p = ngx_slab_page_addr(pool, page);

} else {
p = 0;
}

goto done;
}
// 小对象选择对应slot
if (size > pool->min_size) {
shift = 1;
for (s = size - 1; s >>= 1; shift++) { /* void */ }
slot = shift - pool->min_shift;

} else {
shift = pool->min_shift;
slot = 0;
}
// 对应slot请求+1
pool->stats[slot].reqs++;

ngx_log_debug2(NGX_LOG_DEBUG_ALLOC, ngx_cycle->log, 0,
"slab alloc: %uz slot: %ui", size, slot);

slots = ngx_slab_slots(pool);
page = slots[slot].next; // slot的链表

if (page->next != page) { // 链表为空,跳到后面分配新的1页

if (shift < ngx_slab_exact_shift) { // 需要使用页面开头部分空间作为bitmap

bitmap = (uintptr_t *) ngx_slab_page_addr(pool, page); // 页面开头是bitmap

map = (ngx_pagesize >> shift) / (8 * sizeof(uintptr_t)); // 几个uintptr_t大小的map

for (n = 0; n < map; n++) { // 遍历寻找一个空闲的对象

if (bitmap[n] != NGX_SLAB_BUSY) { // 当前bitmap块没有全置位

for (m = 1, i = 0; m; m <<= 1, i++) {
if (bitmap[n] & m) {
continue;
}
// 第n个map的第i位(从右往左数)
bitmap[n] |= m;
// 总共第几位 * 对象大小 = 对象地址关于页地址的偏移
i = (n * 8 * sizeof(uintptr_t) + i) << shift;

p = (uintptr_t) bitmap + i; // p为对象地址

pool->stats[slot].used++; // 正在使用的+1
// 如果当前bitmap满了,检查后面的是否都满了
if (bitmap[n] == NGX_SLAB_BUSY) {
for (n = n + 1; n < map; n++) {
if (bitmap[n] != NGX_SLAB_BUSY) {
goto done;
}
}
// 如果整个页都用完了,将其从slot链表中移除
prev = ngx_slab_page_prev(page);
prev->next = page->next;
page->next->prev = page->prev;

page->next = NULL;
page->prev = NGX_SLAB_SMALL;
}

goto done;
}
}
}

} else if (shift == ngx_slab_exact_shift) { // 页面头中的slab正好作为bimap

for (m = 1, i = 0; m; m <<= 1, i++) {
if (page->slab & m) {
continue;
}

page->slab |= m; // 置位
// 该页如果满了,移出slot链表
if (page->slab == NGX_SLAB_BUSY) {
prev = ngx_slab_page_prev(page);
prev->next = page->next;
page->next->prev = page->prev;

page->next = NULL;
page->prev = NGX_SLAB_EXACT;
}
// 获取对象地址
p = ngx_slab_page_addr(pool, page) + (i << shift);

pool->stats[slot].used++;

goto done;
}

} else { /* shift > ngx_slab_exact_shift */
// 111...11,1页对象个数个1
mask = ((uintptr_t) 1 << (ngx_pagesize >> shift)) - 1;
mask <<= NGX_SLAB_MAP_SHIFT; // 移到高位上,为啥? 因为低位还要记录shift。为啥需要记录shift,因为在释放对象的时候需要知道大小

for (m = (uintptr_t) 1 << NGX_SLAB_MAP_SHIFT, i = 0;
m & mask;
m <<= 1, i++)
{
if (page->slab & m) {
continue;
}

page->slab |= m; // 置位
// 该页如果满了,移出slot链表
if ((page->slab & NGX_SLAB_MAP_MASK) == mask) {
prev = ngx_slab_page_prev(page);
prev->next = page->next;
page->next->prev = page->prev;

page->next = NULL;
page->prev = NGX_SLAB_BIG;
}

p = ngx_slab_page_addr(pool, page) + (i << shift);

pool->stats[slot].used++;

goto done;
}
}

ngx_slab_error(pool, NGX_LOG_ALERT, "ngx_slab_alloc(): page is busy");
ngx_debug_point();
}
// 分配新页
page = ngx_slab_alloc_pages(pool, 1);

if (page) {
if (shift < ngx_slab_exact_shift) {
bitmap = (uintptr_t *) ngx_slab_page_addr(pool, page);
// 计算需要开头几个对象作为bitmap: n = 1页对象个数 / 1个对象位数
n = (ngx_pagesize >> shift) / ((1 << shift) * 8);
// 不满1个对象,使用1对象
if (n == 0) {
n = 1;
}
// 接下来初始化bitmap,将bitmap本身占用的对象置1,将申请的目标对象置1
/* "n" elements for bitmap, plus one requested */

for (i = 0; i < (n + 1) / (8 * sizeof(uintptr_t)); i++) {
bitmap[i] = NGX_SLAB_BUSY;
}
// 不满一个bitmap的部分置1
m = ((uintptr_t) 1 << ((n + 1) % (8 * sizeof(uintptr_t)))) - 1;
bitmap[i] = m;
// 剩余bitmap置0
map = (ngx_pagesize >> shift) / (8 * sizeof(uintptr_t));

for (i = i + 1; i < map; i++) {
bitmap[i] = 0;
}

page->slab = shift; // slab记录shift
page->next = &slots[slot]; // 插入slot链表头
page->prev = (uintptr_t) &slots[slot] | NGX_SLAB_SMALL;

slots[slot].next = page;
// 增加该slot总对象数(不包括bitmap本身占用的对象)
pool->stats[slot].total += (ngx_pagesize >> shift) - n;

p = ngx_slab_page_addr(pool, page) + (n << shift);

pool->stats[slot].used++;

goto done;

} else if (shift == ngx_slab_exact_shift) {

page->slab = 1; // 申请的对象置位
// 插入slot链表头
page->next = &slots[slot];
page->prev = (uintptr_t) &slots[slot] | NGX_SLAB_EXACT;

slots[slot].next = page;
// 增加该slot总对象数
pool->stats[slot].total += 8 * sizeof(uintptr_t);

p = ngx_slab_page_addr(pool, page);

pool->stats[slot].used++;

goto done;

} else { /* shift > ngx_slab_exact_shift */
// 高位申请的对象置位 + 低位记录shift
page->slab = ((uintptr_t) 1 << NGX_SLAB_MAP_SHIFT) | shift;
page->next = &slots[slot];
page->prev = (uintptr_t) &slots[slot] | NGX_SLAB_BIG;

slots[slot].next = page;

pool->stats[slot].total += ngx_pagesize >> shift;

p = ngx_slab_page_addr(pool, page);

pool->stats[slot].used++;

goto done;
}
}

p = 0;

pool->stats[slot].fails++; // 分配对象失败次数

done:

ngx_log_debug1(NGX_LOG_DEBUG_ALLOC, ngx_cycle->log, 0,
"slab alloc: %p", (void *) p);

return (void *) p;
}

释放逻辑

释放页面

如果是释放多个页面,将后面的页面头都清零。如果是slot页,将其从slot链表中移除。然后尝试合并前后的页面,从而组成更大的连续页。合并完成后,插入到空闲页块链表中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
static void
ngx_slab_free_pages(ngx_slab_pool_t *pool, ngx_slab_page_t *page,
ngx_uint_t pages)
{
ngx_slab_page_t *prev, *join;

pool->pfree += pages;

page->slab = pages--;
// 第二页开始的页面头都清零
if (pages) {
ngx_memzero(&page[1], pages * sizeof(ngx_slab_page_t));
}
// next不都等于NULL么? 不等于NULL表示是slot页,将其从slot链表中移除
if (page->next) {
prev = ngx_slab_page_prev(page);
prev->next = page->next;
page->next->prev = page->prev;
}
// 尝试合并后面的页面
join = page + page->slab;

if (join < pool->last) { // 检查后面是否还有页

if (ngx_slab_page_type(join) == NGX_SLAB_PAGE) { // 是整页

if (join->next != NULL) { // 在空闲页链表中
pages += join->slab; // 加上合并块的页数
page->slab += join->slab; // 增加页块大小
// 将join块从空闲链表中移除
prev = ngx_slab_page_prev(join);
prev->next = join->next;
join->next->prev = join->prev;
// 清理页块头,恢复指针和标志
join->slab = NGX_SLAB_PAGE_FREE;
join->next = NULL;
join->prev = NGX_SLAB_PAGE;
}
}
}
// 尝试合并前面的页
if (page > pool->pages) { // 前面还有页
join = page - 1; // join指向被释放页的前一页

if (ngx_slab_page_type(join) == NGX_SLAB_PAGE) { // 是整页

if (join->slab == NGX_SLAB_PAGE_FREE) { // 不是页块头
join = ngx_slab_page_prev(join); // join指向页块头
}

if (join->next != NULL) { // 在空闲链表中
pages += join->slab; // 加上合并块的页数
join->slab += page->slab; // 增加页块大小
// 将join块从空闲链表中移除
prev = ngx_slab_page_prev(join);
prev->next = join->next;
join->next->prev = join->prev;
// 清理页块头,恢复指针和标志
page->slab = NGX_SLAB_PAGE_FREE;
page->next = NULL;
page->prev = NGX_SLAB_PAGE;

page = join; // 更新页块头
}
}
}
// 如果页块大于1页,将最后一页的prev指向页块头,合并时用
if (pages) {
page[pages].prev = (uintptr_t) page;
}
// 将页块插入空闲页链表头
page->prev = (uintptr_t) &pool->free;
page->next = pool->free.next;

page->next->prev = (uintptr_t) page;

pool->free.next = page;
}

释放对象

首先根据对象指针获取到对应的页面头以及相应的页面信息,然后根据页面类型做不同的处理。对于slot对象,将对应的bit位复位。如果释放该对象前页面是满的,则将该页面重新插入slot半满链表。如果释放该对象后页面空了,则释放该页。

如果你已经了解了分配逻辑,那么释放逻辑应该很好理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
void
ngx_slab_free_locked(ngx_slab_pool_t *pool, void *p)
{
size_t size;
uintptr_t slab, m, *bitmap;
ngx_uint_t i, n, type, slot, shift, map;
ngx_slab_page_t *slots, *page;

ngx_log_debug1(NGX_LOG_DEBUG_ALLOC, ngx_cycle->log, 0, "slab free: %p", p);

if ((u_char *) p < pool->start || (u_char *) p > pool->end) {
ngx_slab_error(pool, NGX_LOG_ALERT, "ngx_slab_free(): outside of pool");
goto fail;
}
// 计算释放的指针在第几页
n = ((u_char *) p - pool->start) >> ngx_pagesize_shift;
page = &pool->pages[n]; // 获取对应页面头
slab = page->slab;
type = ngx_slab_page_type(page);

switch (type) {

case NGX_SLAB_SMALL:

shift = slab & NGX_SLAB_SHIFT_MASK;
size = (size_t) 1 << shift;
// 检查指针是否对象对齐
if ((uintptr_t) p & (size - 1)) {
goto wrong_chunk;
}
// 第几个对象: n = 页中的偏移 / 对象大小
n = ((uintptr_t) p & (ngx_pagesize - 1)) >> shift;
m = (uintptr_t) 1 << (n % (8 * sizeof(uintptr_t)));
n /= 8 * sizeof(uintptr_t);
bitmap = (uintptr_t *)
((uintptr_t) p & ~((uintptr_t) ngx_pagesize - 1)); // 页起始地址,获取bitmap
// 第n个bitmap的m位
if (bitmap[n] & m) {
slot = shift - pool->min_shift;
// 如果当前页不在slot链表中(即释放前是满的),重新加回链表
if (page->next == NULL) {
slots = ngx_slab_slots(pool);

page->next = slots[slot].next;
slots[slot].next = page;

page->prev = (uintptr_t) &slots[slot] | NGX_SLAB_SMALL;
page->next->prev = (uintptr_t) page | NGX_SLAB_SMALL;
}
// 清bit位
bitmap[n] &= ~m;
// 计算需要开头几个对象作为bitmap: n = 1页对象个数 / 1个对象位数
n = (ngx_pagesize >> shift) / ((1 << shift) * 8);

if (n == 0) {
n = 1;
}
// 接下来检查除了bitmap本身占用的对象外,是否还有其他对象
// 已经被bitmap占满的uintptr_t不用检查,对于bitmap对象和实际数据对象混用的uintptr_t需要unmask掉bitmap的部分
i = n / (8 * sizeof(uintptr_t));
m = ((uintptr_t) 1 << (n % (8 * sizeof(uintptr_t)))) - 1;

if (bitmap[i] & ~m) {
goto done;
}
// map = 多少个uintptr_t大小的map
map = (ngx_pagesize >> shift) / (8 * sizeof(uintptr_t));

for (i = i + 1; i < map; i++) {
if (bitmap[i]) {
goto done;
}
}
// 如果空了,就释放该页
ngx_slab_free_pages(pool, page, 1);
// 减去slot的对象个数
pool->stats[slot].total -= (ngx_pagesize >> shift) - n;

goto done;
}

goto chunk_already_free;

case NGX_SLAB_EXACT:

m = (uintptr_t) 1 <<
(((uintptr_t) p & (ngx_pagesize - 1)) >> ngx_slab_exact_shift); // 目标对象对应的位
size = ngx_slab_exact_size;
// 检查对齐
if ((uintptr_t) p & (size - 1)) {
goto wrong_chunk;
}

if (slab & m) {
slot = ngx_slab_exact_shift - pool->min_shift;

if (slab == NGX_SLAB_BUSY) { // bitmap是满的,重新加入链表
slots = ngx_slab_slots(pool);

page->next = slots[slot].next;
slots[slot].next = page;

page->prev = (uintptr_t) &slots[slot] | NGX_SLAB_EXACT;
page->next->prev = (uintptr_t) page | NGX_SLAB_EXACT;
}

page->slab &= ~m; // 清bit位
// 如果空了,就释放该页
if (page->slab) {
goto done;
}

ngx_slab_free_pages(pool, page, 1);

pool->stats[slot].total -= 8 * sizeof(uintptr_t);

goto done;
}

goto chunk_already_free;

case NGX_SLAB_BIG:

shift = slab & NGX_SLAB_SHIFT_MASK;
size = (size_t) 1 << shift;

if ((uintptr_t) p & (size - 1)) {
goto wrong_chunk;
}
// 目标对象对应的位
m = (uintptr_t) 1 << ((((uintptr_t) p & (ngx_pagesize - 1)) >> shift)
+ NGX_SLAB_MAP_SHIFT);

if (slab & m) {
slot = shift - pool->min_shift;
// 如果当前页不在slot链表中(即释放前是满的),重新加回链表
if (page->next == NULL) {
slots = ngx_slab_slots(pool);

page->next = slots[slot].next;
slots[slot].next = page;

page->prev = (uintptr_t) &slots[slot] | NGX_SLAB_BIG;
page->next->prev = (uintptr_t) page | NGX_SLAB_BIG;
}

page->slab &= ~m;
// 如果空了,就释放该页
if (page->slab & NGX_SLAB_MAP_MASK) {
goto done;
}

ngx_slab_free_pages(pool, page, 1);

pool->stats[slot].total -= ngx_pagesize >> shift;

goto done;
}

goto chunk_already_free;

case NGX_SLAB_PAGE:

if ((uintptr_t) p & (ngx_pagesize - 1)) {
goto wrong_chunk;
}

if (!(slab & NGX_SLAB_PAGE_START)) {
ngx_slab_error(pool, NGX_LOG_ALERT,
"ngx_slab_free(): page is already free");
goto fail;
}

if (slab == NGX_SLAB_PAGE_BUSY) {
ngx_slab_error(pool, NGX_LOG_ALERT,
"ngx_slab_free(): pointer to wrong page");
goto fail;
}

size = slab & ~NGX_SLAB_PAGE_START;

ngx_slab_free_pages(pool, page, size);

ngx_slab_junk(p, size << ngx_pagesize_shift);

return;
}

/* not reached */

return;

done:

pool->stats[slot].used--; // 对象使用数减1

ngx_slab_junk(p, size);

return;

wrong_chunk:

ngx_slab_error(pool, NGX_LOG_ALERT,
"ngx_slab_free(): pointer to wrong chunk");

goto fail;

chunk_already_free:

ngx_slab_error(pool, NGX_LOG_ALERT,
"ngx_slab_free(): chunk is already free");

fail:

return;
}

共享内存销毁

前面讲了初始化、分配逻辑、释放逻辑,为了不失完整性,这里再讲下销毁逻辑。

搜索了整个代码并没有发现释放共享内存的地方,只有在cycle_init()失败时会munmap()释放新创建的共享内存、或者reload的时候释放掉被替换的共享内存。

所以猜测是在进程退出的时候自动进行了释放。那么如何进行验证呢?我们编写一个简单的测试程序:

1
2
3
4
5
6
7
8
9
10
11
12
#include <stdio.h>
#include <sys/mman.h>

int main()
{
char *addr = (char *)0x7f1000000000;
size_t size = 8192;
char *p = mmap(addr, size, PROT_READ|PROT_WRITE,
MAP_SHARED|MAP_ANONYMOUS, -1, 0);
printf("addr p = %p, size = %zu\n", p, size);
return 0;
}

为了测试方便,参数里指定了一个建议的地址。运行程序输出如下:

1
2
$ ./a.out
addr p = 0x7f1000000000, size = 8192

然后,我们编写一个systemtap脚本,打印参数信息和调用栈信息

1
2
3
4
5
6
7
8
probe kernel.function("unmap_page_range") {
if ($addr != 0x7f1000000000)
next;
printf("--------\n");
printf("addr: 0x%lx, size: %lu\n", $addr, $end-$addr);
print_backtrace();
printf("--------\n");
}

我们运行stap脚本,然后执行./a.out结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
--------
addr: 0x7f1000000000, size: 8192
0xffffffff8bc168e0 : unmap_page_range+0x0/0xd00 [kernel]
0xffffffff8bc1765d : unmap_single_vma+0x7d/0xf0 [kernel]
0xffffffff8bc179a1 : unmap_vmas+0x51/0xb0 [kernel]
0xffffffff8bc21535 : exit_mmap+0xb5/0x1d0 [kernel]
0xffffffff8ba8b5e7 : mmput+0x57/0x140 [kernel]
0xffffffff8ba93b22 : do_exit+0x352/0xb90 [kernel]
0xffffffff8ba943e3 : do_group_exit+0x43/0xb0 [kernel]
0xffffffff8ba94464 : SyS_exit_group+0x14/0x20 [kernel]
0xffffffff8ba03a43 : do_syscall_64+0x73/0x130 [kernel]
0xffffffff8c400085 : entry_SYSCALL_64_after_hwframe+0x41/0xa6 [kernel]
0xffffffff8c400085 : entry_SYSCALL_64_after_hwframe+0x41/0xa6 [kernel]
--------

可以看到在进程退出的时候,自动unmap了我们用mmap分配的内存。

统计与监控

ngx_slab_stat模块可以用于查看slab的统计信息,它的效果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
^_^$ curl http://127.0.0.1:50090/slab
* shared memory: SSL
total: 64(KB) free: 20(KB) size: 4(KB)
pages: 20(KB) start:00007FA1F9D8C000 end:00007FA1F9D9B000
slot: 8(Bytes) total: 504 used: 1 reqs: 1 fails: 0
slot: 16(Bytes) total: 254 used: 1 reqs: 1 fails: 0
slot: 32(Bytes) total: 127 used: 1 reqs: 1 fails: 0
slot: 64(Bytes) total: 64 used: 2 reqs: 2 fails: 0
slot: 128(Bytes) total: 32 used: 12 reqs: 15 fails: 0
slot: 256(Bytes) total: 16 used: 2 reqs: 2 fails: 0
slot: 512(Bytes) total: 8 used: 1 reqs: 1 fails: 0
slot: 1024(Bytes) total: 4 used: 1 reqs: 1 fails: 0
slot: 2048(Bytes) total: 2 used: 2 reqs: 5 fails: 0

主要就是读取ngx_slab_stat_t结构体中的那些统计信息。当然它需要编译时加入该模块,然后加入相应配置项,访问接口才能获取。如果这些不可接受的话,也可以使用systemtap非侵入式地实现类似功能。

ssl session cache

最后提一下ssl session cache,它是一个实际使用共享内存的例子,也是我为什么会写这篇博客的原因。ngx_ssl_session_cache_init是其私有的初始化函数,主要是初始化ngx_ssl_session_cache_t结构,包括红黑树和队列的初始化。红黑树用于session cache的查找,队列用于过期淘汰。然后将指针赋值到ngx_shm_zone_tngx_slab_pool_t的私有数据data字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
ngx_int_t
ngx_ssl_session_cache_init(ngx_shm_zone_t *shm_zone, void *data)
{
size_t len;
ngx_slab_pool_t *shpool;
ngx_ssl_session_cache_t *cache;

if (data) {
shm_zone->data = data;
return NGX_OK;
}

shpool = (ngx_slab_pool_t *) shm_zone->shm.addr;

if (shm_zone->shm.exists) {
shm_zone->data = shpool->data;
return NGX_OK;
}

cache = ngx_slab_alloc(shpool, sizeof(ngx_ssl_session_cache_t));
if (cache == NULL) {
return NGX_ERROR;
}

shpool->data = cache;
shm_zone->data = cache;

ngx_rbtree_init(&cache->session_rbtree, &cache->sentinel,
ngx_ssl_session_rbtree_insert_value);

ngx_queue_init(&cache->expire_queue);

len = sizeof(" in SSL session shared cache \"\"") + shm_zone->shm.name.len;

shpool->log_ctx = ngx_slab_alloc(shpool, len);
if (shpool->log_ctx == NULL) {
return NGX_ERROR;
}

ngx_sprintf(shpool->log_ctx, " in SSL session shared cache \"%V\"%Z",
&shm_zone->shm.name);

shpool->log_nomem = 0;

return NGX_OK;
}

ngx_ssl_new_session()用于创建新session cache,每次创建都会先尝试淘汰最多2个过期session,这样既保证了cache不会堆积、又是惰性淘汰不会让事件占用太长时间。每次申请失败,都有一次强制淘汰的机会。因为底层的slab机制,所以不管淘汰几个过期的session都是无法保证新session一定申请成功的。除非淘汰的对象跟新分配的属于同一个slot,或者淘汰了对象之后正好空出了一个页。

因为ssl session cache本来就是属于一种优化,所以优化失败也是正常的。如果比较在意这个,那么当出现强制淘汰时就已经说明共享内存空间不够了,需要相应地增加缓存容量或者减小超时时间。缓存容量和超时时间应满足下面的关系:

缓存容量 > ∑ [( TPS / 超时时间 ) * 单个大小]

-------------本文结束感谢您的阅读-------------

欢迎关注我的其它发布渠道