xv6-labs-2020.lab7.lock

lab7 lock

1. 作业链接

  • https://pdos.csail.mit.edu/6.828/2020/labs/lock.html

2. 实习内容

2.1 Memory allocator

(1) 实验目标

  • 在内存管理上,由于只有一个 freelist,在多 CPU 的情况下,如果多个 CPU 同时申请内存,则可能出现剧烈的 race 现象
  • 一个解决的方案就是为每一个 CPU 维护一个 kmem 的锁,从而减少 race
  • 在这一部分,就是实现这样的策略

(2) 提示

  • kernel/param.h 中定义了常数 NCPU (CPU 个数)
  • freerange 中将所有的可分配物理内存分配给每个 CPU
  • cpuid() 返回当前 CPU 的 id,但是这只有在关闭中断的时候才是安全的
    • 因此需要在调用前后关开中断(push_off()pop_off()
  • 可以简单地将锁命名为 kmem,或者使用 snprintf() 函数进行命名(这个没什么用,DEBUG)

(3) 实现

[1] 锁的声明与初始化
  • 找到原来的定义修改为数组即可
1
2
3
4
5
// kernel/kalloc.c
struct {
struct spinlock lock;
struct run *freelist;
} kmem[NCPU];
  • 初始化的时候初始化整个数组
1
2
3
4
5
6
7
// kernel/kalloc.c
void kinit() {
// 为了方便, 我们直接把名字命名为 `kmem`
for(int i = 0;i < NCPU; ++i)
initlock(&kmem[i].lock, "kmem");
freerange(end, (void*)PHYSTOP);
}
[2] 初始化可分配的物理内存
  • freerange() 中实现
  • 依次给每个 CPU 分配物理内存即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// kernel/kalloc.c
void freerange(void *pa_start, void *pa_end) {
char *p;
p = (char*)PGROUNDUP((uint64)pa_start);
int id = 0;
for(; p + PGSIZE <= (char*)pa_end; p += PGSIZE) {
// kfree(p);
// 模仿 kfree 实现
struct run *r;
memset(p, 1, PGSIZE);
r = (struct run*)p;
acquire(&kmem[id].lock);
r->next = kmem[id].freelist;
kmem[id].freelist = r;
release(&kmem[id].lock);
++id;
if(id == NCPU) {
id = 0;
}
}
}
[3] kalloc / kfree
  • 在分配物理内存的时候,需要在当前 CPU 的 freelist 中分配
  • 当不够的时候,可以取其他 CPU 的 freelist 中的物理内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// kernel/kalloc.c
void * kalloc(void) {
struct run *r;
push_off();
int id = cpuid();
pop_off();
// 尝试分配内存
acquire(&kmem[id].lock);
r = kmem[id].freelist;
if(r)
kmem[id].freelist = r->next;
release(&kmem[id].lock);

// 从其他 CPU 那里获取
if(!r) {
for(int i = 0;i < NCPU; ++i) {
acquire(&kmem[i].lock);
r = kmem[i].freelist;
if(r)
kmem[i].freelist = r->next;
release(&kmem[i].lock);
if(r)
break;
}
}

if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
return (void*)r;
}
  • 在释放物理内存的时候,将其释放到当前 CPU 的 freelist 中去
1
2
3
4
5
6
7
8
9
10
11
12
// kernel/kalloc.c
void kfree(void *pa) {
// ...
push_off();
int id = cpuid();
pop_off();

acquire(&kmem[id].lock);
r->next = kmem[id].freelist;
kmem[id].freelist = r;
release(&kmem[id].lock);
}

2.2 Buffer cache

  • 由于 buffer 只有一个锁,因此存在大量的冲突,修改锁实现更少的冲突

(1) 提示

  • 修改 bgetbrelse,以便 bcache 中不同块的并发查找和释放不太可能在锁上发生冲突(例如,不必都等待 bcache.lock
    • 每个块最多缓存一个副本
  • 新加的锁命名为 bcache
  • 建议使用哈希表在查找每个块号,然后每个哈希表项
    • 可以使用固定大小的哈希表(例如一个质数:13)
  • 允许存在以下的冲突(conflict)
    • 两个进程并发的使用相同块号的块(bcachetest test0 不测试这个)
    • 两个进程同时 miss,需要找一个新的块(bcachetest test0 不测试这个)
    • 通过调整块和锁不可避免的冲突(例如两个块的哈希值相同,但是可以通过调整哈希表的大小减小冲突)
  • 在哈希表中找 buffer 但是没找到,需要重新分配一个 buffer 的入口的时候得是原子操作
  • 移除 buffer 链表(bcache.head 等),使用带最后一次使用时间戳的 buffer 代替(使用 kernel/trap.c 中的 ticks
    • 这样修改以后,brelse 就不需要申请 bcache 的锁了,而且 bget 能够通过时间戳选出 LRU 的 buffer
  • bget 可以中可以是序列化驱逐(满了)
  • 注意避免死锁,当驱逐的时候你需要同时持有 bcache 和哈希表中某一项的锁
  • 当替换块的时候,如果没有映射到同一个哈希表项中,需要移动 struct buf 结构
    • 注意死锁(哈希值相同时)

(2) 实现

  • 只需要把链表修改为哈希桶即可
    • 对于每一个哈希桶内部,我们使用带虚拟头结点的单向链表实现
  • 只需要修改文件 kernel/bio.ckernel/buf.h
[1] 修改链表为开链哈希桶
  • 一些宏定义
    • 哈希桶个数为 13
    • buf 的个数就按照原来给的,30 个
1
2
#define BUCKET_NUM 13
#define BUF_NUM NBUF
  • 修改 struct buf,我们不需要 prev 指针了,通过 timestamp 时间戳去实现 LRU 算法
1
2
3
4
5
6
7
8
9
10
11
12
struct buf {
int valid; // has data been read from disk?
int disk; // does disk "own" buf?
uint dev;
uint blockno;
struct sleeplock lock;
uint refcnt;
// struct buf *prev; // 删除字段
struct buf *next;
uchar data[BSIZE];
uint timestamp; // 添加字段
};
  • 修改数据结构 bcache
1
2
3
4
5
6
7
// kernel/bio.c
struct {
struct buf buf[NBUF];
// 修改为哈希桶(开链法)
struct buf* buckets[BUCKET_NUM];
struct spinlock bucketslock[BUCKET_NUM];
} bcache;
  • 初始化中修改初始化方式
    • 初始化锁
    • 初始化 buffer,主要是将 buf 均匀分配到哈希桶中
      • 每个哈希桶中内容为带虚拟头结点的单向链表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void binit(void) {
// 初始化锁
for(int i = 0;i < BUCKET_NUM; ++i) {
initlock(&bcache.lock[i], "bcache");
}
initlock(&bcache.lock_global, "bcache_global");

// 初始化初始引用(初始化虚拟头结点)
for(int i = 0; i < BUCKET_NUM; ++i) {
bcache.buckets[i].next = &bcache.buckets[i];
}

// 初始化 buffer
for(int i = 0; i < BUF_NUM; ++i) {
int id = hash(i);
struct buf* now = &bcache.buf[i];
now->blockno = id;
now->refcnt = 0;
// now->timestamp = ticks;
initsleeplock(&now->lock, "buffer");
// 插入
now->next = bcache.buckets[id].next;
bcache.buckets[id].next = now;
}
}
[2] 修改 bget 函数
  • 按照原来链表的寻找方式,我们在每个哈希桶的链表中查找
    • 如果已经缓存的话,直接返回即可
    • 如果没有缓存的话,先在对应的哈希桶中找看看有没有空 buffer
      • 对应的哈希桶内 LRU 查找
    • 如果没有的话再到其他哈希桶中找空的 buffer
      • 其他的所有哈希桶 LRU 查找
  • 这样子的实现并不是标准的 LRU
  • 具体实现如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
static struct buf* bget(uint dev, uint blockno) {
struct buf *b;

int id = hash(blockno);
acquire(&bcache.lock[id]);

// 1. 查找这个块是否被缓存, 如果被缓存直接返回即可
for(b = bcache.buckets[id].next; b != &bcache.buckets[id]; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.lock[id]);
acquiresleep(&b->lock);
return b;
}
}

// 2 先在自己的桶里寻找
uint timestamp_min = 0x8fffffff;
struct buf* res = 0;
for(b = bcache.buckets[id].next; b != &bcache.buckets[id]; b = b->next) {
if(b->refcnt == 0 && b->timestamp < timestamp_min) {
timestamp_min = b->timestamp;
res = b;
}
}
if(res) {
res->dev = dev;
res->blockno = blockno;
res->valid = 0;
res->refcnt = 1;
release(&bcache.lock[id]);
acquiresleep(&res->lock);
return res;
}

// 3. 没有被缓存, 需要进行驱逐(其他桶中找)
// 注意这里需要加全局锁
// 不然可能出现两个进程同时找到同一个空块的情况
acquire(&bcache.lock_global);

// (1) 找到最早的时间戳
// 可以直接对 buf 循环
timestamp_min = 0x8fffffff;
res = 0;
for(int i = 0; i < BUF_NUM; ++i) {
b = &bcache.buf[i];
if(b->refcnt == 0 && b->timestamp < timestamp_min) {
timestamp_min = b->timestamp;
res = b;
}
}

// 如果没找到直接 panic
if(!res) {
release(&bcache.lock[id]);
release(&bcache.lock_global);
panic("bget: no buffers");
}

int id_new = hash(res->blockno);

// 需要移动块到目标哈希桶中
// 从原桶中删除
acquire(&bcache.lock[id_new]);
for(b = &bcache.buckets[id_new]; b->next != &bcache.buckets[id_new]; b = b->next) {
if(b->next == res) {
b->next = res->next;
break;
}
}
// 插入新桶
res->next = bcache.buckets[id].next;
bcache.buckets[id].next = res;
res->dev = dev;
res->blockno = blockno;
res->valid = 0;
res->refcnt = 1;
release(&bcache.lock[id_new]); // 具体释放位置应该在哪里?
release(&bcache.lock[id]);
release(&bcache.lock_global);
acquiresleep(&res->lock);
return res;
}

3. 实验结果

  • 在我的电脑上第一个测试 kalloctest ,给的时间是 200s,但是我的电脑跑起来要 210s 左右,修改了一下时间限制才能过
1
2
# grade-lab-lock
# line 12: timeout=300
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
== Test running kalloctest ==
$ make qemu-gdb
(211.1s)
== Test kalloctest: test1 ==
kalloctest: test1: OK
== Test kalloctest: test2 ==
kalloctest: test2: OK
== Test kalloctest: sbrkmuch ==
$ make qemu-gdb
kalloctest: sbrkmuch: OK (19.2s)
== Test running bcachetest ==
$ make qemu-gdb
(14.5s)
== Test bcachetest: test0 ==
bcachetest: test0: OK
== Test bcachetest: test1 ==
bcachetest: test1: OK
== Test usertests ==
$ make qemu-gdb
usertests: OK (222.9s)
== Test time ==
time: OK
Score: 70/70

4. 遇到的困难以及收获

  • 一开始利用双向链表、单向链表实现,不知道为什么都会报错 panic,现在也没找到问题,后来改用带虚拟头结点的单向链表之后通过了,感觉很奇怪,有时间再回来看看具体的问题
  • 做完 lab 之后了解到了一种在高并发场景下常用的优化手段
    • 将锁进行细分,从而能够减小冲突,最终实现性能的提升
  • 感觉在实际的操作系统设计的过程中,锁的控制确实是一个很精细的过程,一不小心就会写成死锁,而且这些问题一般都很难发现
  • 设计过程中很多时候应该使用保守的设计,比如在这个例子中,我们在找不到缓存,同时在哈希桶中找不到空闲 buffer 的时候,加上全局锁,此时虽然降低了性能,但是最终能够保证结果的正确

5. 对课程或 lab 的意见和建议

  • 建议提供一些关于 lab 的 debug 功能的指导

6. 参考文献

  • https://pdos.csail.mit.edu/6.828/2019/xv6/book-riscv-rev1.pdf