这篇文章是对《Java高并发编程》第五章的记录,同时结合自己查找的资料做集中总结。
并发编程里面会有一些常用的设计模式。
单例模式
自不用说,单例是最常见的一种设计模式,是为了避免对象重复创建。 有两大好处:
-
对于频繁使用的对象,可以省略new操作花费的时间,这对于那些重量级对象而言,是非常可观的一笔系统开销;
-
由于new操作的次数减少,因而对系统内存的使用频率也会降低,这将减轻GC压力,缩短GC停顿时间。
几种单例写法在这里不做罗列,google一大堆。因为是并发相关, 直接看懒汉式单例
public class Singleton {
private static Singleton instance;
private Singleton (){}
public static synchronized Singleton getInstance() {
if (instance == null) {
instance = new Singleton();
}
return instance;
}
}
这种写法为了防止instance在多线程中被多次创建,对getInstance用了synchronized加锁,但是在并发激烈的场合下,会造成锁竞争激烈,影响性能。所以最优写法就是静态内部类:
public class Singleton {
private static class SingletonHolder {
private static final Singleton INSTANCE = new Singleton();
}
private Singleton (){}
public static final Singleton getInstance() {
return SingletonHolder.INSTANCE;
}
}
巧妙利用静态内部类加载时机,在需要初始化的时候主动调用才会构造instance对象实现懒加载,同时没有用锁,性能优越。
不变模式
不变模式在多线程环境下始终保持内部状态的一致性和正确性
。
不变模式的使用场景满足两个条件:
- 当对象创建后,其内部状态和数据不再发生任何变化。
- 对象需要被共享,被多线程频繁访问。
不变对象和只读对象是有区别的,不变对象是从创建之后就不会再改变,内部的属性都不会变化,只读对象是外部不可修改,内部可以自变化。
不变对象的注意点:
- 去除setter方法以及所有修改自身属性的方法。
- 将所有属性设置为私有,并用final标记。
- 确保没有子类可以重载修改它的行为。
- 有一个可以创建完整对象的构造函数。
典型案例:java.lang.String,Boolean,Byte等lang包下数据类型。
生产者消费者模式
多个生产者和消费者通过共享内存缓冲区进行通信,可以看到消费者并不会直接和生产者接触,而是从缓冲区消费。
有了共享内存缓冲区,可以让数据在多线程间共享,同时缓解生产者和消费者之间的性能差。
生产者-消费者模式里面,用BlockingQueue扮演缓冲区的角色。 至于用BlockingQueue是因为它适合作为数据共享的通道,BlockingQueue的实现类同时还会进行消息等待和线程唤醒等操作。
public class Product {
private int v;
public Product(int v) {
this.v = v;
}
public int getV() {
return v;
}
}
生产者每次间隔r.nextInt(SLEEPTIME)的随即毫秒数生产出一个产品,然后加入队列。
public class Producer implements Runnable {
private volatile boolean isRunning = true;
private BlockingQueue<Product> queue;
private static AtomicInteger number = new AtomicInteger();
private static final int SLEEPTIME = 1000;
public Producer(BlockingQueue<Product> queue) {
this.queue = queue;
}
@Override
public void run() {
Product product = null;
Random r = new Random();
System.out.println("Producer Thread id:" + Thread.currentThread().getId() + "is running");
try {
while (isRunning) {
//模拟生产耗时
Thread.sleep(r.nextInt(SLEEPTIME));
product = new Product(number.incrementAndGet());
System.out.println("product " + product.getV() + "is produced");
if (!queue.offer(product, 2, TimeUnit.SECONDS)) {
System.out.println("product " + product.getV() + " failed to put into queue");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop() {
isRunning = false;
}
}
public class Consumer implements Runnable {
private BlockingQueue<Product> queue;
//消费比生产要慢
private static final int SLEEPTIME = 2000;
public Consumer(BlockingQueue<Product> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("Consumer thread id " + Thread.currentThread().getId() + " is running");
Random r = new Random();
try {
//源源不断消费
while (true) {
//从队列获取产品
Product product = queue.take();
if (null != product) {
System.out.println("Consumer thread id " + Thread.currentThread().getId() + " take product " + product.getV());
}
Thread.sleep(r.nextInt(SLEEPTIME));
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
//建立缓冲区
BlockingQueue<Product> queue = new LinkedBlockingQueue<Product>(10);
Producer producer1 = new Producer(queue); //建立生产者
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer1 = new Consumer(queue); //建立消费者
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool(); //建立线程池
service.execute(producer1); //运行生产者
service.execute(producer2);
service.execute(producer3);
service.execute(consumer1); //运行消费者
service.execute(consumer2);
service.execute(consumer3);
Thread.sleep(10 * 1000);
producer1.stop(); //停止生产者
producer2.stop();
producer3.stop();
Thread.sleep(3000);
service.shutdown();
}
}
部分运行截图
无锁生产者消费者模式
上面讲的生产者消费者模式是通过BlockingQueue的实现来完成的,比如LinkedBlockingQueue
和ArrayBlockingQueue
,但是从BlockingQueue的名字就可以看出,这是一个阻塞队列,也就是里面用到了大量的锁和阻塞来实现多线程同步,也就以为了在高并发场景下,性能会受到制约,因此就会想到高性能无锁的CAS实现,但是自己编写CAS实现是非常复杂的。
有一个高性能线程间消息传递框架实现了,就是Disruptor
。View on Github
关于Disruptor,日后会深入学习源码做一次记录。 Disruptor的关键是一个环形队列RingBuffer,和普通队列需要首和尾两个指针相比,RingBuffer只要一个cursor,同时在写入和读取时都使用CAS算法。 需要注意的是,RingBuffer在初始化的时候需要制定队列大小,且必须是2的幂次方,这样可以做到完全的内存复用,不会动态分配内存空间。
具体的demo代码下次分析源码时一并给出。
Future模式
Future顾名思义是将来的意思,就是提交一个任务,但是不是立马获得结果,而是在将来想要获取结果的时候再去拿。
Future模式和期货类似,提交任务等待结果我们可以理解为现货,期货就是现在提交一个FutureTask,然后就可以去做其他事情,等过段时间凭着Future凭证再来获取需要的信息。
看一下JDK内置的Future的类图:
Callable是一个接口,里面只有一个call()方法,这是一个带返回值的方法,返回的就是真实数据。
FutureTask
实现了RunnableFuture
接口,而RunnableFuture
接口又实现了Runnable
和Future
接口,因此这是一个runnable任务,可以并发执行。
FutureTask里面有一个run方法,他会调用callable的call()将构造的数据传到result变量里,get()方法可以返回result。还有个限时的get()方法,超过一定时间取不到结果就结束。
public void run() {
synchronized (this) {
if (state != READY) return;
state = RUNNING;
runner = Thread.currentThread();
}
try {
set(callable.call());
}
catch (Throwable ex) {
setException(ex);
}
}
public synchronized Object get()
throws InterruptedException,ExecutionException
{
waitFor();
return getResult();
public synchronized Object get(long timeoutTimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException
{
waitFor(unit.toNanos(timeout));
return getResult();
}
看一个Future的demo:定义一个每日签到task,交由线程池执行,然后获得结果。
public class CheckinTask implements Callable<String> {
private String userName;
public CheckinTask(String userName) {
this.userName = userName;
}
@Override
public String call() throws Exception {
//模拟网络请求耗时
Thread.sleep(1000);
return userName + "已经成功签到~";
}
}
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> futureTask = new FutureTask<String>(new CheckinTask("kinsomy"));
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(futureTask);
System.out.println("开始签到");
try {
//做其他事去了
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//获得签到返回结果
System.out.println(futureTask.get());
}
}
参考资料
- 《实战Java高并发程序设计》第二版
- Disruptor Introduction