笔记015 - HW7: Locking

spin lock

锁的错误实现:

1
2
3
4
5
6
7
8
9
struct lock { int locked; }
acquire(lk) {
while(1){
if(lk->locked == 0){ // A
lk->locked = 1; // B
return;
}
}
}

这种方法存在问题,因为如果lk->locked为0时,第一个线程进入到if内,正打算执行lk->locked = 1,这时发生了一个时钟中断,切换到第二个线程。对于第二个线程,这时lk->lock依旧为0,因此也会进入到if语句内。这是因为A和B不是原子操作造成的。

实现A和B为原子操作:

1
2
3
4
5
6
7
acquire(lk){
while(1){
if(xchg(&lk->locked, 1) == 0){
break
}
}
}

xchg是两个寄存器之间,或寄存器和内存变量之间内容的交换指令(两个操作数不能同时为内存变量。)。xv6的xchg函数是借助xchg指令完成以下工作:
假如lk->locked已经是1,xchg再次设置lk->locked为1并且返回1,循环继续;
假如lk->locked是0,至多只有一次xchg会遇到0它将设置lk->locked为1并且返回0,跳出循环,其他的xchgs将会返回1,继续循环。

这就是xv6的自旋锁。

使用锁的过程中关中断

使用xv6的自旋锁的过程中需要关中断。
以IDE驱动为例。ide.c/iderw维护着一个disk请求的队列,处理器会并发地将新请求加入到队列中。为了维护该驱动的队列和其他变量,iderw在操作过程中会请求idelock锁,并在操作完成时释放锁。另一方面,假如发生了中断,会调用ideintr函数。

acquire请求idelock锁的过程中会屏蔽中断,假如iderw调用了acquire保持了对idelock锁的持有,如果这时候发生了ide中断(比如在acquire之后我们尝试调用sti()开中断),则会调用中断处理函数ideintr,ideintr函数也需要调用acquire请求idelock锁,此时发现锁被其他程序持有,则会一直等待锁的释放。然而,iderw函数不会继续执行,因为中断处理函数ideintr不会返回。这就造成了死锁,所以使用xv6的自旋锁的过程中需要关中断。

另外,需要注意的是必须在执行xchg之前关中断,因为如果是在xchg之后关中断仍有可能出现:iderw在操作过程中会请求idelock锁,执行xchg之后关中断之前,就发生了中断,这样仍会导致死锁。

release将1、先释放锁,然后2、开中断。释放锁的过程就是使lk->locked = 0,但不能直接使用c语言赋值,因为其对应的指令可能不是原子操作,且处理器可能会对内存进行排序,导致开中断先于释放锁操作的执行,倘若此时又发生中断,将可能导致死锁。xv6使用以下指令保证释放锁是原子操作:asm volatile("movl $0, %0" : "+m" (lk->locked) : );

避免使用递归锁

递归锁是指在某个CPU上有了一次acquire锁操作,之后还允许在该CPU上的第二次acquire操作。避免递归锁是必要的,这是因为acquire锁的过程中可能是在维护某些数据结构,如果允许了递归锁,CPU可能暂停当前维护数据结构的程序,并切换执行另一段代码,假如该代码是以“被维护的数据结构是完整的”的前提进行工作,则可能发生错误。因此,一旦发现递归锁,则panic。所以,两次acquire(&lk);会导致panic。

ide.c/iderw开中断

假如在ide.c/iderw中acquire()之后添加sti(),并在release()之前添加cli(),将会出现两种panic情况。panic定义在console.c中,会以输出字符串在栈中的位置为基准,往回输出函数调用的eip值,根据这些eip值在kernel.asm查询,可以查看到函数的调用顺序,有以下两种情况:
情况1:panic->acquire->ideintr->trap->iderw->bread->initlog->trapret(由后往前输出)
该情况是在iderw的执行过程中发生了ide中断,最终导致递归锁的出现而发生panic。
情况2:panic->sched->yield->trap->iderw->bread->readsb->iinit->trapret(由后往前输出)
该情况是在iderw的执行过程中发生了时钟中断,调用了yield,yield请求yieldlock,然后调用sched,在sched中检测了cpu->ncli这个变量,因为这个cpu获得了两把锁,所以这个值是2,于是就panic了。

pcs存储的是eip值,通过getcallerpcs实现,getcallerpcs的参数v代表当前栈内的最后一个参数。我们知道,函数调用的时候栈的布局如下:

因此,ebp的位置是v的位置减去2。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//see in spinlock.c
// Record the current call stack in pcs[] by following the %ebp chain.
void
getcallerpcs(void *v, uint pcs[])
{
uint *ebp;
int i;

ebp = (uint*)v - 2;
for(i = 0; i < 10; i++){
if(ebp == 0 || ebp < (uint*)KERNBASE || ebp == (uint*)0xffffffff)
break;
pcs[i] = ebp[1]; // saved %eip
ebp = (uint*)ebp[0]; // saved %ebp
}
for(; i < 10; i++)
pcs[i] = 0;
}

file.c/filealloc开中断

假如在file.c/filealloc中acquire()之后添加sti(),并在每一个release()之前添加cli(),同时添加#include "x86.h",却不会发生panic现象。原因是file_table_lock保护的临界区足够短,时钟中断还来不及触发。如果在该临界区中加一段长时间的while循环,就会panic。

xv6 lock implementation

release()函数中,lk->pcs[0]和lk->cpu的清除工作必须在lk->locked置零前进行,这是因为如果在清除lk->locked之后清除,那么就会有一小段空隙,在这个空隙里调度到其他等待锁的程序,就会成功获得这个锁,然后设置lk->pcs[]和lk->cpu;此时再调度到原来的进程,原来的进程继续将lk->pcs[]和lk->cpu清除,就会导致出错。

补充

自旋锁与互斥量的区别:互斥量通过休眠使进程阻塞,不阻塞中断;自旋锁在获取锁之前一直处于忙等(自旋)状态,除了提供互斥机制以外,它们会阻塞中断(如果不阻塞中断,如果在获取自旋锁后发生中断,中断处理程序尝试获取同一个锁,并忙等待不返回)。

自旋锁可用于以下情况:锁被持有的时间短,而且线程并不希望在重新调度上花费太多的成本。

自旋锁常用场景:非抢占式内核,中断程序。

非抢占式内核使用自旋锁而不是互斥量的原因:(这里讨论的自旋锁包括xchg互斥和关中断两个部分)中断程序要求快速响应,不能阻塞。如果内核使用互斥量的概念,则可能获取锁之后发生中断,从而导致中断程序阻塞等待锁释放的情况发生。

显示 Gitment 评论