Phaser的使用

Phaser提供动态增减parties(屏障点)计数,这点币CyclicBarrier类操作parties更加方便,通过若干个方法来控制多个线程之间同步运行的效果,还可以实现针对某一个线程取消同步运行的效果,而且支持在指定屏障处等待,在等待时还支持中断或非中断等功能。对线程并发进行分组同步控制时,它比CyclicBarrier类功能更加强大,更建议使用。

简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class App {
public static void main(String[] args) {
final Phaser phaser = new Phaser(3);
Runnable r1 = () -> {
System.out.println(Thread.currentThread().getName() + " A1 begin=" + System.currentTimeMillis());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " A1 end=" + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName() + " A2 begin=" + System.currentTimeMillis());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " A2 end=" + System.currentTimeMillis());
};
Runnable r2 = () -> {
try {
System.out.println(Thread.currentThread().getName() + " A1 begin=" + System.currentTimeMillis());
Thread.sleep(2000);
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " A1 end=" + System.currentTimeMillis());
System.out.println(Thread.currentThread().getName() + " A2 begin=" + System.currentTimeMillis());
Thread.sleep(2000);
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName() + " A2 end=" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
};
new Thread(r1, "A").start();
new Thread(r1, "B").start();
new Thread(r2, "C").start();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
A A1 begin=1525358412447
B A1 begin=1525358412447
C A1 begin=1525358412447
C A1 end=1525358414450
C A2 begin=1525358414451
B A1 end=1525358414451
A A1 end=1525358414451
B A2 begin=1525358414451
A A2 begin=1525358414451
C A2 end=1525358416456
B A2 end=1525358416456
A A2 end=1525358416456

移除代码中的18-21行的内容

1
2
3
4
// System.out.println(Thread.currentThread().getName() + " A2 begin=" + System.currentTimeMillis());
// Thread.sleep(2000);
// phaser.arriveAndAwaitAdvance();
// System.out.println(Thread.currentThread().getName() + " A2 end=" + System.currentTimeMillis());

运行结果:
[image]
运行结果
从运行结果看,因为某一个线程达到屏障点,导致另外两个线程一直等待。

为了解决上面的问题,不再继续向下一个屏障点执行的线程调用
arriveAndDeregister(),可以实现将当前线程退出该屏障,并且将屏障点(parties)值减1.
将上述Runnable r2代码改为:

1
2
3
4
5
6
7
8
9
10
11
12
Runnable r2 = () -> {
try {
System.out.println(Thread.currentThread().getName() + " A1 begin=" + System.currentTimeMillis());
Thread.sleep(2000);
System.out.println("A:" + phaser.getRegisteredParties());
phaser.arriveAndDeregister();
System.out.println("B:" + phaser.getRegisteredParties());
System.out.println(Thread.currentThread().getName() + " A1 end=" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
};

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
A A1 begin=1525359548165
C A1 begin=1525359548165
B A1 begin=1525359548165
A:3
B:2
C A1 end=1525359550168
A A1 end=1525359550168
B A1 end=1525359550168
A A2 begin=1525359550169
B A2 begin=1525359550169
B A2 end=1525359550169
A A2 end=1525359550169

方法介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
arriveAndAwaitAdvance()当前线程已经到达屏障,在此等待条件满足后继续向后执行
arriveAndDeregister()当前线程退出该屏障,并且将屏障点(parties)值减1
getRegisteredParties()当前phaser注册的屏障点数
register()没执行一次该方法就动态添加一个parties值
getPhase()获取已经到达第几个屏障
onAdvance()通过新的屏障时调用
// 该方法返回true表示不等待类,Phaser呈无效/销毁状态
// 该方法返回false表示Phaser继续工作
bulkRegister()批量增加parties值
getArrivedParties()获取已经被使用的parties个数
getUnarrivedParties()获取未被使用的parties个数
arrive()使到达屏障的线程数加1,且当前线程不在屏障处等待,直接向下面的代码继续运行,并且Phaser到达的线程达到parties时会重置计数
awaitAdvance(int phase)如果传入的phase值和当前getPhase()方法返回值一样,则在屏障处等待,否则继续向下运行。类似于旁观者的作用,当观察的条件满足了就等待(旁观),如果条件不满足,则程序向下继续运行
awaitAdvance(int phase)不可中断
awaitAdvanceInterruptibly(int phase)可中断的
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)指定最大的等待时间
forceTermination()是Phaser对象的屏障功能失效,并且处于屏障处等待的线程继续执行,并不抛出一场
isTerminated()判断Phaser对象是否已经呈销毁状态