一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - 簡單談談RxJava和多線程并發

簡單談談RxJava和多線程并發

2020-08-23 15:00zjutkz Java教程

認識RxJava已經有一段時間了,但是一直沒有機會在項目中嘗試,最近在新的項目里引進了RxJava寫一些事件處理,在review代碼的時候發現了一些和多線程并發相關的問題,所以寫了這篇文章,需要的朋友可以參考借鑒,下面來一起看

前言

相信對于RxJava,大家應該都很熟悉,他最核心的兩個字就是異步,誠然,它對異步的處理非常的出色,但是異步絕對不等于并發,更不等于線程安全,如果把這幾個概念搞混了,錯誤的使用RxJava,是會來帶非常多的問題的。

RxJava與并發

首先讓我們來看一段RxJava協議的原文:

?
1
Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

如上所述,RxJava對多線程并發其實并沒有做非常的多保護,這段話中說,如果多個Observables從多個線程中發射數據,必須要滿足happens-before原則。

下面來看一個簡單的例子:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
final PublishSubject<Integer> subject = PublishSubject.create();
 
subject.subscribe(new Subscriber<Integer>() {
 @Override
 public void onCompleted() {
 
 }
 
 @Override
 public void onError(Throwable e) {
 
 }
 
 @Override
 public void onNext(Integer integer) {
  unSafeCount = unSafeCount + integer;
  Log.d("TAG", "onNext: " + unSafeCount);
 }
});
 
findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
 @Override
 public void onClick(View v) {
  final int unit = 1;
  for(int i = 0;i < 10;i++) {
   new Thread(new Runnable() {
    @Override
    public void run() {
     for (int j = 0; j < 1000; j++) {
      subject.onNext(unit);
     }
    }
   }).start();
  }
 }
});

這是一個最典型的多線程問題,從10個線程中發射數據并相加,這樣最終得到的答案是小于10000的。雖然使用了RxJava,但是這樣的使用對于并發是沒有意義的,因為RxJava并沒有去處理并發帶來的問題。我們可以看下subject的onNext方法的源碼,里面很簡單,就是調用了對應observer的onNext方法而已。不止是這樣,絕大多數的Subject都是線程不安全的,所以當你在使用這樣的類的時候(典型場景就是自制的RxBus),如果從多個線程中發射數據,那你就要小心了。

對于這樣的問題,有兩種解決方案:

第一種就是簡單的使用傳統的解決方法,比如用AtomicInteger代替int。

第二種則是使用RxJava的解決方案,在這里就是用SerializedSubject去代替Subject:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
final PublishSubject<Integer> subject = PublishSubject.create();
 
subject.subscribe(new Subscriber<Integer>() {
 @Override
 public void onCompleted() {
 
 }
 
 @Override
 public void onError(Throwable e) {
 
 }
 
 @Override
 public void onNext(Integer integer) {
  unSafeCount = unSafeCount + integer;
  count.addAndGet(integer);
 
  Log.d("TAG", "onNext: " + count);
 }
});
 
final SerializedSubject<Integer, Integer> ser = new SerializedSubject<Integer, Integer>(subject);
 
findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
 @Override
 public void onClick(View v) {
  final int unit = 1;
 
  for(int i = 0;i < 10;i++){
   new Thread(new Runnable() {
    @Override
    public void run() {
     for(int j = 0;j < 1000;j++){
      ser.onNext(unit);
     }
    }
   }).start();
  }
 }
});

可以看一下SerializedSubject的onNext方法做了什么:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Override
public void onNext(T t) {
 if (terminated) {
  return;
 }
 synchronized (this) {
  if (terminated) {
   return;
  }
  if (emitting) {
   FastList list = queue;
   if (list == null) {
    list = new FastList();
    queue = list;
   }
   list.add(nl.next(t));
   return;
  }
  emitting = true;
 }
 try {
  actual.onNext(t);
 } catch (Throwable e) {
  terminated = true;
  Exceptions.throwOrReport(e, actual, t);
  return;
 }
 for (;;) {
  for (int i = 0; i < MAX_DRAIN_ITERATION; i++) {
   FastList list;
   synchronized (this) {
    list = queue;
    if (list == null) {
     emitting = false;
     return;
    }
    queue = null;
   }
   for (Object o : list.array) {
    if (o == null) {
     break;
    }
    try {
     if (nl.accept(actual, o)) {
      terminated = true;
      return;
     }
    } catch (Throwable e) {
     terminated = true;
     Exceptions.throwIfFatal(e);
     actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
     return;
    }
   }
  }
 }
}

處理方式很簡單,如果有其他線程在發射數據,那就將數據放置到隊列中,等待下次發射。這保證了同一時間只會有一個線程調用onNext,onComplete和onError這些方法。

但是這樣操作顯然是會造成性能的影響的,所以RxJava并不會把所有的操作都打上線程安全的標簽。

在這里就要引申出一個問題,那就是使用者對create方法的濫用,其實這個方法不應該被使用者頻繁的調用的,因為你必須要小心的處理所有的數據發射,接收的邏輯。相反的,使用已有的操作符能很好的解決這個問題,所以下次大家在遇到問題的時候不要簡單的使用create去自己寫,而是應該想想有沒有現成的操作符可以完成相應的需求。

RxJava中的一些操作符

RxJava中有一些操作符也和多線程并發有關,下面讓我來講一講merge和concat,以及他們的一些變種操作符。

對于多線程發射數據,有時候我們需要得到的結果也保持和發射時候一樣的順序,這個時候如果我們使用merge這個操作符去結合多個發射源,那么就會產生一定的問題了(例子中做了非常不好的示范——使用了create操作符,請大家不要學習這樣的寫法,這里單純是為了求證結果)。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
Observable o1 = Observable.create(new Observable.OnSubscribe<Integer>() {
 @Override
 public void call(final Subscriber<? super Integer> subscriber) {
  new Thread(new Runnable() {
   @Override
   public void run() {
    try {
     Thread.sleep(1000);
     subscriber.onNext(1);
     subscriber.onCompleted();
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
   }
  }).start();
 }
});
Observable o2 = Observable.create(new Observable.OnSubscribe<Integer>() {
 @Override
 public void call(Subscriber<? super Integer> subscriber) {
  subscriber.onNext(2);
  subscriber.onCompleted();
 }
});
 
Observable.merge(o1,o2)
  .subscribe(new Subscriber<Integer>() {
   @Override
   public void onCompleted() {
 
   }
 
   @Override
   public void onError(Throwable e) {
 
   }
 
   @Override
   public void onNext(Integer i) {
    Log.d("TAG", "onNext: " + i);
   }
  });

對于這樣的場景,我們得到的答案將是2,1而不是先得到o1發射的數據,再獲取o2的數據。

究其原因,就是因為merge其實就是給什么傳什么,也不會去管數據發射的順序:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void onNext(Observable<? extends T> t) {
  if (t == null) {
    return;
  }
  if (t == Observable.empty()) {
    emitEmpty();
  } else
  if (t instanceof ScalarSynchronousObservable) {
    tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
  } else {
    InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
    addInner(inner);
    t.unsafeSubscribe(inner);
    emit();
  }
}

可以看到在經過lift操作之后,對應的中間人MergeSubscriber的onNext,沒有什么多余的代碼,所以在多個Observable從多線程中發射數據的時候,順序當然不能得到保證。

一個單詞說明這個問題:interleaving——交錯。merge后的數據源可能是交錯的。由于merge有這樣數據交錯的問題,所以它的變種—flatMap也會有同樣的問題。

對于這樣的場景,我們可以使用concat操作符來完成:

?
1
Concat waits to subscribe to each additional Observable that you pass to it until the previous Observable completes.

根據文檔,我們知道concat操作符是一個接一個的處理數據源的數據的。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
if (wip.getAndIncrement() != 0) {
  return;
}
 
final int delayErrorMode = this.delayErrorMode;
 
for (;;) {
  if (actual.isUnsubscribed()) {
    return;
  }
 
  if (!active) {
    if (delayErrorMode == BOUNDARY) {
      if (error.get() != null) {
        Throwable ex = ExceptionsUtils.terminate(error);
        if (!ExceptionsUtils.isTerminated(ex)) {
          actual.onError(ex);
        }
        return;
      }
    }
 
    boolean mainDone = done;
    Object v = queue.poll();
    boolean empty = v == null;
 
    if (mainDone && empty) {
      Throwable ex = ExceptionsUtils.terminate(error);
      if (ex == null) {
        actual.onCompleted();
      } else
      if (!ExceptionsUtils.isTerminated(ex)) {
        actual.onError(ex);
      }
      return;
    }
 
    if (!empty) {
 
      Observable<? extends R> source;
 
      try {
        source = mapper.call(NotificationLite.<T>instance().getValue(v));
      } catch (Throwable mapperError) {
        Exceptions.throwIfFatal(mapperError);
        drainError(mapperError);
        return;
      }
 
      if (source == null) {
        drainError(new NullPointerException("The source returned by the mapper was null"));
        return;
      }
 
      if (source != Observable.empty()) {
 
        if (source instanceof ScalarSynchronousObservable) {
          ScalarSynchronousObservable<? extends R> scalarSource = (ScalarSynchronousObservable<? extends R>) source;
 
          active = true;
 
          arbiter.setProducer(new ConcatMapInnerScalarProducer<T, R>(scalarSource.get(), this));
        } else {
          ConcatMapInnerSubscriber<T, R> innerSubscriber = new ConcatMapInnerSubscriber<T, R>(this);
          inner.set(innerSubscriber);
 
          if (!innerSubscriber.isUnsubscribed()) {
            active = true;
 
            source.unsafeSubscribe(innerSubscriber);
          } else {
            return;
          }
        }
        request(1);
      } else {
        request(1);
        continue;
      }
    }
  }
  if (wip.decrementAndGet() == 0) {
    break;
  }
}

通過源碼我們可以知道,active字段就保證了如果上一個數據源還沒有發射完數據,就會一直在for循環中等待,直到上一個數據源發射完了數據重置了active字段。

對于concat,其實還存在一個問題,那就是多個Observable變成了串行,會大大的增加整個RxJava事件流的處理時間,對于這個場景,我們可以使用concatEager來解決。concatEager的源碼就不帶大家分析了,有興趣的同學可以自行查看。

總結

這篇文章比較短,講的東西也比較淺顯,其實就是討論了一下RxJava中多線程并發的幾個問題。最后我想說,RxJava并不是什么高大上的東西,在你的項目引入之前,要考慮一下是否真的有必要這么做。就算真的有場景需要RxJava,也請不要一口氣把項目中所有的操作都換成RxJava,一些簡單的操作不一定需要使用RxJava的操作符的實現,用了反而降低了代碼的可讀性,切勿為了使用Rx而使用Rx。

好了,以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作能帶來一定的幫助,如果有疑問大家可以留言交流。

原文鏈接:http://zjutkz.net/2017/02/09/淺談RxJava與多線程并發/

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 亚洲26uuuu最新地址 | bt天堂在线观看国产 | 免费一级欧美大片在线观看 | 亚洲精品免费在线观看 | 3x免费高清视频 | 大陆国产精品视频 | 国产欧美精品一区二区三区四区 | 亚洲精品视频在线 | 户外露出野战hd | 暖暖 免费 高清 中文 日本 | 国产美女做爰免费视频软件 | 好爽轻点太大了太深了 | 欧美 亚洲 综合 卡通 另类 区 | 67194最新网址 | 国产福利片在线 | 国产一区视频在线免费观看 | yellow视频在线观看 | 成人亚洲欧美综合 | 成人福利免费在线观看 | 日本欧美大码a在线视频播放 | 成人福利免费视频 | 美女被无套进入 | 毛片网站观看 | 国产馆 | 日韩无砖专区体验区 | 欧美一区二区三区视视频 | 国产一级特黄aa大片在线 | 99热在线免费观看 | 俄罗斯女人与公拘i交酡 | 美女脱了内裤打开腿让人羞羞软件 | 亚洲福利视频在线观看 | 34g污奶跳舞| 国内自拍成人网在线视频 | 隔壁老王国产精品福利 | 色一情一区二区三区四区 | 韩国久播影院理论片不卡影院 | 色漫在线观看 | 护士们的母狗 | 肉文高h调教| 日本中文字幕高清 | 护士被多人调教到失禁h |