Java进阶(二)并行模式算法

java concurrent programming

Posted by Kinsomy on January 24, 2019

这篇文章是对《Java高并发编程》第五章的记录,同时结合自己查找的资料做集中总结。

并发编程里面会有一些常用的设计模式。

单例模式

自不用说,单例是最常见的一种设计模式,是为了避免对象重复创建。 有两大好处:

  • 对于频繁使用的对象,可以省略new操作花费的时间,这对于那些重量级对象而言,是非常可观的一笔系统开销;

  • 由于new操作的次数减少,因而对系统内存的使用频率也会降低,这将减轻GC压力,缩短GC停顿时间。

几种单例写法在这里不做罗列,google一大堆。因为是并发相关, 直接看懒汉式单例

public class Singleton {  
    private static Singleton instance;  
    private Singleton (){}  
    public static synchronized Singleton getInstance() {  
    if (instance == null) {  
        instance = new Singleton();  
    }  
    return instance;  
    }  
}

这种写法为了防止instance在多线程中被多次创建,对getInstance用了synchronized加锁,但是在并发激烈的场合下,会造成锁竞争激烈,影响性能。所以最优写法就是静态内部类:

public class Singleton {  
    private static class SingletonHolder {  
    private static final Singleton INSTANCE = new Singleton();  
    }  
    private Singleton (){}  
    public static final Singleton getInstance() {  
    return SingletonHolder.INSTANCE;  
    }  
}

巧妙利用静态内部类加载时机,在需要初始化的时候主动调用才会构造instance对象实现懒加载,同时没有用锁,性能优越。

不变模式

不变模式在多线程环境下始终保持内部状态的一致性和正确性

不变模式的使用场景满足两个条件:

  • 当对象创建后,其内部状态和数据不再发生任何变化。
  • 对象需要被共享,被多线程频繁访问。

不变对象和只读对象是有区别的,不变对象是从创建之后就不会再改变,内部的属性都不会变化,只读对象是外部不可修改,内部可以自变化。

不变对象的注意点:

  • 去除setter方法以及所有修改自身属性的方法。
  • 将所有属性设置为私有,并用final标记。
  • 确保没有子类可以重载修改它的行为。
  • 有一个可以创建完整对象的构造函数。

典型案例:java.lang.String,Boolean,Byte等lang包下数据类型。

生产者消费者模式

多个生产者和消费者通过共享内存缓冲区进行通信,可以看到消费者并不会直接和生产者接触,而是从缓冲区消费。

有了共享内存缓冲区,可以让数据在多线程间共享,同时缓解生产者和消费者之间的性能差。

生产者-消费者模式里面,用BlockingQueue扮演缓冲区的角色。 至于用BlockingQueue是因为它适合作为数据共享的通道,BlockingQueue的实现类同时还会进行消息等待和线程唤醒等操作。

public class Product {
    private int v;
    public Product(int v) {
        this.v = v;
    }
    public int getV() {
        return v;
    }
}

生产者每次间隔r.nextInt(SLEEPTIME)的随即毫秒数生产出一个产品,然后加入队列。

public class Producer implements Runnable {
    private volatile boolean isRunning = true;
    private BlockingQueue<Product> queue;
    private static AtomicInteger number = new AtomicInteger();
    private static final int SLEEPTIME = 1000;

    public Producer(BlockingQueue<Product> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        Product product = null;
        Random r = new Random();

        System.out.println("Producer Thread id:" + Thread.currentThread().getId() + "is running");

        try {
            while (isRunning) {
                //模拟生产耗时
                Thread.sleep(r.nextInt(SLEEPTIME));
                product = new Product(number.incrementAndGet());
                System.out.println("product " + product.getV() + "is produced");
                if (!queue.offer(product, 2, TimeUnit.SECONDS)) {
                    System.out.println("product " + product.getV() + " failed to put into queue");
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }

    public void stop() {
        isRunning = false;
    }
}
public class Consumer implements Runnable {
    private BlockingQueue<Product> queue;
    //消费比生产要慢
    private static final int SLEEPTIME = 2000;

    public Consumer(BlockingQueue<Product> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("Consumer thread id " + Thread.currentThread().getId() + " is running");
        Random r = new Random();

        try {
            //源源不断消费
            while (true) {
                //从队列获取产品
                Product product = queue.take();
                if (null != product) {
                    System.out.println("Consumer thread id " + Thread.currentThread().getId() + " take product " + product.getV());
                }
                Thread.sleep(r.nextInt(SLEEPTIME));
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
}
public class Main {

    public static void main(String[] args) throws InterruptedException {
        //建立缓冲区
        BlockingQueue<Product> queue = new LinkedBlockingQueue<Product>(10);
        Producer producer1 = new Producer(queue);                  //建立生产者
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);                  //建立消费者
        Consumer consumer2 = new Consumer(queue);
        Consumer consumer3 = new Consumer(queue);
        ExecutorService service = Executors.newCachedThreadPool();     //建立线程池
        service.execute(producer1);                                //运行生产者
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer1);                                //运行消费者
        service.execute(consumer2);
        service.execute(consumer3);
        Thread.sleep(10 * 1000);
        producer1.stop();                                          //停止生产者
        producer2.stop();
        producer3.stop();
        Thread.sleep(3000);
        service.shutdown();
    }
}

部分运行截图

无锁生产者消费者模式

上面讲的生产者消费者模式是通过BlockingQueue的实现来完成的,比如LinkedBlockingQueueArrayBlockingQueue,但是从BlockingQueue的名字就可以看出,这是一个阻塞队列,也就是里面用到了大量的锁和阻塞来实现多线程同步,也就以为了在高并发场景下,性能会受到制约,因此就会想到高性能无锁的CAS实现,但是自己编写CAS实现是非常复杂的。

有一个高性能线程间消息传递框架实现了,就是DisruptorView on Github

关于Disruptor,日后会深入学习源码做一次记录。  Disruptor的关键是一个环形队列RingBuffer,和普通队列需要首和尾两个指针相比,RingBuffer只要一个cursor,同时在写入和读取时都使用CAS算法。 需要注意的是,RingBuffer在初始化的时候需要制定队列大小,且必须是2的幂次方,这样可以做到完全的内存复用,不会动态分配内存空间。

具体的demo代码下次分析源码时一并给出。

Future模式

Future顾名思义是将来的意思,就是提交一个任务,但是不是立马获得结果,而是在将来想要获取结果的时候再去拿。

Future模式和期货类似,提交任务等待结果我们可以理解为现货,期货就是现在提交一个FutureTask,然后就可以去做其他事情,等过段时间凭着Future凭证再来获取需要的信息。

看一下JDK内置的Future的类图:

Callable是一个接口,里面只有一个call()方法,这是一个带返回值的方法,返回的就是真实数据。

FutureTask实现了RunnableFuture接口,而RunnableFuture接口又实现了RunnableFuture接口,因此这是一个runnable任务,可以并发执行。 FutureTask里面有一个run方法,他会调用callable的call()将构造的数据传到result变量里,get()方法可以返回result。还有个限时的get()方法,超过一定时间取不到结果就结束。

public void run() {
    synchronized (this) {
        if (state != READY) return;
        state = RUNNING;
        runner = Thread.currentThread();
    }
    try {
        set(callable.call());
    }
    catch (Throwable ex) {
        setException(ex);
    }
}

public synchronized Object get()
    throws InterruptedException,ExecutionException
{
    waitFor();
    return getResult();

public synchronized Object get(long timeoutTimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException
{
    waitFor(unit.toNanos(timeout));
    return getResult();
}

 看一个Future的demo:定义一个每日签到task,交由线程池执行,然后获得结果。

public class CheckinTask implements Callable<String> {

    private String userName;

    public CheckinTask(String userName) {
        this.userName = userName;
    }

    @Override
    public String call() throws Exception {
        //模拟网络请求耗时
        Thread.sleep(1000);
        return userName + "已经成功签到~";
    }
}
public class Main {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask = new FutureTask<String>(new CheckinTask("kinsomy"));
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        executorService.submit(futureTask);
        System.out.println("开始签到");
        try {
            //做其他事去了
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //获得签到返回结果
        System.out.println(futureTask.get());
    }
}

参考资料