差異處
這裏顯示兩個版本的差異處。
Both sides previous revision 前次修改 下次修改 | 前次修改 | ||
java:java:java8:concurrent:blockingoperationwithcompletablefuture:carefully_avoid_transition_task_of_callback [2019/01/13 18:35] tony [How to resolve?] |
java:java:java8:concurrent:blockingoperationwithcompletablefuture:carefully_avoid_transition_task_of_callback [2023/06/25 09:48] (目前版本) |
||
---|---|---|---|
行 31: | 行 31: | ||
} | } | ||
</code> | </code> | ||
+ | 但假如你的使用情況像[[java:java:java8:concurrent:blockingoperationwithcompletablefuture:blocking_inside_cf|之前文章]]中,會將Blocking工作送到另外一個Thread,接著透過compose串接,就要考慮使用Async方式去延續工作: | ||
+ | <code java> | ||
+ | @Test | ||
+ | public void testSendAsync(){ | ||
+ | CompletableFuture<Response> sendAsync = CompletableFuture.supplyAsync(()->{ | ||
+ | dumpCurrentThreadName("supplyAsync"); | ||
+ | return launchTaskWithAuxThread(()->new BlockingJob().invoke()); | ||
+ | }, es) | ||
+ | .thenCompose((CompletableFuture<Response> responseReceived)->{ | ||
+ | dumpCurrentThreadName("thenCompose"); | ||
+ | return responseReceived; | ||
+ | }) | ||
+ | .thenApplyAsync((Response x)->{ | ||
+ | dumpCurrentThreadName("thenApply"); | ||
+ | return x; | ||
+ | }, es); | ||
+ | |||
+ | sendAsync.join(); | ||
+ | } | ||
+ | </code> | ||
+ | 上面的是我在Java8上實做的範例。如果以作者的範例來說,可以參考我在Java11做的範例程式碼。它使用了一個共享的變數responseReceived;當Blocking工作結束後,使用responseReceived.completeAsync去串接thenApply,這樣可以讓工作的執行回到共用的ExecutorService: | ||
+ | <code java> | ||
+ | public static class BlockingJob2 { | ||
+ | private CompletableFuture<Response> responseReceived; | ||
+ | public BlockingJob2(CompletableFuture<Response> responseReceived) { | ||
+ | this.responseReceived = responseReceived; | ||
+ | } | ||
+ | public void invoke() { | ||
+ | try { | ||
+ | dumpCurrentThreadName("before blocking job"); | ||
+ | Thread.sleep(2*1000); | ||
+ | // io blocking job and get response | ||
+ | responseReceived.completeAsync(()->new Response(), es); | ||
+ | } catch( Exception e ) { | ||
+ | // log | ||
+ | } finally { | ||
+ | dumpCurrentThreadName("after blocking job"); | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | |||
+ | public static CompletableFuture<Response> send2(){ | ||
+ | return CompletableFuture.supplyAsync(()->{ | ||
+ | dumpCurrentThreadName("supplyAsync"); | ||
+ | |||
+ | CompletableFuture<Response> responseReceived = new CompletableFuture<>(); | ||
+ | launchTaskWithAuxThread(()->{ | ||
+ | new BlockingJob2(responseReceived).invoke(); | ||
+ | }); | ||
+ | return responseReceived; | ||
+ | }, es) | ||
+ | .thenCompose((CompletableFuture<Response> responseReceived)->{ | ||
+ | dumpCurrentThreadName("thenCompose"); | ||
+ | return responseReceived; | ||
+ | }) | ||
+ | .thenApply((Response x)->{ | ||
+ | dumpCurrentThreadName("whenComplete: " + x.getClass().getName()); | ||
+ | return x; | ||
+ | }); | ||
+ | } | ||
+ | </code> | ||
+ | async寫法帶來的是thread的控制性;sync則是因為減少context switch而帶來的則是較好的效能。要選擇哪一種方式,一定要先確定好你要的是什麼。 | ||
===== Reference ===== | ===== Reference ===== | ||
* [[https://qconsf.com/sf2017/system/files/presentation-slides/cf.pdf|Asynchronous API with | * [[https://qconsf.com/sf2017/system/files/presentation-slides/cf.pdf|Asynchronous API with |