蓝色步行者

每个人都有自己的梦想


  • 首页

  • 归档

  • 标签

  • 分类

  • 搜索

笔记02 - HW01: x86 shell

发表于 2017-08-20 | 分类于 MIT6.828 Operating System Engineering

MIT 6.828 LEC1作业部分,实现shell重定向<>和管道|。
代码详见:github: jianzzz

使用方式1

$ gcc sh.c 编译以后产生 a.out 文件
执行$ ./a.out
分别输入以下命令:
ls > y
cat < y | sort | uniq | wc > y1
cat y1
rm y1
ls | sort | uniq | wc
rm y

使用方式2

将上述命令复制到 t.sh 中,执行$ ./a.out < t.sh

实现方式

解析命令过程

main函数按行读取命令,对于每一条命令,先创建子进程,然后调用parsecmd函数解析命令,并对parsecmd函数解析结果调用runcmd执行命令。

parsecmd函数处理命令字符串首尾指针后调用parseline函数,parseline函数调用parsepipe解析命令。

parsepipe函数首先调用parseexec函数解析第一个子命令(如果存在管道符号,则是第一个管道符号前的命令),返回解析的cmd结构体;如果存在管道符号,则递归调用parsepipe自身解析后面的命令,返回解析的cmd结构体;然后调用pipecmd命令将前后的两个结构体整合为管道cmd结构体。

parseexec函数解析子命令的退出条件是遇到管道符号。如果没有碰到管道符号,则解析紧接着的参数,如上述第二条命令的cat,接着调用parseredirs函数判断当前是否是I/O重定向(是否遇到了<>符号),是的话则读取重定向符号右端的参数(重定向文件),然后将重定向符号左端的cmd对象、重定向符号、重定向符号右端的重定向文件整合为重定向cmd结构体。

仔细阅读parseexec函数,cmd结构体强制转换为execcmd结构体,存储到execcmd结构体指针所指地址的数据实际上都存到了cmd结构体指针所指地址。在while循环中,如果碰到了重定向符<>,cmd结构体指针将被包含于redircmd结构体中(redircmd函数),redircmd结构体强制转换为cmd结构体后返回,此刻parseexec函数中cmd结构体指针真正指向的数据其实是redircmd结构体的数据,并且该redircmd结构体中的cmd指针真正指向的数据其实是execcmd结构体的数据;如果parseexec函数的while循环执行到此还没遇到到管道符|,则表示重定向符<>右端的参数个数大于1(如:cat < file -n),新读入的参数继续存储到execcmd结构体中,即cmd结构体指针所指的内存地址,即redircmd结构体中的cmd指针所指的内存地址。注意到:redircmd结构体只是存储了cmd结构体指针,因此新读入的参数继续存储到execcmd结构体中(即redircmd结构体中的cmd指针所指)并不会影响到redircmd结构体其他值的存储。

另一方面,可以观察出,pipecmd、redircmd等复杂结构体包含的命令对象指针是最简单的cmd结构体类型,而在传参和返回值方面,都会转换成cmd类型。这是可以学习的编程设计技巧。

执行命令过程

以cat < y | sort | uniq | wc > y1简要说明:
判断命令的类型,
1、如果是管道命令,则开启两个子线程分别负责管道左端和右端命令,父进程负责wait。fork之后,父进程和子进程都有指向管道的文件描述符。子线程c1关闭管道读端,将标准输出指向管道写端,然后执行左端命令如cat < y,执行结果将缓冲到标准输出即管道写端。子线程c2关闭管道写端,将标准输入指向管道读端,然后c2调用runcmd执行右端命令,如sort | uniq | wc > y1。c2子线程将作为父进程创建新的管道和新的两个子线程,c21将标准输出指向管道写端后执行sort,c22将标准输入指向管道读端后执行uniq | wc > y1。由于c1、c2,c21、c22执行顺序是不一定的,有可能出现c1进程还没执行完cat < y(还没写入到标准输出),c21已经重定向了标准输出,则c21执行sort时无法从标准输入读取到cat的结果;又或者是,sort命令还没从标准输入读取数据,c22线程已经重定向了标准输入,导致sort无法读取结果。个人解决方案是先调用wait让第一个线程完成工作。
2、如果是重定向命令,则根据命令的模式打开文件,注意如果是写入到文件,需要保证所有者可以读写文件;根据命令fd重定向标准输入/输出符后执行命令。
3、如果是普通执行命令,则调用execv,注意到exec函数执行成功后不返回到调用程序,所以可在exec函数调用后面编写执行失败检查代码。execv命令对象需要完整执行路径。也可以使用execvp函数,则无需完整执行路径。

笔记01 - x86 v6 book | Chapter 0

发表于 2017-08-20 | 分类于 MIT6.828 Operating System Engineering

什么是xv6?

xv6是Dennis Ritchie 和 Ken Thompson的Unix Version 6再实现版本。xv6大致延续v6的结构和风格,但使用ANSI C,并基于x86−多处理器被重新设计。

什么是OS?

An operating system is a program that manages a computer’s hardware. 一个操作系统是用来管理计算机硬件的一种程序。 The job of an operating system is to share a computer among multiple programs and to provide a more useful set of services than the hardware alone supports. 操作系统的工作是保证在多个程序间共享一台计算机并提供一个比单独硬件支持更有用的服务。It also multiplexes the hardware, allowing many programs to share the computer and run (or appear to run) at the same time. 它还多路传输硬件资源,允许许多程序在同一时间共享电脑和运行(或出现运行)。Finally, operating systems provide controlled ways for programs to interact, so that they can share data or work together. 最后,操作系统提供程序交互的控制方法,确保程序可以共享数据或一起工作。
看待OS的两种视图:
The small view: a h/w management library. 小视图:OS是一个硬件管理库。
The big view: physical machine -> abstract one w/ better properties. 大视图:对物理机的抽象,使其具备更好写性能。

system call 系统调用是什么?

When a process needs to invoke a kernel service, it invokes a procedure call in the operating system interface. Such a procedure is called a system call. The system call enters the kernel; the kernel performs the service and returns. Thus a process alternates between executing in user space and kernel space. 当一个进程需要调用内核服务时,它会在操作系统接口调用一个过程调用。这样的过程称为系统调用。该系统调用进入内核,内核执行服务和返回。这样一个过程在用户空间和内核空间之间的交替执行。The kernel uses the CPU’s hardware protection mechanisms to ensure that each process executing in user space can access only its own memory. The kernel executes with the hardware privileges required to implement these protections; user programs execute without those privileges. When a user program invokes a system call, the hardware raises the privilege level and starts executing a pre-arranged function in the kernel. 内核使用CPU的硬件保护机制,确保每个在用户空间执行的过程只能访问自己的内存。内核执行有硬件特权,以实现这些保护;用户程序执行没有这些特权。当一个用户程序调用系统调用,硬件提高特权水平并开始在内核中执行一个预先安排的函数。

xv6 process 进程

An xv6 process consists of user-space memory (instructions, data, and stack) and per-process state private to the kernel. 一个xv6进程由用户空间内存(指令、数据和堆栈)和进程状态组成,进程状态相对于内核来说是私有的。The instructions implement the program’s computation. The data are the variables on which the computation acts. The stack organizes the program’s procedure calls.指令实现了程序计算。数据是计算行为过程中的变量。堆栈组织了程序的过程调用。 Xv6 can time-share processes: it transparently switches the available CPUs among the set of processes waiting to execute. xv6进程是分时共享的,可以在等待执行的进程集合中透明地转变可用的CPU资源。When a process is not executing, xv6 saves its CPU registers, restoring them when it next runs the process. 当一个进程没被执行,xv6保存它的CPU寄存器并在下次执行的时候进行重建。The kernel associates a process identifier, or pid, with each process. 内核通过进程标识符或pid来关联一个进程。A process may create a new process using the fork system call.一个进程可通过fork系统调用来创建一个新进程。

fork()函数做了什么?

复制用户内存
       复制进程内核状态(e.g. user id)
子进程得到不同的PID
子进程状态包含父进程PID
以不同的值返回两次(在父进程中,fork返回新创建子进程的进程ID;在子进程中,fork返回0;如果出现错误,fork返回一个负值)

exec()函数做了什么?

用新内存镜像(从特定格式文件加载而来)替代当前正执行的进程内存
       xv6使用ELF格式的文件
执行成功后不返回到调用程序,相反,由文件加载而来的指令开始在ELF头声明的入口点处开始执行
包含两个参数:可执行文件的名称和字符串数组参数

fork allocates the memory required for the child’s copy of the parent’s memory, and exec allocates enough memory to hold the executable file. fork从父进程内存中复制,分配了子进程需要的内存。exec分配了足够的内存以控制可执行文件。

file descriptor 文件描述符是什么?

A file descriptor is a small integer representing a kernel-managed object that a process may read from or write to.一个文件描述符是一个非负小整数,代表了一个内核管理对象。进程可通过文件描述符进行读写。Internally, the xv6 kernel uses the file descriptor as an index into a per-process table, so that every process has a private space of file descriptors starting at zero. xv6内核内部使用文件描述符作为每个进程表的索引,每个进程都拥有文件描述符的私有空间(由0开始),(实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。)。By convention, a process reads from file descriptor 0 (standard input), writes output to file descriptor 1 (standard output), and writes error messages to file descriptor 2 (standard error). 习惯上,标准输入的文件描述符是 0,标准输出是 1,标准错误是 2。The shell ensures that it always has three file descriptors open, which are by default file descriptors for the console. Shell确保总是有三个文件描述符打开,这也是控制台的默认文件描述符。The close system call releases a file descriptor, making it free for reuse by a future open, pipe, or dup system call. close系统调用会释放文件描述符,使其可被open, pipe或者dup系统调用所重用。 A newly allocated file descriptor is always the lowest-numbered unused descriptor of the current process. 一个新分配的文件描述符总是当前进程序号最小的未使用的描述符。

以下内容来自wiki

上图是一个进程的文件描述符表、file表和inode表。注意到不同的文件描述符可以指向相同的file表项(比如dup系统调用的结果),以及多个不同的file表项可以指向相同的inode(比如文件被多次打开,inode表仍然保持精简,因为inode表通过文件名来标识inodes–即使inode可以有多个名字)。File descriptor 3不指向任何file,表明它被关闭了。

对于virtual file system (VFS),以上结构则有所变化。VFS是具体文件系统之上的一个抽象层,其目的是允许客户机应用程序以统一的方式访问不同类型的具体文件系统。参考Linux kernel map in printable PDF,可知VFS将目录当作files。在路径/bin/vi下,bin和vi都是files。bin是特殊的directory file,vi是regular file,存在一个inode同时代表这两个components。尽管存在这种统一,VFS经常需要执行一些目录操作,比如路径名查询。路径名查询涉及转换每个component的路径,确保它是有效的,然后继续查询下一个component。因此,VFS提出了目录条目(dentry)的概念,一个dentry是一个路径下的一个特定组件。比如说,/、bin、vi都是dentry对象。/和bin是directory,vi是regular file。这存在一个很重要的观点:dentry objects are all components in a path,including files。解析一个路径并遍历它的组件耗时且充斥着字符串比较,dentry对象使得整个过程变得更加容易。dentry还可能包括挂载点。路径/mnt/cdrom/foo下,组件/、mnt、cdrom\foo都是dentry对象。当执行目录操作时,VFS根据需要构造出dentry对象。

以下是某篇博客对上述内容的简介
       内核中,对应于每个进程都有一个文件描述符表,表示这个进程打开的所有文件。文件描述表中每一项都是一个指针,指向一个用于描述打开的文件的数据块———file对象,file对象中描述了文件的打开模式,读写位置等重要信息,当进程打开一个文件时,内核就会创建一个新的file对象。需要注意的是,file对象不是专属于某个进程的,不同进程的文件描述符表中的指针可以指向相同的file对象,从而共享这个打开的文件。file对象有引用计数,记录了引用这个对象的文件描述符个数,只有当引用计数为0时,内核才销毁file对象,因此某个进程关闭文件,不影响与之共享同一个file对象的进程.
       file对象中包含一个指针,指向dentry对象。dentry对象代表一个独立的文件路径,如果一个文件路径被打开多次,那么会建立多个file对象,但它们都指向同一个dentry对象。
       dentry对象中又包含一个指向inode对象的指针。inode对象代表一个独立文件。因为存在硬链接与符号链接,因此不同的dentry对象可以指向相同的inode对象。inode 对象包含了最终对文件进行操作所需的所有信息,如文件系统类型、文件的操作方法、文件的权限、访问日期等。
       打开文件后,进程得到的文件描述符实质上就是文件描述符表的下标,内核根据这个下标值去访问相应的文件对象,从而实现对文件的操作。

       注意,同一个进程多次打开同一个文件时,内核会创建多个file对象。
       当进程使用fork系统调用创建一个子进程后,子进程将继承父进程的文件描述符表,因此在父进程中打开的文件可以在子进程中用同一个描述符访问。

Why there are actually one page table per process?为什么通常一个进程一个页表,而不是整个系统一张页表?

Page tables are used to translate the virtual addresses seen by the application into physical addresses used by the hardware to process instructions; 页表被用来将应用程序看到的线性地址转换为硬件处理指令所用的物理地址。such hardware that handles this specific translation is often known as the memory management unit. 负责这种特定转换的硬件是内存管理单元。Each entry in the page table holds a flag indicating whether the corresponding page is in real memory or not. If it is in real memory, the page table entry will contain the real memory address at which the page is stored.页表中的每个条目使用标志指示是否对应于实际内存中的页面。如果是,页表条目将包含页面存储的真正的内存地址。If the page table entry for the page indicates that it is not currently in real memory, the hardware raises a page fault exception, invoking the paging supervisor component of the operating system. 如果页表条目表明页面目前不在实际内存中,硬件产生一个页错误异常,请求操作系统的分页管理组件。
Systems can have one page table for the whole system, separate page tables for each application and segment, a tree of page tables for large segments or some combination of these. 系统可以为整个系统设计一个页表,或每个应用程序和段拥有单独的页表,或为大段或它们的一些组合设计页表树。If there is only one page table, different applications running at the same time use different parts of a single range of virtual addresses. 如果只有一个页表,同时运行的不同应用程序使用一个范围内的线性地址的不同部分。If there are multiple page or segment tables, there are multiple virtual address spaces and concurrent applications with separate page tables redirect to different real addresses. 如果有多个页面或分段表,将存在多个线性地址空间,拥有单独页表的并发的应用程序将被重定向到不同的真实地址。
A page table usually has a fixed number of entries and therefore describes only a portion of the entire virtual address space. This is why you need multiple of them to cover the entire address space. 页表通常有固定数量的条目,因此只描述了整个线性地址空间的一部分。这就是为什么需要多个页表来覆盖整个地址空间。Now, in many OSes processes have individual (in other words, not shared with others) virtual address spaces, which helps to protect processes from one another. This is another reason for having multiple page tables.在许多操作系统过程中存在个人线性地址空间(换句话说,不与他人分享),这有助于保护进程,这是拥有多个页表的另一个原因。

Why fork and exec are not combined in a single call (separate calls for creating a process and loading a program) ? 为什么fork和exec不整合为一个简单的系统调用(为什么对创建进程和加载程序分别操作)?

File descriptors and fork interact to make I/O redirection easy to implement. 文件描述符和fork相互作用,容易实现I/O重定向。Fork copies the parent’s file descriptor table along with its memory, so that the child starts with exactly the same open files as the parent.fork复制了父进程的文件描述符表和内存,子进程开始执行时拥有相同的被打开的文件。 The system call exec replaces the calling process’s memory but preserves its file table. exec替代了当前调用进程的内存,但保留了文件描述符表。 This behavior allows the shell to implement I/O redirection by forking, reopening chosen file descriptors, and then execing the new program. 这个行为允许了Shell实现I/O重定向:fork创建子进程,重打开被关闭的文件描述符,最后exec执行新程序。
e.g. cat < input.txt 实现cat重定向
子进程关闭了文件描述符0后,由于0是当前最小的可用文件描述符,确保了open可以使用它。cat开始执行,并以文件描述符0(标准输入)为索引指向了input.txt。

1
2
3
4
5
6
7
8
char *argv[2];
argv[0] = "cat";
argv[1] = 0;
if(fork() == 0) { //创建子进程
close(0); //子进程释放文件描述符0(标准输入)
open("input.txt", O_RDONLY); //文件描述符0(标准输入)指向了input.txt
exec("cat", argv);//执行cat
}

The code for I/O redirection in the xv6 shell works in exactly this way. xv6 shell也是以这种方式进行I/O重定向的。Recall that at this point in the code the shell has already forked the child shell and that runcmd will call exec to load the new program. 回想一下,此时在代码中shell已经通过fork创建子进程shell,runcmd将调用exec加载新程序。Now it should be clear why it is a good idea that fork and exec are separate calls. 现在应该清楚为什么fork和exec单独调用是一个好主意。This separation allows the shell to fix up the child process before the child runs the intended program.这种分离允许shell在子进程运行目标程序前对子线程进程修正。

How two file descriptors share the same file offset? 两个文件描述符如何共享相同的文件偏移?

文件描述符会伴随着文件偏移地址,read和write系统调用会更新文件读写指针的偏移地址,共享了文件描述符则表示可以分别操作不同的文件描述符,使其作用于同一个文件。
Two file descriptors share an offset if they were derived from the same original file descriptor by a sequence of fork and dup calls. 如果两个文件描述符来自于同一个原始的文件描述符(通过fork和dup系统调用),则它们使用相同的文件偏移。Otherwise file descriptors do not share offsets, even if they resulted from open calls for the same file. 否则文件描述符不会共享文件偏移,即使是由open系统调用打开相同文件所产生的文件描述符。
e.g. write hello world into a file by fork

1
2
3
4
5
6
7
if(fork() == 0) { 
write(1, "hello ", 6);
exit();
} else {
wait();
write(1, "world\n", 6);
}

e.g. write hello world into a file by dup

1
2
3
fd = dup(1); 
write(1, "hello ", 6);
write(fd, "world\n", 6);

The dup system call duplicates an existing file descriptor, returning a new one that refers to the same underlying I/O object. dup复制现有的文件描述符,返回一个指向相同底层I/O对象的新的文件描述符。Dup allows shells to implement commands like this: ls existing-file non-existing-file > tmp1 2>&1. The 2>&1 tells the shell to give the command a file descriptor 2 that is a duplicate of descriptor 1. dup告诉shell文件描述符2复制于文件描述符1。 Both the name of the existing file and the error message for the non-existing file will show up in the file tmp1. 结果是现有的文件名称和不存在的文件错误消息将出现在tmp1文件。 The xv6 shell doesn’t support I/O redirection for the error file descriptor, but now you know how to implement it. xv6 shell不支持错误文件描述符的I/O重定向,我们可以通过上述方式进行实现。

What is pipe? 什么是管道?

A pipe is a small kernel buffer exposed to processes as a pair of file descriptors, one for reading and one for writing. 管道是一个小的内核缓冲区,以一对文件描述符的形式暴露给进程,一个用于读,一个用于写。Writing data to one end of the pipe makes that data available for reading from the other end of the pipe. 向管道的一端写入数据,使数据可用于管道另一端读取。Pipes provide a way for processes to communicate.管道提供了一种进程交互的方式。
e.g. wc 标准输入连向管道的读端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int p[2]; 
char *argv[2];
argv[0] = "wc";
argv[1] = 0;
pipe(p); //创建管道,记录读、写文件描述符到数组p中
//fork之后,父进程和子进程都有指向管道的文件描述符
if(fork() == 0) {
close(0); //释放标准输入文件描述符0
dup(p[0]); //复制管道读端到标准输入文件描述符0
close(p[0]); //关闭管道读端
close(p[1]); //关闭管道写端
exec("/bin/wc", argv); //执行wc,wc会从标准输入中读取,即从管道读端读取
} else {
write(p[1], "hello world\n", 12); //向管道写端写入
close(p[0]); //关闭管道读端
close(p[1]); //关闭管道写端
}

If no data is available, a read on a pipe waits for either data to be written or all file descriptors referring to the write end to be closed; 如果没有可用的数据,管道读端会阻塞等待,直到数据写入或者所有指向写端的文件描述符被关闭。in the latter case, read will return 0, just as if the end of a data file had been reached. 在后一种情况下,read会返回0,如同达到了文件末尾。The fact that read blocks until it is impossible for new data to arrive is one reason that it’s important for the child to close the write end of the pipe before executing wc above: if one of wc’s file descriptors referred to the write end of the pipe, wc would never see end-of-file. 子进程执行wc之前将写端关闭的一个重要原因是:读端会一直阻塞直到不可能出现新数据(关闭了所有写端)。如果存在指向写端的文件描述符没被关闭(如子线程未关闭),wc将不会看到文件的末尾,因为读端一直阻塞。
The xv6 shell implements pipelines such as grep fork sh.c | wc -l in a manner similar to the above code (8450).xv6 shell以类似的方式实现管道,例如grep fork sh.c | wc -l。 The child process creates a pipe to connect the left end of the pipeline with the right end. 子进程创建管道来连接管道的左端和右端。Then it calls runcmd for the left end of the pipeline and runcmd for the right end, and waits for the left and the right ends to finish, by calling wait twice. 然后子进程在左端和右端调用runcmd,并且两次调用wait等待左端和右端结束。The right end of the pipeline may be a command that itself includes a pipe (e.g., a | b | c), which itself forks two new child processes (one for b and one for c). Thus, the shell may create a tree of processes.管道的右端可能是包含管道的命令(可以察觉到,管道右端的命令将在子线程中实现)。因此,shell可能创建一棵进程树。 The leaves of this tree are commands and the interior nodes are processes that wait until the left and right children complete. 进程树的叶子节点是命令,内部节点是进程,进程将wait直至左右孩子节点完成工作。

What’s the differences between pipes and temporary files? 管道和临时文件之间的差别是什么?

Pipes may seem no more powerful than temporary files: the pipeline

1
echo hello world | wc

could be implemented without pipes as

1
2
echo hello world >/tmp/xyz;   
wc < /tmp/xyz

There are at least three key differences between pipes and temporary files. First, pipes automatically clean themselves up; with the file redirection, a shell would have to be careful to remove /tmp/xyz when done. 差别1,管道自动清理(缓冲区),文件重定向时shell必须在工作结束后小心移除临时文件。Second, pipes can pass arbitrarily long streams of data, while file redirection requires enough free space on disk to store all the data. 差别2,管道能传递任意长度的数据流,文件重定向要求足够大的硬盘空间来存储所有数据。Third, pipes allow for synchronization: two processes can use a pair of pipes to send messages back and forth to each other, with each read blocking its calling process until the other process has sent data with write.差别3:管道允许同步:两个进程可以使用一对管道来回发送消息,每个read阻塞调用进程直到其他线程使用write发送数据。

xv6 file system 文件系统

The xv6 file system provides data files, which are uninterpreted byte arrays, and directories, which contain named references to data files and other directories.xv6文件系统提供数据文件,包括未解释字节数组和目录,目录包含了被命名的数据文件的引用和其他目录。 Xv6 implements directories as a special kind of file. xv6将目录实现为一种特殊的文件。The directories form a tree, starting at a special directory called the root.目录形成了一棵树,从一个特殊的目录root开始。
create a new device file: mknod(“/console”, 1, 1);
Mknod creates a file in the file system, but the file has no contents. Mknod创建文件系统中的一个文件,但这个文件没有内容。Instead, the file’s metadata marks it as a device file and records the major and minor device numbers (the two arguments to mknod), which uniquely identify a kernel device. 相反,文件的元数据将其标记为一个设备文件,并且记录了主次设备号,设备号唯一标识了一个内核设备。(主设备号用来区分不同种类的设备,而次设备号用来区分同一类型的多个设备。对于常用设备,Linux有约定俗成的编号,如硬盘的主设备号是3。)When a process later opens the file, the kernel diverts read and write system calls to the kernel device implementation instead of passing them to the file system. 当进程后面打开文件时,内核将read和write系统调用转移到内核设备实现,而不是将他们传递到文件系统。
The file’s inode and the disk space holding its content are only freed when the file’s link count is zero and no file descriptors refer to it. 只有当文件链接数为0而且没有文件描述符指向它时,文件的inode和磁盘空间时才释放其内容文件。
Furthermore, an idiomatic way to create a temporary inode that will be cleaned up when the process closes fd or exits is: 此外,可按照以下惯用方式创建一个临时inode,当进程关闭fd或者离开的时候,该inode将被清理:

1
2
fd = open("/tmp/xyz", O_CREATE|O_RDWR); 
unlink("/tmp/xyz");

Xv6 commands for file system operations are implemented as user-level programs such as mkdir, ln, rm, etc. This design allows anyone to extend the shell with new user commands. xv6文件系统操作命令实现为用户级程序。这种设计运行任何人以新用户命令扩展shell(其他系统一般内置到shell里)。 One exception is cd, which is built into the shell (8516).cd命令除外,它是内置到shell的。 cd must change the current working directory of the shell itself. If cd were run as a regular command, then the shell would fork a child process, the child process would run cd, and cd would change the child’s working directory. The parent’s (i.e., the shell’s) working directory would not change. cd必须改变shell本身的当前工作目录。如果cd作为常规命令执行,shell会创建子进程,由子进程执行cd,cd将改变子进程的工作目录,父目录(shell的目录)不会被改变。
注:用户在命令行输入命令后,一般情况下shell会fork并exec该命令,但是shell的内建命令例外,执行内建命令相当于调用shell进程中的一个函数,并不创建新的进程.

笔记03 - Lab 3: Fault-tolerant Key/Value Service

发表于 2017-08-16 | 分类于 MIT6.824 Distributed Systems

综述

raft snapshot

随着客户端请求的不断增加,raft的日志项不断增加。为了压缩日志,raft采取快照snapshotting的方式,以某个日志项为基准,将之前的整个系统状态写入到快照文件中,然后删除该日志项之前的所有日志。快照内容通常是所有命令操作过的对象的最新值,作为快照基准的日志项必须是已提交的数据项。

raft的服务器中,无论是leader还是follower,一旦触发快照条件,服务器都会独立采取快照措施以压缩日志。此外,如果leader采取快照措施压缩了自身的日志,但是被丢弃的日志还未发送给follower;这种情况下,leader还需要发送快照到follower以更新follower的日志和快照。幸运的是,这种情况在正常操作中是不可能发生的,因为能与leader通信的follower一般已经保存了这些被丢弃的日志项。但是,网络原因或者follower响应慢或者新服务器加入了raft集群的话,都可能导致被丢弃的日志还未发送给follower。此时,为了保证这些follower的日志up-to-date,必须由leader在网络上向他们发送快照。

快照文件可以分块发送,follower接收到完整的快照文件之后,将丢弃快照所对应的日志部分,但保留快照文件之后的日志。快照文件也可能包含了follower日志不曾有过的内容,这时follower将丢弃整个日志。若follower有未提交的日志项(如被隔离的旧leader接收了请求但无法提交,最后恢复通信变成follower并接收了快照),也将被快照文件所替代。

快照策略偏离了raft的strong leader原则,因为follower本身也可以执行该快照策略,但这并没有破坏raft的状态一致性。follower仍然只能从leader接收日志数据,只不过follower现在可以重新组织他们的数据,但数据来源依然是leader。当然,follower可能先后接收到日志项和快照,因此可能出现先接收了更新的快照,而后才接收到旧的日志项,这种情况下follower应当忽略这些旧日志项。倘若只由leader来创建快照,然后一并发送给follower,将会产生两个问题:1、浪费网络带宽,降低快照处理速度,follower在满足日志压缩的条件下自行压缩日志将大大高效于从网络中接收快照文件并进行存储;2、leader的实现将更复杂,需要并行处理新日志发送和快照文件发送,以不阻塞接收新的客户端请求。

影响快照策略的性能问题有:1、服务器决定进行快照处理的时机。快照太过频繁将浪费磁盘带宽和资源,太过稀少则可能耗尽存储容量,并且重启的时候将消耗大量的时间来恢复日志。一个简单的解决设置阈值,如果该阈值比预期的快照大小大得多,则快照带来的磁盘带宽开销将会很小。2、写快照文件将会花费大量的时间,但我们不希望因此延迟了正常的操作。解决这个问题的方法是使用copy-on-write策略,可以利用操作系统的copy-on-write支持为整个状态机创建一个内存快照。

实验内容

Part 3A: Key/value service without log compaction

本实验部分将基于raft实现一个可容错的kv存储服务。kv存储服务将被构建为复制状态机,包含了多个kv服务器,每个kv服务器都基于raft日志来协调本身的活动。尽管可能发生网络分区或者其他错误,但只要大多数raft服务器是活跃的并且能够互相通信,kv存储服务就必须能够持续处理客户端的请求。

客户端-服务器的布局

kv存储服务体系包括了客户端和kv服务器,每个kv服务器关联到一个raft服务器。客户端发送PutAppend()、Get()的RPC请求到kv服务器上,kv服务器将客户端请求按次序追加到raft日志上。一个客户端可以向任何kv服务器发送rpc,如果kv服务器关联的raft服务器不是leader或者出现错误,客户端需要重新向其他服务器发送请求;各个kv服务器可面向多个客户端,并维护一致的key-value存储。一旦raft提交了对应的请求操作(并因此应用到kv状态机上),kv服务器需要向客户端发送回复。如果kv服务器发现请求操作无法被提交(例如更换了leader),kv服务器需要向客户端发送报告,客户端再重新向其他服务器发送请求。
相关的数据结构是:

1
2
3
4
5
6
7
type Clerk struct {
servers []*labrpc.ClientEnd //客户端可联系的kv服务器
}
type RaftKV struct {
me int
rf *raft.Raft //kv服务器关联的raft服务器
}

kv服务器职责

kv服务器底层借助raft进行实现,因此,kv服务器本身需要长时间等待raft的日志提交通知。针对已提交的操作,kv服务器需要存储key-value对的内容。客户端提供Put()、Append()、Get()三种接口,实际上转换为调用kv服务器可接受的PutAppend()、Get()两种RPC请求。kv服务器需要处理客户端的重复请求。kv服务器之间不进行通信,由其关联的raft服务器之间会保持通信。
相关的数据结构是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 客户端发送Put、Append请求
type PutAppendArgs struct {
Key string
Value string
Op string // "Put" or "Append"
ClerkId int64
ReqId int64
}
// 客户端发送Put、Append请求的回复
type PutAppendReply struct {
WrongLeader bool
Err Err
}
// 客户端发送Get请求
type GetArgs struct {
Key string
ClerkId int64
ReqId int64
}
// 客户端发送Get请求的回复
type GetReply struct {
WrongLeader bool
Err Err
Value string
}
// 客户端参数
type Clerk struct {
lastLeader int //存储已知关联到leader的kv服务器索引
reqId int64 //客户端发送请求序号
clerkId int64 //客户端标志序列
}
// kv服务器参数
type RaftKV struct {
applyCh chan raft.ApplyMsg //用于等待raft的日志提交通知
maxraftstate int //用于压缩raft日志的快照阈值
rpcResult map[int]chan Op //用于等待新追加的日志项的提交
kvMap map[string]string //用于存储kv键值对
ackMap map[int64]int64 //用于识别客户端的重复请求
}
// 日志项参数,kv服务器将客户端的请求转换为该结构,并追加到raft日志中
type Op struct {
Key string
Value string
Op string
ClerkId int64
ReqId int64
}

raft的日志项

raft的日志项内容是一个Op结构体,该结构体内容来自于客户端的请求参数,包括键、值、操作类型、客户端标识符、客户端请求序号。

kv服务器监听日志项提交

kv服务器接到客户端的请求后,将请求参数内容保存到Op结构体中,调用raft.Start()方法尝试将Op追加到raft日志中。如果raft.Start()返回参数中指明该kv服务器的raft服务器不是leader,则kv服务器直接回复客户端,回复参数WrongLeader为true,客户端再次向其他服务器发送请求。如果该kv服务器的raft服务器是leader,则根据返回的日志index,尝试使用select阻塞读取日志项提交结果。kv服务器使用了rpcResult map[int]chan Op数据结构来通知日志项已被raft提交的结果:首先,kv服务器启动时使用go routine监听raft的日志提交通知,即阻塞读取applyCh,一旦有读出,将读出结果Op写入到rpcResult[index]中,键是该日志项的索引index;其次,kv服务器每次成功调用raft.Start()之后(成功指的是服务器为leader),根据raft.Start()返回的日志项索引阻塞读取rpcResult[index],一旦读出成功,表明完成了客户端的请求操作(该表述不一定正确,因为网络分区可能造成在index索引处读取到的日志项并不是所等待的预期日志项,详见后面分析)。这样就完成了kv服务器监听raft日志项提交和通知日志项提交结果的工作。

kv服务器追加Get请求到日志

kv存储服务的一个设计原则是:当客户端发送请求到kv服务器时,如果kv服务器关联的raft服务器不是majority的一部分,则不应该完成该请求。当客户端发送Get请求时,kv服务器没有办法直接判断其关联的raft服务器是不是majority的一部分,因此,一个简单的解决方法是调用raft.Start()将Get请求操作追加到raft日志中,如果能够收到raft的日志项提交通知,说明其关联的raft服务器是majority的一部分,并且该Get请求可完成。

kv服务器对重复请求的处理

kv服务器如何标识客户端请求:
由于一个客户端可以向任何kv服务器发送rpc请求,因此kv服务器识别客户端请求时,不仅需要识别该请求是来自于哪个客户端,还需要识别某个客户端的不同请求。因此,本实验使用了两个字段来识别客户端请求,分别是客户端id和请求id。客户端id通过随机数产生,每个客户端在初始化的使用将请求id置为0,之后每次发送请求时,都将请求id加1。
kv服务器处理客户端重复请求的必要性:
客户端的重复请求应当被忽略。由于网络分区、网络丢包等原因,客户端可能多次发送同一请求,如果kv服务器不忽略这些重复请求,则写请求会破坏kv键值对的正确存储,造成数据破坏。
kv服务器如何识别重复的客户端请求:
kv服务器使用ackMap map[int64]int64结构来确认客户端请求,该map的key值是客户端id,value值是已确认的最大请求id。本实验使用了累加确认的方式,即一旦发现客户端请求id小于或等于已确认的最大请求id,则判断该请求是重复的,这要求客户端有序发送请求,而不能一次性发送多个请求。
kv服务器何时确认客户端请求:
原则:当客户端发送请求到kv服务器时,如果kv服务器关联的raft服务器不是majority的一部分,则不应该完成该请求。因此,kv服务器本可以在Get()和PutAppend()两个函数的入口处判断该请求是否是重复请求,以减少服务器的不必要开销,但这违反了上述原则。因此,本实验的方案是尝试将请求操作追加至raft日志中,一旦kv服务器收到raft的日志提交通知,则判断该请求是否是重复请求,如果是则不进行kv存储(对于重复请求的处理:仍需要通过channel通知日志项提交结果,且kv服务器按照请求操作成功进行回复即可,这样客户端收到预期回复后停止重发),否则根据已提交日志项进行kv存储。
kv存储的具体细节是:使用kvMap map[string]string数据结构进行存储,如果操作是Put,则kv.kvMap[args.Key] = args.Value;如果操作是Append,则kv.kvMap[args.Key] += args.Value;如果操作是Get,无需存储;最后,将该请求的请求id登记到对应的客户端id上(kv.ackMap),表明当前对该客户端id已确认的最大请求id。注意到,不需要对Get请求进行存储,但需要对Get请求进行确认。 另一个需要注意的问题是:判断是否是重复请求以及进行kv存储这两个操作需要在同一个锁范围内执行,否则,对于两个相同的请求,可能导致在进行kv存储前执行了两次判断操作,并一致认为该请求不重复,最终对同一请求追加了两次值。

补充:客户端如何发送请求

kv服务器处理请求但回复丢失(Call的调用结果被置为false),或者客户端超时重传(如果有),都可能造成客户端重复发送请求的情况。
设计1:客户端只有在本次发送请求并成功收到预期回复时才可以进行下次发送(本实验采取的设计)。对于这种情况,kv服务器接收到的每个客户端的请求将是有序的。那么,由于网络原因导致客户端重发请求时,kv服务器可以采用累加确认结果来判别是不是重复请求(当前请求id小于等于已确认的最大请求id则表明是重复请求)。累加确认方法只记录每个客户端的最高确认项。
设计2:客户端可并发发送多个请求。对于这种情况,请求id小的请求不一定先被kv服务器接收到,这意味着先被提交的请求操作,其请求id不见得更小。因此,为了能识别重复请求,应当对每个客户端的每个请求进行单独记录,即不再采用累加确认方法,该方案存在的缺点是kv服务器需要维护的数据量大大增加。另一种解决方案是仍旧采用累加确认方法,并且需要在确认更大的请求id之前提示客户端重传之前尚未确认的数据包,该方案需要重新设计客户端-kv服务器的通信协议,并且可能加大网络负载。

kv服务器等待日志项提交结果的通知时设置超时

kv服务器接收客户端请求并调用raft.Start()方法尝试将请求操作追加至raft日志中,如果调用成功且kv服务器所关联的raft服务器是leader,则kv服务器会阻塞读取rpcResult[index],以等待日志项提交结果的通知。在此,是否需要设置超时等待呢?
kv服务器等待的是索引index所对应日志项的提交结果通知,而没有针对具体的leader。raft可能在kv服务器等待期间更换了leader,考虑以下场景:
raft集群启动后,客户端向kv服务器发送rpc请求,kv服务器将请求操作发送到所关联的raft服务器leader A,然后等待提交结果通知。leader A将请求操作追加到日志,并且复制到其他服务器。其他服务器都成功接收到了日志,但此时leader A还未commit该日志项。这时leader A宕机,B成为新的leader。kv服务器若没有设置等待超时,则客户端会一直等待回复,不会发送新的请求;然而,leader B在当前新任期内倘若没有追加新的日志项,是不会提交旧日志项的(未提交的部分),导致kv服务器一直收不到日志项提交结果的通知,造成死锁。
因此,kv服务器需要在等待日志项提交结果的通知设置超时等待。这里有两种设计方案:1、一旦超时则直接告知客户端是wrong leader,客户端尝试从下一个服务器开始重发请求。2、使用for无限循环和select机制,阻塞等待日志项提交结果的通知和进行超时处理,一旦超时就检查当前所关联raft服务器的状态,一旦发现不是leader,则跳出循环,告知客户端是wrong leader,否则继续下一次select。

kv服务器收到日志项提交结果的通知后判断是否是预期结果/网络分区对kv服务器的影响

kv服务器接收客户端请求并调用raft.Start()方法尝试将请求操作追加至raft日志中,如果调用成功且kv服务器所关联的raft服务器是leader,则kv服务器会阻塞读取rpcResult[index],以等待日志项提交结果的通知。接收到通知后,是否可以直接断定该请求已完成了呢?
考虑以下场景:
一开始,kv服务器A所关联的raft服务器是leader,之后发生了网络分区,kv服务器A所关联的raft服务器处于少数机子可连通的分区,此时kv服务器A所关联的raft服务器仍然以为自己是leader。kv服务器A接收了客户端的请求,raft.Start()仍表明是leader,日志项的索引index为1,然后kv服务器A等待raft的日志项提交。大多数机子可连通的分区选举了新的leader,新leader接收了新日志项并成功提交(index为1)后,网络分区恢复了。这时候,kv服务器A所关联的raft服务器接收了来自于新leader的日志项并提交,kv服务器A收到了index为1的日志项提交通知,但此时该日志项并不是预期的日志项。这种情况下,应当告知客户端是wrong leader,让客户端重新发送请求。因此,kv服务器接收到通知后,应当判断接受结果是否等同于预期的请求操作。

客户端解决非leader问题

kv服务器发现是非leader的时机:
1、kv服务器调用raft.Start()追加请求操作到raft日志时,返回参数指明raft服务器是否是leader。
2、kv服务器使用for无限循环和select机制来阻塞等待日志项提交结果的通知和进行超时处理时,通过raft.GetState()查看当前raft服务器是不是leader。
客户端对非leader的处理:
1、客户端使用了lastLeader来记录已知的kv服务器索引。每次调用Call向服务器发送请求后,调用成功并且回复的wrongLeader为false时,更新lastLeader。
2、在网络可靠的情况下,这种做法很大几率地避免了轮询服务器发送请求的情况。但在网络不可靠的情况下,很有可能出现以下情况:调用Call向关联leader的kv服务器发送请求,由于Call失败转而向非leader重新发送请求;如果客户端支持并发发送请求,某个程序对于同个客户端的lastLeader的修改可能会影响其他程序对于lastLeader的读取,进而影响其他程序轮询服务器发送请求。

kv服务器重启的影响及数据持久化

kv服务器重启意味着之前的kv键值对数据丢失、确认数据丢失,后续请求可能获取不到正确的数据。在本实验中,kv服务器重启意味着使用相同的配置条件重新Make一个kv服务器,所以对应的raft服务器也是重新Make出来的。raft的启动将使用其持久化数据,对于kv服务器而言,有没有必要考虑数据持久化呢?
对于这个问题有两个解决方案,其一是仿照raft为kv服务器进行持久化数据存储,一旦更新kv服务器的数据则进行持久化存储,kv服务器启动时使用其持久化数据;其二是借助raft的底层实现,kv服务器本身不采取数据持久化的做法,具体方案是:
raft持久化数据不能包括已提交的最高索引和已应用的最高索引!当kv服务器重启时,其数据为空,如果raft持久化了已提交的最高索引和已应用的最高索引,则已提交过的日志项数据无法重构到kv服务器中。相反,若raft服务器重新启动时读取了持久化数据,并设置已提交的最高索引和已应用的最高索引均为0,则其所有日志项都会按索引顺序通知到kv服务器,kv服务器按顺序读出日志项内容后,对kv键值对和确认数据进行重构即可。当kv服务器重启后,对于新接收的请求操作,假如其日志项index为i,根据日志项提交通知的实现,只有当kv服务器收到index小于等于i的所有日志项的提交通知后才会对该请求操作进行回复,因此保证了数据的完整性。

Part 3B: Key/value service with log compaction

直至目前为止,kv服务器重启时将借助整个raft日志来恢复状态。但是,对于长时间运行的raft服务器来说,不可能一直保持完整的日志。本实验将借助raft和kv服务器之间的协作来节省空间:kv服务器时不时将当前状态持久化存储到快照中,raft则根据快照覆盖范围来丢弃日志。当kv服务器重启或者远远慢于leader的进度且必须跟进时,服务器首先安装快照,然后从创建快照的基准点之后开始恢复日志(实际上本实验没有进行硬盘操作,因此是先恢复了持久化的日志数据,然后根据快照对日志进行截断)。

参考In Search of an Understandable Consensus Algorithm一文,可大致了解到,raft的快照形式为:

结合到本实验中,实际上snapshot是由kv服务器提供的,而不是raft服务器自身产生,raft服务器的日志实现为业务无关型,当然也没有办法收集到所有命令操作过的对象的最新值。kv服务器接收到raft的日志项提交通知后,判断是否达到快照阈值,如果达到,则将本身的kv数据和请求确认数据写入到snapshot后,发送给对应的raft服务器,之后raft服务器再获取快照文件对应的最后一个日志项的index和term,然后和kv服务器传来的snapshot一起持久化到自身的snapshot中,并且压缩自身的日志。

倘若leader丢弃的日志还未复制给follower,则leader会通过rpc将最新的snapshot发送给follower,具体格式为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
快照请求数据:
term: leader的任期号
leaderId: leaderId用于选民对客户机请求重定向
lastIncludedIndex: 快照文件所囊括的日志项上限的索引(包括lastIncludedIndex)
lastIncludedTerm: lastIncludedIndex对应的日志项任期号
offest: 数据块在整个快照文件中的偏移(byte)
data[]: 快照文件的某个字节码数据块,从offest开始
done: 是否是最后一个数据块

快照回复数据:
term: 当前任期,用于leader更新自己的任期

快照的接收实现:
1、如果leader的任期号term < 当前任期currentTerm,直接回复
2、如果是第一个数据块(offest为0),创建snapshot
3、将data写入到snapshot的offest处
4、如果不是最后一个数据块,回复并等待其他数据块
5、保存snapshot文件,丢弃任何已存在的或局部的index更小的snapshot
6、如果已存在的日志项拥有snapshot囊括的最后一个日志项,丢弃该日志项之前的所有日志,维护之后的日志,回复
7、丢弃整个日志
8、使用snapshot的数据重设状态机(加载snapshot的集群配置)。

本实验没有将snapshot分块传输,而是直接一整块发送。raft的follower接收到leader的snapshot之后,将该snapshot数据回传给上一层的kv服务器,而kv服务器使用该snapshot更新自身的kv数据和请求确认数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
快照请求数据:
term: leader的任期号
lastIncludedIndex: 快照文件所囊括的日志项上限的索引(包括lastIncludedIndex)
lastIncludedTerm: lastIncludedIndex对应的日志项任期号
data[]: 快照文件的某个字节码数据块,从offest开始

快照回复数据:
term: 当前任期,用于leader更新自己的任期
nextIndex: 回复者告知leader下次发送的日志项索引

快照的接收实现:
1、如果leader的任期号term小于当前任期currentTerm,直接回复
2、如果当前任期currentTerm小于请求参数中leader的任期号,更新当前任期号,立即将状态转换为选民。
3、返回参数中附上当前任期号。
4、往chanHeartBeat写入true。
5、判断当前commitIndex是否大于lastIncludedIndex,是则将返回参数的nextIndex赋值commitIndex+1,返回。
6、保存snapshot文件。
7、如果已存在的日志项拥有snapshot囊括的最后一个日志项,丢弃该日志项之前的所有日志,维护之后的日志,否则丢弃整个日志。
8、更新commitIndex和lastApplied为lastIncludedIndex。
9、使用snapshot的数据重设状态机(加载snapshot的集群配置)。
10、将返回参数的nextIndex赋值lastIncludedIndex+1,返回。

快照策略执行的流程

本实验的快照策略执行流程为:1、tester在创建kv服务器时绑定了快照阈值maxraftstate;2、kv服务器接收客户端请求并发送给对应的raft服务器,然后等待日志提交结果的通知,每接收到一个通知之后,判断maxraftstate与关联的raft服务器的持久化数据的大小(包括日志),一旦超出maxraftstate,kv服务器将自身的kv数据和请求确认数据打包到snapshot中,连同刚通知的日志项index发送给raft服务器;3、raft服务器根据index获取已提交日志项的任期号,压缩自身日志,然后将index、term、kv服务器的snapshot打包存储为自身新的snapshot中;4、任何raft服务器均可以响应对应kv服务器的快照并压缩自身的日志和进行snapshot存储。5、leader心跳包发送日志项期间,假如发现有未发送的日志项被丢弃,则采取将持久化的snapshot发送到follower的措施;6、follower接收到snapshot后,以该snapshot为准进行日志压缩和snapshot存储,并将snapshot发给kv服务器以更新kv服务器自身的kv数据和请求确认数据。7、raft服务器重启时,首先读取状态持久化数据,恢复日志,然后读取快照数据,进行日志截断,最后将snapshot发给kv服务器以更新kv服务器自身的kv数据和请求确认数据。

raft的日志压缩

raft的日志没有使用磁盘存储,而是采用了slice实现,因此为了能够保证日志截断后的正常索引,raft的日志项需要存储index。每次截取日志后,raft将lastIncludedIndex和lastIncludedTerm对应的日志项作为log[0],该日志项已经被丢弃,将其作为当前第一个日志项在某些场景下可以带来计算方便,比如InstallSnapshot RPC参数中的lastIncludedIndex和lastIncludedTerm的获取。

快照内容的构成

kv服务器在监听到raft的日志项提交通知后,判断是否需要执行快照策略。对于kv服务器来说,其需要识别重复的客户端请求,也需要存储kv键值对内容,因此由kv服务器发给raft服务器的快照内容包括了kv数据和请求确认数据,同时还包括此时被通知的日志项的index。raft服务器则是根据index获取到对应的日志项的term,然后将这两者作为lastIncludedIndex和lastIncludedTerm,连同kv服务器发来的快照内容一起打包进了snapshot中。
由此也可看出,快照的内容针对的是已提交的日志项及其导向的kv数据,未提交的日志项数据不会存储在快照中。

快照策略的时机

三个时机:
1、kv服务器根据快照阈值maxraftstate和raft服务器的持久化数据的大小通知其所关联的raft服务器执行快照策略;
2、leader在发送日志项的过程中,根据nextIndex[i]和log[0].Index判断是否有未发送的日志项被丢弃。如上所述,log[0]对应的是最新的快照文件所囊括的最大的日志项;而nextIndex[i]的更新时机是在leader成功向follower追加日志并接收到回复;因此,如果nextIndex[i] <= log[0].Index,说明对于该follower有未发送的日志项被丢弃的现象,leader应该将其snapshot发送给follower,而不是发送日志。由此也可看出,leader发送snapshot是在广播发送心跳包/日志项的过程中额外添加的处理,如果发送失败,则在后面的心跳包期间会继续尝试发送。另一方面,由于网络原因,可能出现leader发送追加日志后又发送更新的snapshot,而后follower先接收处理snapshot,再处理过时的追加日志,这种时候应该将过时的追加日志丢弃(判断条件是参数中PrevLogIndex小于follower快照中的最后一个日志项的index)。
leader成功发送snapshot之后,应当及时更新nextIndex[i]和matchIndex[i];follower成功接收到snapshot后,应当及时更新commitIndex和lastApplied为lastIncludedIndex。
注意:考虑以下情景:1、leader发送日志和commitIndex N到follower A,A追加日志,并将日志提交到索引N;2、A回复leader,回复丢失,leader无法更新nextIndex[];3、leader压缩日志,根据nextIndex[]和快照内容,判断出该follower有未发送的日志项被丢弃的现象,leader发送snapshot和lastIncludedIndex M;4、A接收snapshot,压缩日志,更新commitIndex和lastApplied为M;5、由于M < N,(M,N]范围内的日志项再次被提交。上述场景导致日志项被重复提交,在我们的实验中,raft通知kv服务器(写入channel ch1)的时候是需要加锁的,而kv服务器读取通知后,会将通知结果写入到channel ch2中,kv服务器会有另外的程序等待读取该channel ch2以判断请求是否完成;因此,如果日志项被重复提交,kv服务器读取通知后会再次将结果写入到channel ch2中,但可能没有程序读出该channel ch2(因为这不是一次新请求),从而导致kv服务器读取通知程序被阻塞,进而导致raft通知阻塞,没办法释放锁。因此,follower在接收到leader的快照时,应当判断commitIndex是否大于lastIncludedIndex,如果是,则直接忽略该snapshot,并将返回参数的nextIndex赋值为commitIndex+1。
3、raft重启后,恢复持久化数据,然后读取快照数据,进行日志截断,最后将snapshot发给kv服务器以更新kv服务器自身的kv数据和请求确认数据。raft读取快照后,应当及时更新commitIndex和lastApplied。

快照处理请求过时

raft服务器每次更新commitIndex之后,都会将(lastApplied,commitIndex]区间内的日志项逐一通知到kv服务器,这期间需要加锁;而kv服务器在监听到每一个日志项提交通知后都会判断是否需要发送快照数据,一旦需要则会反过来通知raft服务器进行快照处理,raft服务器进行快照处理也需要进行加锁。因此,kv服务器通知raft服务器进行快照处理需要通过go runtine实现,而不能阻塞等待,否则会造成raft服务器死锁。
采取go runtine通知raft服务器进行快照处理后,则可能出现多个快照处理的通知都被阻塞,直到(lastApplied,commitIndex]区间内的日志项逐一通知完毕后raft释放了锁,然后随机竞争。因此,raft执行快照处理前需要判断kv服务器传递的index是否已经小于log[0].Index(lastIncludedIndex),一旦是,则表明已经有更新的快照被保存了,当前的快照处理是过时请求,应当忽略。

测试结果

测试:

1
2
$ cd "$GOPATH/src/kvraft"
$ go test

结果:

笔记02 - Lab 2: Raft

发表于 2017-08-16 | 分类于 MIT6.824 Distributed Systems

综述

什么是Raft

Raft是实现分布式一致性的协议。在一个分布式系统中,因为各种意外可能,有的服务器可能会崩溃或变得不可靠,它就不能和其他服务器达成一致状态。这样就需要一种Consensus协议,一致性协议是为了确保容错性,也就是即使系统中有一两个服务器当机,也不会影响其处理过程。为了以容错方式达成一致,我们不可能要求所有服务器100%都达成一致状态,只要超过半数的大多数服务器达成一致就可以了,假设有N台服务器,N/2 +1 就超过半数,代表大多数了。Raft为了实现Consensus一致性这个目标,如同选举一样,参选者需要说服大多数选民(服务器)投票给他,一旦选定后就跟随其操作。

Raft的服务器状态

Follower:类似选民,完全被动。
Candidate:候选人,可以被选为一个新的领导人。
Leader:处理所有客户端交互,日志复制等,一般一次只有一个Leader。

Leader Election:领导人选举

服务器以Follower的状态启动,如果没有收到Leader信息,那么Follower可以成为Candidate。Candidate向其他节点发起投票,其他节点回复投票,如果Candidate获得大多数投票,则Candidate成为Leader,对系统的任何更改将首先通过Leader。

控制Election的两种timeout设置

1、election timeout:
election timeout表示Follower等待变成Candidate的时间。election timeout是150ms到300ms之间的一个随机数。Follower变成Candidate之后,开始一个新的election term选举任期(对term进行编号),并投票给自己。之后,Candidate发送一个投票请求Request Vote到其他节点;如果节点在该election term尚未进行过投票,则投票给该Candidate,记录当前选举任期,并重新设置自身的election timeout。一旦Candidate获得大多数投票,Candidate成为Leader。
2、heartbeat timeout:
每过一个心跳包时间,Leader会发送心跳包给它的Follower。此外,对于客户机发来的新请求,Leader在下一个心跳包时刻将Append Entries新增条目发送给Follower。Follower对Append Entries新增条目进行响应,并重新设置自身的election timeout。该选举任期election term将会持续,直至Follower在election timeout时间内没有接收到心跳包,则Follower成为新的Candidate。注意:Follower的响应并不会更改Leader或Candidate的election timeout。

如何确保每届选举任期内只有一个Leader

Candidate需要获得大多数投票才能开始新的选举任期election term,这确保了每届选举任期内最多只有一个Leader。如果一个Leader宕机了,新的Leader将开始新的任期,旧的Leader保持旧的任期状态。

split vote:分裂投票

如果两个Follower同时变为Candidate,并发出投票请求Request Vote,各有一半的Follower收到请求并进行投票;由于这两个Candidate处于同一届新任期,它两不会互相投票,Follower对某个任期的Candidate投票后也不会再对其他Candidate投票,因此两个Candidate都不能获得到大多数投票,这种现象称为split vote。处于split vote阶段的Candidate将各自等待自己的election timeout过期,在此期间可能会有其他Follower成为新的Candidate并请求投票,又或者与旧的Candidate同时成为新的Candidate,再次引发分裂投票。

Log Replication:日志复制

对系统的每一次更改都会在Leader上添加一个日志项,该日志项还未committed,所以Leader的值还没更新。为了commit该日志项,Leader首先将日志项副本发送给它的Follower,然后等待,直至大多数Follower写入该日志项并返回响应;接着Leader commit该日志项,更新系统的值,向客户机返回响应,然后在下一个心跳包时刻通知它的Follower该日志项已经被commit,接到通知的Follower也commit该日志项,保持集群的一致性。

网络原因可能导致某些节点无法接收到Leader的心跳包,并选举出新的Leader,这时候集群内会存在不同选举任期的Leader和Follower。客户机发送给旧Leader的新请求会记录在旧Leader的日志上,旧Leader的Follow日志也会记录该新请求,但旧Leader以及它的Follow都无法commit该日志项,因为旧Leader无法获得大多数节点的响应。客户机发送给新Leader的新请求会记录在新Leader的日志上,并成功commit,因为新Leader可以获得大多数节点的响应。网络分区恢复后,处于新选举任期的Leader和Follow将忽略处于旧选举任期的Leader发出的心跳包,而处于旧选举任期的Leader和Follower将响应新选举任期的Leader发出的心跳包。处于旧选举任期的Leader和Follower将回滚未提交的日志项,并匹配新Leader的日志,保持集群的一致性。

注意:网络原因导致网络分区时,如果新分区的节点数少于等于总数的一半,则该分区无法产生新Leader;如果新旧分区节点数各占一半,则Raft停止服务。

实验内容

Introduction

本章节将构建一个可容错的kv存储系统。在这个实验中,将实现Raft复制状态机协议。下一个实验将在Raft基础上构建一个kv服务,然后在多个复制状态机上共享该服务,获得更高的性能。
复制服务(如kv数据库)是通过将数据副本存储在多个副本服务器来实现容错的,副本服务器确保了主服务器宕机的时候可以继续提供服务。实现副本服务器的挑战是宕机可能导致副本服务器存储了不同的备份数据。

Raft管理着一个服务的状态副本,并在服务失败后找出正确的状态。Raft实现了一个复制状态机,它将客户机请求按序列组织成日志log,并确保所有副本服务器的日志内容是一致的。每一个副本服务器按照日志记录的请求顺序执行客户机请求,并将这些请求应用到副本服务器的本地服务状态副本上。由于所有活跃副本服务器的日志内容是一致的,所以他们按照相同的顺序执行相同的请求,并继续保持着相同的服务状态。假如一个服务器宕机并随后恢复,Raft负责将其日志状态进行更新。只有在大多数机子是活跃的并且可以互相通信的情况下,Raft才会继续工作;否则Raft将停止工作,并将在大多数机子活跃的情况下重新从离开的地方恢复运行。

本实验将借助相关方法将Raft实现为go的一个对象类型,进而可以作为一个调用模块在更大的服务中被调用。Raft实例之间将使用rpc进行通信,以维护日志副本。Raft接口将支持已编号命令的一个不定序列,称为日志条目/日志项。日志项对应一个索引号,并最终被commit。日志项最终将有Raft发送至更大的服务中执行。

本实验不同Raft实例只能通过rpc进行通信,不能共享go变量,也不能实现文件。本实验将实现Raft大多数设计思想,包括状态持久化存储、节点宕机并重启之后的状态读取,但不会实现集群成员变化和日志压缩/快照。

参考资料:
illustrated guide to Raft
raft-extended
Students’ Guide to Raft

The code

src/raft提供了框架代码和测试案例,src/labrpc提供一个简单的类rpc系统。扩展raft/raft.go以实现Raft。需要实现以下调用接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// create a new Raft server instance:
rf := Make(peers, me, persister, applyCh)
创建一个Raft peer,peers参数是已建立的rpc连接数组,每一个连接对应一个peer,包括将创建的peer本身。me参数是该peer在peers数组的下标。

// start agreement on a new log entry:
rf.Start(command interface{}) (index, term, isleader)
请求Raft将命令追加到日志副本中,该函数不阻塞等待执行过程,而是立即返回。

// ask a Raft for its current term, and whether it thinks it is leader
rf.GetState() (term, isLeader)

// each time a new entry is committed to the log, each Raft peer
// should send an ApplyMsg to the service (or tester).
type ApplyMsg

src/labrpc

src/labrpc提供一个简单的类rpc系统,用于模拟网络的客户机和服务器情况。labrpc使用Network结构体对客户机ClientEnd和服务器Server进行组合,该结构体使用map保存客户机和服务器结构,Network的connections是客户机名到服务器名的映射,表示客户机是否连通服务器。Network拥有一个endCh,该数据是一个chan reqMsg,用于阻塞读取发送到网络的数据。ClientEnd在创建之时会复制该endCh,通过对其channel副本写入数据,模拟发送到网络中。网络一旦从endCh中读取到请求信息reqMsg,则根据请求信息找到对应的服务器Server,服务器对请求进行处理,然后将处理结果写入到reqMsg中的replyCh,该响应信息也是一个channel。网络调用Server的服务时,同时开启了超时机制,一旦超时,判断服务器是否宕机,是则返回服务出错响应,否则继续等待服务调用结果。
注意:ClientEnd发送请求信息前就创建了reqMsg->replyCh,然后往endCh副本写入reqMsg(阻塞),直至网络读取reqMsg之后,ClientEnd转而阻塞读取reqMsg->replyCh;而网络从endCh读出reqMsg后,交由对应服务器进行处理,处理结果只需写回reqMsg->replyCh即可。由此可以看出go channel的神奇之处:将包含channel的结构体A写入channel中,另一方从该channel副本中读出到结构体B中,A和B仍共享同一个channel。

大致流程:
1、调用MakeNetwork创建网络,阻塞读取endCh,即客户机请求。
2、调用Network.MakeEnd创建客户机,将其加入到网络中,状态未激活。
3、调用MakeService将用户定义的结构体及其接口封装为服务Service,使用的是reflect技术。
4、调用MakeServer创建服务器。
5、调用Server.AddService将服务添加到服务器中,表示该服务器提供该服务。
6、调用Network.AddServer将服务器加入到网络中。
7、调用Network.Connect将客户机映射到服务器,表示该客户机连通该服务器。
8、调用Network.Enable激活客户机。
9、调用ClientEnd.Call方法,调用服务器服务,等待响应结果。

本实验raft之间的通信是利用了ClientEnd.Call方法,具体实现是在每个raft服务器上建立ClientEnd数组,每个ClientEnd负责与对应的raft服务器进行通信(调用服务器服务,等待响应结果)。

Part 2A

此部分主要实现raft领导选举以及心跳包,实现的目标是选举一位leader,如果没有出错则维持该leader,如果旧leader出错或者数据包丢失了(发给leader的或者是leader发出的),则选举新的领导。
参考In Search of an Understandable Consensus Algorithm一文,可大致了解到,raft的状态数据包括:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
所有服务器上的持久化状态:在响应RPCs前更新到稳态存储中
currentTerm raft服务器已知的最新任期号(启动时初始化为0,持续递增)
voteFor 当前任期内收到选票的候选者Id(表明投票给了谁)
log[] 日志项集,每一个日志项包含了任务状态机的命令,以及收到该日志
项时的领导任期,日志项的初始索引是1

所有服务器上的易变状态:
commitIndex raft服务器已知的已提交的日志项最高索引(初始化为0,持续递增)
lastApplied raft服务器已知的已应用的日志项最高索引(初始化为0,持续递增)

leader服务器上的易变状态:选举之后重新初始化
nextIndex[] 对于每一个服务器,leader需要发送给该服务器的下一个日志项索引
(初始化为leader的最新日志项索引+1)
matchIndex[] 对于每一个服务器,leader已知的复制到该服务器的日志项最高索引
(初始化为0, 持续递增)

发送和处理投票请求的数据和逻辑为:

1
2
3
4
5
6
7
8
9
10
11
12
13
投票请求数据:  
term 候选人启动的任期号
candidateId 候选人Id
lastLogIndex 候选人最新的日志项索引
lastLogTerm 候选人最新的日志项对应的任期号

投票回复数据:
term 当前任期,用于候选人更新自己的任期
voteGranted true表示候选人获得选票

投票请求的接收实现:
1、如果候选人启动的任期号term < 当前任期currentTerm,回复false
2、如果voteFor为空或是候选人Id,并且候选人的日志至少与选民的日志一样新,选民投票给该候选人

发送和处理心跳包/日志项的数据和逻辑为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
心跳包/日志项请求数据:  
term leader的任期号
leaderId leaderId用于选民对客户机请求重定向
prevLogIndex 紧接着新条目的先前的日志项索引
prevLogTerm 索引为prevLogIndex的日志项的任期号
entries[] 待存储的日志项(心跳包为空)
leaderCommit leader已知的已提交的日志项最高索引

心跳包/日志项回复数据:
term 当前任期,用于leader更新自己的任期
success 当选民的日志项匹配prevLogIndex和prevLogTerm时,回复true

心跳包/日志项请求的接收实现:
1、如果leader的任期号term < 当前任期currentTerm,回复false
2、如果选民的日志项不匹配prevLogIndex和prevLogTerm时,回复false
3、如果已有的日志项与新日志项冲突(索引相同,任期不同),删除该日志项和后续所有日志项
4、添加所有不存在的日志项
5、如果leader已知的已提交的日志项最高索引 > 选民已知的已提交的日志项最高索引,将选民的commitIndex更新为min(leader已知的已提交的日志项最高索引,新日志项的最高索引)

选民、候选者、领导状态转换逻辑为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
对于所有服务器:
1、如果raft服务器已知的已提交的日志项最高索引commitIndex > raft服务器已知的已应用的日志项最高索引lastApplied,增大lastApplied,应用log[lastApplied]到状态机上。
2、如果RPC请求或响应中term T > 当前任期号currentTerm,设置currentTerm=T,身份转换为选民。

对于选民:
1、对候选人和leader的RPC请求进行响应。
2、如果在选举超时计时内没有收到leader的AppendEntries RPC或候选人的投票请求,身份转换为候选人。

对于候选人:
1、转换为候选人时,开始选举:
递增currentTerm
选举自己
重设选举超时计时
发送投票请求到其他所有服务器
2、如果收到了大多数选民的投票,身份转换为leader
3、如果收到了新leader的AppendEntries RPC,身份转换为选民。
4、如果选举计时超时,启动新一轮选举。

对于leader:
1、向每一个server发送初始的空AppendEntries RPC(心跳包),在每个空闲时期重复该操作以避免选举超时。
2、如果接收到客户机的命令:在本地追加日志项,在日志项应用到状态机之后回复客户机。
3、如果最后的日志索引大于等于nextIndex[](leader需要发送给每一个服务器的下一个日志项索引):从nextIndex开始发送携带日志项的AppendEntries RPC,如果成功则更新nextIndex和matchIndex,如果由于日志不一致导致失败,递减nextIndex并重试。
4、如果存在N > leader已提交的日志项最高索引commitIndex,且大多数matchIndex[i] >= N,且log[N].term == 当前任期号,设置commitIndex为N。

Part 2A解决思路

根据以上描述,在领导选举阶段,可使用3种channel来控制raft的状态变化:chanLeader表示已成为leader、chanHeartBeat表示收到心跳包/日志项、chanSuccVote表示成功进行投票。
另外,在收到投票请求时、收到投票请求回复时、收到心跳包/日志项时、收到心跳包/日志项回复时,如果收到更新的任期号,则应该立即将服务器任期号更新,并将状态更新为选民;注意不要采用写入channel进行通知,并在channel读出端统一再转换为选民身份的做法。这是因为:接收请求的处理或接收回复的处理过程中需要加锁来修改服务器数据(调用rf.mu.Lock()),如果在处理过程中收到更新的任期号并更新了服务器任期号,但却没有及时将身份转换为选民身份,而是写入channel进行通知,那么意味着在channel读出端的go routine中需要竞争加锁(调用rf.mu.Lock())来转换身份;这种情况下,很有可能导致旧leader任期号更新了,身份却还没转变,则会导致一个任期内存在两个leader的现象。同时,外部调用rf.GetState()获取服务器状态应该时,rf.GetState()也应该加锁进行读取。

此部分需要扩展的数据结构包括:
用于维持服务器状态:Raft

1
2
3
4
5
6
7
state:状态
voteCount:获得选票数量
chanSuccVote:成功投票的通知
chanHeartBeat:收到心跳包或日志项的通知
chanLeader:成为leader的通知
currentTerm:当前任期号
votedFor:投票给谁

用于发送投票请求:RequestVoteArgs

1
2
Term:候选人的任期号
CandidateId:候选人的id,用于投票者记录投票给了谁

用于接收投票请求回复:RequestVoteReply

1
2
Term:回复者的任期号
VoteGranted:是否获得选票

用于发送rpc心跳包或日志项:AppendEntries

1
Term:leader的任期号

用于接收rpc心跳包或日志项回复:AppendEntriesReply

1
2
Term:回复者的任期号
Success:如果回复者的日志匹配了prevLogIndex和prevLogTerm,返回true,在leader选举阶段直接返回true即可

通知状态变化具体表现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
当raft服务器状态为选民时,使用select进行多路复用:
1、<- chanSuccVote,已成功投票,进入下一次超时计算。
2、<- chanHeartBeat,已成功收到心跳包/日志项,进入下一次超时计算。
3、<- time.After(time.Duration(rand.Int63()%150+150)*time.Millisecond),超时,状态转换为候选人。

当raft服务器状态为候选人时,递增选举号,投票给自己,然后广播请求选民进行投票,之后,使用select进行多路复用:
1、<- chanLeader,已成为leader(状态已转换为leader)。
2、<- chanHeartBeat,已成功收到心跳包/日志项,如果在等待期间收到更大任期号,那么在接收心跳包/日志项的处理过程中已将候选人的状态转换为选民,因此这里无需做任何处理。
3、<- chanSuccVote,已成功投票(状态已转换为选民)。注意到候选人首先会投票给自己,在相同的任期号内不会再投票给其他人;因此如果收到chanSuccVote通知,说明在等待期间收到更大任期号,那么在接收投票请求的处理过程中已将候选人的状态转换为选民,因此这里无需做任何处理。
4、<- time.After(time.Duration(rand.Int63()%150+150)*time.Millisecond),超时,状态不变,进入下一轮选举。

当raft服务器状态为leader时,使用select进行多路复用:
1、<- chanSuccVote,已成功投票(状态已转换为选民)。旧leader如果收到候选人的更大任期号,则在接收投票请求的处理过程中已将leader的状态转换为选民,因此这里无需做任何处理。
2、<- time.After(HEARTBEATINTERVAL),广播发送心跳包/日志项。
3、注意leader无需对心跳包/日志项进行多余处理,如果该leader收到的心跳包/日志项携带了更大任期号,则旧leader会在接收心跳包/日志项的处理过程中将状态转换为选民,因此我们只需要在上述步骤2广播发送心跳包/日志项的过程中附加判断当前服务器状态是不是leader就可以了,一旦不是leader则不进行发送。另一方面,如果leader收到一心跳包就跳出select,则可以进行攻击,从而影响到正常的心跳包/日志项的发送。

注意,Part 2A不需要考虑任何关于日志项的字段。
候选人广播发送投票请求,当接收者接收到请求后,作出以下判断:

1
2
3
4
1、如果当前任期号大于请求参数中候选人启动的任期号,则不投票,返回参数中附上当前任期号并直接返回。
2、如果当前任期号小于请求参数中候选人启动的任期号,更新当前任期号,voteFor置为-1,立即将状态转换为选民。
3、返回参数中附上当前任期号。
4、如果voteFor值为-1,要么是当前任期内还没投过票,要么是接收了新的任期号,此时往chanSuccVote写入true,返回参数中VoteGranted设为true,voteFor置为候选人Id。

候选人广播发送投票请求后,对于每一个收到的请求结果,作出以下判断:

1
2
3
4
1、如果服务器状态已经不是候选人,直接返回。
2、如果发送参数中的任期号已经不等于当前任期号,直接返回。
3、如果返回参数中的任期号大于当前任期号,更新当前任期号(导致步骤2的出现),候选人状态立即转换为选民(导致步骤1的出现),返回。
4、如果返回参数中VoteGranted为true,表明收到选票,如果当前服务器状态仍是候选人状态,计算投票人数是否超出一半,是则往chanLeader写入true,候选人状态立即转换为leader。

leader广播发送心跳包/日志项请求,当接收者接收到请求后,作出以下判断:

1
2
3
4
1、如果当前任期号大于请求参数中leader的任期号,返回参数附上当前任期号并直接返回。
2、如果当前任期号小于请求参数中leader的任期号,更新当前任期号,立即将状态转换为选民。
3、返回参数中附上当前任期号。
4、往chanHeartBeat写入true,返回参数中Success设为true。

leader广播发送心跳包/日志项请求后,对于每一个收到的请求结果,作出以下判断:

1
2
3
1、如果服务器状态已经不是leader,直接返回。
2、如果发送参数中的任期号已经不等于当前任期号,直接返回。
3、如果返回参数中的任期号大于当前任期号,更新当前任期号(导致步骤2的出现),leader状态立即转换为选民(导致步骤1的出现),返回。

测试:

1
2
$ cd "$GOPATH/src/raft"
$ go test -run 2A

结果:

Part 2B

此部分主要实现raft日志复制以保证数据一致性。客户端调用Start()时,如果服务器是leader,则将会将命令添加到日志中。leader通过发送AppendEntries RPC将日志追加到其他服务器。(按照In Search of an Understandable Consensus Algorithm一文的说法,非leader服务器接收到客户端发送的请求时,将转发请求到leader,本实验的做法是直接忽略)

此部分需要扩展的数据结构包括:
用于维持服务器状态:Raft

1
2
3
4
5
6
7
chanCommit:服务器更新commitIndex时通知service或tester
chanApplyMsg:服务器更新commitIndex时通知service或tester写入数据的通道
log[]:服务器保存的日志数组
commitIndex:服务器提交的最高日志索引
lastApplied:服务器应用的最高日志索引
nextIndex[]:leader保存的需要发送给其他服务器的日志索引
matchIndex[]:leader保存的已复制到其他服务器的日志索引

用于保存日志数据:LogEntry

1
2
3
Index:日志项对应的索引
Term:日志项对应的任期号
Command:命令

用于发送投票请求:RequestVoteArgs

1
2
LastLogIndex:候选人最新日志项的索引
LastLogTerm:候选人最新日志项的任期号

用于发送rpc心跳包或日志项:AppendEntries

1
2
3
4
5
LeaderId:leader的Id,用于重定向客户端请求到leader,本实验未实现
PrevLogIndex:leader所发送日志项的前一个日志项索引
PrevLogTerm:leader所发送日志项的前一个日志项任期号
Entries[]:leader本次发送的日志项数组
LeaderCommit:leader的commitIndex

用于接收rpc心跳包或日志项回复:AppendEntriesReply

1
2
Success:如果回复者的日志匹配了prevLogIndex和prevLogTerm,返回true
NextIndex:回复者告知leader下次发送的日志项索引,用于提高leader重新发送日志的效率

Raft的实现保证了以下五个特性:
1、选举安全性:
一个特定任期内只有最多只有一个leader。
如何保证:候选人的状态变化只有三种情况,变为leader、变为follower、重新新一轮投票请求。raft采用随机时间来减少分裂投票的情况。只有当候选人获得大多数选票时(本实验实现为超过一半)才能成为leader,这确保了一个特定任期内只有最多只有一个leader(假如leader无法与其他服务器通信了,但没有宕机,则系统可能存在两个leader,原来的leader也可以接收到客户端请求,但是新日志项无法提交)。
2、leader只追加:
leader不会重写或者删除日志项。
3、日志匹配:
如果不同服务器的日志中的两个日志项的index和term一样,那么它们的命令相同。
如何保证:一个leader在一个特定的term和特定的index上只会创建一个日志项,且日志项从来不改变位置。
如果不同服务器的日志中的两个日志项的index和term一样,那么在它们之前的所有日志都是相同的。
如何保证:追加日志项时,附带发送前一个日志项的index和term,接收者只有在日志保持一致时才会追加新日志。
4、leader完整性:
如果一个日志项在某个选举期内被提交,那么后续更高任期的leader日志中将会出现该日志项。
如何确保:对投票添加约束。只有当候选人的日志比选民的日志更加新(判断逻辑见后)时,候选人才有可能获得选票。另外,对于leader而言,只有在其新任期内复制了新日志到其他服务器并收到大多数回复,才有可能更新commitIndex,倘若只复制旧任期的日志项,是无法更新commitIndex的。
5、状态机安全性:
一旦某个服务器将某个索引的日志项应用到状态机上,则不会出现其他服务器在相同索引处应用其他不同日志项的情况。日志项应用之时,该服务器在该日志项之前的日志与leader相同,且该日志项已被commit。
如何确保:leader完整性、日志匹配、raft要求按照日志索引顺序将日志项应用到状态机上。

如何判断日志更加新,即more up-to-date:
比较两个日志的最后一项的term,越大的越新。倘若一样大,则日志项越多的越新。

以下是对日志项提交的一个补充说明:

第一行表示日志项索引,S1~S5表示服务器,方格内数字表示任期。(a)表示S1为leader,term为2,接收了客户端的命令并在index=2上添加新日志项,成功将日志项复制到S2中。(b)表示S1宕机,S5为leader,由于日志比S2旧,只获取到S3~S5的选票,term为3,接收了客户端的命令并在index=2上添加新日志项。(c)表示S5宕机,S1恢复,并重新被选为leader,term=4,接收了客户端的命令并在index=3上添加新日志项,成功将index=2的日志项复制到S3中。此时,index=2的日志项已经复制到大多数服务器上,但该日志项尚未被提交,因为term=4的日志项还未被复制到大多数服务器上。(d)如果S1宕机,S5恢复,由于日志比S1旧,只可能获取到S2~S5的选票,term为5,成功将term为3的日志项复制到所有服务器并重写了选民的日志,index=2的日志项被提交。(e)如果c中S1将term=4的日志项复制到S2和S3中,term=4的日志项将被提交,之后S1宕机,则S5无法成为新leader,因为其日志比大多数服务器的日志旧。

Part 2B解决思路

1、raft的状态通知变化与Part 2A一样,但是在由候选人转换为leader时,需要初始化nextIndex数组和matchIndex数组,其中nextIndex[i]初始化为leader的最后一个日志项index+1,matchIndex[i]初始化为0。同时,在启动raft的时候追加一个index=0且term=0的日志项到log[]中,以确保日志项的index和log的下标一致(有效日志项的初始index是1)。
2、raft启动时使用go routine等待服务器的日志提交通知,一旦获取到通知,将服务器最后应用的日志项的下一项开始(lastApplied+1),到已提交的最高日志项(commitIndex)范围内的日志项数据通知到service或tester,并更新应用日志索引lastApplied。
3、raft接收到客户端的命令时,首先判断是不是leader,不是则忽略,否则将命令追加到本地日志上,初始index=1。
4、候选人广播发送投票请求前,将候选人的最新日志项的索引和任期号附加到投票参数LastLogIndex和LastLogTerm上。对于每次投票请求的发送,附加判断当前服务器状态是否仍旧是候选人,一旦不是则无需发送。
5、候选人广播发送投票请求,当接收者接收到请求后,作出以下判断:

1
2
3
4
5
1、如果当前任期号大于请求参数中候选人启动的任期号,则不投票,返回参数中附上当前任期号并直接返回。
2、如果当前任期号小于请求参数中候选人启动的任期号,更新当前任期号,voteFor置为-1,立即将状态转换为选民。
3、返回参数中附上当前任期号。
4、判断候选人的日志是否更加新,即比较当前最后一个日志项的term和参数中的LastLogTerm,越大的越新。倘若一样大,则比较当前最后一个日志项的index和参数中的LastLogIndex,越大的越新。
5、如果voteFor值为-1,要么是当前任期内还没投过票,要么是接收了新的任期号,且候选人的日志更加新,此时往chanSuccVote写入true,返回参数中VoteGranted设为true,voteFor置为候选人Id。

6、候选人广播发送投票请求后,对于每一个收到的请求结果,作出以下判断:

1
2
3
4
1、如果服务器状态已经不是候选人,直接返回。
2、如果发送参数中的任期号已经不等于当前任期号,直接返回。
3、如果返回参数中的任期号大于当前任期号,更新当前任期号(导致步骤2的出现),候选人状态立即转换为选民(导致步骤1的出现),返回。
4、如果返回参数中VoteGranted为true,表明收到选票,如果当前服务器状态仍是候选人状态,计算投票人数是否超出一半,是则往chanLeader写入true,候选人状态立即转换为leader。

7、leader广播发送心跳包/日志项请求前,更新commitIndex,更新规则是:如果存在N > leader已提交的日志项最高索引commitIndex,且大多数matchIndex[i] >= N,且log[N].term == 当前任期号,设置commitIndex为N(本实验实现为从后往前查找N,一旦找到满足的N即退出查找)。leader将commitIndex附加到参数中,然后广播发送日志项。对于每个服务器,将nextIndex[i]开始的所有log复制到参数中,并将索引为nextIndex[i]-1的日志项的index和term附加到参数中,即PrevLogIndex和PrevLogTerm,以便接收者检查日志的一致性。对于每次心跳包/日志项的发送,附加判断当前服务器状态是否仍旧是leader,一旦不是则无需发送。

1
注意:一旦当前服务器状态不是leader,则不能发送心跳包/日志项,否则会导致出错。例如:1、三台服务器都在index=1上提交了日志项(term=1),之后leader S1与其他服务器失去通信,但接收了term=1的其他日志项。2、S0成为新leader,并接收了term=2的新日志项(index=2),S0和S2都提交了该日志项,之后leader S0与其他服务器失去通信。3、S1恢复通信,并且认为自己还是leader,此时S1的日志是旧于S2的,S1开始第一次尝试广播发送日志给S0(无法通信)和S2,在处理回复之前由于过了心跳包间隔又启动第二次尝试广播发送日志(还没真正发送),此时收到了第一次发送日志的S2回复,并更新了任期号和状态;这时候,第二次广播发送日志就将使用最新的任期号;由于没有在实际发送数据的时候判断当前最新服务器状态,最终导致服务器在非leader情况下使用最新的任期号发送了旧日志,且由于index=1的日志项已提交(导致PrevLogIndex和PrevLogTerm匹配),最终导致S2删掉了term=2的日志项而追加了term=1的日志项。

8、leader广播发送心跳包/日志项请求,当接收者接收到请求后,作出以下判断:

1
2
3
4
5
6
7
8
9
1、如果当前任期号大于请求参数中leader的任期号,返回参数附上当前任期号并直接返回,回复参数的Success为false。
2、如果当前任期号小于请求参数中leader的任期号,更新当前任期号,立即将状态转换为选民。
3、返回参数中附上当前任期号。
4、往chanHeartBeat写入true。
5、如果参数中的PrevLogIndex大于当前服务器最高日志项索引,将回复参数的NextIndex更新为最高日志项索引+1,返回,回复参数的Success为false。
6、检查日志一致性,如果当前服务器在PrevLogIndex上的日志项任期号term1不等于参数中的PrevLogTerm,则将回复参数的NextIndex更新后返回,回复参数的Success为false,NextIndex为:任期号为term1的第一个日志项索引(由后往前判断),这可以加快leader重发日志项的效率。
7、如果参数中的日志项不为空,尝试追加日志项到本地。如果已有的日志项与新日志项冲突(索引相同,任期不同),删除该日志项和后续所有日志项;添加所有不存在的日志项。为方便,可以将PrevLogIndex之后的日志项截取掉,并直接追加所有到来的日志项,将回复参数的NextIndex更新为最高日志项索引+1。
8、如果leader已提交日志项的最高索引 > 当前服务器已提交日志项的最高索引,将当前服务器的commitIndex更新为min(leader已提交日志项的最高索引,当前服务器日志项的最高索引)。如果当前服务器的commitIndex被修改了(没被改动无需进行通知),则往chanCommit写入true(促使日志项的应用,以及通知service或tester)。注意参数中的日志项为空时,也是有可能更新commitIndex的,即commitIndex的更新时机与参数中的日志项是否为空没有关系。
9、回复参数的Success为true。

9、leader广播发送心跳包/日志项请求后,对于每一个收到的请求结果,作出以下判断:

1
2
3
4
5
1、如果服务器状态已经不是leader,直接返回。
2、如果发送参数中的任期号已经不等于当前任期号,直接返回。
3、如果返回参数中的任期号大于当前任期号,更新当前任期号(导致步骤2的出现),leader状态立即转换为选民(导致步骤1的出现),返回。
4、如果回复参数的Success为true,说明接收者的已有日志与leader保持一致,且接收者接收了新的日志项(如果有)。如果leader发送的日志项不为空,则更新对应的nextIndex[i]为已发送日志的最后一项的index+1,matchIndex[i]为已发送日志的最后一项的index。
5、如果回复参数的Success为false,说明接收者的已有日志与leader不一致,根据回复参数的NextIndex更新对应的nextIndex[i]。

1
2
3
4
5
注意:
当leader发送的日志项为空时,是否需要更新matchIndex[i]?
观点1,需要更新,考虑以下场景:raft集群启动后,leader A接收了5个客户端命令,追加到日志,并且复制到其他服务器。其他服务器都成功接收到了日志,但此时leader A的commitIndex[i]还只是0。这时leader A宕机,B成为新的leader,B的matchIndex[i]初始化为0。然而,leader B没有发送日志到其他服务器,因为所有的服务器日志都是一致的。这种情况下,倘若不更新matchIndex[i],则matchIndex[i]可能会一直保持为0,导致旧日志无法被提交。
观点1的说法其实是不完整的,因为它忽略了一个事实:只有在当前任期号currentTerm内接收了新日志并成功追加到其他服务器时,才有可能更新commitIndex;倘若在当前任期号currentTerm内没有接收新日志,而只是将旧日志的matchIndex[i]更新,也不会更新leader B的commitIndex。而一旦接收了新日志并成功追加到其他服务器时,则总有一个时刻leader发送的日志项不为空;因此,在这种情况下,观点2“如果leader发送的日志项为空则不需要更新matchIndex[i]”,这种做法是正确的。
raft重启之后需要可能需要通过快照来恢复数据(后续实验会实现),如果没使用快照的话,则会将日志项一个一个重新应用并通知service或tester;这种情况下,倘若raft重启后尝试发送数据到follower,但由于服务器之间日志一致,则会导致matchIndex[i]无法更新,最终在没有接收到新请求的情况下,leader是无法更新commitIndex的,最终没办法通知service或tester,除非上层应用重启后有尝试发送请求到raft。

测试:

1
2
$ cd "$GOPATH/src/raft"
$ go test -run 2B

结果:

Part 2C

raft服务器在重启时应该从它的离开点恢复服务,这就要求raft服务器必须在一些时间点保存持久状态。真实情况中,raft服务器应该在数据变化时将服务器数据持久化到磁盘中,并且在重启之后从磁盘中读取最新的持久化数据。在我们的实验中,并没有使用到磁盘,而是通过序列化和反序列化将数据存储到Persister object中。当调用Raft.Make()函数启动raft服务器时,将从Persister object中读取最后保存的持久化状态数据。

raft重启时都是选民身份,因此无需持久化leader或候选人相关的数据。raft需要持久化的数据如下:

1
2
3
1、currentTerm:当前服务器已知的任期号。
2、votedFor:当前服务器给谁投过票。
3、log[]:当前服务器的日志数组。

注意:不能对commitIndex(当前服务器已提交的最高日志项索引)和lastApplied(当前服务器已应用的最高日志项索引)进行持久化。当raft重启之后,应该根据所有服务器的日志情况重新计算matchIndex[],并且从头开始按顺序将已提交的日志项通知到service或tester。

需要持久化raft服务器数据的时机:

1
2
3
4
5
6
1、leader接收了客户端的命令并追加到日志中。
2、接收到投票请求,导致更新已知的任期号或进行投票。
3、接收到投票请求的回复,导致更新已知的任期号。
4、leader广播发送心跳包/追加日志项请求前更新commitIndex。
5、接收到心跳包/追加日志项请求,导致更新已知的任期号,或追加日志,或更新接收者的commitIndex,或更新接收者的lastApplied。
6、接收到心跳包/追加日志项请求的回复,导致更新已知的任期号。

测试:

1
2
$ cd "$GOPATH/src/raft"
$ go test -run 2C

结果:

本实验没有实现的部分

1、本实验是基于单机模拟的,没有实现真正的分布式。
2、日志没有存储到磁盘上。
3、非leader收到客户端请求时,没有转发到leader而只是简单地忽略。
4、持久化服务器状态数据时没有写入到磁盘上。
5、没有涉及集群成员变化的情况。

笔记01 - Lab 1: MapReduce

发表于 2017-08-16 | 分类于 MIT6.824 Distributed Systems

综述

什么是MapReduce:MapReduce是一种编程模型,用于大规模数据集的并行运算。软件实现指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对;指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
MapReduce的应用情况:master的schedule调度函数负责根据任务数分配任务给worker,并同步worker的工作。worker启动时指定Map函数和Reduce函数,接收master调遣时根据任务标志来决定执行Map函数还是Reduce函数。在不同的应用场景下,用户对Map函数和Reduce函数进行自定义,并在启动worker时进行注册即可。

例子1:单词统计程序中:Map任务将读取文件内容,调用Map函数将文件内容映射成一组新的键值对,然后将映射结果输出到对应的Reduce中间文件中。其中,Map(映射)函数将文件内容映射成一组新的键值对,键为单词,值为出现次数。
Reduce任务读取对应的Reduce中间文件中,保证所有相同“单词:次数”键值对共享相同的键组,调用Reduce(归约)函数统计该单词出现的总次数,最后将结果写入到中间文件。

例子2:反向索引生成程序中:Map任务将读取文件内容,调用Map函数将文件内容映射成一组新的键值对,然后将映射结果输出到对应的Reduce中间文件中。其中,Map(映射)函数将文件内容映射成一组新的键值对,键为单词,值为出现文件名。
Reduce任务读取对应的Reduce中间文件中,保证所有相同“单词:文件名”键值对共享相同的键组,调用Reduce(归约)函数统计该单词出现的次数和所有文件名,最后将结果写入到中间文件。

实验内容

准备

1、Go version 1.7
2、参考资料:
Pro Git book,git user’s manual,Git for Computer Scientists
3、项目地址:git://g.csail.mit.edu/6.824-golabs-2017

1
2
3
4
$ git clone git://g.csail.mit.edu/6.824-golabs-2017 6.824
$ cd 6.824
$ ls
Makefile src

实验准备了顺序和分布式两种Map/Reduce模式。顺序模式启动第一个map任务,然后是第二个、第三个…所有map任务完成后启动reduce任务,该方法速度慢,但有利于调试。分布式模式在启动第一个map任务时就并发启动多个worker线程,然后reduce任务,该方法速度快,但不利于实现和调试。

Preamble: Getting familiar with the source

mapreduce提供了简单的Map/Reduce库。master.go的Distributed()函数和Sequential()函数分别提供了分布式模式和顺序模式用于启动job。job包括了以下内容:
1、提供许多输入文件、一个map函数、一个reduce函数、reduce任务的数目nReduce。
2、master将启动rpc服务(master_rpc.go),等待worker的注册(Register() in master.go)。当map task和reduce task可分配时,master分配这些task到对应的worker(schedule() in schedule.go)。
3、master将每个输入文件当成一个map task,启动nMap个map task/worker,每个worker调用doMap()将输入文件的key/value pairs映射到nReduce个中间文件中,映射方法是对每一个pair的key进行哈希。map阶段将产生nMap x nReduce个中间文件,每个worker机器在本地存储nReduce个中间文件。在本实验中,文件存于本机,不同worker都是运行在本机上;但在实际中,每个worker应当被允许读取其他worker写入的文件,可以通过GFS在不同机器上运行worker,并使用分布式存储系统进行存储。
4、master启动reduce阶段(doReduce() in common_reduce.go)。每个map task/worker有nReduce个中间文件,对于reduce task R,R将会收集每个map task/worker的第R个中间文件。reduse阶段将生成nReduce个文件。
5、master启动merge阶段(mr.merge() in master_splitmerge.go),将nReduce个文件合并为一个文件。
6、master为每一个worker发送Shutdown RPC,然后关闭rpc服务。

Part I: Map/Reduce input and output

完成以下内容:
1、为每一个map task切分输出文件,实现doMap() in common_map.go。
2、为每一个reduce task收集输入文件,实现doReduce() in common_reduce.go。

思路:
1、doMap函数负责一个map task,这里对应读入一个文件内容。然后,调用map function将文件内容处理为KeyValue数组,遍历每一个KeyValue,对Key进行哈希,并输出到对应的reduce中间文件。doMap产生nReduce个中间文件。
2、doReduce函数负责处理所有map task产生的第r个reduce中间文件。读取每一个中间文件,将KeyValue映射到结果集中,相同key的value存于同一个数组中。接着,对key进行排序。最后,针对结果集的每一项key->values[],调用reduce function将values[]连接为字符串new value,并将key->new value写入到reduce task的目标文件中,该文件按key字段排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func doMap(
jobName string, // the name of the MapReduce job
mapTaskNumber int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(file string, contents string) []KeyValue,
) {
//now we create file
files := make([]*os.File,nReduce)
encs := make([]*json.Encoder,nReduce)
var err error
for i:=0;i<nReduce;i++{
files[i], err = os.Create((reduceName(jobName,mapTaskNumber,i)))
defer files[i].Close()
if err != nil {
panic(err.Error())
}
encs[i] = json.NewEncoder(files[i])
}

//now we read file
data, err := ioutil.ReadFile(inFile)
if err != nil {
panic(err.Error());
return
}
//now we write file
kvs := mapF(inFile,string(data))
for _,kv:=range kvs{
err = encs[ihash(kv.Key)%nReduce].Encode(&kv)
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTaskNumber int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
//now we create file
outF,err := os.Create(outFile)
defer outF.Close()
if err != nil {
panic(err.Error())
}
enc := json.NewEncoder(outF)

//now we read file
kvMap := make(map[string][]string)
for i:=0;i<nMap;i++{
file, err := os.Open((reduceName(jobName,i,reduceTaskNumber)))
defer file.Close()
if err != nil {
panic(err.Error())
}

dec := json.NewDecoder(file)
//now we read file
var kv KeyValue
for{
err = dec.Decode(&kv)
if err != nil {
break;// done with this file
}
kvMap[kv.Key] = append(kvMap[kv.Key],kv.Value)
}
}

//now we sort the immediate result
var keys []string
for k,_:=range kvMap{
keys = append(keys,k)
}
sort.Strings(keys)

//now we write file
for _,key:=range keys{
enc.Encode(KeyValue{key, reduceF(jobName,kvMap[key])})
}
}

测试:

1
2
3
4
$ export "GOPATH=$PWD" 
$ cd "$GOPATH/src/mapreduce"
$ go test -run Sequential
ok mapreduce 2.694s

输出调试结果:

1
2
common.go加入debugEnabled = true
使用$ go test -v -run Sequential

Part II: Single-worker word count

任务:
完成单词计数功能,为main/wc.go提供map function和reduce function。map function主要根据输入的文件内容对单词进行提取和计数,reduce function则是对相同key的value次数进行统计。

思路:
1、单词的提取使用strings.FieldsFunc函数,单词的定义是连续的letter字符串,提取如下:

1
2
3
ss := strings.FieldsFunc(value, func(c rune) bool {
return !unicode.IsLetter(c)
})

2、注意,如果参数输入为go run wc.go master sequential pg-*.txt,golang接收为os.Args[3:],可以自动识别匹配的文件个数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func mapF(filename string, contents string) []mapreduce.KeyValue {
// TODO: you have to write this function
kvs := make([]mapreduce.KeyValue,0)
values := strings.FieldsFunc(contents,func(c rune)bool{
return !unicode.IsLetter(c)
})
for _,v:=range values{
kvs = append(kvs,mapreduce.KeyValue{Key:v,Value:"1"})
}
return kvs
}

func reduceF(key string, values []string) string {
// TODO: you also have to write this function
return strconv.Itoa(len(values))
}

测试:

1
2
3
4
$ cd "$GOPATH/src/main"
$ time go run wc.go master sequential pg-*.txt
$ sort -n -k2 mrtmp.wcseq | tail -10
$ rm mrtmp.*

或

1
$ bash ./test-wc.sh

Part III: Distributing MapReduce tasks

mapreduce的一大卖点正是它可以将顺序模式代码并行化,而且开发者无需做任何改变。此部分内容将使用RPC来模拟分布式计算(没能够真正地在多台机器上机器上进行Map/Reduce部署)。

master设计思路:
1、暴露mapreduce.Distributed接口。根据参数构造master数据结构,包括rpc地址、done channel、互斥锁、条件变量、文件名数组、net.Listener、rpc shutdown channel等。
2、启动rpc服务。先创建rpc服务,然后注册master结构。接着,master使用net.Listen进行监听,并使用net.Listener.Accept()接收连接,最后使用Server.ServeConn(conn)处理连接。当worker调用master暴露的rpc接口时将产生连接请求。

1
2
3
4
5
rpcs := rpc.NewServer()
rpcs.Register(mr)

conn, err := mr.l.Accept()
rpcs.ServeConn(conn)

其中 Server.Register 用于注册RPC服务,默认的名字是对象的类型名字(这里是Echo)。如果需要指定特殊的名字,可以用 Server.RegisterName 进行注册。被注册对象的类型所有满足以下规则的方法会被导出到RPC服务接口:
func (t *T) MethodName(argType T1, replyType *T2) error。被注册对应至少要有一个方法满足这个特征,否则可能会注册失败。
master导出的服务接口包括func (mr *Master) Register(args *RegisterArgs, _ *struct{}) error(用于worker注册)、func (mr *Master) Shutdown(_, _ *struct{}) error(用于停止自身rpc服务)。
3、调用schedule启动map阶段。在此之前,使用string channel等待worker的注册(sync.Cond.Wait()),一旦注册了worker(sync.Cond.Broadcast()),将其rpc地址写入到string channel中。schedule函数参数包括该string channel,意味着只会使用已注册的worker。当然,如果在启动rpc服务之后、调用schedule之前已经有worker注册了,master将会依次将已注册的worker的rpc地址写入到string channel中,而schedule函数将从该channel中读取。
需要注意的是条件变量的用法,不同于pthread条件变量的初始化(条件变量和搭配使用的互斥锁),golang的初始化只使用了一个参数,sync.NewCond(master)用于初始化master的cond,参数是master结构体,master结构体需包含一个互斥锁和一个条件变量。
map阶段将根据输入文件数目确定map task数,然后分配给worker。每个worker负责一个或多个map task,将对应的文件内容哈希到nReduce个文件中。
4、调用schedule启动reduce阶段。该阶段继续使用map阶段的worker和新注册的worker。
reduce阶段将根据nReduce确定reduce task数,然后分配给worker。每个worker读取负责map的worker所保存的第r个文件,将其按照key有序规律输出到统计文件中。
5、关闭与workers的连接,关闭rpc服务。
6、启动merge阶段。按照key有序规律将nReduce个统计文件输出到最终文件中。
7、结束计算。

worker设计思路:
1、暴露mapreduce.RunWorker接口。根据参数构造worker数据结构,包括map function、reduce function、可被rpc调用的次数、rpc地址等。
2、worker向master注册。
3、启动rpc服务。先创建rpc服务,然后注册worker结构。接着,worker使用net.Listen进行监听,并使用net.Listener.Accept()接收连接,最后使用Server.ServeConn(conn)处理连接。当master调用worker暴露的rpc接口时将产生连接请求。RunWorker的最后一个参数表示该worker可被调用的rpc次数,一旦超过次数,worker将关闭监听。
worker暴露的rpc接口有func (wk *Worker) DoTask(arg *DoTaskArgs, _ *struct{}) error(用于master分配工作)、func (wk *Worker) Shutdown(_ *struct{}, res *ShutdownReply) error(用于工作完成时被master调用)。
4、worker结构体需包含一个互斥锁。
5、master可以分配一个或多个task给一个work,但必须等待上一个task完成才能分配下一个task。

schedule设计思路:
1、计算任务数。
2、对每一个任务创建一个goroutine。在每一个goroutine中,通过registerChan阻塞等待可用worker,master将会依次将已注册的worker的rpc地址写入到该channel中,完成任务之后对应的worker也会尝试写入到该channel中。channel的设计满足了“必须等待上一个task完成才能分配下一个task”的需求。
3、使用sync.WaitGroup进行任务同步。master必须等待所有worker完成map阶段之后才进行reduce阶段。
4、对registerChan的写入必须使用goroutine完成,否则map阶段当最后一个worker完成任务时,将会阻塞写入registerChan,而此时没有对registerChan进行读出,map阶段将无法正常完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}

fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

var wg sync.WaitGroup
for task:=0;task<ntasks;task++{
wg.Add(1)

go func(taskNum int){
//wait until we have available worker
worker := <- registerChan

var arg DoTaskArgs
arg.JobName = jobName
arg.Phase = phase
arg.File = mapFiles[taskNum]
arg.TaskNumber = taskNum
arg.NumOtherPhase = n_other
ok := call(worker, "Worker.DoTask", &arg, nil)
if ok == false {
fmt.Printf("Cleanup: RPC %s error\n", worker)
}
//remember to use goruntine to write data into registerChan
//because last time we write data into it, it will block until someone read it
//however, no one read from it, so the last wg.Done() will not been executed
//and map-phase never finish
go func(){
//again, worker is free
registerChan <- worker
}()
wg.Done()
}(task)
}
wg.Wait()

fmt.Printf("Schedule: %v phase done\n", phase)
}

测试:

1
2
$ cd "$GOPATH/src/mapreduce"
$ go test -run TestBasic

Part IV: Handling worker failures

本部分是需要处理worker出错的现象。MapReduce的worker没有持久状态,一旦worker出错,master发给该worker的rpc包将失败,因此,一旦发现这种现象,master需要将分配给该worker的任务重新分配给其他worker。
RPC失败不一定表示worker不在执行任务,有可能worker已经执行完任务了但是回复丢失了,或者worker正在执行任务但是master的RPC超时。因此,两个worker可能接收相同的task,并进行计算和生成输出。对一个给定的输入,对map和reduce function的两次调用将生成相同的输出,因此不会出现不一致性,后续程序不会读取到不同的输出。另外,MapReduce框架确保map和reduce function的输出是原子的:要么没有,要么是map和reduce function一次调用的完全输出(本实验没有实现这个功能,而是简单地停止worker,所以不会出现任务的并发执行问题)。
思路:
1、worker的fail实现是通过设置可被调用的rpc次数来实现的。
2、一旦worker关闭监听,master调用call将失败,尝试选取其他worker。因此,对于每一个任务的goroutine,借助for无限循环模拟worker失败并重新选取worker执行任务的操作,一旦调用成功则跳出循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}

fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

var wg sync.WaitGroup
for task:=0;task<ntasks;task++{
wg.Add(1)

go func(taskNum int){
for{ //we use this, cause worker may fail
//wait until we have available worker
worker := <- registerChan

var arg DoTaskArgs
arg.JobName = jobName
arg.Phase = phase
arg.File = mapFiles[taskNum]
arg.TaskNumber = taskNum
arg.NumOtherPhase = n_other
ok := call(worker, "Worker.DoTask", &arg, nil)
if ok == false {
fmt.Printf("Cleanup: RPC %s error\n", worker)
}else{
//remember to use goruntine to write data into registerChan
//because last time we write data into it, it will block until someone read it
//however, no one read from it, so the last wg.Done() will not been executed
//and map-phase never finish
go func(){
//again, worker is free
registerChan <- worker
}()
wg.Done()
break
}
}
}(task)
}
wg.Wait()

fmt.Printf("Schedule: %v phase done\n", phase)
}

测试:

1
2
$ cd "$GOPATH/src/mapreduce"
$ go test -run Failure

Part V: Inverted index generation

本部分将借助map和reduce function来生成反向索引。反向索引广泛应用于文档搜索。一般来讲,反向索引是底层数据到其原始位置的一个映射,比如文档搜索的反向索引是由关键字映射到包含这些关键字的文档。
修改main/ii.go的mapF和reduceF以生成反向索引。

思路:
mapF函数:使用strings.FieldsFunc获取文本的所有单词,然后为每个单词创建“单词:文件名”键值对,返回所有键值对。
reduceF函数:对同一个单词的所有文件名进行归纳,剔除重复的文件名,然后对文件名进行排序,按照“次数 文件名1,文件名2”的格式返回。

1
2
3
4
5
6
7
8
9
10
11
func mapF(document string, value string) (res []mapreduce.KeyValue) {
// TODO: you should complete this to do the inverted index challenge
kvs := make([]mapreduce.KeyValue,0)
values := strings.FieldsFunc(value,func(c rune)bool{
return !unicode.IsLetter(c)
})
for _,v:=range values{
kvs = append(kvs,mapreduce.KeyValue{Key:v,Value:document})
}
return kvs
}

1
2
3
4
5
6
7
8
9
10
11
12
13
func reduceF(key string, values []string) string {
// TODO: you should complete this to do the inverted index challenge
m := make(map[string]string,0)
for _,v:=range values{
m[v] = ""
}
var keys []string
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
return strconv.Itoa(len(keys)) + " " + strings.Join(keys,",")
}

测试:

1
2
3
$ cd "$GOPATH/src/main"
$ go run ii.go master sequential pg-*.txt
$ head -n5 mrtmp.iiseq

或

1
2
$ bash ./test-ii.sh
$ head -n5 mrtmp.iiseq

最终测试结果

测试:

1
2
$ cd "$GOPATH/src/main"
$ bash ./test-mr.sh

1…67
zoro

zoro

如果我后退的话,我曾重视的誓言和约定就会全部消失,然后,再也不会回来这个地方了

65 日志
12 分类
18 标签
© 2020 zoro
由 Hexo 强力驱动
|
主题 — NexT.Gemini