前言
相信對于RxJava,大家應該都很熟悉,他最核心的兩個字就是異步,誠然,它對異步的處理非常的出色,但是異步絕對不等于并發,更不等于線程安全,如果把這幾個概念搞混了,錯誤的使用RxJava,是會來帶非常多的問題的。
RxJava與并發
首先讓我們來看一段RxJava協議的原文:
1
|
Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications. |
如上所述,RxJava對多線程并發其實并沒有做非常的多保護,這段話中說,如果多個Observables從多個線程中發射數據,必須要滿足happens-before原則。
下面來看一個簡單的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
final PublishSubject<Integer> subject = PublishSubject.create(); subject.subscribe( new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { unSafeCount = unSafeCount + integer; Log.d( "TAG" , "onNext: " + unSafeCount); } }); findViewById(R.id.send).setOnClickListener( new View.OnClickListener() { @Override public void onClick(View v) { final int unit = 1 ; for ( int i = 0 ;i < 10 ;i++) { new Thread( new Runnable() { @Override public void run() { for ( int j = 0 ; j < 1000 ; j++) { subject.onNext(unit); } } }).start(); } } }); |
這是一個最典型的多線程問題,從10個線程中發射數據并相加,這樣最終得到的答案是小于10000的。雖然使用了RxJava,但是這樣的使用對于并發是沒有意義的,因為RxJava并沒有去處理并發帶來的問題。我們可以看下subject的onNext方法的源碼,里面很簡單,就是調用了對應observer的onNext方法而已。不止是這樣,絕大多數的Subject都是線程不安全的,所以當你在使用這樣的類的時候(典型場景就是自制的RxBus),如果從多個線程中發射數據,那你就要小心了。
對于這樣的問題,有兩種解決方案:
第一種就是簡單的使用傳統的解決方法,比如用AtomicInteger代替int。
第二種則是使用RxJava的解決方案,在這里就是用SerializedSubject去代替Subject:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
final PublishSubject<Integer> subject = PublishSubject.create(); subject.subscribe( new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { unSafeCount = unSafeCount + integer; count.addAndGet(integer); Log.d( "TAG" , "onNext: " + count); } }); final SerializedSubject<Integer, Integer> ser = new SerializedSubject<Integer, Integer>(subject); findViewById(R.id.send).setOnClickListener( new View.OnClickListener() { @Override public void onClick(View v) { final int unit = 1 ; for ( int i = 0 ;i < 10 ;i++){ new Thread( new Runnable() { @Override public void run() { for ( int j = 0 ;j < 1000 ;j++){ ser.onNext(unit); } } }).start(); } } }); |
可以看一下SerializedSubject的onNext方法做了什么:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
@Override public void onNext(T t) { if (terminated) { return ; } synchronized ( this ) { if (terminated) { return ; } if (emitting) { FastList list = queue; if (list == null ) { list = new FastList(); queue = list; } list.add(nl.next(t)); return ; } emitting = true ; } try { actual.onNext(t); } catch (Throwable e) { terminated = true ; Exceptions.throwOrReport(e, actual, t); return ; } for (;;) { for ( int i = 0 ; i < MAX_DRAIN_ITERATION; i++) { FastList list; synchronized ( this ) { list = queue; if (list == null ) { emitting = false ; return ; } queue = null ; } for (Object o : list.array) { if (o == null ) { break ; } try { if (nl.accept(actual, o)) { terminated = true ; return ; } } catch (Throwable e) { terminated = true ; Exceptions.throwIfFatal(e); actual.onError(OnErrorThrowable.addValueAsLastCause(e, t)); return ; } } } } } |
處理方式很簡單,如果有其他線程在發射數據,那就將數據放置到隊列中,等待下次發射。這保證了同一時間只會有一個線程調用onNext,onComplete和onError這些方法。
但是這樣操作顯然是會造成性能的影響的,所以RxJava并不會把所有的操作都打上線程安全的標簽。
在這里就要引申出一個問題,那就是使用者對create方法的濫用,其實這個方法不應該被使用者頻繁的調用的,因為你必須要小心的處理所有的數據發射,接收的邏輯。相反的,使用已有的操作符能很好的解決這個問題,所以下次大家在遇到問題的時候不要簡單的使用create去自己寫,而是應該想想有沒有現成的操作符可以完成相應的需求。
RxJava中的一些操作符
RxJava中有一些操作符也和多線程并發有關,下面讓我來講一講merge和concat,以及他們的一些變種操作符。
對于多線程發射數據,有時候我們需要得到的結果也保持和發射時候一樣的順序,這個時候如果我們使用merge這個操作符去結合多個發射源,那么就會產生一定的問題了(例子中做了非常不好的示范——使用了create操作符,請大家不要學習這樣的寫法,這里單純是為了求證結果)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
Observable o1 = Observable.create( new Observable.OnSubscribe<Integer>() { @Override public void call( final Subscriber<? super Integer> subscriber) { new Thread( new Runnable() { @Override public void run() { try { Thread.sleep( 1000 ); subscriber.onNext( 1 ); subscriber.onCompleted(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }); Observable o2 = Observable.create( new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext( 2 ); subscriber.onCompleted(); } }); Observable.merge(o1,o2) .subscribe( new Subscriber<Integer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Integer i) { Log.d( "TAG" , "onNext: " + i); } }); |
對于這樣的場景,我們得到的答案將是2,1而不是先得到o1發射的數據,再獲取o2的數據。
究其原因,就是因為merge其實就是給什么傳什么,也不會去管數據發射的順序:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
@Override public void onNext(Observable<? extends T> t) { if (t == null ) { return ; } if (t == Observable.empty()) { emitEmpty(); } else if (t instanceof ScalarSynchronousObservable) { tryEmit(((ScalarSynchronousObservable<? extends T>)t).get()); } else { InnerSubscriber<T> inner = new InnerSubscriber<T>( this , uniqueId++); addInner(inner); t.unsafeSubscribe(inner); emit(); } } |
可以看到在經過lift操作之后,對應的中間人MergeSubscriber的onNext,沒有什么多余的代碼,所以在多個Observable從多線程中發射數據的時候,順序當然不能得到保證。
一個單詞說明這個問題:interleaving——交錯。merge后的數據源可能是交錯的。由于merge有這樣數據交錯的問題,所以它的變種—flatMap也會有同樣的問題。
對于這樣的場景,我們可以使用concat操作符來完成:
1
|
Concat waits to subscribe to each additional Observable that you pass to it until the previous Observable completes. |
根據文檔,我們知道concat操作符是一個接一個的處理數據源的數據的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
if (wip.getAndIncrement() != 0 ) { return ; } final int delayErrorMode = this .delayErrorMode; for (;;) { if (actual.isUnsubscribed()) { return ; } if (!active) { if (delayErrorMode == BOUNDARY) { if (error.get() != null ) { Throwable ex = ExceptionsUtils.terminate(error); if (!ExceptionsUtils.isTerminated(ex)) { actual.onError(ex); } return ; } } boolean mainDone = done; Object v = queue.poll(); boolean empty = v == null ; if (mainDone && empty) { Throwable ex = ExceptionsUtils.terminate(error); if (ex == null ) { actual.onCompleted(); } else if (!ExceptionsUtils.isTerminated(ex)) { actual.onError(ex); } return ; } if (!empty) { Observable<? extends R> source; try { source = mapper.call(NotificationLite.<T>instance().getValue(v)); } catch (Throwable mapperError) { Exceptions.throwIfFatal(mapperError); drainError(mapperError); return ; } if (source == null ) { drainError( new NullPointerException( "The source returned by the mapper was null" )); return ; } if (source != Observable.empty()) { if (source instanceof ScalarSynchronousObservable) { ScalarSynchronousObservable<? extends R> scalarSource = (ScalarSynchronousObservable<? extends R>) source; active = true ; arbiter.setProducer( new ConcatMapInnerScalarProducer<T, R>(scalarSource.get(), this )); } else { ConcatMapInnerSubscriber<T, R> innerSubscriber = new ConcatMapInnerSubscriber<T, R>( this ); inner.set(innerSubscriber); if (!innerSubscriber.isUnsubscribed()) { active = true ; source.unsafeSubscribe(innerSubscriber); } else { return ; } } request( 1 ); } else { request( 1 ); continue ; } } } if (wip.decrementAndGet() == 0 ) { break ; } } |
通過源碼我們可以知道,active字段就保證了如果上一個數據源還沒有發射完數據,就會一直在for循環中等待,直到上一個數據源發射完了數據重置了active字段。
對于concat,其實還存在一個問題,那就是多個Observable變成了串行,會大大的增加整個RxJava事件流的處理時間,對于這個場景,我們可以使用concatEager來解決。concatEager的源碼就不帶大家分析了,有興趣的同學可以自行查看。
總結
這篇文章比較短,講的東西也比較淺顯,其實就是討論了一下RxJava中多線程并發的幾個問題。最后我想說,RxJava并不是什么高大上的東西,在你的項目引入之前,要考慮一下是否真的有必要這么做。就算真的有場景需要RxJava,也請不要一口氣把項目中所有的操作都換成RxJava,一些簡單的操作不一定需要使用RxJava的操作符的實現,用了反而降低了代碼的可讀性,切勿為了使用Rx而使用Rx。
好了,以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作能帶來一定的幫助,如果有疑問大家可以留言交流。
原文鏈接:http://zjutkz.net/2017/02/09/淺談RxJava與多線程并發/