Observable.create(new Observable.OnSubscribe<Integer>() { | |
@Override | |
public void call(Subscriber<? super Integer> observer) { | |
try { | |
if (!observer.isUnsubscribed()) { | |
for (int i = 1; i < 5; i++) { | |
observer.onNext(i); | |
} | |
observer.onCompleted(); | |
} | |
} catch (Exception e) { | |
observer.onError(e); | |
} | |
} | |
}).subscribe(new Subscriber<Integer>() { | |
@Override | |
public void onNext(Integer item) { | |
System.out.println("Next: " + item); | |
} | |
@Override | |
public void onError(Throwable error) { | |
System.err.println("Error: " + error.getMessage()); | |
} | |
@Override | |
public void onCompleted() { | |
System.out.println("Sequence complete."); | |
} | |
}); |
Running result:
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
一个 Observable 需要一个 OnSubscribe & Subscriber 配套成组。 每组 Observable 构成上下游关系,operator 调用时,会创建该层级的 Observable 并持有上游的 Observable 对象。 当该层级 Observable 被 subscribe 时, onSubscribe 会实例化上游的 Observer,它持有下游的 Observer。 subscribe 上游的 Observable。 由此出发链式 subscribe 调用,直到最上游的 Observable,其 onSubscribe 发射数据
上游的 Observer 会调用下游的 Observer ,如此链式逐级调用 Observer 的 接口方法,如 onNext ,直到最下游 。
/**
* Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with an unbounded buffer of configurable "island size" and optionally delays onError notifications.
* <p>
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
* <p>"Island size" indicates how large chunks the unbounded buffer allocates to store the excess elements waiting to be consumed
* on the other side of the asynchronous boundary. Values below 16 are not recommended in performance sensitive scenarios.
*
* @param scheduler
* the {@link Scheduler} to notify {@link Observer}s on
* @param delayError
* indicates if the onError notification may not cut ahead of onNext notification on the other side of the
* scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received
* from upstream
* @param bufferSize the size of the buffer.
* @return the source ObservableSource modified so that its {@link Observer}s are notified on the specified
* {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #subscribeOn
* @see #observeOn(Scheduler)
* @see #observeOn(Scheduler, boolean)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
/**
* Asynchronously subscribes Observers to this ObservableSource on the specified {@link Scheduler}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
*
* @param scheduler
* the {@link Scheduler} to perform subscription actions on
* @return the source ObservableSource modified so that its subscriptions happen on the
* specified {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #observeOn
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
Observable.create(OnSubscribe)
.observeOn()
.subscribeOn()
// 1. subscribe
.subscribe(Subscriber);
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
// 2. 实例话上游 Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
// 3. 使用上游 Observer 订阅上游 Observable ,具体是在 SubscribeTask 内执行。
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(upstream);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// 4. 订阅上游 Observable ObservableObserveOn
source.subscribe(parent);
}
}
}