原创

java高并发系列 - 第34篇:google提供的一些好用的并发工具类

java高并发系列第34篇。

环境:jdk1.8。

关于并发方面的,juc已帮我们提供了很多好用的工具,而谷歌在此基础上做了扩展,使并发编程更容易,这些工具放在guava.jar包中。

本文演示几个简单的案例,见一下guava的效果。

需要先了解的一些技术:juc中的线程池Excecutors、ExecutorService、Callable、Future

guava maven配置

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>27.0-jre</version>
</dependency>

guava中常用几个类

MoreExecutors:提供了一些静态方法,是对juc中的Executors类的一个扩展。
Futures:也提供了很多静态方法,是对juc中Future的一个扩展。

案例1:异步执行任务完毕之后回调

package com.itsoku.chat34;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 跟着阿里p7学并发,微信公众号:javacode2018
 */
@Slf4j
public class Demo1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //创建一个线程池
        ExecutorService delegate = Executors.newFixedThreadPool(5);
        try {
            ListeningExecutorService executorService = MoreExecutors.listeningDecorator(delegate);
            //异步执行一个任务
            ListenableFuture<Integer> submit = executorService.submit(() -> {
                log.info("{}", System.currentTimeMillis());
                //休眠2秒,默认耗时
                TimeUnit.SECONDS.sleep(2);
                log.info("{}", System.currentTimeMillis());
                return 10;
            });
            //当任务执行完毕之后回调对应的方法
            submit.addListener(() -> {
                log.info("任务执行完毕了,我被回调了");
            }, MoreExecutors.directExecutor());
            log.info("{}", submit.get());
        } finally {
            delegate.shutdown();
        }
    }
}

输出:

14:25:50.055 [pool-1-thread-1] INFO com.itsoku.chat34.Demo1 - 1567491950047
14:25:52.063 [pool-1-thread-1] INFO com.itsoku.chat34.Demo1 - 1567491952063
14:25:52.064 [pool-1-thread-1] INFO com.itsoku.chat34.Demo1 - 任务执行完毕了,我被回调了
14:25:52.064 [main] INFO com.itsoku.chat34.Demo1 - 10

说明:

ListeningExecutorService接口继承于juc中的ExecutorService接口,对ExecutorService做了一些扩展,看其名字中带有Listening,说明这个接口自带监听的功能,可以监听异步执行任务的结果。通过MoreExecutors.listeningDecorator创建一个ListeningExecutorService对象,需传递一个ExecutorService参数,传递的ExecutorService负责异步执行任务。

ListeningExecutorServicesubmit方法用来异步执行一个任务,返回ListenableFutureListenableFuture接口继承于juc中的Future接口,对Future做了扩展,使其带有监听的功能。调用submit.addListener可以在执行的任务上添加监听器,当任务执行完毕之后会回调这个监听器中的方法。

ListenableFutureget方法会阻塞当前线程直到任务执行完毕。

上面的还有一种写法,如下:

package com.itsoku.chat34;

import com.google.common.util.concurrent.*;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 跟着阿里p7学并发,微信公众号:javacode2018
 */
@Slf4j
public class Demo2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService delegate = Executors.newFixedThreadPool(5);
        try {
            ListeningExecutorService executorService = MoreExecutors.listeningDecorator(delegate);
            ListenableFuture<Integer> submit = executorService.submit(() -> {
                log.info("{}", System.currentTimeMillis());
                TimeUnit.SECONDS.sleep(4);
                //int i = 10 / 0;
                log.info("{}", System.currentTimeMillis());
                return 10;
            });
            Futures.addCallback(submit, new FutureCallback<Integer>() {
                @Override
                public void onSuccess(@Nullable Integer result) {
                    log.info("执行成功:{}", result);
                }

                @Override
                public void onFailure(Throwable t) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.error("执行任务发生异常:" + t.getMessage(), t);
                }
            }, MoreExecutors.directExecutor());
            log.info("{}", submit.get());
        } finally {
            delegate.shutdown();
        }
    }
}

输出:

14:26:07.938 [pool-1-thread-1] INFO com.itsoku.chat34.Demo2 - 1567491967936
14:26:11.944 [pool-1-thread-1] INFO com.itsoku.chat34.Demo2 - 1567491971944
14:26:11.945 [main] INFO com.itsoku.chat34.Demo2 - 10
14:26:11.945 [pool-1-thread-1] INFO com.itsoku.chat34.Demo2 - 执行成功:10

上面通过调用Futures的静态方法addCallback在异步执行的任务中添加回调,回调的对象是一个FutureCallback,此对象有2个方法,任务执行成功调用onSuccess,执行失败调用onFailure

失败的情况可以将代码中int i = 10 / 0;注释去掉,执行一下可以看看效果。

示例2:获取一批异步任务的执行结果

package com.itsoku.chat34;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

/**
 * 跟着阿里p7学并发,微信公众号:javacode2018
 */
@Slf4j
public class Demo3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("star");
        ExecutorService delegate = Executors.newFixedThreadPool(5);
        try {
            ListeningExecutorService executorService = MoreExecutors.listeningDecorator(delegate);
            List<ListenableFuture<Integer>> futureList = new ArrayList<>();
            for (int i = 5; i >= 0; i--) {
                int j = i;
                futureList.add(executorService.submit(() -> {
                    TimeUnit.SECONDS.sleep(j);
                    return j;
                }));
            }
            //获取一批任务的执行结果
            List<Integer> resultList = Futures.allAsList(futureList).get();
            //输出
            resultList.forEach(item -> {
                log.info("{}", item);
            });
        } finally {
            delegate.shutdown();
        }
    }
}

输出:

14:26:35.970 [main] INFO com.itsoku.chat34.Demo3 - star
14:26:41.137 [main] INFO com.itsoku.chat34.Demo3 - 5
14:26:41.138 [main] INFO com.itsoku.chat34.Demo3 - 4
14:26:41.138 [main] INFO com.itsoku.chat34.Demo3 - 3
14:26:41.138 [main] INFO com.itsoku.chat34.Demo3 - 2
14:26:41.138 [main] INFO com.itsoku.chat34.Demo3 - 1
14:26:41.138 [main] INFO com.itsoku.chat34.Demo3 - 0

结果中按顺序输出了6个异步任务的结果,此处用到了Futures.allAsList方法,看一下此方法的声明:

public static <V> ListenableFuture<List<V>> allAsList(
      Iterable<? extends ListenableFuture<? extends V>> futures)

传递一批ListenableFuture,返回一个ListenableFuture<List<V>>,内部将一批结果转换为了一个ListenableFuture对象。

示例3:一批任务异步执行完毕之后回调

异步执行一批任务,最后技术其和

package com.itsoku.chat34;

import com.google.common.util.concurrent.*;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 跟着阿里p7学并发,微信公众号:javacode2018
 */
@Slf4j
public class Demo4 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        log.info("star");
        ExecutorService delegate = Executors.newFixedThreadPool(5);
        try {
            ListeningExecutorService executorService = MoreExecutors.listeningDecorator(delegate);
            List<ListenableFuture<Integer>> futureList = new ArrayList<>();
            for (int i = 5; i >= 0; i--) {
                int j = i;
                futureList.add(executorService.submit(() -> {
                    TimeUnit.SECONDS.sleep(j);
                    return j;
                }));
            }
            ListenableFuture<List<Integer>> listListenableFuture = Futures.allAsList(futureList);
            Futures.addCallback(listListenableFuture, new FutureCallback<List<Integer>>() {
                @Override
                public void onSuccess(@Nullable List<Integer> result) {
                    log.info("result中所有结果之和:" + result.stream().reduce(Integer::sum).get());
                }

                @Override
                public void onFailure(Throwable t) {
                    log.error("执行任务发生异常:" + t.getMessage(), t);
                }
            }, MoreExecutors.directExecutor());
        } finally {
            delegate.shutdown();
        }
    }
}

输出:

14:47:04.819 [main] INFO com.itsoku.chat34.Demo4 - star
14:47:09.933 [pool-1-thread-1] INFO com.itsoku.chat34.Demo4 - result中所有结果之和:15

代码中异步执行了一批任务,所有任务完成之后,回调了上面的onSuccess方法,内部对所有的结果进行sum操作。

总结

  • 通过guava提供的一些工具类,方便异步执行任务并进行回调

  • guava内部还有很多好用的工具类,有兴趣的可以去研究一下

java高并发系列目录

  1. 第1天:必须知道的几个概念
  2. 第2天:并发级别
  3. 第3天:有关并行的两个重要定律
  4. 第4天:JMM相关的一些概念
  5. 第5天:深入理解进程和线程
  6. 第6天:线程的基本操作
  7. 第7天:volatile与Java内存模型
  8. 第8天:线程组
  9. 第9天:用户线程和守护线程
  10. 第10天:线程安全和synchronized关键字
  11. 第11天:线程中断的几种方式
  12. 第12天JUC:ReentrantLock重入锁
  13. 第13天:JUC中的Condition对象
  14. 第14天:JUC中的LockSupport工具类,必备技能
  15. 第15天:JUC中的Semaphore(信号量)
  16. 第16天:JUC中等待多线程完成的工具类CountDownLatch,必备技能
  17. 第17天:JUC中的循环栅栏CyclicBarrier的6种使用场景
  18. 第18天:JAVA线程池,这一篇就够了
  19. 第19天:JUC中的Executor框架详解1
  20. 第20天:JUC中的Executor框架详解2
  21. 第21天:java中的CAS,你需要知道的东西
  22. 第22天:JUC底层工具类Unsafe,高手必须要了解
  23. 第23天:JUC中原子类,一篇就够了
  24. 第24天:ThreadLocal、InheritableThreadLocal(通俗易懂)
  25. 第25天:掌握JUC中的阻塞队列
  26. 第26篇:学会使用JUC中常见的集合,常看看!
  27. 第27天:实战篇,接口性能提升几倍原来这么简单
  28. 第28天:实战篇,微服务日志的伤痛,一并帮你解决掉
  29. 第29天:高并发中常见的限流方式
  30. 第30天:JUC中工具类CompletableFuture,必备技能
  31. 第31天:获取线程执行结果,这6种方法你都知道?
  32. 第32天:高并发中计数器的实现方式有哪些?
  33. 第33篇:怎么演示公平锁和非公平锁?

阿里p7一起学并发,公众号:路人甲java,每天获取最新文章!
file

正文到此结束
本文目录