Featured image of post Java 并发编程

Java 并发编程

本文主要讲述了Java 创建多线程以及线程的管理和同步,并发容器的使用,以及多线程之间的通信。

Java 并发编程

Java 是一种多线程编程语言,支持多线程的创建和管理。多线程编程可以提高程序的性能和响应能力,但也带来了线程安全和同步的问题。本文将介绍 Java 中的多线程编程,包括线程的创建、管理、同步和通信等方面。

1. Java 多线程的创建

Java 中创建多线程有两种方式:继承 Thread 类和实现 Runnable 接口。

1.1 继承 Thread 类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("Thread is running");
    }
    
    public static void main(String[] args) {
        MyThread thread = new MyThread();
        thread.start(); // 启动线程
    }
}

1.2 实现 Runnable 接口

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14

public class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("Thread is running");
    }
    
    public static void main(String[] args) {
        Thread thread = new Thread(new MyRunnable());
        thread.start(); // 启动线程
    }
}

> 如何暂停线程呢可以使用Thread.sleep(int millis)方法来暂停线程参数是暂停的时间单位是毫秒

此模块比较基础,本文将不再赘述。

2. Java 多线程安全

在多线程中,如果两个或多个线程同时访问一个资源(通常是变量),可能会导致数据不一致的情况,这时候就需要我们用一些机制来保证线程安全,来控制变量访问顺序。

2.1 线程安全问题

接下来通过一个案例来演示线程安全问题: 在某些购物平台的双11活动期间,有时候一秒钟可能很多人同时订购一个商品,如果我们不去保证线程安全,可能会导致库存与平台显示的商品数量不一致的问题或者顺序混乱的问题:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
package cn.programcx.multi.issue;

public class Product {
    private static int count = 300;
    public void sell(){
        if(count>0){
            count--;
            System.out.println(Thread.currentThread().getName() +":卖出一张,还剩  "+count+"张");
        }

    }

    public int getCount(){
        return count;
    }

}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package cn.programcx.multi.issue;

public class SellServices implements Runnable{
    private Product product;

    public SellServices(Product product){
        this.product = product;
    }

    @Override
    public void run(){
        while(product.getCount()>0){
                try {
                    Thread.sleep(100);
                    product.sell();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

        }
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
package cn.programcx.multi.issue;

public class Test {
    public static void main(String[] args) {

           for (int i = 0; i < 10; i++) {
               SellServices services = new SellServices(new Product());
               Thread thread = new Thread(services);
               thread.start();
           }
    }
}

输出结果会出现数据一样的情况: alt text

为什么会出现这个问题呢? 请看这段代码:

1
2
count--;
System.out.println(Thread.currentThread().getName() +":卖出一张,还剩  "+count+"张");

如果两个同时执行到这段代码,可能会交叉执行count--System.out.println,就可能整体出现这种代码顺序:

1
2
3
4
count--;
count--;
System.out.println(Thread.currentThread().getName() +":卖出一张,还剩  "+count+"张");
System.out.println(Thread.currentThread().getName() +":卖出一张,还剩  "+count+"张");

这就导致了数据不一致的问题。

2.2 解决线程安全问题的几种方法

2.2.1 使用 synchronized 关键字

synchronized 关键字可以用来修饰方法或代码块,表示该方法或代码块是线程安全的。使用 synchronized 关键字可以保证同一时刻只有一个线程可以执行被修饰的方法或代码块。其它的线程会被阻塞等待,直到当前线程执行完毕。

可以使用 synchronized 修饰实例方法: 比如一个实例方法:public void setCount(int count),可以使用 synchronized 修饰这个方法,改为 public synchronized void setCount(int count),这样就可以保证同一时刻只有一个线程可以执行这个方法。

可以用 synchronized 包围代码块。

上一个案例可以改为:

1
2
3
4
5
6
7
 public synchronized void sell(){
        if(count>0){
            count--;
            System.out.println(Thread.currentThread().getName() +":卖出一张,还剩  "+count+"张");
        }

    }

或者改为:

1
2
3
4
5
6
7
8
9
public void sell(){
        synchronized (this){
            if(count>0){
                count--;
                System.out.println(Thread.currentThread().getName() +":卖出一张,还剩  "+count+"张");
            }
        }

    }

这两种方式是等价的,都是对当前对象加锁。 但是如果我们使用了static修饰的方法,那么就会对类加锁,而不是对对象加锁。 这两种方式的区别在于:如果我们使用了static修饰的方法,那么就会对类加锁,而不是对对象加锁。

运行修改后的代码,输出结果如下: alt text

synchronized 体现了两种特点:

  1. 原子性:保证了同一时刻只有一个线程可以执行被修饰的方法或代码块。
  2. 可见性:保证了一个线程对共享变量的修改对其它线程是可见的。

但是 synchronized 也有一些缺点:

  1. 性能开销:synchronized 的性能开销比较大,尤其是在高并发的情况下,可能会导致线程的阻塞和上下文切换。
  2. 死锁:如果多个线程同时持有锁,可能会导致死锁的情况。
  3. 可重入性:synchronized 是可重入的,即一个线程可以多次获得同一把锁,而不会导致死锁。
  4. 不能中断:synchronized 一旦获得锁,就不能被中断,可能会导致线程长时间等待。
  5. 不能指定锁的公平性:synchronized 是非公平锁,可能会导致线程饥饿的情况。

2.2.2 使用 ReentrantLock

ReentrantLock 是 Java 并发包 java.util.concurrent.locks 下的一个可重入锁(Reentrant Lock),相比 synchronized,它提供了更灵活的锁定机制。我们可以通过自己控制加锁和释放锁还保证锁的公平性,也就是先到先得,它会按照线程请求锁的顺序来进行调度,避免线程“插队”。

有以下常用方法:

1
2
3
4
5
public void lock() // 获取锁
public void unlock() // 释放锁
public boolean tryLock() // 尝试获取锁,如果获取不到就返回 false
public boolean tryLock(long timeout, TimeUnit unit) // 尝试获取锁,如果获取不到就等待指定的时间
public void lockInterruptibly() // 获取锁,如果被中断就抛出异常

上述方法中tryLock()加的锁是非阻塞的,也就是乐观锁,lockInterruptibly()lock()tryLock(long timeout, TimeUnit unit)是悲观锁,获取不到锁就会阻塞。

前面的案例可以改为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package cn.programcx.multi.issue;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Product {
    private static int count = 300;
    private static final Lock lock = new ReentrantLock(true); // 创建锁对象,true表示公平锁

    public void sell() {
        lock.lock(); // 获取锁
        try {
            if (count > 0) {
                count--;
                System.out.println(Thread.currentThread().getName() + ": 卖出一张,还剩  " + count + "张");
            }
        } finally {
            lock.unlock(); // 释放锁
        }
    }

    public int getCount() {
        return count;
    }
}

2.2.3 使用读写锁

ReentrantReadWriteLock 是比 ReetrantLock 更细粒度的锁,它允许多个线程同时读共享变量,但在写共享变量时,必须独占锁。它提供了两个锁:读锁和写锁。

区分读锁和写锁的优势: 如果把读操作和写操作放在一个锁里面,那么读操作会被写操作阻塞,这样就会导致性能下降。可以通过区分读锁和写锁来提高性能:读锁:允许多个线程同时读共享变量,但是不允许其它线程进行写的操作;写锁:不允许其它线程进行读和写的操作。这种更细粒度的划分可以大大提高性能。

比如有ReadData()和WriteData()两个方法,ReadData()方法是读操作,WriteData()方法是写操作。我们可以使用ReentrantReadWriteLock来实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package cn.programcx.multi.issue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWrite {
    public static void main(String[] args) {
        ReadWriteThread rwThread = new ReadWriteThread();

        // 创建多个写线程
        for (int i = 0; i < 8; i++) {
            new Thread(rwThread::writeData, "写线程" + i).start();
        }

        // 创建多个读线程
        for (int i = 0; i < 3; i++) {
            new Thread(rwThread::readData, "读线程" + i).start();
        }


    }
}

class ReadWriteThread{
    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock lock = rwLock.readLock();
    private final ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();

    static List<Integer> list = new CopyOnWriteArrayList<>(Arrays.asList(1,2,3,4,5,6,7,8,9,10));

    public void writeData(){
        try {
            writeLock.lock();   //获取写锁
            int rd = new Random().nextInt(10);
            list.add(rd);
            Thread.sleep(10);  //模拟写的时候消耗时间长
            System.out.println("已经写入:"+rd);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            writeLock.unlock(); //释放写锁
        }

    }

    public void readData(){
        try {
            lock.lock();    //获取读锁
            list.forEach((i)-> System.out.print(i+" "));
            System.out.println();
        } finally {
            lock.unlock();  //释放读锁
        }
    }

}

输出结果如下: alt text

从上面的输出结果可以看出,读操作在写操作后面,因为读锁会阻塞写锁。

2.2.4 使用 AtomicInteger

AtomicInteger 是 Java 并发包 java.util.concurrent.atomic 下的一个原子类,它提供了一些原子操作的方法,可以用来实现线程安全的计数器。它的性能比 synchronizedReentrantLock 更高,因为它是基于 CAS(Compare And Swap)算法实现的。 CAS 是一种乐观锁的实现方式,它通过比较内存中的值和预期值,如果相等就交换,否则就不交换。这样就可以避免线程的阻塞和上下文切换,提高性能。

AtomicInteger 的常用方法:

1
2
3
4
5
6
7
8
public int get() // 获取当前值
public int incrementAndGet() // 自增 1,并返回自增后的值
public int getAndIncrement() // 自增 1,并返回自增前的值
public int decrementAndGet() // 自减 1,并返回自减后的值
public int getAndDecrement() // 自减 1,并返回自减前的值
public int add(int delta) // 自增 delta,并返回自增后的值
public int getAndAdd(int delta) // 自增 delta,并返回自增前的值
public AtomicInteger atomicInteger = new AtomicInteger(int initialValue) // 创建一个 AtomicInteger 对象

第一个案例我们可以改为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
package cn.programcx.multi.issue;

import java.util.concurrent.atomic.AtomicInteger;

public class Product {
    private static final AtomicInteger count = new AtomicInteger(300);

    public boolean sell() {
        int remaining = count.decrementAndGet();
        if (remaining >= 0) {
            System.out.println(Thread.currentThread().getName() + " 卖出一张票,剩余:" + (remaining));
            return true;
        } else {
            return false;
        }
    }


}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
package cn.programcx.multi.issue;

public class SellServices implements Runnable{
    private Product product;

    public SellServices(Product product){
        this.product = product;
    }

    @Override
    public void run(){
        while(product.sell()){
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

        }
    }
}

在上述代码中,我们使用了 AtomicInteger 来代替 int 类型的 count 变量。通过调用 decrementAndGet() 方法来实现自减操作,并返回自减后的值。decrementAndGet() 这个方法是原子操作,即递减和获取值操作不会被其他线程打断,因此可以保证线程安全。

3. 并发容器

Java 提供了一些并发容器,用于实现代替Runnable接口和继承Thread类的方式代替多线程编程。并发容器是线程安全的,可以在多线程环境中使用。常用的并发容器为:CompletableFuture,接下来我们会介绍CompletableFuture的使用。

CompletableFuture 是 Java 8 引入的一个类,它实现了 Future 接口,提供了异步编程的能力。它可以用来执行异步任务,并在任务完成后执行回调操作。CompletableFuture 支持链式调用,可以将多个异步任务组合在一起。

javascript中,我们使用Promise来实现异步编程,是这样的用法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
const promise =new Promise((resolve,reject)=>{
    // 异步操作
    if(success){
        resolve(result);    //传参到then方法
    }else{
        reject(error);  //传参到catch方法
    }
});

promise.then(result=>{
    // 处理结果
}).catch(error=>{
    // 处理错误
});

在 Java 中,CompletableFuture 提供了类似的功能。它可以用来表示一个异步计算的结果,并提供了一些方法来处理这个结果。CompletableFuture 的使用方式如下:

1
2
3
4
5
6
7
8
9
CompletableFuture.supplyAsync(()=>{
    // 异步任务
    return result;
}).thenApply(result -> {
    // 处理结果
    return processedResult;
}).thenAccept(result -> {
    // 消费结果
});

在上述的代码中,CompletableFuture 执行了一个异步任务,处理完第一个任务后,将返回的结果作为thenApply方法的参数,继续执行下一个thenApply的任务,最后执行thenAccept方法来消费结果。 CompletableFuture 还提供了其他一些方法,比如 thenCombine()thenCompose()allOf()anyOf() 等,可以用来实现更复杂的异步任务组合。 这个就有点像Promise的链式调用了,CompletableFuture 也可以用来实现类似于 Promise 的功能。它可以用来表示一个异步计算的结果,并提供了一些方法来处理这个结果。

我们还可以通过ArrayList来存储多个CompletableFuture对象,然后使用allOf()方法来等待所有的异步任务完成。allOf()方法返回一个新的 CompletableFuture 对象,当所有的异步任务完成时,这个新的 CompletableFuture 对象也会完成。

比如我们要完成多个学生同时注册的操作,先定义一个学生类:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.programcx.ch5.p2;

public class Student {
    private String name;
    private int id;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public Student(String name, int id) {
        this.name = name;
    }

    public Student(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Student{" +
                "name='" + name + '\'' +
                ", id=" + id +
                '}';
    }
}

然后定义一个注册类:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
package cn.programcx.ch5.p2;

import java.util.concurrent.atomic.AtomicInteger;

public class Register {
    private static AtomicInteger count = new AtomicInteger(0);
    public Student register(Student student) {
       student.setId(count.incrementAndGet());
       return student;
    }
}

在注册类中,我们使用了AtomicInteger来保证线程安全,count变量用来记录注册的学生数量。我们在register()方法中对学生进行注册,并返回注册后的学生对象。 在register()方法中,我们使用了incrementAndGet()方法来实现自增操作,并返回自增后的值。incrementAndGet() 这个方法是原子操作,即递增和获取值操作不会被其他线程打断,因此可以保证线程安全。

之后我们就可以使用CompletableFuture来实现异步注册了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
        List<CompletableFuture> completableFutures = new ArrayList<>();
        List<Student> students = new ArrayList<>();
        for(int i=0;i<100;i++){
            students.add(new Student("student"+i));
        }
        Register register =new Register();
        students.forEach(student -> {
            CompletableFuture cf= CompletableFuture.supplyAsync(()->{
                register.register(student);
                return student;
            }).thenAccept(stu ->
                System.out.println("注册成功,学生信息为"+stu)
            );

            completableFutures.add(cf); //将CompletableFuture实例放到ArrayList容器中
        });

        try{
            CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join();
        }catch(Exception e){
            e.printStackTrace();
        }

在上述代码中,我们使用了 CompletableFuture.supplyAsync() 方法来执行异步任务,并将返回的结果作为参数传递给 thenAccept() 方法。最后,我们使用 CompletableFuture.allOf() 方法来等待所有的异步任务完成。 在 CompletableFuture.allOf() 方法中,我们将 ArrayList 转换为数组,并传递给 allOf() 方法。allOf() 方法返回一个新的 CompletableFuture 对象,我们在最后调用 join() 方法来等待所有的异步任务完成。、

补充CompletableFuture 还提供了其它一些方法,比如isDone()cancel()complete() 等,可以用来实现其他并发状态获取和控制功能。

4.线程池

线程池是 Java 中用于管理线程的一个重要概念。它可以用来创建和管理多个线程,并提供了一些方法来控制线程的生命周期。线程池可以提高程序的性能和响应能力,避免频繁创建和销毁线程带来的性能开销。 Java 中我们可以使用ThreadPoolExecutor类来创建线程池。ThreadPoolExecutor 是一个可扩展的线程池实现,它提供了多种线程池的实现方式,比如固定大小的线程池、可缓存的线程池、单线程的线程池等。

1
2
3
4
5
6
7
8
9
public void execute(Runnable command) // 执行一个任务
public Future<?> submit(Runnable task) // 提交一个任务,并返回一个 Future 对象
public <T> Future<T> submit(Callable<T> task) // 提交一个任务,并返回一个 Future 对象
public void shutdown() // 关闭线程池
public List<Runnable> shutdownNow() // 立即关闭线程池
public boolean isShutdown() // 判断线程池是否关闭
public boolean isTerminated() // 判断线程池是否终止
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) // 执行一组任务,并返回结果
...

下列实例我们会演示如何创建线程池:

  1. 先重写Register类,使其继承Thread
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
package cn.programcx.ch5.pool;

import cn.programcx.ch5.p2.Student;

import java.util.concurrent.atomic.AtomicInteger;

public class Register extends Thread {
    private static final AtomicInteger count = new AtomicInteger(0);
    private final Student student;

    public Register(Student student) {
        this.student = student;
    }

    @Override
    public void run() {
        student.setId(count.incrementAndGet());
        System.out.println("有新的学生注册成功"+student);
    }
}
  1. 然后创建一个线程池,并使用线程池来执行注册操作
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package cn.programcx.ch5.pool;

import cn.programcx.ch5.p2.Student;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;

import java.util.concurrent.*;

public class Pool {
    //创建线程工厂
    private static final ThreadFactory threadFactory = new BasicThreadFactory.Builder()
            .namingPattern("pool-%d").build();

    //创建阻塞队列
    private static final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(1024);

//使用上面两个实例作为参数创建线程池
    private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            20, 200, 30, TimeUnit.SECONDS, workQueue, threadFactory, new ThreadPoolExecutor.AbortPolicy()
    );

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            Student student = new Student(String.valueOf(i));

            Register register = new Register(student);

            //使用线程池来执行注册操作,调用Runnable接口的run方法
            threadPoolExecutor.execute(register);
        }
    }
}

**注意:**execute方法传入的参数的实例必须实现Runnable接口或者Callable接口,可以继承Thread类,或者实现Runnable接口。

其实在 3 的例子中,我们可以通过使用ExecutorService来创建线程池控制并发数量,防止高并发导致服务器崩溃。

3中的代码可以改为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package cn.programcx.ch5.p2;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestRegisterStudent {
    public static void main(String[] args) {
        List<CompletableFuture> completableFutures = new ArrayList<>();

        ExecutorService executorService = Executors.newFixedThreadPool(20);

        List<Student> students = new ArrayList<>();
        for(int i=0;i<100;i++){
            students.add(new Student("student"+i));
        }
        Register register =new Register();
        students.forEach(student -> {
            CompletableFuture cf= CompletableFuture.supplyAsync(()->{
                register.register(student);
                return student;
            },executorService).thenAccept(stu ->
                System.out.println("注册成功,学生信息为"+stu)
            );

            completableFutures.add(cf); //将CompletableFuture实例放到ArrayList容器中
        });

        try{
            CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join();
        }catch(Exception e){
            e.printStackTrace();
        }

        executorService.shutdown(); //关闭线程池
    }
}

这里,我们使用 ExecutorService executorService = Executors.newFixedThreadPool(20);来创建一个固定大小的线程池,大小为 20。然后在CompletableFuture.supplyAsync()方法中传入线程池对象,这样就可以使用线程池来执行异步任务了。 在最后,我们调用executorService.shutdown()方法来关闭线程池,释放资源。

线程池的好处:

  1. 线程池可以重用线程,避免频繁创建和销毁线程带来的性能开销。
  2. 线程池可以控制线程的数量,避免过多的线程导致系统资源耗尽。
  3. 可以通过线程池获取线程池的状态、任务的数量等。
  4. 线程池提供一些任务调度的功能,比如定时任务、周期任务等。
  5. 可以执行一些任务的取消和中断,比如取消正在执行的任务、中断等待的任务等。

5. 线程间通信

5.1 wait() 和 notify()

wait()notify() 是 Java 中用于线程间通信的方法。它们可以用来实现线程间的协作和同步。wait() 方法会使当前线程等待,直到其他线程调用 notify()notifyAll() 方法来唤醒它。notify() 方法会随机唤醒一个等待的线程,而 notifyAll() 方法会唤醒所有等待的线程。 wait()notify() 方法必须在同步代码块或同步方法中调用,否则会抛出 IllegalMonitorStateException 异常。因为不呢能在主线程上等待,否则会导致程序阻塞。

  • wait() 方法会释放锁,并使当前线程进入等待状态,直到其他线程调用 notify()notifyAll() 方法来唤醒它。
  • notify() 方法会随机唤醒一个等待的线程,并使其进入就绪状态,但不会释放锁。
  • notifyAll() 方法会唤醒所有等待的线程,并使它们进入就绪状态,但不会释放锁。

接下来是一个简单的 demo:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package cn.programcx.consumer;

public class Data {
    private int data;
    public synchronized void increase() throws InterruptedException{
        while(data!=0){
            wait();
        }
        data++;
        System.out.println("data:"+data);
        notifyAll();
    }
    public synchronized void decrease() throws InterruptedException{
        while(data==0){
            wait();
        }
        data--;
        System.out.println("data:"+data);
        notifyAll();
    }

    public static void main(String[] args) {
       Data data = new Data();

       new Thread(()->{
           for(int i=0;i<10;i++){
               try{
                   data.increase();
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
           }
       }).start();

       new Thread(()->{
           for(int i=0;i<10;i++){
               try {
                   data.decrease();
               }catch (InterruptedException e){
                   throw new RuntimeException(e);
               }
           }
       }).start();
    }

}

在上述代码中,我们创建了一个基本的“生产——消费者”模型,该模型中有两个方法,一个负责在descrease()方法中减少数据,一个负责在increase()方法中增加数据。这两个方法应该是互斥的,所以我们使用了while循环来判断是否互斥方法已经执行了,如果没有执行,就使用wait()方法来等待,直到互斥方法调用notify()notifyAll()方法来唤醒它。注意:这两个方法必须在不同线程中运行,否则程序会阻塞,即如果只有一个线程调用,它一旦wait()自己就再也没机会唤醒自己,程序就永远卡住了。

我们还可以用这个”生产——消费者“模型来写一个厕所使用的例子,即一个公共厕所里面有5个厕所位置,当5个全部被占用时,其他人就要等着,直到有一个人上完厕所,释放一个位置。我们可以使用wait()notify()方法来实现这个模型。

以下是示例代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package cn.programcx.consumer;

import java.util.HashMap;
import java.util.Map;

public class Toilet {
    private Map<Integer, Boolean> toiletMap = new HashMap<Integer, Boolean>();

    private ThreadLocal<Integer> threadLocal = new InheritableThreadLocal<>();

    public Toilet() {
        for (int i = 1; i <= 5; i++) {
            toiletMap.put(i, false); // 初始化5个厕所位置,声明为未被占用
        }
    }

    private synchronized void acquirePosition() throws InterruptedException {
        int position = 0;
        while (true) {
            for (Integer item : toiletMap.keySet()) {
                if (!toiletMap.get(item)) {
                    position = item; 
                    toiletMap.put(position, true);  //占用厕所位置
                    threadLocal.set(position);  //保存当前线程占用厕所的位置
                    System.out.println(Thread.currentThread().getName() + " 占用了厕所 " + item);
                    return;
                }
            }

            wait();
        }

    }

    private synchronized void releasePosition() {
        Integer position = threadLocal.get();
        if (position != null) {
            toiletMap.put(position, false); //释放厕所位置
            threadLocal.remove();
            System.out.println(Thread.currentThread().getName() + " 上完了厕所 " + position);
            notifyAll();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        Toilet toilet = new Toilet();
        for (int i = 1; i <= 7; i++) {
            new Thread(() -> {
                try {
                    while (true) {
                        toilet.acquirePosition();
                        Thread.sleep(2000);
                        toilet.releasePosition();
                        Thread.sleep(1000);
                    }

                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }).start();
        }
    }
}

输出结果如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Thread-0 占用了厕所 1
Thread-6 占用了厕所 2
Thread-2 占用了厕所 3
Thread-4 占用了厕所 4
Thread-3 占用了厕所 5
Thread-3 上完了厕所 5
Thread-5 占用了厕所 5
Thread-4 上完了厕所 4
Thread-1 占用了厕所 4
Thread-2 上完了厕所 3
Thread-6 上完了厕所 2
Thread-0 上完了厕所 1
Thread-3 占用了厕所 1
Thread-6 占用了厕所 2
Thread-4 占用了厕所 3
Thread-1 上完了厕所 4
Thread-2 占用了厕所 4
Thread-5 上完了厕所 5
Thread-0 占用了厕所 5
Thread-3 上完了厕所 1
Thread-6 上完了厕所 2
Thread-1 占用了厕所 1
Thread-4 上完了厕所 3
Thread-5 占用了厕所 2
Thread-2 上完了厕所 4
Thread-6 占用了厕所 3
Thread-0 上完了厕所 5
Thread-4 占用了厕所 4
Thread-3 占用了厕所 5
Thread-1 上完了厕所 1
Thread-2 占用了厕所 1
Thread-5 上完了厕所 2
Thread-0 占用了厕所 2
Thread-6 上完了厕所 3
Thread-4 上完了厕所 4
Thread-1 占用了厕所 3
Thread-3 上完了厕所 5
Thread-5 占用了厕所 4

进程已结束,退出代码为 130

在上面的代码中,我们使用Map<Integer,Boolean> toiletMap来记录厕所占用情况,在acquirePosition()方法中,我们创建了一个while(true)循环,通过遍历toiletMap来判断是否有厕所位置被占用,如果没有,就使用wait()方法来等待,直到有一个人上完厕所,释放一个位置。我们还使用了ThreadLocal<Integer> threadLocal来保存当前线程占用厕所的位置,这样就可以在releasePosition()方法中释放自己占用的厕所位置了。

5.2 BlockingQueue

我们还可以通过BlockingQueue来实现线程间通信,BlockingQueue 是一个线程安全的队列,它提供了一些方法来阻塞线程,直到队列中有可用的元素。BlockingQueue 提供了多种实现方式,比如 ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueue 等。

在并发数不是太大的项目中,我们常用ArrayBlockingQueue,它是一个有界的阻塞队列,使用数组来实现。它的构造函数需要指定队列的大小,比如 ArrayBlockingQueue(int capacity)。而在并发数较大的项目中,我们常用LinkedBlockingQueue,它是一个无界的阻塞队列,使用链表来实现。它的构造函数可以指定队列的大小,比如 LinkedBlockingQueue(int capacity),如果不指定大小,则默认为 Integer.MAX_VALUELinkedBlockingQueue 的性能比 ArrayBlockingQueue 更高,因为它是基于链表实现的,避免了数组扩容带来的性能开销,但是它的内存开销也更大,因为它需要维护一个链表结构。

BlockingQueue中,我们常用这些方法:

1
2
3
4
5
6
7
8
9
public void add(E e) // 添加元素,如果队列满了就抛出异常,如果超出队列大小就抛出异常
public void put(E e) // 添加元素,如果队列满了就阻塞
public E take() // 获取元素,如果队列为空就阻塞
public boolean offer(E e, long timeout, TimeUnit unit) // 添加元素,如果队列满了就等待指定的时间,如果时间到了仍然无法添加元素,则返回 false。如果添加成功,则返回 true。
public E poll(long timeout, TimeUnit unit) // 获取元素,如果队列为空就等待指定的时间,如果时间到了仍然无法获取元素,则返回 null。如果成功获取到元素,则返回该元素。
public int remainingCapacity() // 返回队列的剩余容量
public int size() // 返回队列的大小
public boolean isEmpty() // 判断队列是否为空
public boolean isFull() // 判断队列是否满

我们可以用BlockingQueue来改写上一个厕所占用的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package cn.programcx.consumer;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class QueueToilet {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(5);
        for (int i = 1; i <=5; i++) {
            queue.add(i);
        }

        for(int i = 0; i < 7; i++){
            new Thread(() -> {
                while(true){
                    try {
                        Integer num = queue.take();
                        System.out.println(Thread.currentThread().getName()+"占用了厕所位置:"+num);
                        Thread.sleep(2000);
                        queue.put(num);
                        System.out.println(Thread.currentThread().getName()+"离开了厕所位置:"+num);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }

                }
            }).start();
        }

    }
}

输出结果如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Thread-1占用了厕所位置:2
Thread-2占用了厕所位置:3
Thread-4占用了厕所位置:5
Thread-6占用了厕所位置:4
Thread-0占用了厕所位置:1
Thread-5占用了厕所位置:2
Thread-1离开了厕所位置:2
Thread-6离开了厕所位置:4
Thread-3占用了厕所位置:4
Thread-0离开了厕所位置:1
Thread-4离开了厕所位置:5
Thread-2离开了厕所位置:3
Thread-1占用了厕所位置:1
Thread-0占用了厕所位置:5
Thread-6占用了厕所位置:3
Thread-5离开了厕所位置:2
Thread-2占用了厕所位置:2
Thread-3离开了厕所位置:4
Thread-4占用了厕所位置:4
Thread-5占用了厕所位置:5
Thread-3占用了厕所位置:3
Thread-0离开了厕所位置:5
Thread-6离开了厕所位置:3
Thread-1离开了厕所位置:1
Thread-2离开了厕所位置:2
Thread-4离开了厕所位置:4
Thread-1占用了厕所位置:1
Thread-6占用了厕所位置:2
Thread-0占用了厕所位置:4

进程已结束,退出代码为 130

5.3 Semaphore

信号量(Semaphore)是一个用来控制对共享资源的访问数量的同步工具。它广泛应用于并发编程中,通常用于限制能够访问某些资源的线程数量。Semaphore 提供了一个计数器来表示当前可用的资源数,并且可以用来实现访问共享资源的控制。

信号量本身其实是一个计数器,表示当前可用资源的数量,并且可以和其它线程共享。

Semaphore 用两个基本操作,一个是acquire(),一个是release()acquire() 方法会获取一个信号量,如果信号量的计数器大于 0,就将计数器减 1,并返回;如果计数器为 0,就会阻塞当前线程,直到有其他线程调用 release() 方法来释放信号量。release() 方法会将信号量的计数器加 1,并唤醒一个等待的线程。 Smephore提供了以下构造方法:

1
2
public Semaphore(int permits) // 创建一个信号量,初始计数器为 permits
public Semaphore(int permits, boolean fair) // 创建一个信号量,初始计数器为 permits,fair 表示是否公平

Semaphore 的公平性是指在多个线程竞争信号量时,是否按照请求的顺序来分配信号量。如果设置为公平,则会按照请求的顺序来分配信号量;如果设置为非公平,则会随机分配信号量。公平性会影响性能,因为它会导致线程的上下文切换和调度开销。

Semaphore 的常用方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public void acquire() throws InterruptedException // 获取信号量
public void acquire(int permits) throws InterruptedException // 获取指定数量的信号量
public void release() // 释放信号量
public void release(int permits) // 释放指定数量的信号量
public int availablePermits() // 获取当前可用的信号量数量
public int drainPermits() // 获取当前可用的信号量数量,并将计数器清零
public boolean tryAcquire() // 尝试获取信号量,如果获取成功则返回 true,否则返回 false
public boolean tryAcquire(int permits) // 尝试获取指定数量的信号量,如果获取成功则返回 true,否则返回 false
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException // 尝试获取信号量,如果获取成功则返回 true,否则返回 false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException // 尝试获取指定数量的信号量,如果获取成功则返回 true,否则返回 false

我们可以用Semaphore简单地来改写上一个厕所占用的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package cn.programcx.consumer;

import java.util.concurrent.Semaphore;

public class SemaphoreToilet {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(5);

        for (int i = 0; i < 7; i++) {
            new Thread(() -> {
                while (true) {
                    try{
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName()+"线程占用了厕所");
                        Thread.sleep(2000);
                        semaphore.release();
                        System.out.println(Thread.currentThread().getName()+"线程离开了厕所");
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }).start();
        }
    }
}

这里我们使用了Semaphore来控制厕所的数量,Semaphore的构造函数中传入的参数为 5,表示有 5 个厕所位置。然后在每个线程中调用acquire()方法来获取信号量,占用一个厕所位置;调用release()方法来释放信号量,离开一个厕所位置。

但是这样在获取或离开厕所时我们不能知道是哪一个厕所,这时候我们就需要修改一下Semaphore的构造函数,创建一个Semaphore数组,这个数组里面每个对象都代表一个厕所编号(通过数组下标),每个对象的permits值为 1,表示这个厕所可以被占用。然后遍历每一个Semaphore对象,通过tryAcquire()方法来尝试获取数组,直到获取到一个位置为止。

以下是根据这个思路修改完成的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.programcx.consumer;

import java.util.concurrent.Semaphore;

public class SemaphoreToilet {
    public static void main(String[] args) {
        Semaphore[] semaphores =new Semaphore[5];   // 创建一个Semaphore数组,表示5个厕所位置

        for(int i=0;i<semaphores.length;i++){
            semaphores[i]=new Semaphore(1);  //初始化数组,每个只表示是否被占用
        }

        for(int i=0;i<7;i++){
            new Thread(()->{
                while(true) {
                    for (int j = 1; j <=semaphores.length; j++) {
                        if (semaphores[j-1].tryAcquire()) { //尝试获取一个厕所位置,非阻塞,获取失败返回false
                            System.out.println(Thread.currentThread().getName() + "线程占用了一个厕所位置:" + j);
                            try {
                                Thread.sleep(2000);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                            semaphores[j-1].release();
                            System.out.println(Thread.currentThread().getName() + "线程离开了一个厕所位置:" + j);
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                            break;
                        }
                    }
                }
            }).start();
        }
    }
}
Licensed under CC BY-NC-SA 4.0