并发编程基础-信号量机制

信号量(Semaphore)是一种控制多线程(进程)访问共享资源的同步机制,是由荷兰的Dijkstra大佬在1962年前后提出来的。

信号量的原理

信号量机制包含以下几个核心概念:

  1. 信号量S,整型变量,需要初始化值大于0
  2. P原语,荷兰语Prolaag(probeer te verlagen),表示减少信号量,该操作必须是原子的
  3. V原语,荷兰语Verhogen,表示增加信号量,该操作必须是原子的

从上图不难看出信号量的两个核心操作,P和V:

  1. P操作,原子减少S,然后如果S < 0,则阻塞当前线程
  2. V操作,原子增加S,然后如果S <= 0,则唤醒一个阻塞的线程

信号量一般被用来控制多线程对共享资源的访问,允许最多S个线程同时访问临界区,多于S个的线程会被P操作阻塞,直到有线程执行完临界区代码后,调用V操作唤醒。所以PV操作必须是成对出现的。

那么信号量可以用来干什么呢?

  1. 信号量似乎天生就是为限流而生的,我们可以很容易用信号量实现一个限流器
  2. 信号量可以用来实现互斥锁,初始化信号量S = 1,这样就只能有一个线程能访问临界区。很明显这是一个不可重入的锁。
  3. 信号量甚至能够实现条件变量,比如阻塞队列

动手实现一个信号量

学习这些经典理论的时候,最好的办法还是用自己熟悉的编程语言实现一遍。Java并发包提供了一个信号量的java.util.concurrent.Semaphore,是用AbstractQueuedSynchronizer的共享模式实现的,以后会单独分析关于AQS相关的原理,这里不再展开描述,其核心思想是CAS。
下面是我用Java实现的一个简单的信号量,这里使用synchronized来替代互斥锁

信号量实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Semaphore {
/**
* 信号量S
*/
private int s;

public Semaphore(int s) {
this.s = s;
}

/**
* P原语,原子操作
* <p>
* S减decr,如果S小于0,阻塞当前线程
*/
public synchronized void p(int decr) {
s -= decr;
if (s < 0) {
try {
wait();
} catch (InterruptedException e) {
// ...
}
}
}

/**
* V原语,原子操作
* <p>
* S加incr,如果S小于等于0,唤醒一个等待中的线程
*/
public synchronized void v(int incr) {
s += incr;
if (s <= 0) {
notify();
}
}
}

用信号量限流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Limiter implements Executor {

private Semaphore semaphore;

public Limiter(int limit) {
semaphore = new Semaphore(limit);
}

public void execute(Runnable runnable) {
if (runnable != null) {
new Thread(() -> {
semaphore.p(1);
runnable.run();
semaphore.v(1);
}).start();
}
}
}

用信号量实现互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
public class SemaphoreLock {

private Semaphore semaphore = new Semaphore(1);

public void lock() {
semaphore.p(1);
}

public void unlock() {
semaphore.v(1);
}
}

用信号量实现阻塞队列

实现阻塞队列需要两个信号量和一个锁(锁也可以用信号量代替)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class SemaphoreBlockingQueue<T> {

private Semaphore notFull;
private Semaphore notEmpty;
private SemaphoreLock lock = new SemaphoreLock();

private Object[] table;
private int size;

public SemaphoreBlockingQueue(int cap) {
if (cap < 1) {
throw new IllegalArgumentException("capacity must be > 0");
}
notEmpty = new Semaphore(0);
notFull = new Semaphore(cap);
table = new Object[cap];
}

public void add(T t) {
// 如果队列是满的就会阻塞
notFull.p(1);
// lock保证队列的原子添加
lock.lock();
table[size++] = t;
lock.unlock();
// 唤醒一个阻塞在notEmpty的线程
notEmpty.v(1);
}

@SuppressWarnings("unchecked")
public T poll() {
T element;
// 如果队列是空就会阻塞
notEmpty.p(1);
// lock保证队列的原子删除
lock.lock();
element = (T) table[--size];
lock.unlock();
// 唤醒一个阻塞在notFull的线程
notFull.v(1);
return element;
}
}

本文标题:并发编程基础-信号量机制

文章作者:山坡杨

发布时间:2019年04月15日 - 09:33:33

最后更新:2019年04月16日 - 14:30:45

原始链接:http://www.yangxf.top/11/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

感觉本站内容不错,读后有收获?
0%