英文原文地址:How to use wait, notify and notifyAll in Java - Producer Consumer Example
以下是我翻译的中文,在保留原意的基础上对原文的代码稍有调整。
在Java中,你可以使用wait
,notify
和notifyAll
方法来实现线程间的通信。举个栗子,你的程序中运行着两个线程,分别是生产者线程和消费者线程。假设有一个固定容量的消息队列:当队列中有可消费的消息时,生产者线程会通知消费者线程进行消息的消费;同样地,当队列中有额外的空间时,消费者线程会通知生产者线程进行消息的生产。即:当消息队列满了,生产者线程应该停止生产并进入等待状态;当消息队列为空的时候,消费者线程应该停止消费并进入等待状态。
如果某些线程正在等待某些条件变为true
,你可以在条件改变的时候使用notify
或者notifyAll
方法来通知这些线程并唤醒它们。notify
方法和notifyAll
方法都会向等待的线程发送通知,区别在于:如果有多个线程处于等待状态,notify
发送的通知只会被其中一个线程收到,且不能保证是哪个线程收到;而notifyAll
发送的通知会被所有线程收到。如果只有一个线程在等待对象锁,那么notify
和notifyAll
的效果是一样的,发出的通知都会被该线程接收到。
在这篇Java多线程教程中,你将会学习到如何使用wait
,notify
和notifyAll
方法实现线程间通信,并解决生产者-消费者的问题。另外,如果你真正想掌握并发和多线程,我强烈建议你读一读Java Concurrency In Practice这本书,作者是Brian Goetz。没读过此书,你对Java多线程的理解是不完整的。
1 如何在代码中使用wait和notify方法
wait
和notify
都是定义在java.lang.Object
类中的方法。虽然它们都是很基础的概念,但是想在实际代码中使用它们却不是那么容易呢。不信你可以在面试中让面试者使用wait
和notify
徒手撸出代码来解决生产者-消费者问题?我相信很多人会一脸的疑惑:
很多人都会对这个问题不知所措或者错误地使用wait
和notify
,例如代码块使用同步的地方错了、没有用正确的对象来调用wait
方法。老实说,这些困扰着很多程序员。
对于上述问题,第一个困惑:如何使用wait
方法?wait
方法不是java.lang.Thread
类中定义的,所以你不能直接向调用Thread.sleep()
那样来调用wait
方法。正确调用wait
方法的姿势应该是这样子的:你有一个被多个线程共享的对象实例,你需要使用该对象实例来调用wait
方法! 在生产者-消费者问题里面,这个被共享的对象实例就是指被生产者和消费者共享的队列实例。第二个困惑:应该在同步代码块(synchronized block)中调用wait
方法还是在同步的方法(synchronized method)中调用wait
方法?如果使用同步代码块(synchronized block),哪个对象应该被放在同步块中?答案是:加锁的对象和你要获取锁的对象应该是同一个! 在我们这个例子中,就是那个队列的对象实例。
2 在循环体中使用wait和notify方法,而不是if代码块中
你现在已经知道需要使用一个同步的、共享的对象来调用wait
方法,接下来要做的就是在while
循环中调用wait
方法,而不是在if
代码块中调用。
我们需要在某些条件成立的情况下调用wait
方法,例如生产者线程应该在队列满的时候调用wait
。这时候我们首先会想到使用if
来判断条件是否成立。但是在if
代码块中调用wait
方法可能会产生BUG,因为线程有可能会在等待条件未改变的情况下被虚假唤醒(spurious wakeup)。如果没有使用循环来在线程唤醒后检查等待条件,就很可能会造成错误。例如会造成往满队列中写数据或者从空队列中取数据。这就是我们应该在循环体中调用wait
,而不是在if
块中调用wait
的原因。
另外,我也推荐阅读《Effective Java》这本书里面关于这部分内容的描述,也许是wait
和notify
使用的最佳实践。
基于上述知识,这里给出在Java中调用wait
和notify
的标准方式:1
2
3
4
5
6
7// 在Java中调用wait方法的正确姿势
synchronized (sharedObject) {
while (condition) {
sharedObject.wait(); // 在循环体中调用wait方法:线程会释放对象锁,等待被唤醒
}
// 在这里执行一些操作:例如将消息写入队列或者从队列中获取消息
}
正如我所说的那样,始终应该在循环体中调用wait
。这个循环体是用来对线程进入等待和被唤醒后的条件进行检测。如果条件成立,并且notify
或notifyAll
方法在线程执行wait
方法之前被调用了,线程就有可能一直wait
,导致死锁。
3 Java wait(),notify()和notifyAll()的例子
这个例子将演示如何使用我们上面讨论的标准方法来使用wait()
,notify()
和notifyAll()
。
前言: 在这个例子中,我们有两个线程:生产者线程和消费者线程,分别由Producer和Consumer两个类来表示(两个类均继承自Thread类)。我们使用LinkedList
对象实例作为共享的消息队列。
任务: 生产者运行在一个死循环中,并不断生产消息、将消息写入队列。通过while(queue.size == maxSize)
条件来判断队列是否已满。记住:在执行while(queue.size == maxSize)
条件检查之前,先给队列的对象实例加锁,保证我们执行检查时不会有其他线程修改队列。 如果队列满了,生产者线程会调用wait
挂起,直到消费者消费了消息并调用notify()
通知生产者继续消费。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98package com.biteenew.java.base;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Queue;
/**
* 生产者线程
* @author luzhanghong
* @date 2018-06-09 14:23
*/
public class Producer extends Thread {
private final static Logger LOGGER = LoggerFactory.getLogger(Producer.class);
private final Queue<Integer> queue;
private final int maxSize;
private int messageCount = 1;
public Producer(Queue<Integer> queue, int maxSize, String threadName) {
super(threadName);
this.queue = queue;
this.maxSize = maxSize;
}
public void run() {
while (true) {
synchronized (queue) { // 在条件判断之前给共享资源加锁
while (queue.size() == maxSize) {
try {
LOGGER.info("消息队列已满: 生产者线程调用wait方法进入等待状态 ...");
queue.wait(); // 在循环体中:使用共享对象来调用wait方法,释放共享资源的锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
sleep(1000); // 让生产者每1秒钟生产一条消息
} catch (InterruptedException e) {
e.printStackTrace();
}
int messageId = messageCount++;
LOGGER.info("生产消息: {}", messageId);
queue.add(messageId); // 将消息写入队列
queue.notifyAll(); // 通知消费者线程,对消息进行消费
}
}
}
}
/**
* 消费者线程
* @author luzhanghong
* @date 2018-06-09 14:37
*/
public class Consumer extends Thread {
private final static Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
private final Queue<Integer> queue;
public Consumer(Queue<Integer> queue, String threadName) {
super(threadName);
this.queue = queue;
}
public void run() {
while (true) {
synchronized (queue) { // 在条件判断之前给共享资源加锁
while (queue.isEmpty()) {
try {
LOGGER.info("消息队列为空: 消费者线程调用wait方法进入等待状态 ...");
queue.wait(); // 在循环体中:使用共享对象来调用wait方法,释放共享资源的锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOGGER.info("消费消息: {}", queue.remove());
queue.notifyAll(); // 通知生产者线程,可以继续生产消息了
}
}
}
}
/**
* Put together
*/
public class Launcher {
public static void main(String[] args) throws Exception {
Queue<Integer> queue = new LinkedList<>();
int maxSize = 5;
new Producer(queue, maxSize, "producer-thread").start();
new Consumer(queue, "consumer-thread").start();
}
}
执行的结果如下日志所示:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
402018-06-09 14:59:46.149 INFO [producer-thread] 生产消息: 1
2018-06-09 14:59:47.166 INFO [producer-thread] 生产消息: 2
2018-06-09 14:59:48.167 INFO [producer-thread] 生产消息: 3
2018-06-09 14:59:48.167 INFO [consumer-thread] 消费消息: 1
2018-06-09 14:59:48.167 INFO [consumer-thread] 消费消息: 2
2018-06-09 14:59:48.167 INFO [consumer-thread] 消费消息: 3
2018-06-09 14:59:48.167 INFO [consumer-thread] 消息队列为空: 消费者线程调用wait方法进入等待状态 ...
2018-06-09 14:59:49.167 INFO [producer-thread] 生产消息: 4
2018-06-09 14:59:50.168 INFO [producer-thread] 生产消息: 5
2018-06-09 14:59:51.168 INFO [producer-thread] 生产消息: 6
2018-06-09 14:59:52.169 INFO [producer-thread] 生产消息: 7
2018-06-09 14:59:53.170 INFO [producer-thread] 生产消息: 8
2018-06-09 14:59:53.170 INFO [producer-thread] 消息队列已满: 生产者线程调用wait方法进入等待状态 ...
2018-06-09 14:59:53.170 INFO [consumer-thread] 消费消息: 4
2018-06-09 14:59:53.170 INFO [consumer-thread] 消费消息: 5
2018-06-09 14:59:53.170 INFO [consumer-thread] 消费消息: 6
2018-06-09 14:59:53.170 INFO [consumer-thread] 消费消息: 7
2018-06-09 14:59:53.170 INFO [consumer-thread] 消费消息: 8
2018-06-09 14:59:53.170 INFO [consumer-thread] 消息队列为空: 消费者线程调用wait方法进入等待状态 ...
2018-06-09 14:59:54.170 INFO [producer-thread] 生产消息: 9
2018-06-09 14:59:54.170 INFO [consumer-thread] 消费消息: 9
2018-06-09 14:59:54.170 INFO [consumer-thread] 消息队列为空: 消费者线程调用wait方法进入等待状态 ...
2018-06-09 14:59:55.171 INFO [producer-thread] 生产消息: 10
2018-06-09 14:59:56.171 INFO [producer-thread] 生产消息: 11
2018-06-09 14:59:57.172 INFO [producer-thread] 生产消息: 12
2018-06-09 14:59:58.172 INFO [producer-thread] 生产消息: 13
2018-06-09 14:59:59.173 INFO [producer-thread] 生产消息: 14
2018-06-09 14:59:59.173 INFO [producer-thread] 消息队列已满: 生产者线程调用wait方法进入等待状态 ...
2018-06-09 14:59:59.173 INFO [consumer-thread] 消费消息: 10
2018-06-09 14:59:59.173 INFO [consumer-thread] 消费消息: 11
2018-06-09 14:59:59.173 INFO [consumer-thread] 消费消息: 12
2018-06-09 14:59:59.173 INFO [consumer-thread] 消费消息: 13
2018-06-09 14:59:59.173 INFO [consumer-thread] 消费消息: 14
2018-06-09 14:59:59.173 INFO [consumer-thread] 消息队列为空: 消费者线程调用wait方法进入等待状态 ...
2018-06-09 15:00:00.174 INFO [producer-thread] 生产消息: 15
2018-06-09 15:00:01.174 INFO [producer-thread] 生产消息: 16
2018-06-09 15:00:02.175 INFO [producer-thread] 生产消息: 17
2018-06-09 15:00:03.175 INFO [producer-thread] 生产消息: 18
2018-06-09 15:00:04.175 INFO [producer-thread] 生产消息: 19
2018-06-09 15:00:04.175 INFO [producer-thread] 消息队列已满: 生产者线程调用wait方法进入等待状态 ...
4 使用wait(),notify()和notifyAll()需要注意的地方
- 你可以使用
wait()
和notify()
方法来实现Java的线程间通信。不仅一个或两个线程可以这样做,多线程之间同样可以使用这种方法达到线程间通信的目的。 - 要在同步方法或者同步块中调用
wait()
,notify()
和notifyAll()
方法,否则JVM会抛异常:IllegalMonitorStateException
。 - 要在循环体中调用
wait()
和notify()
方法,不要在if
块中调用。因为循环可以做到在wait()
前后对条件进行检测。 - 使用共享对象来调用
wait()
方法。 - 最好使用
notifyAll()
而不是notify()
,原因在这里。
以上就是Java的wait()
,notify()
和notifyAll()
的正确使用姿势。