首页 资讯 社群 我的社区 搜索

Java 多线程编程核心技术 (三):多线程通信(上篇)

yimo~
2019-09-04 16:54:51

线程是操作系统中独立的个体,但这些个体如果不经过特殊的处理就不能成为一个整体。线程间的通信就是成为整体的必用方案之一,可以说,使线程间进行通信后,系统之间的交互性会更强大,在大大提高 CPU 利用率的同时还会使程序员对各线程任务在处理的过程中进行有效的把控与监督。

在本章中需要着重掌握的技术点如下:

使用 wait/notify 实现线程间的通信

生产者 / 消费者模式的实现

1. 等待 / 通知机制

通过本节可以学习到,线程与线程之间不是独立的个体,它们彼此之间可以互相通信和协作。

1.1 不使用等待 / 通知机制实现线程间通信

下面的示例,是 sleep() 结合 while(true) 死循环来实现多个线程间通信。

public class MyService {
    volatile private List<Integer> list = new ArrayList<>();
    public void add(){
        list.add(1);
    }
    public int size(){
        return list.size();
    }

    public static void main(String[] args) {
        MyService myService = new MyService();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i= 0;i<10;i++) {
                    myService.add();
                    System.out.println(" 添加了 "+myService.size()+" 个元素 ");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    while (true){
                        if (myService.size() == 5){
                            System.out.println(" == 5 ,我要退出了 ");
                            throw new InterruptedException();
                        }

                    }
                } catch (InterruptedException e) {
                    System.out.println(myService.size());
                    e.printStackTrace();
                }
            }
        }).start();

    }
}

打印结果:

添加了 1 个元素
添加了 2 个元素
添加了 3 个元素
添加了 4 个元素
添加了 5 个元素
 == 5 ,我要退出了
5
java.lang.InterruptedException
    at cn.zyzpp.thread3_1.MyService$2.run(MyService.java:42)
    at java.lang.Thread.run(Thread.java:745)
添加了 6 个元素
添加了 7 个元素
添加了 8 个元素
添加了 9 个元素
添加了 10 个元素

虽然两个线程间实现了通信,但有一个弊端就是,线程 ThreadB.java 不停地通过 while 语句轮询机制来检测某一个条件,这样会浪费 CPU 资源。如果轮询的时间间隔很小,更浪费 CPU 资源;如果轮询的时间间隔很大,有可能会取不到想要得到的数据。所以就需要一种机制来实现减少 CPU 的资源浪费,而且还可以实现在多个线程间通信,它就是“wait / notify”机制。

1.2 什么是等待 / 通知机制

等待 / 通知机制在生活中比比皆是,比如你去餐厅点餐,服务员去取菜,菜暂时还没有做出来,这时候服务员就进入”等待“的状态,等到厨师把菜放在菜品传递台上,其实就相当于一种”通知“,这时服务员才可以拿到菜并交给就餐者。

需要说明的是,上节多个线程间也可以实现通信,原因是多个线程共同访问同一个变量,但那种通信不是“等待 / 通知”,两个线程完全是主动式地读取一个共享变量,在花费读取时间的基础上,读到的值是不是想要的,并不能完全确定。所以现在迫切需要一种“等待 / 通知”机制来满足上面的要求。

1.3 等待 / 通知机制的实现

方法 wait() 的作用是使当前执行代码的线程进行等待,wait() 方法是 object 类的方法,该方法用来将当前线程置于“预执行队列”中,并且在 wait() 所在的代码行处停止执行,直到接到通知或被中断为止。在调用 wait() 方法之前,线程必须拿到该对象的对象级别锁。在从 wait() 返回前,线程与其他线程竞争重新获得锁。如果调用 wait() 时没有持有适当的锁,则抛出 java.lang.IllegalMonitorStateException 异常,它是 RuntimeException 的一个子类,因此,不需要 try-catch 语句进行捕捉异常。

方法 notify() 也要在同步方法或同步块中调用,即在调用前,线程也必须获得该对象的对象级别锁。如果调用 notify 时没有适当的锁,也会抛出 java.lang.IllegalMonitorStateException 异常。该方法用来通知那些可能等待该对象的对象锁的其他线程,如果有多个线程等待,则由线程规划器随机挑选出其中一个呈 wait 状态的线程,对其发出通知 notify,并使它等待获取该对象的对象锁。需要说明的是,在执行 notify 方法后,当前线程不会马上释放该对象锁,呈 wait 状态的线程也并不能马上获取该对象锁,要等到执行 notify() 方法的线程将程序执行完,也就是退出 synchronized 代码块后,当前线程才会释放锁,而呈 wait 状态所在的线程才可以获取该对象锁。当第一个获得了该对象锁的 wait 线程运行完毕以后,它会释放掉该对象锁,此时如果该对象没有再次使用 notify 语句,则该对象以及空闲,其它 wait 状态等待的线程由于没有得到该对象的通知,还会继续阻塞在 wait 状态,知道直到这个对象发出一个 notify 或 notifyAll。

用一句话来总结一下 wait 和 notify :wait 使线程停止运行,而 notify 使停止的线程继续运行。

示例代码:

public class MyServiceTwo extends Thread {
    private Object lock;

    public MyServiceTwo(Object object) {
        this.lock = object;
    }

    @Override
    public void run() {
        try {
            synchronized (lock){
                System.out.println(" 开始等待 "+System.currentTimeMillis());
                lock.wait();
                System.out.println(" 结束等待 "+System.currentTimeMillis());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

public class MyServiceThree extends Thread {
    private Object lock;

    public MyServiceThree(Object object) {
        this.lock = object;
    }

    @Override
    public void run() {
        synchronized (lock) {
            System.out.println(" 开始通知 " + System.currentTimeMillis());
            lock.notify();
            System.out.println(" 结束通知 " + System.currentTimeMillis());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Object lock = new Object();
        MyServiceTwo serviceTwo = new MyServiceTwo(lock);
        serviceTwo.start();
        Thread.sleep(100);
        MyServiceThree serviceThree = new MyServiceThree(lock);
        serviceThree.start();
    }

}

打印结果:

开始等待 1537185132949
开始通知 1537185133048
结束通知 1537185133048
结束等待 1537185133048

从控制台的打印来看,100ms 后线程被 notify 通知唤醒。

下面我们使用 wait / notify 来实现刚开始的实验:

public class MyService {
    volatile private List<Integer> list = new ArrayList<>();

    public void add() {
        list.add(1);
    }

    public int size() {
        return list.size();
    }

    public static void main(String[] args) {
        MyService myService = new MyService();
        Object lock = new Object();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    synchronized (lock) {
                        if (myService.size() != 5) {
                            System.out.println(" 等待 "+System.currentTimeMillis());
                            lock.wait();
                            System.out.println(" 等待结束 "+System.currentTimeMillis());
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock) {
                    for (int i = 0; i < 10; i++) {
                        if (myService.size() == 5){
                            lock.notify();
                            System.out.println(" 已发出通知!");
                        }
                        myService.add();
                        System.out.println(" 添加了 " + myService.size() + " 个元素 ");
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }).start();

    }
}

打印结果:

等待 1537186277023
添加了 1 个元素
添加了 2 个元素
添加了 3 个元素
添加了 4 个元素
添加了 5 个元素
已发出通知!
添加了 6 个元素
添加了 7 个元素
添加了 8 个元素
添加了 9 个元素
添加了 10 个元素
等待结束 1537186287034

日志信息 wait end 在最后输出,这也说明 notify 方法执行后并不立即释放锁。

关键字 synchronized 可以将任何一个 Object 对象作为同步对象来看待,而 Java 为每个 Object 都实现了 wait 和 notify 方法,它们必须用在被 synchronized 同步的 object 的临界区内。通过调用 wait() 方法可以使处于临界区内的线程进入等待状态,同时释放被同步对象对象的锁。而 notify 操作可以唤醒一个因调用了 wait 操作而处于阻塞状态中的线程,使其进入就绪状态。被重新换醒的线程会试图重新获得临界区的控制权,也就是锁,并继续执行临界区内 wait 之后的代码。如果发出 notify 操作时没有处于阻塞状态中的线程,那么该命令会被忽略。

wait 方法可以使调用该方法的线程释放共享资源的锁,然后从运行状态退出,进入等待队列,直到被再次唤醒。

notify 方法可以随机唤醒等待队列中等待同一共享资源的“一个”线程,并使该线程退出等待队列,进入可运行状态,也就是 notify() 方法仅通知“一个”线程。

notifyAll() 方法可以使所有正在等待队列中等待同一共享资源的“全部”线程从等待状态退出,进入可运行状态。并使该线程退出等待队列,进入可运行状态。此时,优先级最高的那个线程最先执行,但也有可能是随机执行,因为这要取决于 JVM 虚拟机的实现。

在《Java 多线程编程核心技术 (一)Java 多线程技能》中,已经介绍了与 Thread 有关的大部分 API ,这些 API 可以改变线程对象的状态。

  1. 新创建一个新的线程对象后,再调用它的 start() 方法,系统会为此线程分配 CPU 资源,使其处于 Runnable(可运行)状态,这是一个准备运行的阶段。如果线程抢占到 CPU 资源,此线程就处于 Running(运行) 状态。

  2. Runnable 状态和 Running 状态可相互切换,因为有可能线程运行一段时间后,有其他高优先级的线程抢占了 CPU 资源,这时此线程就从 Running 状态变成 Runnable 状态。

线程进入 Runable 状态大体分为如下情况:

  • 调用 sleep 方法后经过的时间超过了指定的休眠时间。

  • 线程调用的阻塞 IO 已经返回,阻塞方法执行完毕。

  • 线程成功地获得了试图同步的监视器。

  • 线程正在等待某个通知,其他线程发出了通知。

  • 处于挂起状态的线程调用了 resurne 恢复方法。

Blocked 是阻寒的意思, 例如遇到了一个 IO 操作, 此时 CPU 处于空闲状态, 可能会转而把 CPU 时间片分配给其他线程, 这时也可以称为“暂停”状态。Blocked 状态结束后,进入 Runnable 状态, 等待系统重新分配资源。

出现阻塞的情况大体分为如下 5 种:

  • 线程调用 sleep 方法, 主动放弃占用的处理器资源。

  • 线程调用了阻塞式 IO 方法,在该方法返回前,该线程被阻塞。

  • 线程试图获得一个同步监视器,但该同步监视器正被其他线程所持有。

  • 线程等待某个通知。

  • 程序调用了 suspend 方法将该线程挂起。此方法容易导致死锁,尽量避免使用该方法。

main() 方法运行结束后进人销毁阶段,整个线程执行完毕。

每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列。就绪队列存储了将要获得锁的线程,阻塞队列存储了被阻塞的线程。一个线程被唤醒后,才会进入就绪队列,等待 CPU 的调度;反之,一个线程被 wait 后,就会进入阻塞队列,等待下一次被唤醒。

1.4 方法 wait() 锁释放与 notify() 锁不释放

当方法 wait() 被执行后,锁自动释放,但执行完 notify() 方法,锁却不自动释放。

1.5 当 interrupt 方法遇到 wait 方法

当线程呈 wait() 方法时,调用线程对象的 interrupt() 方法会出现 InterruptedException 异常。

下面我们做一个实验:

public class MyServiceTwo extends Thread {
    private Object lock;

    public MyServiceTwo(Object object) {
        this.lock = object;
    }


    @Override
    public void run() {
        try {
            synchronized (lock){
                System.out.println(" 开始等待 "+System.currentTimeMillis());
                lock.wait();
                System.out.println(" 结束等待 "+System.currentTimeMillis());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println(" 出现异常了 ");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Object lock = new Object();
        MyServiceTwo service = new MyServiceTwo(lock);
        service.start();
        Thread.sleep(5000);
        service.interrupt();
    }

}

运行结果:

开始等待 1537194007598
java.lang.InterruptedException
出现异常了
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:502)
    at cn.zyzpp.thread3_1.MyServiceTwo.run(MyServiceTwo.java:19)

通过上面的实验可以总结如下三点:

  1. 执行完同步代码块就会释放对象的锁。

  2. 在执行同步代码块的过程中,遇到异常而导致线程终止,锁也会被释放。

  3. 在执行同步代码块的过程中,执行了锁所属对象的 wait() 方法,这个线程会释放对象锁,而此线程对象会进入线程等待池中,等待被唤醒。

1.6 notify() 和 notifyAll()

调用方法 notify() 一次只随机通知一个线程进行唤醒。

当多次调用 notify() 方法会随机将等待 wait 状态的线程进行唤醒。

notifyAll() 方法会唤醒全部线程。

1.7 方法 wait(long) 的使用

带一个参数的 wait(long) 方法的功能是等待某一时间内是否有线程对锁进行唤醒,如果超过这个时间则自动唤醒。

1.8 等待 / 通知之交叉备份

假设我们创建了 20 个线程,我们需要这 20 个线程的运行效果变成有序的,我们可以在 等待 / 通知的基础上,利用如下代码作为标记:

volatile private boolean prevIsA = false;

再使用 while() 循环:

while(prevIsA){
    wait();
}

实现交替打印。

2. 生产者 / 消费者模式

等待 / 通知模式最经典的案列就是”生产者 / 消费者“模式。但此模式在使用上有几种”变形“,还有一些小的注意事项,但原理都是基于 wait/notify 的。

2.1 一生产与一消费:操作值

生产者:

public class P {
    private String lock;

    public P(String lock) {
        super();
        this.lock = lock;
    }

    public void setValue(){
        try {
            synchronized (lock){
                if (!ValueObject.value.equals("")){
                    lock.wait();
                }
                String value = System.currentTimeMillis() + "_" + System.nanoTime();
                System.out.println("set 的值是 "+value);
                ValueObject.value =  value;
                lock.notify();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

消费者:

public class C {
    private String lock;

    public C(String lock) {
        super();
        this.lock = lock;
    }

    public void getVlue() {
        try {
            synchronized (lock) {
                if (ValueObject.value.equals("")) {
                    lock.wait();
                }
                System.out.println("get 的值是 " + ValueObject.value);
                ValueObject.value = "";
                lock.notify();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

操作值:

public class ValueObject {
    public static String value = "";
}

main 方法:

public class Run {
    public static void main(String[] args) {
        String lock = new String();
        P p = new P(lock);
        C c = new C(lock);
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    p.setValue();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    c.getVlue();
                }
            }
        }).start();
    }
}

打印结果:

set 的值是 1537253968947_1379616037064493
get 的值是 1537253968947_1379616037064493
set 的值是 1537253968947_1379616037099625
get 的值是 1537253968947_1379616037099625
set 的值是 1537253968947_1379616037136730
get 的值是 1537253968947_1379616037136730
set 的值是 1537253968947_1379616037173047
.....

本实例是 1 个生产者与消费者进行数据的交互,在控制台中打印的日志 get 和 set 是交替运行的。

但如果在此实验的基础上,设计出多个生产者与消费者,那么在运行的过程中极有可能出现“假死”的情况,也就是所有的线程都呈 WAITING 等待状态。

2.2 多生产与多消费:操作值

生产者:

public class P {
    private String lock;

    public P(String lock) {
        super();
        this.lock = lock;
    }

    public void setValue(){
        try {
            synchronized (lock){
                while (!ValueObject.value.equals("")){
                    System.out.println(" 生产者 "+Thread.currentThread().getName()+"WAITING");
                    lock.wait();
                }
                String value = System.currentTimeMillis() + "_" + System.nanoTime();
                System.out.println(" 生产者 "+Thread.currentThread().getName()+"set 的值是 "+value);
                ValueObject.value =  value;
                lock.notify();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

消费者:

public class C {
    private String lock;

    public C(String lock) {
        super();
        this.lock = lock;
    }

    public void getVlue() {
        try {
            synchronized (lock) {
                while (ValueObject.value.equals("")) {
                    System.out.println(" 消费者 "+Thread.currentThread().getName()+"WAITING");
                    lock.wait();
                }
                System.out.println(" 消费者 "+Thread.currentThread().getName()+"get 的值是 " + ValueObject.value);
                ValueObject.value = "";
                lock.notify();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

操作值:

public class ValueObject {
    public static String value = "";
}

main 方法:

public class Run {
    public static void main(String[] args) {
        String lock = new String();
        P p = new P(lock);
        C c = new C(lock);
        for (int i = 0; i < 2; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        p.setValue();
                    }
                }
            }).start();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        c.getVlue();
                    }
                }
            }).start();
        }
    }
}

运行结果:

...
消费者 Thread-1WAITING
消费者 Thread-3WAITING
生产者 Thread-0set 的值是 1537255325047_1380972136738280
生产者 Thread-0WAITING
消费者 Thread-1get 的值是 1537255325047_1380972136738280
消费者 Thread-1WAITING
消费者 Thread-3WAITING
生产者 Thread-2set 的值是 1537255325048_1380972137330390
生产者 Thread-2WAITING
生产者 Thread-0WAITING

运行结果显示,最后所有的线程都呈 WAITING 状态。为什么会出现这样的情况呢?在代码中已经 wait/notify 啊?

在代码中确实已经通过 wait / notify 进行呈通信了,但不保证 notify 唤醒的是异类,也许是同类,比如“生产者”唤醒“生产者”,或“消费者”唤醒“消费者”这样的情况。如果按这样情况运行的比率积少成多,就会导致所有的线程都不能继续运行下去,大家都在等待,都呈 WAITING 状态,程序最后也就呈“假死”的状态,不能继续运行下去了。

解决“假死”的情况很简单,将 P.java 和 C.Java 文件中的 notify() 改成 notifyAll() 方法即可,它的原理就是不光通知同类线程,也包括异类。这样就不至于出现假死的状态了,程序会一直运行下去。

3. 通过管道进行线程间通信

字节流

在 Java 语言中提供了各种各样的输入 / 输出流 Stream,使我们能够很方便地对数据进行操作,其中管道流(pipeStream)是一种特殊的流,用于在不同线程间直接传送数据。一个线程发送数据到输出管道,另一个线程从输入管道中读数据。通过使用管道,实现不同线程间的通信,而无须借助于类似临时文件之类的东西。

在 Java 的 JDK 中的 IO 包提供了 4 个类来使线程间可以进行通信:

PipedInputStream 和 PipedOutputStream

PipedReader 和 PipedWriter

下面来演示字节流的使用。

读线程:

public class ReadThread extends Thread{
    PipedInputStream inputStream;

    public ReadThread(PipedInputStream inputStream) {
        this.inputStream = inputStream;
    }

    @Override
    public void run() {
        readMethod();
    }

    private void readMethod(){
        try {
            System.out.println("Read :");
            byte[] bytes = new byte[20];
            int readLength = inputStream.read(bytes);
            while (readLength != -1){
                String data = new String(bytes,0,readLength);
                System.out.print(data);
                readLength = inputStream.read(bytes);
            }
            System.out.println();
            inputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

写线程:

public class WriteThread extends Thread{
    PipedOutputStream outputStream;

    public WriteThread(PipedOutputStream outputStream) {
        this.outputStream = outputStream;
    }

    @Override
    public void run() {
        readMethod();
    }

    private void readMethod(){
        try {
            System.out.println("write :");
            for (int i=0;i<300;i++){
                String data = ""+(i+1);
                outputStream.write(data.getBytes());
                System.out.print(data);
            }
            System.out.println();
            outputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

运行类:

public class Run {

    public static void main(String[] args) throws InterruptedException, IOException {
        PipedOutputStream outputStream = new PipedOutputStream();
        PipedInputStream inputStream = new PipedInputStream();
//        inputStream.connect(outputStream);
        outputStream.connect(inputStream);
        ReadThread readThread =  new ReadThread(inputStream);
        WriteThread writeThread = new WriteThread(outputStream);
        readThread.start();
        Thread.sleep(2000);
        writeThread.start();
    }

}

打印结果:

Read :
write :
123456789101112131415161718192021222324...
123456789101112131415161718192021222324...

使用代码 inputStream.connect(outputStream) 或 outputStream.connect(inputStream) 的作用使两个 Stream 之间产生通信链接,这样才可以将数据进行输入与输出。

但在此实验中,首先是读取线程启动,由于当时没有数据被写入。所以线程阻塞在 int readLength = inputStream.read(bytes) 代码中,直到有数据被写入,才继续向下运行。

字符流

写线程:

public class WriteThread extends Thread{
    PipedWriter outputStream;

    public WriteThread(PipedWriter outputStream) {
        this.outputStream = outputStream;
    }

    @Override
    public void run() {
        readMethod();
    }

    private void readMethod(){
        try {
            System.out.println("write :");
            for (int i=0;i<300;i++){
                String data = ""+(i+1);
                outputStream.write(data);
                System.out.print(data);
            }
            System.out.println();
            outputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

读线程:

public class ReadThread extends Thread{
    PipedReader inputStream;

    public ReadThread(PipedReader inputStream) {
        this.inputStream = inputStream;
    }

    @Override
    public void run() {
        readMethod();
    }

    private void readMethod(){
        try {
            System.out.println("Read :");
            char[] chars = new char[20];
            int readLength = inputStream.read(chars);
            while (readLength != -1){
                String data = new String(chars);
                System.out.print(data);
                readLength = inputStream.read(chars);
            }
            System.out.println();
            inputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

运行类:

public class Run {

    public static void main(String[] args) throws InterruptedException, IOException {
        PipedWriter outputStream = new PipedWriter();
        PipedReader inputStream = new PipedReader();
//        inputStream.connect(outputStream);
        outputStream.connect(inputStream);
        ReadThread readThread =  new ReadThread(inputStream);
        WriteThread writeThread = new WriteThread(outputStream);
        readThread.start();
        Thread.sleep(2000);
        writeThread.start();

    }

}

运行结果:

Read :
write :
123456789101112131415161718...
123456789101112131415161718...

打印的结果基本和前一个基本一样,此实验是在两个线程中通过管道流进行字符数据的传输。

用户评论