差異處

這裏顯示兩個版本的差異處。

連向這個比對檢視

Both sides previous revision 前次修改
下次修改
前次修改
java:java:java8:concurrent:blockingoperationwithcompletablefuture:carefully_avoid_transition_task_of_callback [2019/01/13 18:35]
tony [How to resolve?]
java:java:java8:concurrent:blockingoperationwithcompletablefuture:carefully_avoid_transition_task_of_callback [2023/06/25 09:48] (目前版本)
行 31: 行 31:
 } }
 </​code>​ </​code>​
 +但假如你的使用情況像[[java:​java:​java8:​concurrent:​blockingoperationwithcompletablefuture:​blocking_inside_cf|之前文章]]中,會將Blocking工作送到另外一個Thread,接著透過compose串接,就要考慮使用Async方式去延續工作:​
 +<code java>
 +@Test
 +public void testSendAsync(){
 + CompletableFuture<​Response>​ sendAsync = CompletableFuture.supplyAsync(()->​{
 + dumpCurrentThreadName("​supplyAsync"​);​
 + return launchTaskWithAuxThread(()->​new BlockingJob().invoke());​
 + }, es)
 + .thenCompose((CompletableFuture<​Response>​ responseReceived)->​{
 + dumpCurrentThreadName("​thenCompose"​);​
 + return responseReceived;​
 + })
 + .thenApplyAsync((Response x)->{
 + dumpCurrentThreadName("​thenApply"​);​
 + return x;
 + }, es);
 +
 + sendAsync.join();​
 +}
 +</​code>​
 +上面的是我在Java8上實做的範例。如果以作者的範例來說,可以參考我在Java11做的範例程式碼。它使用了一個共享的變數responseReceived;當Blocking工作結束後,使用responseReceived.completeAsync去串接thenApply,這樣可以讓工作的執行回到共用的ExecutorService:​
 +<code java>
 +public static class BlockingJob2 {
 + private CompletableFuture<​Response>​ responseReceived;​
 + public BlockingJob2(CompletableFuture<​Response>​ responseReceived) {
 + this.responseReceived = responseReceived;​
 + }
 + public void invoke() {
 + try {
 + dumpCurrentThreadName("​before blocking job");
 + Thread.sleep(2*1000);​
 + // io blocking job and get response
 + responseReceived.completeAsync(()->​new Response(), es);
 + } catch( Exception e ) {
 + // log
 + } finally {
 + dumpCurrentThreadName("​after blocking job");
 + }
 + }
 +}
 +
 +public static CompletableFuture<​Response>​ send2(){
 + return CompletableFuture.supplyAsync(()->​{
 + dumpCurrentThreadName("​supplyAsync"​);​
 +
 + CompletableFuture<​Response>​ responseReceived = new CompletableFuture<>​();​
 + launchTaskWithAuxThread(()->​{
 + new BlockingJob2(responseReceived).invoke();​
 + });
 + return responseReceived;​
 + }, es)
 + .thenCompose((CompletableFuture<​Response>​ responseReceived)->​{
 + dumpCurrentThreadName("​thenCompose"​);​
 + return responseReceived;​
 + })
 + .thenApply((Response x)->{
 + dumpCurrentThreadName("​whenComplete:​ " + x.getClass().getName());​
 + return x;
 + });
 +}
 +</​code>​
 +async寫法帶來的是thread的控制性;sync則是因為減少context switch而帶來的則是較好的效能。要選擇哪一種方式,一定要先確定好你要的是什麼。
 ===== Reference ===== ===== Reference =====
   * [[https://​qconsf.com/​sf2017/​system/​files/​presentation-slides/​cf.pdf|Asynchronous API with   * [[https://​qconsf.com/​sf2017/​system/​files/​presentation-slides/​cf.pdf|Asynchronous API with