xv6-riscv-源代码阅读.同步机制(锁)

XV6 源代码阅读——同步机制(锁)

说明

  • 阅读的代码是 xv6-riscv 版本的
  • 涉及到的文件如下
    • kernel
      • kalloc.cspinlock.hspinlock.csleeplock.hsleeplock.c

问题

  • 什么是临界区?什么是同步和互斥?什么是竞争状态?临界区操作时中断是否应该开启?中断会有什么影响?XV6 的锁是如何实现的,有什么操作?xchg 是什么指令,该指令有何特性?
  1. 基于 XV6 的 spinlock,请给出实现信号量、读写锁、信号机制的设计方案(三选二,请写出相应的伪代码)

阅读报告

race

(1) race 发生条件

  • 同一块内存区域被并发的访问
  • 至少有一个操作是写操作

(2) xv6 中的例子

  • kalloc/kfree
  • 不同 CPU 同时对链表进行操作
    • xv6 中的实现是头部的 push/pop
    • 需要使用锁保护起来,否咋会引发 race
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// kernel/kalloc.c
void kfree(void *pa) {
// ...
r = (struct run*)pa;
acquire(&kmem.lock);
r->next = kmem.freelist;
kmem.freelist = r;
release(&kmem.lock);
// ...
}

void *kalloc(void) {
// ...
acquire(&kmem.lock);
r = kmem.freelist;
if(r)
kmem.freelist = r->next;
release(&kmem.lock);
//...
}
  • Locks Protect Invariants
  • 我们指的使用锁保护数据,我们实际上指的是用锁保护了一组这些数据上的不变量
  • 这些不变量可能在某些短暂的时间段是会变的,但是很快就会被恢复
    • 例如上面的 L6-7,在 push 一个元素的时候,有一段时间 kmem.freelist 并没有指向链表的开头,但是快我们在插入之后便将其重新指向了链表的开头
  • race 的发生就是在这些不变量发生改变的时候,有另外的程序使用了这个不变量
  • 锁的作用
    • 可以理解为我们将一组在临界区上操作序列化了,需要一次一个的执行,从而将不变量很好的保护起来了
    • 也可以理解为我们使用锁将对临界区的操作原子化了
  • 加锁和解锁之间的区域应该尽量小,否则会影响并行性

xv6 中的锁

  • xv6 中有两种锁:spinlocks、sleep-locks

spinlock

  • spinlock 的数据结构如下
    • 是一种互斥锁
1
2
3
4
5
6
7
// Mutual exclusion lock.
struct spinlock {
uint locked; // 1 表示该锁被持有, 0 表示该锁没有被持有
// For debugging:
char *name; // Name of lock.
struct cpu *cpu; // The cpu holding the lock.
};

acquire(lk)

  • 我们需要如下的语句(L3-4)原子化,否则可能出现多个 CPU 持有这个锁的情况
1
2
3
4
5
6
7
8
void acquire(struct spinlock *lk) {
for(;;) {
if(lk->locked == 0) {
lk->locked = 1;
break;
}
}
}
amoswap

RISCV 中提供了这样的指令(是一个原子操作)

  • 这个指令将地址 a 处的值(旧值)读取出来,然后把寄存器 r 里的值(新值)写入地址 a 处,然后再把旧值写入寄存器 r 中
1
amoswap r, a
  • 于是 acquire() 的实现如下
    • __sync_lock_test_and_set 底层实现就是利用了 amoswap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Acquire the lock.
// Loops (spins) until the lock is acquired.
void acquire(struct spinlock *lk) {
push_off(); // disable interrupts to avoid deadlock.
if(holding(lk))
panic("acquire");

// On RISC-V, sync_lock_test_and_set turns into an atomic swap:
// a5 = 1
// s1 = &lk->locked
// amoswap.w.aq a5, a5, (s1)
while(__sync_lock_test_and_set(&lk->locked, 1) != 0) {
// 空循环
}
// Tell the C compiler and the processor to not move loads or stores
// past this point, to ensure that the critical section's memory
// references happen strictly after the lock is acquired.
// On RISC-V, this emits a fence instruction.
__sync_synchronize();
// Record info about lock acquisition for holding() and debugging.
lk->cpu = mycpu();
}

release(lk)

  • release 的实现类似,只是把锁释放
    • __sync_lock_release 底层实现就是利用了 amoswap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Release the lock.
void release(struct spinlock *lk) {
if(!holding(lk))
panic("release");
lk->cpu = 0;
// Tell the C compiler and the CPU to not move loads or stores
// past this point, to ensure that all the stores in the critical
// section are visible to other CPUs before the lock is released,
// and that loads in the critical section occur strictly before
// the lock is released.
// On RISC-V, this emits a fence instruction.
__sync_synchronize();
// Release the lock, equivalent to lk->locked = 0.
// This code doesn't use a C assignment, since the C standard
// implies that an assignment might be implemented with
// multiple store instructions.
// On RISC-V, sync_lock_release turns into an atomic swap:
// s1 = &lk->locked
// amoswap.w zero, zero, (s1)
__sync_lock_release(&lk->locked);
pop_off();
}

xv6 中对 spinlock 的使用

  • 之前提到的 kernel/kalloc.c 中的 kalloc()free()
  • 使用锁的难点
    • 哪里该使用锁?
    • 该使用多少个锁?
  • 当一个 CPU 操作一个变量,同时另一个 CPU 可以写它时,需要使用 lock
  • 如果一个不变量涉及到多个内存位置的时候,需要加锁
  • 为了效率而言,能不加锁的地方尽量不加锁
    • 锁减少了并行性
  • single kernel lock(一个大内核锁)
    • 并行性很差
    • 有些系统调用可能会出问题
      • system calls such as pipe reads or wait would pose a problem
  • 如果内核的工作很复杂,更好的设计是设计更多、更细粒度的锁,从而保持较好的并行性
  • 一个粗粒度设计的例子:xv6 中的 kalloc/free
    • 只有一个锁
    • 如果有多个 CPU 想要调用 kalloc 申请内存,那么结果将会变得很慢
    • 大家都在 spinlock 上做着无意义的空循环
  • 一个细粒度设计的例子:xv6 中的文件
    • 每一个文件都有一个锁
  • xv6 中的锁

image-20210417171819325

xv6 中关于死锁的讨论

  • 如果通过内核的代码路径必须同时持有多个锁,很重要的一点是,所有代码路径需要以相同的顺序获取这些锁
    • 否则可能引发死锁
    • 一个例子
      • A 申请锁的顺序 L1,L2,B 申请锁的顺序 L2,L1
      • 死锁:A(L1),B(L2),A(L2),B(L1)
  • 因此在函数的调用路径上需要好好设计
  • 有的时候可能出现获取锁的顺序和调用顺序不一致的情况,这些都要好好考虑进去
  • 锁的设计越细粒度,死锁的可能性越大

锁和中断处理程序

  • 一个例子就是 sys_sleep() 需要读取 ticks,而 clockintr() 会修改 ticks
    • 想像这样一个场景,sys_sleep() 获得了 ticks 的锁,此时程序收到了时钟中断的信号,需要将 ticks++。此时便引发了死锁。
  • 一般的设计方法是,如果某一个锁会被中断处理程序获取,那么 CPU 在持有这个锁的时候不能开启中断
  • xv6 的设计更加保守,当 CPU 申请一个锁的时候,我们就关当前 CPU 的中断
    • 在释放锁的时候,打开中断
1
2
push_off();
pop_off();

指令与内存模型

  • instructions and memory model
    • A memory consistency model (which we often just call a “memory model”)
  • 编译器或者 CPU 在运行程序的时候可能试图去优化代码而 “乱序执行”
  • 但是需要遵守一些规则,这个规则被称为内存模型
    • 运行结果不能变

sleep lock

  • 有的时候可能需要获取一个锁很久(tens of milliseconds)
    • 例如往磁盘上写文件
  • 如果利用 spinlock 的话很浪费 CPU,需要等很久(空循环)
  • 进程在持有 spinlock 的时候不能 yield
    • 但是也不能设计成能够 yield,可能引发死锁
  • 专门设计一种 sleeplock,允许在等待的时候 yield
    • 允许中断,sleeplock 不能够被用于中断处理程序
    • 允许 yield,因此在 spinlock 保护的临界区内不能使用 sleeplock
  • sleeplock 的数据结构如下
1
2
3
4
5
6
7
8
// Long-term locks for processes
struct sleeplock {
uint locked; // Is the lock held?
struct spinlock lk; // spinlock protecting this sleep lock
// For debugging:
char *name; // Name of lock.
int pid; // Process holding lock
};
  • acquiresleep() / releasesleep()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void acquiresleep(struct sleeplock *lk) {
acquire(&lk->lk);
while (lk->locked) {
sleep(lk, &lk->lk);
}
lk->locked = 1;
lk->pid = myproc()->pid;
release(&lk->lk);
}

void releasesleep(struct sleeplock *lk) {
acquire(&lk->lk);
lk->locked = 0;
lk->pid = 0;
wakeup(lk);
release(&lk->lk);
}

问题 1

  • 临界资源:系统中某些资源只允许一个进程访问或使用,这些资源成为临界资源
  • 临界区:指需要被互斥访问的区域,可能是对临界资源的操作
  • 同步与互斥
    • 互斥:指某一资源在某一个时刻只允许一个进程对其进行访问,但不限制顺序
    • 同步:通过某些机制实现进程对资源的有序访问
    • 互斥是一种特殊的同步
  • 竞争状态:两个进程同时对某个资源进行修改,导致最终的结果是不正确的
    • 具体见 race 部分
  • 临界区的操作中断应该关闭,否则可能引起死锁
  • xv6 的锁的实现:上面的 spinlocksleeplock 部分
  • RISCV 中的类似指令amoswap,是一个原子操作,分析见 amoswap 部分

问题 2

信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
struct semaphore {
int count;
struct spinlock plock; // 保证 P/V 操作是原子的
struct spinlock vlock;
struct spinlock queue;
}

// 只会被调用一次
void sem_init(struct semaphore* se, unsigned int val) {
se->count = val;
initlock(&(se->plock), "semaphore_plock");
initlock(&(se->vlock), "semaphore_vlock");
initlock(&(se->queue), "semaphore_queue");
}

void P(struct samephore* se) {
acquire(se->plock);
--se->count;
if(se->count < 0) {
acquire(se->queue);
}
release(se->plock);
}

void V(struct samephore* se) {
acquire(se->vlock);
++se->count;
if(se->count <= 0) {
release(se->queue);
}
release(se->vlock);
}

读写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
struct rw_lock {
int rdcnt;
struct spinlock rlock; // 保证操作是原子的
struct spinlock wlock;
struct spinlock rulock;
struct spinlock wulock;
struct spinlock queue; // 读写队列
}

void init_lock(struct rw_lock* se) {
se->rdcnt = 0;
initlock(&(se->rlock), "rlock");
initlock(&(se->wlock), "wlock");
initlock(&(se->ulock), "ulock");
initlock(&(se->queue), "queue");
}

void rd_lock(struct rw_lock* se) {
acquire(se->rlock);
se->rdcnt ++;
if(se->rdcnt == 1) {
acquire(se->queue);
}
release(se->rlock);
}

void rd_unlock(struct rw_lock* se) {
acquire(se->ulock);
se->rdcnt--;
if(se->dcnt == 0) {
release(se->queue);
}
release(se->ulock);
}

void wr_lock(struct rw_lock* se) {
acquire(se->wlock);
acquire(se->queue);
release(se->wlock);
}

void wr_unlock(struct rw_lock* se) {
acquire(se->wulock);
release(se->queue);
release(se->wulock);
}

参考资料

  • https://pdos.csail.mit.edu/6.828/2019/xv6/book-riscv-rev0.pdf
  • https://github.com/mit-pdos/xv6-riscv