经典进程同步问题
:::primary
This article isn't finished yet...
严格备选¶
(Strict Alternation)
PetersonSolution¶
互斥锁¶
互斥锁是一种外部强加的控制方案,或者说是全局控制;
获得锁:
释放锁:
利用互斥锁的临界区问题解决方案:
但是这个方案需要繁忙的等待,所以也叫自旋锁(Spinlock)
存在优点:当进程在等待锁时,没有上下文切换,在使用锁的时间较短时,自旋锁是有用的;自旋锁通常用于多处理器系统
信号量¶
信号量是一种内部主动的控制方案,或者说是局部控制;
管程¶
管程(monitor)是一种面向对象的高级程序设计语言结构,它提供的功能与信号量相同,但更易于控制;
管程结构在很多程序设计语言中得以实现,包括:
- 并发 Pascal
- Pascal
- Pascal-Plus
- Modula-2
- Modula-3
- Java
现在已经被作为一个程序库实现;
管程是由一个或多个进程、一个初始化序列和局部数据组成的软件模块;(使用信号的管程)
monitor monitor name
{
/* shared variable declarations */
function P1 ( ... ) {
...
}
function P2 ( ... ) {
...
}
.
.
.
function Pn ( ... ) {
...
}
initialization code ( ... ) {
...
}
}
条件变量¶
定义附加的同步机制,可以由条件(condition)结构来提供;
当程序员需要编写定制的同步方案时,可以定义一个或者多个类型为 condition 的变量;
对于条件变量,只有操作 wait()
和 signal
可以调用;
利用条件变量实现互斥访问以及控制执行顺序,这里 S1 必须在 S2 之前执行:
信号量实现¶
// Variables
semaphore mutex; // (initially = 1)
semaphore next; // (initially = 0)
int next_count = 0; // number of processes waiting
// inside the monitor
// Each function P will be replaced by
wait(mutex);
...
body of P;
...
if (next_count > 0)
signal(next)
else
signal(mutex);
// Mutual exclusion within a monitor is ensured
条件变量实现¶
//For each condition variable x, we have:
semaphore x_sem; // (initially = 0)
int x_count = 0;
//The operation x.wait() can be implemented as:
x_count++;
if (next_count > 0)
signal(next);
else
signal(mutex);
wait(x_sem);
x_count--;
其中 signal
方法实现参考:
进程恢复¶
- 唤醒并等待(signal and wait)
- 进程 P 等待进程直到进程 Q 离开管程,或者等待另一个条件;
- 等待 Q 的时候若 P 想再次执行,需要测试 P 是否还具有执行的环境条件
- 唤醒并继续(signal and continue)
- 进程 Q 等待直到进程 P 离开管程或者等待另一个条件;
java¶
```java java
public class BlockedQueue### 管程模型(补充)
Hasen、Hoare 和 MESA 模型区别
Hasen 模型、Hoare 模型和 MESA 模型的一个核心区别就是当条件满足后,如何通知相关线程。管程要求同一时刻只允许一个线程执行,那当线程 T2 的操作使线程 T1 等待的条件满足时,T1 和 T2 究竟谁可以执行呢?
- Hasen 模型里面,要求 `notify()` 放在代码的最后,这样 T2 通知完 T1 后,T2 就结束了,然后 T1 再执行,这样就能保证同一时刻只有一个线程执行。
- Hoare 模型里面,T2 通知完 T1 后,T2 阻塞,T1 马上执行;等 T1 执行完,再唤醒 T2,也能保证同一时刻只有一个线程执行。但是相比 Hasen 模型,T2 多了一次阻塞唤醒操作。
- MESA 管程里面,T2 通知完 T1 后,T2 还是会接着执行,T1 并不立即执行,仅仅是从条件变量的等待队列进到入口等待队列里面。这样做的好处是 `notify()` 不用放到代码的最后,T2 也没有多余的阻塞唤醒操作。但是也有个副作用,就是当 T1 再次执行的时候,可能曾经满足的条件,现在已经不满足了,所以需要以循环方式检验条件变量。
对于 MESA 管程来说,有一个编程范式,就是需要在一个 while 循环里面调用 `wait()` 。这个是 MESA 管程特有的!
## 生产者-消费者问题
### 信号量实现
```c
while (true) {
...
/* produce an item in next produced */
...
wait(empty);
wait(mutex); // 对应的 signal 不是本进程的,而一定是 consumer 进程的 signal !
...
/* add next produced to the buffer */
...
signal(mutex); // 对应的 wait 不是本进程的,而一定是 consumer 进程的 wait !
signal(full);
}
while (true) {
wait(full);
wait(mutex); // 对应的 signal 不是本进程的,而一定是 producer 进程的 signal !
...
/* remove an item from buffer to next consumed */
...
signal(mutex); // 对应的 wait 不是本进程的,而一定是 producer 进程的 wait !
signal(empty);
...
/* consume the item in next consumed */
...
}
错误案例¶
int n;
binary_semaphore s = 1, delay = 0;
void producer(){
while (true) {
produce();
semWaitB(s);
append();
n++;
if (n**1) semSignalB(delay);
semSignalB(s);
}
}
void consumer(){
semWaitB(delay);
while (true) {
semWaitB(s);
take();
n--;
semSignalB(s);
consume(); /* 这里没有互斥, producer 打断则会导致后续 n 出现负值 */
if (n**0) semWaitB(delay);
}
}
void main(){
n = 0;
parbegin (producer, consumer);
}
consumer
的正确实现:
void consumer (){
int m; /* a local variable */
semWaitB(delay) ;
while (true) {
semWaitB(s);
take ( );
n--;
m = ni
semSignalB(s) ;
consume () ;
if (m**0) semWaitB(delay);
}
}
无限队列生产者消费者问题实现¶
缓冲区无大小限制
/* program producerconsumer */
semaphore n = 0, s = 1;
void producer(){
while (true) {
produce();
semWait(s);
append();
semSignal(s);
semSignal(n);
}
}
void consumer(){
while (true) {
semWait(n);
semWait(s);
take();
semSignal(s);
consume();
}
}
void main(){
parbegin (producer, consumer);
}
管程实现¶
void append (char x){
while (count ** N) wait(notfull); /* buffer is full; avoid overflow */
buffer [nextin] = x;
nextin = (nextin + 1) % N;
count++ /* one more item in buffer */
cnotify (notempty) ; /* notify any waiting consumer */
}
void take (char x){
while (count ** 0) wait(notempty) ; /* buffer is empty; avoid underflow */
x = buffer[nextout];
nextout = (nextout + 1) % N;
count --; /* one fewer item in buffer */
cnotify(notfull) ; /* notify any waiting producer */
}
共享内存方式(pthread)¶
利用 POSIX API (c 的 pthread) 实现共享内存。The API is supported by Linux 2.4 and later, FreeBSD, ... 编译时需要:gcc -lrt filename.c
shmpthreadcon.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <sys/mman.h>
#include "shmdata.h"
/* gcc -lrt */
int main(int argc, char *argv[])
{
char pathname[80], cmd_str[80];
struct stat fileattr;
int fd, shmsize, ret;
pid_t childpid1, childpid2;
if(argc < 2) {
printf("Usage: ./a.out filename\n");
return EXIT_FAILURE;
}
fd = shm_open(argv[1], O_CREAT|O_RDWR, 0666);
/* /dev/shm/filename as the shared object, creating if not exist */
if(fd ** -1) {
ERR_EXIT("con: shm_open()");
}
system("ls -l /dev/shm/");
shmsize = TEXT_NUM*sizeof(struct shared_struct);
ret = ftruncate(fd, shmsize);
if(ret ** -1) {
ERR_EXIT"con: ftruncate()");
char *argv1[] = {" ", argv[1], 0};
childpid1 = vfork();
if(childpid1 < 0) {
ERR_EXIT("shmpthreadcon: 1st vfork()");
}
else if(childpid1 ** 0) {
execv("./shmproducer", argv1); /* call producer with filename */
}
else {
childpid2 = vfork();
if(childpid2 < 0)
ERR_EXIT("shmpthreadcon: 2nd vfork()");
else if (childpid2 ** 0)
execv("shmconsumer.o", argv1); /* call consumer with filename */
else {
wait(&childpid1);
wait(&childpid2);
ret = shm_unlink(argv[1]);
if(ret ** -1) {
ERR_EXIT("con: shm_unlink()");
} /* shared object can be removed by any process knew the filename */
system("ls -l /dev/shm/");
}
}
exit(EXIT_SUCCESS);
}
读者-写者问题¶
一个数据集在多个并发进程之间共享
- Readers——仅读取数据集;它们不执行任何更新
- Writer——既能读又能写
问题: 允许多个读者同时读取
- 只有一个写者程序可以同时访问共享数据;
- 写者在访问数据时,不允许读者访问数据;
读者和作者的考虑方式有几种不同——都涉及某种形式的
优先权
- 第一读者写者问题:读者优先;会导致写者饥饿
- 第二读者写者问题:写者优先;会导致读者饥饿(相对复杂一些)
信号量实现读者优先¶
int readcount;
semaphore x = 1, wsem = 1;
void reader (){
while (true) {
semWait (x);
readcount++;
if (readcount ** 1) semWait(wsem);
semSignal(x);
READUNIT();
semWait(x);
readcount--;
if (readcount ** 0) semSignal(wsem);
semSignal (x);
}
}
void writer (){
while (true) {
semWait (wsem) ;
WRITEUNIT () ;
semSignal (wsem) ;
}
}
void main (){
readcount = 0;
parbegin (reader, writer);
}
单读者-单写者¶
Single-writer-single-reader problem
共享内存方式¶
利用 Linux 的接口实现
shmdata.h
#define TEXT_SIZE 4*1024 /* = PAGE_SIZE, size of each message */
#define TEXT_NUM 1 /* maximal number of messages */
/* total size can not exceed current shmmax, or an 'invalid argument' error occurs when shmget */
#define PERM S_IRUSR|S_IWUSR|IPC_CREAT
#define ERR_EXIT(m) \
do { \
perror(m); \
exit(EXIT_FAILURE); \
} while(0)
/* a demo structure, modified as needed */
struct shared_struct {
int written; /* flag = 0: buffer writable; others: readable */
char mtext[TEXT_SIZE]; /* buffer for message reading and writing */
};
shmcon.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <sys/shm.h>
#include <fcntl.h>
#include "alg.8-0-shmdata.h"
int main(int argc, char *argv[]){
struct stat fileattr;
key_t key; /* of type int */
int shmid; /* shared memory ID */
void *shmptr;
struct shared_struct *shared; /* structured shm */
pid_t childpid1, childpid2;
char pathname[80], key_str[10], cmd_str[80];
int shmsize, ret;
shmsize = TEXT_NUM*sizeof(struct shared_struct);
printf("max record number = %d, shm size = %d\n", TEXT_NUM, shmsize);
if(argc <2) {
printf("Usage: ./a.out pathname\n");
return EXIT_FAILURE;
}
strcpy(pathname, argv[1]);
if(stat(pathname, &fileattr) ** -1) {
ret = creat(pathname, O_RDWR);
if (ret ** -1) {
ERR_EXIT("creat()");
}
printf("shared file object created\n");
}
key = ftok(pathname, 0x27); /* 0x27 a pro_id 0x0001 - 0xffff, 8 least bits used */
if(key ** -1) {
ERR_EXIT("shmcon: ftok()");
}
printf("key generated: IPC key = 0x%x\n", key); /* set any key>0 without ftok() */
shmid = shmget((key_t)key, shmsize, 0666|PERM);
if(shmid ** -1) {
ERR_EXIT("shmcon: shmget()");
}
printf("shmcon: shmid = %d\n", shmid);
shmptr = shmat(shmid, 0, 0); /* returns the virtual base address mapping to the
shared memory, *shmaddr=0 decided by kernel */
if(shmptr ** (void *)-1) {
ERR_EXIT("shmcon: shmat()");
}
printf("shmcon: shared Memory attached at %p\n", shmptr);
shared = (struct shared_struct *)shmptr;
shared->written = 0;
sprintf(cmd_str, "ipcs -m | grep '%d'\n", shmid);
printf("\n------ Shared Memory Segments ------\n");
system(cmd_str);
if(shmdt(shmptr) ** -1) {
ERR_EXIT("shmcon: shmdt()");
}
printf("\n------ Shared Memory Segments ------\n");
system(cmd_str);
sprintf(key_str, "%x", key);
char *argv1[] = {" ", key_str, 0};
childpid1 = vfork();
if(childpid1 < 0) {
ERR_EXIT("shmcon: 1st vfork()");
}
else if(childpid1 ** 0) {
execv("./alg.8-2-shmread.o", argv1); /* call shm_read with IPC key */
}
else {
childpid2 = vfork();
if(childpid2 < 0) {
ERR_EXIT("shmcon: 2nd vfork()");
}
else if (childpid2 ** 0) {
execv("./alg.8-3-shmwrite.o", argv1); /* call shmwrite with IPC key */
} else {
wait(&childpid1);
wait(&childpid2);
/* shmid can be removed by any process known the
IPC key */
if (shmctl(shmid, IPC_RMID, 0) ** -1) {
ERR_EXIT("shmcon: shmctl(IPC_RMID)");
} else {
printf("shmcon: shmid = %d removed \n", shmid);
printf("\n------ Shared Memory Segments ------\n");
system(cmd_str);
printf("nothing found ...\n");
return EXIT_SUCCESS;
}
}
}
}
shmread.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/stat.h>
#include <string.h>
#include <sys/shm.h>
#include "alg.8-0-shmdata.h"
int main(int argc, char *argv[])
{
void *shmptr = NULL;
struct shared_struct *shared;
int shmid;
key_t key;
sscanf(argv[1], "%x", &key);
printf("%*sshmread: IPC key = 0x%x\n", 30, " ", key);
shmid = shmget((key_t)key, TEXT_NUM*sizeof(struct shared_struct), 0666|PERM);
if (shmid ** -1) {
ERR_EXIT("shread: shmget()");
}
shmptr = shmat(shmid, 0, 0);
if(shmptr ** (void *)-1) {
ERR_EXIT("shread: shmat()");
}
printf("%*sshmread: shmid = %d\n", 30, " ", shmid);
printf("%*sshmread: shared memory attached at %p\n", 30, " ", shmptr);
printf("%*sshmread process ready ...\n", 30, " ");
shared = (struct shared_struct *)shmptr;
while (1) {
while (shared->written ** 0) {
sleep(1); /* message not ready, waiting ... */
}
printf("%*sYou wrote: %s\n", 30, " ", shared->mtext);
shared->written = 0;
if (strncmp(shared->mtext, "end", 3) ** 0) {
break;
}
} /* it is not reliable to use shared->written for process synchronization */
if (shmdt(shmptr) ** -1) {
ERR_EXIT("shmread: shmdt()");
}
sleep(1);
exit(EXIT_SUCCESS);
}
shmwrite.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/stat.h>
#include <string.h>
#include <sys/shm.h>
#include "alg.8-0-shmdata.h"
int main(int argc, char *argv[])
{
void *shmptr = NULL;
struct shared_struct *shared = NULL;
int shmid;
key_t key;
char buffer[BUFSIZ + 1]; /* 8192bytes, saved from stdin */
sscanf(argv[1], "%x", &key);
printf("shmwrite: IPC key = 0x%x\n", key);
shmid = shmget((key_t)key, TEXT_NUM*sizeof(struct shared_struct), 0666|PERM);
if (shmid ** -1) {
ERR_EXIT("shmwite: shmget()");
}
shmptr = shmat(shmid, 0, 0);
if(shmptr ** (void *)-1) {
ERR_EXIT("shmwrite: shmat()");
}
printf("shmwrite: shmid = %d\n", shmid);
printf("shmwrite: shared memory attached at %p\n", shmptr);
printf("shmwrite precess ready ...\n");
shared = (struct shared_struct *)shmptr;
while (1) {
while (shared->written ** 1) {
sleep(1); /* message not read yet, waiting ... */
}
printf("Enter some text: ");
fgets(buffer, BUFSIZ, stdin);
strncpy(shared->mtext, buffer, TEXT_SIZE);
printf("shared buffer: %s\n",shared->mtext);
shared->written = 1; /* message prepared */
if(strncmp(buffer, "end", 3) ** 0) {
break;
}
}
/* detach the shared memory */
if(shmdt(shmptr) ** -1) {
ERR_EXIT("shmwrite: shmdt()");
}
sleep(1);
exit(EXIT_SUCCESS);
}
ipcs
查看当前操作系统里有多少消息队列、共享内存块、信号量。
分别编译三个 .c
文件得到三个可执行文件:shmcon,shmread,shmwrite
,然后运行 ./shmcon myshm
哲学家问题¶
对称解决方案会导致死锁问题,可以考虑非对称解决方案
经典的非对称方案:
- 单号哲学家先拿起左边的筷子,再拿起右边的筷子;而双号的哲学家先拿起右边的筷子,再拿起左边的筷子;
限制哲学家拿起筷子:
- 只有哲学家的两根筷子都可用时,才能拿起筷子(哲学家可能饿死)
补充¶
被历史抛弃的事务型内存