Rxjava功能個人感覺很好用,里面的一些操作符很方便,Rxjava有:被觀察者,觀察者,訂閱者,
被觀察者通過訂閱者訂閱觀察者,從而實現(xiàn)觀察者監(jiān)聽被觀察者返回的數(shù)據(jù)
下面把Rxjava常用的模型代碼列出來,還有一些操作符的運用:
依賴:
1
2
3
4
|
compile 'io.reactivex.rxjava2:rxandroid:2.0.1' // Because RxAndroid releases are few and far between, it is recommended you also // explicitly depend on RxJava's latest version for bug fixes and new features. compile 'io.reactivex.rxjava2:rxjava:2.1.5' |
這個是另一種解析數(shù)據(jù)的方法,阿里巴巴旗下的,聽說是解析最快的解析器。。。。
1
|
compile 'com.alibaba:fastjson:1.2.39' |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
|
import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.view.View; import android.widget.TextView; import com.alibaba.fastjson.JSONObject; import java.io.IOException; import java.util.concurrent.TimeUnit; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import io.reactivex.FlowableEmitter; import io.reactivex.FlowableOnSubscribe; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.Observer; import io.reactivex.android.schedulers.AndroidSchedulers; import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.functions.BiFunction; import io.reactivex.functions.Consumer; import io.reactivex.functions.Function; import io.reactivex.schedulers.Schedulers; import okhttp3.Call; import okhttp3.Callback; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; public class MainActivity extends AppCompatActivity { private TextView name; @Override protected void onCreate(Bundle savedInstanceState) { super .onCreate(savedInstanceState); setContentView(R.layout.activity_main); name = (TextView) findViewById(R.id.name); //用來調(diào)用下面的方法,監(jiān)聽。 name.setOnClickListener( new View.OnClickListener() { @Override public void onClick(View v) { interval(); } }); } //例1:Observer public void observer() { //觀察者 Observer<string> observer = new Observer<string>() { @Override public void onSubscribe( @NonNull Disposable d) { } @Override public void onNext( @NonNull String s) { //接收從被觀察者中返回的數(shù)據(jù) System.out.println( "onNext :" + s); } @Override public void onError( @NonNull Throwable e) { } @Override public void onComplete() { } }; //被觀察者 Observable<string> observable = new Observable<string>() { @Override protected void subscribeActual(Observer<!--? super String--> observer) { observer.onNext( "11111" ); observer.onNext( "22222" ); observer.onComplete(); } }; //產(chǎn)生了訂閱 observable.subscribe(observer); } //例2:Flowable private void flowable(){ //被觀察者 Flowable.create( new FlowableOnSubscribe<string>() { @Override public void subscribe( @NonNull FlowableEmitter<string> e) throws Exception { for ( int i = 0 ; i < 100 ; i++) { e.onNext(i+ "" ); } } //背壓的策略,buffer緩沖區(qū) 觀察者 //背壓一共給了五種策略 // BUFFER、 // DROP、打印前128個,后面的刪除 // ERROR、 // LATEST、打印前128個和最后一個,其余刪除 // MISSING //這里的策略若不是BUFFER 那么,會出現(xiàn)著名的:MissingBackpressureException錯誤 }, BackpressureStrategy.BUFFER).subscribe( new Consumer<string>() { @Override public void accept(String s) throws Exception { System.out.println( "subscribe accept" +s); Thread.sleep( 1000 ); } }); } //例3:線程調(diào)度器 Scheduler public void flowable1(){ Flowable.create( new FlowableOnSubscribe<string>() { @Override public void subscribe( @NonNull FlowableEmitter<string> e) throws Exception { for ( int i = 0 ; i < 100 ; i++) { //輸出在哪個線程 System.out.println( "subscribe Thread.currentThread.getName = " + Thread.currentThread().getName()); e.onNext(i+ "" ); } } },BackpressureStrategy.BUFFER) //被觀察者一般放在子線程 .subscribeOn(Schedulers.io()) //觀察者一般放在主線程 .observeOn(AndroidSchedulers.mainThread()) .subscribe( new Consumer<string>() { @Override public void accept(String s) throws Exception { System.out.println( "s" + s); Thread.sleep( 100 ); //輸出在哪個線程 System.out.println( "subscribe Thread.currentThread.getName = " + Thread.currentThread().getName()); } }); } //例4:http請求網(wǎng)絡(luò),map轉(zhuǎn)化器,fastjson解析器 public void map1(){ Observable.create( new ObservableOnSubscribe<string>() { @Override public void subscribe( @NonNull final ObservableEmitter<string> e) throws Exception { OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder() .url( "https://qhb.2dyt.com/Bwei/login" ) .build(); client.newCall(request).enqueue( new Callback() { @Override public void onFailure(Call call, IOException e) { } @Override public void onResponse(Call call, Response response) throws IOException { String result = response.body().string(); e.onNext(result); } }); } }) //map轉(zhuǎn)換器 flatmap(無序),concatmap(有序) .map( new Function<string, bean= "" >() { @Override public Bean apply( @NonNull String s) throws Exception { //用fastjson來解析數(shù)據(jù) return JSONObject.parseObject(s,Bean. class ); } }).subscribe( new Consumer<bean>() { @Override public void accept(Bean bean) throws Exception { System.out.println( "bean = " + bean.toString() ); } }); } //常見rxjava操作符 //例 定時發(fā)送消息 public void interval(){ Observable.interval( 2 , 1 , TimeUnit.SECONDS) .take( 10 ) .subscribe( new Consumer< long >() { @Override public void accept(Long aLong) throws Exception { System.out.println( "aLong = " + aLong); } }); } //例 zip字符串合并 public void zip(){ Observable observable1 = Observable.create( new ObservableOnSubscribe<string>() { @Override public void subscribe( @NonNull ObservableEmitter<string> e) throws Exception { e.onNext( "1" ); e.onNext( "2" ); e.onNext( "3" ); e.onNext( "4" ); e.onComplete(); } }); Observable observable2 = Observable.create( new ObservableOnSubscribe<string>() { @Override public void subscribe( @NonNull ObservableEmitter<string> e) throws Exception { e.onNext( "A" ); e.onNext( "B" ); e.onNext( "C" ); e.onNext( "D" ); e.onComplete(); } }); Observable.zip(observable1, observable2, new BiFunction<string,string,string>() { @Override public String apply( @NonNull String o, @NonNull String o2) throws Exception { return o + o2; } }).subscribe( new Consumer<string>() { @Override public void accept(String o) throws Exception { System.out.println( "o" + o); } }); } |
總結(jié)
以上就是本文關(guān)于Rxjava功能操作符的使用方法詳解的全部內(nèi)容,希望對大家有所幫助。有什么問題可以隨時留言,小編會及時回復(fù)大家的。感謝朋友們對本站的支持!
原文鏈接:https://www.2cto.com/kf/201710/691536.html