目录 start

  1. 保证同步的方式
  2. 关键字 synchronized
    1. 使用wait() notify() 和 notifyAll()
  3. 使用Lock
  4. 4、使用读写锁实现同步数据访问
  5. 5、使用Condition
  6. 1、信号量:Semaphore
  7. 2、使用CountDownLatch等待并发事件完成
  8. 3、使用CyclicBarrier让多个线程同步
  9. 4、使用Phaser控制并发阶段任务的运行
  10. 5、使用Exchanger控制并发任务间的数据交换

目录 end|2019-10-19 17:04|


保证同步的方式

关键字 synchronized

  • 一个对象使用synchronized关键字声明,则只有一个执行线程可访问它,如果其他线程试图访问,这些线程将会被挂起,直到第一个拥有的的线程执行完

  • 当使用synchronized修饰一个对象的非静态方法时,当一个线程访问该方法时,其他线程不能访问该对象的其他被synchronized修饰的方法,但可以访问未被synchronized修饰的方法

  • 当使用synchronized修饰静态方法时,该方法同时只能被同一线程访问,但其他线程可访问该对象的其他非静态方法

使用wait() notify() 和 notifyAll()

  • wait()方法可让线程挂起
  • notify()notifyAll()用于唤醒线程
  • 使用队列实现生产-消费者模型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    public class WaitAndNotify {

    /**
    * 模拟存储的队列.
    */
    @Data
    private class Storage {
    private int maxSize;
    private LinkedList<Integer> data;

    public Storage(int maxSize) {
    this.maxSize = maxSize;
    this.data = new LinkedList<>();
    }

    /**
    * 添加数据.
    */
    public synchronized void add(Integer data) {
    while (this.data.size() == maxSize) {
    try {
    System.out.println("队列已满");
    wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    System.out.println("添加数据");
    this.data.add(data);
    notifyAll();
    }

    /**
    * 获取数据
    */
    public synchronized Integer get() {
    while (this.data.size() == 0) {
    try {
    System.out.println("队列已空");
    wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    Integer result = this.data.poll();
    System.out.println("获取数据: " + result);
    notifyAll();
    return result;
    }
    }

    public void run() {
    Random random = new Random();
    Storage storage = new Storage(20);
    Thread producer = new Thread(() -> {
    for (int i = 0; i < 100; i++) {
    int n = random.nextInt(100);
    storage.add(n);
    }
    });
    Thread consumer = new Thread(() -> {
    for (int i = 0; i < 100; i++) {
    storage.get();
    }
    });
    producer.start();
    consumer.start();
    }

    public static void main(String[] args) {
    WaitAndNotify test = new WaitAndNotify();
    test.run();
    }

    }

使用Lock

Java除了使用synchronized实现同步代码块外,还提供了另一种同步代码块机制,这种机制基于Lock及其实现类(如:ReentrantLock)

  • Locksynchronized的区别

    • Lock更加灵活:使用synchronized关键字,只能在同一块synchronized块结构中获取和释放。而Lock可实现更复杂的临界结构,如获取和释放不再同一块结构中

    • Lock提供了更多的功能:如tryLock()

    • Lock接口允许读写分离操作,允许多个读线程和一个写线程

    • Lock的性能更好

4、使用读写锁实现同步数据访问

接口ReadWriteLock和其唯一实现类ReentrantReadWriteLock(该类有两个锁,分别为读操作锁和写操作锁)可实现读写锁

  • 读写锁示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    public class ReadAndWriter {

    private int price1;
    private int price2;

    ReadWriteLock readWriteLock;

    public ReadAndWriter(int price1, int price2) {
    this.price1 = price1;
    this.price2 = price2;
    this.readWriteLock = new ReentrantReadWriteLock();
    }

    /**
    * 获取数据(使用读操作锁)
    * @return
    */
    public int getPrice1() {
    readWriteLock.readLock().lock();
    int price = this.price1;
    readWriteLock.readLock().unlock();
    return price;
    }

    /**
    * 获取数据(使用读操作锁)
    * @return
    */
    public int getPrice2() {
    readWriteLock.readLock().lock();
    int price = this.price2;
    readWriteLock.readLock().unlock();
    return price;
    }

    /**
    * 设置数据(使用写操作锁)
    * @param price1
    * @param price2
    */
    public void setPrices(int price1, int price2) {
    readWriteLock.writeLock().lock();
    this.price1 = price1;
    this.price2 = price2;
    readWriteLock.writeLock().unlock();
    }

    public static void main(String[] args) {
    ReadAndWriter readAndWriter = new ReadAndWriter(0, 1);
    Random random = new Random();
    new Thread(() -> {
    for (int i = 0; i < 3; i++) {
    System.out.println("writer");
    readAndWriter.setPrices(random.nextInt(10000), random.nextInt(10000));
    System.out.println("modified");
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }).start();

    for (int i = 0; i < 5; i++) {
    new Thread(() -> {
    for (int j = 0; j < 5; j++) {
    System.out.println(Thread.currentThread().getName() + " : " + readAndWriter.getPrice1());
    System.out.println(Thread.currentThread().getName() + " : " + readAndWriter.getPrice2());
    }
    }).start();
    }
    }
    }

5、使用Condition

一个锁可能关联一个或多个条件,这些条件通过Condition接口声明,Condition接口提供了线程的挂起和唤醒机制

  • 与锁绑定的条件对象都是通过Lock接口声明的newCondition()方法创建的。

  • 在使用条件时,必须拥有绑定的锁,即,所有带条件的代码必须在调用Lock对象的lock()unlock()方法之间

  • 当线程调用条件await()时,会自动释放绑定的锁

  • 如果调用了条件await(),但没有调用他的signal(),这个线程将会永远休眠

  • 生产-消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class ConditionTest {

private static class Storage {
private LinkedList<Double> storage = new LinkedList<>();
private int maxSize;
// 创建Lock
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();

public Storage(int maxSize) {
this.maxSize = maxSize;
}

public Double get() {
Double n = 0.0;
lock.lock();
try {
while (this.storage.size() < 1) {
System.out.println("等待生产");
consumer.await();
}
System.out.println(Thread.currentThread().getName() + " consumer" );
n = this.storage.poll();
producer.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return n;
}

public void add(Double n) {
lock.lock();
try {
while (this.storage.size() >= this.maxSize) {
System.out.println("等待消费");
producer.await();
}
System.out.println(Thread.currentThread().getName() + " producer" );
this.storage.add(n);
consumer.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}


public static void main(String[] args) {
Storage storage = new Storage(20);
new Thread(() -> {
while (true) {
storage.add(Math.random());
}
}).start();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
while (true) {
System.out.println(storage.get());
}
}).start();
}
}

}

1、信号量:Semaphore

信号量是一个计数器,用来保护一个或多个共享资源的访问。当线程访问一个一个共享资源时,它必须得先获取信号量,如果信号量大于0,则信号量减一,该线程允许访问共享资源。当信号量等于0,则线程将会被置于休眠,直到信号量大于0

  • 注意:当线程用完某个共享资源后,信号量必须释放,释放操作将会是信号量的内部计数器加1

  • 使用二进制信号量控制队列中数据的添加和获取的同步(此处使用公平模式,在非公平模式下,多个死循环线程中出现信号量一直被一个线程占用的情况)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class SemaphoreTest {

/** 声明一个信号量对象. */
private Semaphore semaphore;
/** 存储数据. */
private LinkedList<Double> storage = new LinkedList<>();
/** 存储的最大数量. */
private int maxSize = 20;

public SemaphoreTest() {
// 此处传入的参数为1,则该信号量为二进制信号量,即信号量的计数器的值只有0和1
// 第二个参数表示是否公平
this.semaphore = new Semaphore(1, true);
}

/**
* 添加
* @param d
*/
public void add(Double d) {
try {
// 获取信号量
semaphore.acquire();
System.out.println(Thread.currentThread().getName() +": add start");
if (this.storage.size() < this.maxSize) {
this.storage.add(d);
}
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() +": add end");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放信号量
semaphore.release();
}
}

/**
* 获取
*/
public Double get() {
try {
// 获取信号量
semaphore.acquire();
double d = 0.0;
System.out.println(Thread.currentThread().getName() +": get start");
if (this.storage.size() > 0 ) {
d = this.storage.poll();
}
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() +": get end");
return d;
} catch (InterruptedException e) {
e.printStackTrace();
return 0.0;
} finally {
// 释放信号量
semaphore.release();
}
}

public static void main(String[] args) {
SemaphoreTest test = new SemaphoreTest();
for (int i = 0; i < 3; i++) {
new Thread(() -> {
while (true) {
test.add(Math.random());
}
}).start();
}

for (int i = 0; i < 3; i++) {
new Thread(() -> {
while (true) {
System.out.println(test.get());
}
}).start();
}
}

}
  • 除二进制信号量外,信号量还可以让实现被多个线程同时访问的临界区

2、使用CountDownLatch等待并发事件完成

  • CountDownLatch可以完成一组正在其他线程中执行的操作前,允许他一直等待

  • 实例:在20个线程完全准备好前,让线程等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class CountDownLatchTest {

public static void main(String[] args) {
int size = 20;
// 创建CountDownLatch对象
CountDownLatch latch = new CountDownLatch(size);
Lock lock = new ReentrantLock();
for (int i = 0; i < 20; i++) {
new Thread(() -> {
System.out.println("等待线程全部准备...");
try {
try {
// 为了 latch.getCount() 顺序所以加锁控制
lock.lock();
// 减一操作,表示该线程以准备完成
latch.countDown();
System.out.println("还有 " + latch.getCount() + " 个线程需准备");
} finally {
lock.unlock();
}
// 等待其他线程准备
latch.await();
System.out.println(Thread.currentThread().getName() + ": 准备完成, 开始执行任务...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

}

3、使用CyclicBarrier让多个线程同步

  • CyclicBarrierCountDownLatch很相似,但CyclicBarrier的功能要更强大些。CyclicBarrier除了可以让线程在某个集合点同步外,还能在所有线程都达到集合点后再运行一个新的线程。这可以很好的实现分治思想

  • CyclicBarrier可使用reset()方法,将内部计数器重置为初始化的值。重置后,正在await()方法中等待的线程将会收到BrokenBarrierException异常,这是可处理异常或将操作重新执行或恢复到被中断时的状态

  • 损坏的CyclicBarrier:Cyclicbarrier对象有一种特殊的状态即损坏状态(Broken)。当很多线程在await()方法上等待的时候,如果其中一个线程被中断,这个线程将抛出Interruptedexception异常,其他的等待线程将抛出Brokenbarrierexception异常,于是Cyclicbarrier对象就处于损坏状态了。Cyclicbarrier类提供了isBroken()方法,如果处于损坏状态就返回true,否则返回false。

  • 使用CyclicBarrier,计算某个数值再二维数组中出现的次数(先将二维数组分为若干个一维数组,每个线程计算各自分配的一维数组中数值出现的次数。当所有线程计算完各自一维数组后,再使用另一个线程计算前面每个线程所计算出的数量的总和)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class CyclicBarrierStudy {

private int[][] storage;

public CyclicBarrierStudy(int size, int length, int number) {
// 初始化二维数组数据,并记录所要查找的数字出现的次数
this.storage = new int[size][length];
Random random = new Random();
int count = 0;
for (int i = 0; i < size; i++) {
for (int j = 0; j < length; j++) {
storage[i][j] = random.nextInt(150);
if (storage[i][j] == number) {
count++;
}
}
}
// 打印正确结果信息,用于后面的校验
System.out.printf("需查找的数值 %d 共出现 %d 次发\n", number, count);
}

public int[] getData(int index) {
return this.storage[index];
}

public static void main(String[] args) {
final int size = 10, length = 50, number = 100;
int[] countData = new int[size];
CyclicBarrierStudy study = new CyclicBarrierStudy(size, length, number);

// 创建CyclicBarrier对象,并指定等待 size个线程结束,结束后运行另一个线程计算总数
CyclicBarrier cyclicBarrier = new CyclicBarrier(size, () -> {
// 计算总数
int count = 0;
for (int n : countData) {
count += n;
}
System.out.printf("查找结束,数值 %d 共出现 %d 次\n", number, count);
});

// 用线程计算每组的出现的数量
for (int i = 0; i < size; i++) {
final int index = i;
new Thread(() -> {
// 将二维数组分组进行查询
int[] data = study.getData(index);
int count = 0;
for (int n : data) {
if (n == number) {
count++;
}
}
// 保存该组数组中出现指定数据的次数
countData[index] = count;
System.out.println(Thread.currentThread().getName() + " search end");
try {
// 等待其他线程完成,当其他线程都完成后,await()方法后面的代码才会执行
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}

}

4、使用Phaser控制并发阶段任务的运行

  • Phaser在每一步结束的位置对线程进行同步,当所有线程都完成到该处后,才允许执行下一步

  • 必须对Phaser中参与同步操作的任务数进行初始化,我们可以动态的添加和减少任务数

  • 几个重要方法:

    • arriveAndAwaitAdvance(): 当一个线程调用该方法后,Phaser对象减一,并且把该线程置为休眠状态,直到其他线程完成该阶段

    • arriveAndDeregister(): 该方法主要用于通知Phaser参与同步的线程数减一,表示不参与下一阶段的任务,因此Phaser在开始下一阶段时不会等待该线程

    • onAdvance: Phaser提供的一个方法,可覆盖该方法。onAdvance()会在阶段改变的时候被调用。方法的返回值,true表示phaser已经执行完成并进入了终止态,false表示phaser在继续执行。我们可通过覆盖onAdvance方法,在每个阶段改变的时候执行某些操作

  • Phaser示例,控制各阶段的同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class PhaserTest {

public static void main(String[] args) {
Random random = new Random();
int len = 20, size = 20;
// 创建Phaser,有20个参与线程
Phaser phaser = new Phaser(len);
int[][] data = new int[len][size];
for (int i = 0; i < len; i++) {
for (int j = 0; j < size; j++) {
data[i][j] = random.nextInt(2);
}
}

for (int i = 0; i < len; i++) {
final int index = i;
new Thread(() -> {
// 让所有线程在都创建完成后运行
System.out.println(Thread.currentThread().getName() + ": 准备第一阶段");
phaser.arriveAndAwaitAdvance();
if (data[index][0] == 0) {
System.out.println(Thread.currentThread().getName() + " : 当前线程已结束");
phaser.arriveAndDeregister();
return;
} else {
System.out.println(Thread.currentThread().getName() + " : 已完成第二阶段");
phaser.arriveAndAwaitAdvance();
}
System.out.println(Thread.currentThread().getName() + ": 开始第三阶段");
System.out.println(Thread.currentThread().getName() + ": " + Arrays.toString(data[index]));
}).start();
}
}
}

5、使用Exchanger控制并发任务间的数据交换

  • Exchanger允许在两个线程之间定义同步点,当两个线程都到达同步点时,他们交换数据

  • 消费者与生产者交换数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class ExchangerTest {

public static void main(String[] args) {
LinkedList<Integer> producerData = new LinkedList<>();
LinkedList<Integer> consumerData = new LinkedList<>();
Exchanger<LinkedList<Integer>> exchanger = new Exchanger<>();
new Thread(new Producer(producerData, exchanger)).start();
new Thread(new Consumer(consumerData, exchanger)).start();
}

static class Producer implements Runnable{
private LinkedList<Integer> data;
private final Exchanger<LinkedList<Integer>> exchanger;

public Producer(LinkedList<Integer> data, Exchanger<LinkedList<Integer>> exchangerData) {
this.data = data;
this.exchanger = exchangerData;
}

@Override
public void run() {
int index = 1;
int size = 10;
Random random = new Random();
for (int i =0;i < size; i++) {
System.out.printf("生产者第 %d 次交换\n", index);
int n = random.nextInt();
System.out.println("生产的数据: " + n);
data.add(n);

try {
// 与消费者交换数据
data = exchanger.exchange(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者现有数据数量: " + data.size());
index++;
}
}
}

static class Consumer implements Runnable {

private LinkedList<Integer> data;
private final Exchanger<LinkedList<Integer>> exchanger;

public Consumer(LinkedList<Integer> data, Exchanger<LinkedList<Integer>> exchanger) {
this.data = data;
this.exchanger = exchanger;
}

@Override
public void run() {
int index = 1;
int size = 10;
for (int i =0;i < size; i++) {
System.out.printf("消费者第 %d 次交换\n", index);
try {
// 与生产者交换数据
data = exchanger.exchange(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费的数据: " + data.poll());
index++;
}
}
}
}