在編程語言的這個圈子里,各種語言之間的對比似乎就一直就沒有停過,像什么古早時期的"PHP是世界上最好的語言"就不提了,最近我在摸魚的時候,看到不少文章都在說"Golang性能吊打Java"。作為一個寫了好幾年java的javaer,這我怎么能忍?于是在網上看了一些對比golang和java的文章,其中戳中java痛點、也是golang被吹上天的一條,就是對多線程并發的支持了。先看一段描述:
Go從語言層面原生支持并發,并且使用簡單,Go語言中的并發基于輕量級線程Goroutine,創建成本很低,單個Go應用也可以充分利用CPU多核,編寫高并發服務端軟件簡單,執行性能好,很多情況下完全不需要考慮鎖機制以及由此帶來的各種問題。
看到這,我的心瞬間涼了大半截,真的是字字扎心。雖然說java里的JUC包已經幫我們封裝好了很多并發工具,但實際高并發的環境中我們還要考慮到各種鎖的使用,以及服務器性能瓶頸、限流熔斷等非常多方面的問題。
再說回go,前面提到的這個goroutine究竟是什么東西?其實,輕量級線程goroutine也可以被稱為協程,得益于go中的調度器以及GMP模型,go程序會智能地將goroutine中的任務合理地分配給每個 CPU。
好了,其實上面說的這一大段我也不懂,都是向寫go的哥們兒請教來的,總之就是go的并發性能非常優秀就是了。不過這都不是我們要說的重點,今天我們要討論的是如何在Java中使用協程。
協程是什么?
我們知道,線程在阻塞狀態和可運行狀態的切換,以及線程間的上下文切換都會造成性能的損耗。為了解決這些問題,引入協程coroutine這一概念,就像在一個進程中允許存在多個線程,在一個線程中,也可以存在多個協程。
那么,使用協程究竟有什么好處呢?
首先,執行效率高。線程的切換由操作系統內核執行,消耗資源較多。而協程由程序控制,在用戶態執行,不需要從用戶態切換到內核態,我們也可以理解為,協程是一種進程自身來調度任務的調度模式,因此協程間的切換開銷遠小于線程切換。
其次,節省資源。因為協程在本質上是通過分時復用了一個單線程,因此能夠節省一定的資源。
類似于線程的五種狀態切換,協程間也存在狀態的切換,下面這張圖展示了協程調度器內部任務的流轉。
綜合上面這些角度來看,和原生支持協程的go比起來,java在多線程并發上還真的是不堪一擊。但是,雖然在Java官方的jdk中不能直接使用協程,但是,有其他的開源框架借助動態修改字節碼的方式實現了協程,就比如我們接下來要學習的Quasar。
Quasar使用
Quasar是一個開源的Java協程框架,通過利用Java instrument技術對字節碼進行修改,使方法掛起前后可以保存和恢復jvm棧幀,方法內部已執行到的字節碼位置也通過增加狀態機的方式記錄,在下次恢復執行可直接跳轉至最新位置。
Quasar項目最后更新時間為2018年,版本停留在0.8.0,但是我在直接使用這個版本時報了一個錯誤:
這個錯誤的大意就是這個class文件是使用的高版本jdk編譯的,所以你在低版本的jdk上當然無法運行了。這里major版本號54對應的是jdk10,而我使用的是jdk8,無奈降級試了一下低版本,果然0.7.10可以使用:
<dependency>
<groupId>co.paralleluniverse</groupId>
<artifactId>quasar-core</artifactId>
<version>0.7.10</version>
</dependency>
在我們做好準備工作后,下面就寫幾個例子來感受一下協程的魅力吧。
1、運行時間
下面我們模擬一個簡單的場景,假設我們有一個任務,平均執行時間為1秒,分別測試一下使用線程和協程并發執行10000次需要消耗多少時間。
先通過線程進行調用,直接使用Executors線程池:
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch=new CountDownLatch(10000);
long start = System.currentTimeMillis();
ExecutorService executor= Executors.newCachedThreadPool();
for (int i = 0; i < 10000; i++) {
executor.submit(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
});
}
countDownLatch.await();
long end = System.currentTimeMillis();
System.out.println("Thread use:"+(end-start)+" ms");
}
查看運行時間:
好了,下面我們再用Quasar中的協程跑一下和上面相同的流程。這里我們要使用的是Quasar中的Fiber,它可以被翻譯為協程或纖程,創建Fiber的類型主要可分為下面兩類:
public Fiber(String name, FiberScheduler scheduler, int stackSize, SuspendableRunnable target);
public Fiber(String name, FiberScheduler scheduler, int stackSize, SuspendableCallable<V> target);
在Fiber中可以運行無返回值的SuspendableRunnable或有返回值的SuspendableCallable,看這個名字也知道區別就是java中的Runnable和Callable的區別了。其余參數都可以省略,name為協程的名稱,scheduler是調度器,默認使用FiberForkJoinScheduler,stackSize指定用于保存fiber調用棧信息的stack大小。
在下面的代碼中,使用了Fiber.sleep()方法進行協程的休眠,和Thread.sleep()非常類似。
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch=new CountDownLatch(10000);
long start = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
new Fiber<>(new SuspendableRunnable(){
public Integer run() throws SuspendExecution, InterruptedException {
Fiber.sleep(1000);
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
long end = System.currentTimeMillis();
System.out.println("Fiber use:"+(end-start)+" ms");
}
直接運行,報了一個警告:
QUASAR WARNING: Quasar Java Agent isn't running. If you're using another instrumentation method you can ignore this message; otherwise, please refer to the Getting Started section in the Quasar documentation.
還記得我們前面說過的Quasar生效的原理是基于Java instrument技術嗎,所以這里需要給它添加一個代理Agent。找到本地maven倉庫中已經下好的jar包,在VM options中添加參數:
-javaagent:E:\Apache\maven-repository\co\paralleluniverse\quasar-core\0.7.10\quasar-core-0.7.10.jar
這次運行時就沒有提示警告了,查看一下運行時間:
運行時間只有使用線程池時的一半多一點,確實能大大縮短程序的效率。
2、內存占用
在測試完運行時間后,我們再來測試一下運行內存占用的對比。通過下面代碼嘗試在本地啟動100萬個線程:
public static void main(String[] args) {
for (int i = 0; i < 1000000; i++) {
new Thread(() -> {
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
本來以為會報OutOfMemoryError,但是沒想到的是我的電腦直接直接卡死了…而且不是一次,試了幾次都是以卡死只能重啟電腦而結束。好吧,我選擇放棄,那么下面再試試啟動100萬個Fiber協程。
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch=new CountDownLatch(10000);
for (int i = 0; i < 1000000; i++) {
int finalI = i;
new Fiber<>((SuspendableCallable<Integer>)()->{
Fiber.sleep(100000);
countDownLatch.countDown();
return finalI;
}).start();
}
countDownLatch.await();
System.out.println("end");
}
程序能夠正常執行結束,看樣子使用的內存真的比線程少很多。上面我故意使每個協程結束的時間拖得很長,這樣我們就可以在運行過程中使用Java VisualVM查看內存的占用情況了:
可以看到在使用Fiber的情況下只使用了1G多一點的內存,平均到100萬個協程上也就是說每個Fiber只占用了1Kb左右的內存空間,和Thread線程比起來真的是非常的輕量級。
從上面這張圖中我們也可以看到,運行了非常多的ForkJoinPool,它們又起到了什么作用呢?我們在前面說過,協程是由程序控制在用戶態進行切換,而Quasar中的調度器就使用了一個或多個ForkJoinPool來完成對Fiber的調度。
3、原理與應用
這里簡單介紹一下Quasar的原理,在編譯時框架會對代碼進行掃描,如果方法帶有@Suspendable注解,或拋出了SuspendExecution,或在配置文件META-INF/suspendables中指定該方法,那么Quasar就會修改生成的字節碼,在park掛起方法的前后,插入一些字節碼。
這些字節碼會記錄此時協程的執行狀態,例如相關的局部變量與操作數棧,然后通過拋出異常的方式將cpu的控制權從當前協程交回到控制器,此時控制器可以再調度另外一個協程運行,并通過之前插入的那些字節碼恢復當前協程的執行狀態,使程序能繼續正常執行。
回頭看一下前面例子中的SuspendableRunnable和SuspendableCallable,它們的run方法上都拋出了SuspendExecution,其實這并不是一個真正的異常,僅作為識別掛起方法的聲明,在實際運行中不會拋出。當我們創建了一個Fiber,并在其中調用了其他方法時,如果想要Quasar的調度器能夠介入,那么必須在使用時層層拋出這個異常或添加注解。
看一下簡單的代碼書寫的示例:
public void request(){
new Fiber<>(new SuspendableRunnable() {
public void run() throws SuspendExecution, InterruptedException {
String content = sendRequest();
System.out.println(content);
}
}).start();
}
private String sendRequest() throws SuspendExecution {
return realSendRequest();
}
private String realSendRequest() throws SuspendExecution{
HttpResponse response = HttpRequest.get("http://127.0.0.1:6879/name").execute();
String content = response.body();
return content;
}
需要注意的是,如果在方法內部已經通過try/catch的方式捕獲了Exception,也應該再次手動拋出這個SuspendExecution異常。
總結
本文介紹了Quasar框架的簡單使用,其具體的實現原理比較復雜,暫時就不在這里進行討論,后面打算單獨拎出來進行分析。另外,目前已經有不少其他的框架中已經集成了Quasar,例如同樣是Parallel Universe下的Comsat項目,能夠提供了HTTP和DB訪問等功能。
雖然現在想要在Java中使用協程還只能使用這樣的第三方的框架,但是也不必灰心,在OpenJDK 16中已經加入了一個名為Project Loom的項目, 在OpenJDK Wiki上可以看到對它的介紹,它將使用Fiber輕量級用戶模式線程,從jvm層面對多線程技術進行徹底的改變,使用新的編程模型,使輕量級線程的并發也能夠適用于高吞吐量的業務場景。
原文地址:https://mp.weixin.qq.com/s/9iSHXvsf0R9bZaumpk5jkQ