一、示例
package main;
import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue;
public class FunctionTest { private static final Logger log = LoggerFactory.getLogger(FunctionTest.class);
public static void main(String[] args) throws Exception { FunctionTest main = new FunctionTest(); System.out.println("\n------------------------------------ 函数式编程方式 ------------------------------------"); main.run(); System.out.println("\n------------------------------------ 经典的异步实现方式 ------------------------------------"); main.runOld(); }
long value = 101;
void run() throws Exception { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { value += 101; log.info("supplyAsync, value=" + value); return String.format("%d", value); });
Thread.sleep(1000); log.info("sleep done");
CompletableFuture<Void> next = future.thenAccept(v -> { log.info("thenAccept, value=" + v); });
String result = future.get(); log.info("thenAccept, get=" + result); }
void runOld() throws Exception { value = 101; LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); task(queue);
String result = queue.take(); log.info("thenAccept, get=" + result); }
void task(LinkedBlockingQueue<String> queue) { new Thread(() -> { String result = "failed"; try { value += 101; log.info("supplyAsync, value=" + value); result = String.format("%d", value); } catch (Exception e) { log.error("", e); }
try { queue.put(result); } catch (InterruptedException e) { log.error("", e); } }).start(); } }
|
二、输出
------------------------------------ 函数式编程方式 ------------------------------------ 08:47:51.571 [ForkJoinPool.commonPool-worker-1] INFO main.FunctionTest - supplyAsync, value=202 08:47:52.575 [main] INFO main.FunctionTest - sleep done 08:47:52.575 [main] INFO main.FunctionTest - thenAccept, value=202 08:47:52.575 [main] INFO main.FunctionTest - thenAccept, get=202
------------------------------------ 经典的异步实现方式 ------------------------------------ 08:47:52.576 [Thread-1] INFO main.FunctionTest - supplyAsync, value=202 08:47:52.576 [main] INFO main.FunctionTest - thenAccept, get=202
Process finished with exit code 0
|
三、解释
解决的是执行完一个异步函数后,得到执行结果或者根据执行结果继续执行后续的处理逻辑。
之前的方法是把异步执行的任务放到线程中执行,然后另外一个线程监听通知队列或者阻塞等待任务执行完毕,任务执行完后往队列放一个通知,通知等待线程。
现在可以采用更为简洁的方式去处理这种场景:
- 定义一个异步函数
- 使异步函数执行
- 获取执行结果
当调用get、thenAccept等方法后,java本身会帮助我们执行异步逻辑,并以阻塞的方式等待结果。
一个函数对象只会执行一次,执行完之后会保存这个结果,下次调用get等计算方法的时候直接使用上次的计算结果。
四、查看代码
通过get方法查看具体的执行方式:
CompletableFuture.get()
public T get() throws InterruptedException, ExecutionException { Object r; return reportGet((r = result) == null ? waitingGet(true) : r); }
|
如果函数之前执行过,则会把结果保存到result变量中,调用get方法执行返回result结果;
如果函数没有执行过,则调用waitingGet方法执行。
下面看waitingGet方法:
private Object waitingGet(boolean interruptible) { Signaller q = null; boolean queued = false; int spins = -1; Object r; while ((r = result) == null) { if (spins < 0) spins = SPINS; else if (spins > 0) { if (ThreadLocalRandom.nextSecondarySeed() >= 0) --spins; } else if (q == null) q = new Signaller(interruptible, 0L, 0L); else if (!queued) queued = tryPushStack(q); else if (interruptible && q.interruptControl < 0) { q.thread = null; cleanStack(); return null; } else if (q.thread != null && result == null) { try { ForkJoinPool.managedBlock(q); } catch (InterruptedException ie) { q.interruptControl = -1; } } } if (q != null) { q.thread = null; if (q.interruptControl < 0) { if (interruptible) r = null; // report interruption else Thread.currentThread().interrupt(); } } postComplete(); return r; }
|
此方法的目的是阻塞的方式检测定义的function是否执行完毕,执行完毕或者出现异常后返回结果。
五、总结
这种编程方式是一种思维的转变,让异步结构变得更清晰,前后关系也很清楚(都是A->B->C),可以把所有的异步代码放在一起写,不需要添加额外的异步执行结果判断变量。
但是需要一段时间去习惯这种写法,否则也会被其中各种匿名函数和api绕晕。