函数式编程 - 异步处理(CompletableFuture)

一、示例

package main;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;

/**
* @function
* @date 2021/8/2 17:18
*/
public class FunctionTest {
private static final Logger log = LoggerFactory.getLogger(FunctionTest.class);

public static void main(String[] args) throws Exception {
FunctionTest main = new FunctionTest();
System.out.println("\n------------------------------------ 函数式编程方式 ------------------------------------");
main.run();
System.out.println("\n------------------------------------ 经典的异步实现方式 ------------------------------------");
main.runOld();
}

long value = 101;

void run() throws Exception {
// 定义任务内容
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
value += 101;
log.info("supplyAsync, value=" + value);
return String.format("%d", value);
});

Thread.sleep(1000);
log.info("sleep done");

// 执行完任务后执行匿名函数
CompletableFuture<Void> next = future.thenAccept(v -> {
log.info("thenAccept, value=" + v);
});

// 获取任务的执行结果
String result = future.get();
log.info("thenAccept, get=" + result);
}

void runOld() throws Exception {
value = 101;
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 启动线程执行任务
task(queue);

// 阻塞方式等待任务执行结束
String result = queue.take();
log.info("thenAccept, get=" + result);
}

void task(LinkedBlockingQueue<String> queue) {
new Thread(() -> {
String result = "failed";
try {
value += 101;
log.info("supplyAsync, value=" + value);
result = String.format("%d", value);
} catch (Exception e) {
log.error("", e);
}

try {
queue.put(result);
} catch (InterruptedException e) {
log.error("", e);
}
}).start();
}
}

二、输出

------------------------------------   函数式编程方式  ------------------------------------
08:47:51.571 [ForkJoinPool.commonPool-worker-1] INFO main.FunctionTest - supplyAsync, value=202
08:47:52.575 [main] INFO main.FunctionTest - sleep done
08:47:52.575 [main] INFO main.FunctionTest - thenAccept, value=202
08:47:52.575 [main] INFO main.FunctionTest - thenAccept, get=202

------------------------------------ 经典的异步实现方式 ------------------------------------
08:47:52.576 [Thread-1] INFO main.FunctionTest - supplyAsync, value=202
08:47:52.576 [main] INFO main.FunctionTest - thenAccept, get=202

Process finished with exit code 0

三、解释

解决的是执行完一个异步函数后,得到执行结果或者根据执行结果继续执行后续的处理逻辑。

之前的方法是把异步执行的任务放到线程中执行,然后另外一个线程监听通知队列或者阻塞等待任务执行完毕,任务执行完后往队列放一个通知,通知等待线程。

现在可以采用更为简洁的方式去处理这种场景:

  1. 定义一个异步函数
  2. 使异步函数执行
  3. 获取执行结果

当调用get、thenAccept等方法后,java本身会帮助我们执行异步逻辑,并以阻塞的方式等待结果。

一个函数对象只会执行一次,执行完之后会保存这个结果,下次调用get等计算方法的时候直接使用上次的计算结果。

四、查看代码

通过get方法查看具体的执行方式:

CompletableFuture.get()

public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}

如果函数之前执行过,则会把结果保存到result变量中,调用get方法执行返回result结果;

如果函数没有执行过,则调用waitingGet方法执行。

下面看waitingGet方法:

private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
while ((r = result) == null) {
if (spins < 0) spins = SPINS;
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0) --spins;
} else if (q == null) q = new Signaller(interruptible, 0L, 0L);
else if (!queued) queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
} else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null;
// report interruption
else Thread.currentThread().interrupt();
}
}
postComplete();
return r;
}

此方法的目的是阻塞的方式检测定义的function是否执行完毕,执行完毕或者出现异常后返回结果。

五、总结

这种编程方式是一种思维的转变,让异步结构变得更清晰,前后关系也很清楚(都是A->B->C),可以把所有的异步代码放在一起写,不需要添加额外的异步执行结果判断变量。

但是需要一段时间去习惯这种写法,否则也会被其中各种匿名函数和api绕晕。

Author: iMine
Link: https://imine141.github.io/2021/08/03/Java%E5%9F%BA%E7%A1%80/java%E9%AB%98%E7%BA%A7/%E5%87%BD%E6%95%B0%E5%BC%8F%E7%BC%96%E7%A8%8B%20-%20%E5%BC%82%E6%AD%A5%E5%A4%84%E7%90%86%EF%BC%88CompletableFuture%EF%BC%89/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.