RxJava
心无旁骛-我只关心持有的这只股票,第一时间作出准确判断
张三最近有了一笔闲钱,为了能有一个比较好的收益,他经过研究,最终选择ABC这只股票。股市瞬息万变,张三需要时刻关注ABC股票的价格走势力,以便第一时间作出决策(加仓,减持保收益)。上市公司股票是透明的投资,李四也拥有ABC这只股票。他们都关注着,当前每股24元,经过分析,张三判断,目标价格30为顶,需要减持,而李四则判断目标价为40元。我们来模拟这个例子。
这里涉及几关键对象:股票,关注股票的人(张三、李四)//股票public interface Gupiao { int getSum(); int add(int add); void addPeople(PeopleGupiao peopleGupiao);}public class AbcGupiao implements Gupiao { Listlist = new ArrayList<>(); int currentPrice = 24; @Override public int getSum() { return currentPrice; } @Override public int add(int add) { this.currentPrice+=add; for(PeopleGupiao p : list){ p.myPrice(this); } return add; } @Override public void addPeople(PeopleGupiao peopleGupiao) { list.add(peopleGupiao); }}
//peoplepublic interface PeopleGupiao { void myPrice(Gupiao gupiao);}public class Zhangsan implements PeopleGupiao { private boolean isHave = true; private int expect = 30; private int price = 24; @Override public void myPrice(Gupiao gupiao) { if(isHave){ this.price = gupiao.getSum(); if(this.price < expect){ isHave = true; } else { isHave = false; } } System.out.println("zhansan price :"+price); }}public class Lisi implements PeopleGupiao { private boolean isHave = true; private int expect = 40; private int price = 24; @Override public void myPrice(Gupiao gupiao) { if(isHave){ this.price = gupiao.getSum(); if(this.price < expect){ isHave = true; } else { isHave = false; } } System.out.println("lisi price :"+price); }}
这里就是一个典型的观察者模式的实现,Gupiao类即为主题对象-subject,People即为观察者对像--observer,而Gupiao的add方法为状态变化时的notify方法,将该主题传递给每一个观察者对象,而观察者在继续做相应处理,此处处理方法为People的myPrice方法。
追本溯源-RxJava起源
RxJava时ReactiveX的java实现,支持事件相应式编程。毫无疑问,事件响应式编程都是基于观察者模式,RxJava也一样,顶层设计思想就是一个庞大的观察者模式(发布/订阅模式)。
进一步讨论
工厂方法:创建Observable
在RxJava中,Observable持有一个主题对象,OnSubscrible
public staticObservable create(OnSubscribe f) { return new Observable (RxJavaHooks.onCreate(f));}
protected Observable(OnSubscribef) { this.onSubscribe = f; }
即主题对象为ObSubscribe,产生主题或者主题状态发生变化会通知所以有的观察者,所以主题的通知方法会以观察者(订阅者)为参数。
public interface OnSubscribeextends Action1 > { // cover for generics insanity}
在通知方法调用时,会依次调用所有的观察者Observer的回调方法。
public interface Observer{ /** * Notifies the Observer that the {@link Observable} has finished sending push-based notifications. * * The {@link Observable} will not call this method if it calls {@link #onError}. */ void onCompleted(); /** * Notifies the Observer that the {@link Observable} has experienced an error condition. *
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or * {@link #onCompleted}. * * @param e * the exception encountered by the Observable */ void onError(Throwable e); /** * Provides the Observer with a new item to observe. *
* The {@link Observable} may call this method 0 or more times. *
* The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or * {@link #onError}. * * @param t * the item emitted by the Observable */ void onNext(T t);}
所以观察者接口,主题接口就是ReactiveX的核心接口。
主题接口--OnSubscribe
1. 空实现,通知方法直接调用,订阅者接口的onCompleted方法。
@Overridepublic void call(Subscriber child) { child.onCompleted();}
1. OnSubscribeThrow,通知方法直接调用,订阅者接口的onError方法。
@Overridepublic void call(Subscriber observer) { observer.onError(exception);}
2. JustOnSubscribe,只生产一次主题,from和just方法主传入一个值的默认实现。
public void call(Subscriber s) { s.setProducer(createProducer(s, value));}
关键方法为生产者对象。
staticProducer createProducer(Subscriber s, T v) { if (STRONG_MODE) { return new SingleProducer (s, v); } return new WeakSingleProducer (s, v);}
static final class WeakSingleProducerimplements Producer { final Subscriber actual; final T value; boolean once; public WeakSingleProducer(Subscriber actual, T value) { this.actual = actual; this.value = value; } @Override public void request(long n) { if (once) { return; } if (n < 0L) { throw new IllegalStateException("n >= required but it was " + n); } if (n == 0L) { return; } once = true; Subscriber a = actual; if (a.isUnsubscribed()) { return; } T v = value; try { a.onNext(v); } catch (Throwable e) { Exceptions.throwOrReport(e, a, v); return; } if (a.isUnsubscribed()) { return; } a.onCompleted(); }}
一言以蔽之只会会调用观察者的onNext方法一次,如果异常则调用onError,没有异常则调用观察者的onCompleted方法。
3. OnSubscribeFromArray,数组中的每一个值当成生产对象,from和just方法主传入多值的默认实现。
默认的生产者为FromArrayProducer,其默认实现时遍历数组,一次调用onNext,onCompleted方法,当然,异常出现也会调用OnError。
4. OnSubscribeFromIterable,跟OnSubscribeFromArray类似,只是,传入的是Iterable对象,使用的生产者为OnSubscribeFromIterable。
5. 周期性主题对象,其关键对象是scheduler。通过Scheduler的createWork方法创建Work对象。
6.OnSubscribeTimerPeriodically,interval方法的默认实现,counter初始值为0,每次迭代周期,counter+1。
public final class OnSubscribeTimerPeriodically implements OnSubscribe{ final long initialDelay; final long period; final TimeUnit unit; final Scheduler scheduler; public OnSubscribeTimerPeriodically(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) { this.initialDelay = initialDelay; this.period = period; this.unit = unit; this.scheduler = scheduler; } @Override public void call(final Subscriber child) { final Worker worker = scheduler.createWorker(); child.add(worker); worker.schedulePeriodically(new Action0() { long counter; @Override public void call() { try { child.onNext(counter++); } catch (Throwable e) { try { worker.unsubscribe(); } finally { Exceptions.throwOrReport(e, child); } } } }, initialDelay, period, unit); }}
几种典型的scheduler
1. computation
@Experimentalpublic static Scheduler createComputationScheduler() { return createComputationScheduler(new RxThreadFactory("RxComputationScheduler-"));}
public final class RxThreadFactory extends AtomicLong implements ThreadFactory { /** */ private static final long serialVersionUID = -8841098858898482335L; public static final ThreadFactory NONE = new ThreadFactory() { @Override public Thread newThread(Runnable r) { throw new AssertionError("No threads allowed."); } }; final String prefix; public RxThreadFactory(String prefix) { this.prefix = prefix; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, prefix + incrementAndGet()); t.setDaemon(true); return t; }}
创建的是守护线程。继承子AtomicLong,有效的避免同步问题,线程的创建为线程安全的。默认会创建与cpu数量相同的线程。选取线池中的线程时,简单轮询选取线程执行。
public PoolWorker getEventLoop() { int c = cores; if (c == 0) { return SHUTDOWN_WORKER; } // simple round robin, improvements to come return eventLoops[(int)(n++ % c)];}
2. immediate
内部并没有创建新的线程,仅仅是使用当前线程执行,每一次迭代都是简单使用sleep操作。
public final class ImmediateScheduler extends Scheduler { .... @Override public Worker createWorker() { return new InnerImmediateScheduler(); } private class InnerImmediateScheduler extends Scheduler.Worker implements Subscription { final BooleanSubscription innerSubscription = new BooleanSubscription(); @Override public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { // since we are executing immediately on this thread we must cause this thread to sleep long execTime = ImmediateScheduler.this.now() + unit.toMillis(delayTime); return schedule(new SleepingAction(action, this, execTime)); } .... }}
/* package */class SleepingAction implements Action0 {.... @Override public void call() { if (innerScheduler.isUnsubscribed()) { return; } long delay = execTime - innerScheduler.now(); if (delay > 0) { try { Thread.sleep(delay); } catch (InterruptedException e) { Thread.currentThread().interrupt(); Exceptions.propagate(e); } } // after waking up check the subscription if (innerScheduler.isUnsubscribed()) { return; } underlying.call(); }}
3. trampoline
跟immediate类似,只是,trampoline并非立即执行,而是加入到队列中。
@Overridepublic Subscription schedule(Action0 action, long delayTime, TimeUnit unit) { long execTime = now() + unit.toMillis(delayTime); return enqueue(new SleepingAction(action, this, execTime), execTime);}
4. newThread
每次都是创建一个但线程池来执行任务。
/* package */public NewThreadWorker(ThreadFactory threadFactory) { ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory); // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak boolean cancelSupported = tryEnableCancelPolicy(exec); if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) { registerExecutor((ScheduledThreadPoolExecutor)exec); } executor = exec;}
5. IO
每次都新创建一个线程或者利用已经回收的线程。适合执行io等耗时的操作。
6. Executor接口
public static Scheduler from(Executor executor) { return new ExecutorScheduler(executor);}
利用Executor接口的线程池子实现创建新的线程。
几种典型的方法
1. amb
选择最先到达的主题执行,其他的主题,会直接调用unsubscribe方法。
public staticOnSubscribe amb(final Iterable > sources) { return new OnSubscribeAmb (sources); }
再看下OnSubscribeAmb的call方法。
public void call(final Subscriber subscriber) { final Selectionselection = new Selection (); subscriber.add(Subscriptions.create(new Action0() { @Override public void call() { AmbSubscriber c; if ((c = selection.get()) != null) { c.unsubscribe(); } unsubscribeAmbSubscribers(selection.ambSubscribers); } })); for (Observable source : sources) { if (subscriber.isUnsubscribed()) { break; } AmbSubscriber ambSubscriber = new AmbSubscriber (0, subscriber, selection); selection.ambSubscribers.add(ambSubscriber); AmbSubscriber c; if ((c = selection.get()) != null) { // Already chose one, the rest can be skipped and we can clean up selection.unsubscribeOthers(c); return; } source.unsafeSubscribe(ambSubscriber); } if (subscriber.isUnsubscribed()) { unsubscribeAmbSubscribers(selection.ambSubscribers); } }
在迭代sources时,会添加一个观察者,所以关键方法就为AmbSuber对象。AmbSubscriber的onNext方法
@Overridepublic void onNext(T t) { if (!isSelected()) { return; } subscriber.onNext(t);}
private boolean isSelected() { if (chosen) { return true; } if (selection.get() == this) { // fast-path chosen = true; return true; } else { if (selection.compareAndSet(null, this)) { selection.unsubscribeOthers(this); chosen = true; return true; } else { // we lost so unsubscribe ... and force cleanup again due to possible race conditions selection.unsubscribeLosers(); return false; } }}
所以最先到达的Observable主题,会被选择为当前执行的主题,缓存在selection中。
2. combineLatest
public staticObservable combineLatest(Iterable > sources, FuncN combineFunction) { return create(new OnSubscribeCombineLatest (sources, combineFunction));}
主题对象为OnSubscribleCombineLatest,在call方法中最为关键的是LatestCoordinator的构造及调用
LatestCoordinatorlc = new LatestCoordinator (s, combiner, count, bufferSize, delayError); lc.subscribe(sources);
public void subscribe(Observable [] sources) { Subscriber[] as = subscribers; int len = as.length; for (int i = 0; i < len; i++) { as[i] = new CombinerSubscriber (this, i); } lazySet(0); // release array contents actual.add(this); actual.setProducer(this); for (int i = 0; i < len; i++) { if (cancelled) { return; } ((Observable )sources[i]).subscribe(as[i]); } }
对传入的每一个主题都注册了一个观察这ComminerSubscriber。其关键方法为onNext方法。
static final class CombinerSubscriberextends Subscriber { final LatestCoordinator parent; final int index; final NotificationLite nl; boolean done; public CombinerSubscriber(LatestCoordinator parent, int index) { this.parent = parent; this.index = index; this.nl = NotificationLite.instance(); request(parent.bufferSize); } @Override public void onNext(T t) { if (done) { return; } parent.combine(nl.next(t), index); } @Override public void onError(Throwable t) { if (done) { RxJavaHooks.onError(t); return; } parent.onError(t); done = true; parent.combine(null, index); } @Override public void onCompleted() { if (done) { return; } done = true; parent.combine(null, index); } public void requestMore(long n) { request(n); } }
所以,在onNext会调用parent.combine方法。即LatestCoordinator的compine方法。combie方法会在队列中入队last数组信息。每次调用combine都会修改last[i]的值。
if (value == null) { complete = ++completedCount;} else { latest[index] = combinerSubscriber.nl.getValue(value);}
只有在每一个主题对象都产生主题后,才能执行回调方法,FuncN的call方法。
第一次执行call方法是在last数组中都有值后。if (!allSourcesFinished && value != null) { combinerSubscriber.requestMore(1); return;}drain();
3.concat
在第一个主题调用complete后执行第二个主题,依次类推。4.deffer
主题对象为动态生成的,由Func0的call方法动态生成。public staticObservable defer(Func0 > observableFactory) { return create(new OnSubscribeDefer (observableFactory));}
public final class OnSubscribeDeferimplements OnSubscribe { final Func0 > observableFactory; public OnSubscribeDefer(Func0 > observableFactory) { this.observableFactory = observableFactory; } @Override public void call(final Subscriber s) { Observable o; try { o = observableFactory.call(); } catch (Throwable t) { Exceptions.throwOrReport(t, s); return; } o.unsafeSubscribe(Subscribers.wrap(s)); }}
5.from(future)
public staticObservable from(Future future) {return (Observable )create(OnSubscribeToObservableFuture.toObservableFuture(future)); }
public staticOnSubscribe toObservableFuture(final Future that)return new ToObservableFuture (that);}
主题对象由future.get()方法生成。
6.lift(Operator)
可以理解为给主题对象动态生成代理观察者。
public finalObservable lift(final Operator operator) {return create(new OnSubscribeLift (onSubscribe,operator));}
public final class OnSubscribeLiftimplements OnSubscribe {... @Override public void call(Subscriber o) { try { Subscriber st = RxJavaHooks.onObservableLift(operator).call(o); try {.... st.onStart(); parent.call(st); } catch (Throwable e) {.... } } catch (Throwable e) { Exceptions.throwIfFatal(e); // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } }}
7. merge
8. range
public static Observablerange(int start, int count) { if (count < 0) { throw new IllegalArgumentException("Count can not be negative"); } if (count == 0) { return Observable.empty(); } if (start > Integer.MAX_VALUE - count + 1) { throw new IllegalArgumentException("start + count can not exceed Integer.MAX_VALUE"); } if(count == 1) { return Observable.just(start); } return Observable.create(new OnSubscribeRange(start, start + (count - 1))); }
主题对象为OnSubscribeRange,产生小心从start,直到end,每次+1
void fastpath() { final long endIndex = this.endOfRange + 1L; final Subscriber childSubscriber = this.childSubscriber; for (long index = currentIndex; index != endIndex; index++) { if (childSubscriber.isUnsubscribed()) { return; } childSubscriber.onNext((int) index); } if (!childSubscriber.isUnsubscribed()) { childSubscriber.onCompleted(); } } }
9.switchOnNext
用后来产生的主题代替前一个主题。void emit(T value, InnerSubscriberinner) { synchronized (this) { if (index.get() != inner.id) { return; } queue.offer(inner, nl.next(value)); } drain();}
每次来一个主题事件,index会+1,使用index.incrementAndGet(),
在并发操作的时候,index.get()有可能不等于inner.id,那么这个index肯定不是最新的,switchOnNext永远会使用最新的通知。在drain()中会调用真实的通知方法。10.using
public staticObservable using( final Func0 resourceFactory, final Func1 > observableFactory, final Action1 disposeAction, boolean disposeEagerly) { return create(new OnSubscribeUsing (resourceFactory, observableFactory, disposeAction, disposeEagerly)); }
关键主题对象为OnSubscribeUsing,根据observableFactory的call方法创建Observable,可创建基于外部资源文件关联的主题对象。如网络请求资源等。
11.zip
public staticObservable zip(Iterable > ws, FuncN zipFunction) { List > os = new ArrayList >(); for (Observable o : ws) { os.add(o); } return Observable.just(os.toArray(new Observable [os.size()])).lift(new OperatorZip (zipFunction)); }
在每一个主题对象都产生主题后,会调用zipFunction,关键为OperatorZip,其关键对象为zip的start方法。
public void start(@SuppressWarnings("rawtypes") Observable[] os, AtomicLong requested) { final Object[] subscribers = new Object[os.length]; for (int i = 0; i < os.length; i++) { InnerSubscriber io = new InnerSubscriber(); subscribers[i] = io; childSubscription.add(io); } this.requested = requested; this.subscribers = subscribers; // full memory barrier: release all above for (int i = 0; i < os.length; i++) { os[i].unsafeSubscribe((InnerSubscriber) subscribers[i]); }}
12. all
所有主题都是否都满足条件,如果一个主题不满足,则立马发送一个false的消息,如果都满足条件,则发送一个true的消息。Subscribers = new Subscriber () { boolean done; @Override public void onNext(T t) { Boolean result; try { result = predicate.call(t); } catch (Throwable e) { Exceptions.throwOrReport(e, this, t); return; } if (!result && !done) { done = true; producer.setValue(false); unsubscribe(); } } @Override public void onError(Throwable e) { child.onError(e); } @Override public void onCompleted() { if (!done) { done = true; producer.setValue(true); }}
13. buffer(Func0<? extends Observable<? extends TClosing>> bufferClosingSelector)
自定义调用close的方式,将主题元素缓存在list中,在调用发送closing主题的时候,将list的元素一起响应。public BufferingSubscriber(Subscriber > child) { this.child = child; this.chunk = new ArrayList(initialCapacity);}
chunk为缓存的list。
14. buffer()->withSize
public Subscriber call(final Subscriber > child) { if (skip == count) { BufferExactparent = new BufferExact (child, count); child.add(parent); child.setProducer(parent.createProducer()); return parent; } if (skip > count) { BufferSkip parent = new BufferSkip (child, count, skip); child.add(parent); child.setProducer(parent.createProducer()); return parent; } BufferOverlap parent = new BufferOverlap (child, count, skip); child.add(parent); child.setProducer(parent.createProducer()); return parent;}
count=skip时,使用BufferExact,当缓存的数据到达count时,调用真实调用next方法,将缓存的内容同时通知给观察者。
count<skip时,使用BufferSkip,当index=skip时重新创建buffer,老的buffer会被jvm回收。当buffer的大小跟count相等时,真实调用onnext方法。响应缓存中的所有通知。 当skip<count时。使用BufferOverlap,实现使用队列插入缓存因子。当达到缓存count时会出队,真实执行通知缓存中的所有值。一言以蔽之,skip时跳到某个值,所以,会出现重复的因子。15. buffer()->withTimer
内部使用OperatorBufferWithTime,作为定义的操作。public final Observable
> buffer(long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) { return lift(new OperatorBufferWithTime (timespan, timeshift, unit, Integer.MAX_VALUE, scheduler));}
间隔timespan会触发真实的通知操作,间隔timeshift会创建新的缓存。
当timespan=timeshift时,使用ExactSubscriber。if (timespan == timeshift) { ExactSubscriber bsub = new ExactSubscriber(serialized, inner); bsub.add(inner); child.add(bsub); bsub.scheduleExact(); return bsub; }
void scheduleExact() { inner.schedulePeriodically(new Action0() { @Override public void call() { emit(); } }, timespan, timespan, unit);}
在固定的间隔周期会发布通知。
当timespan!=timeshift,使用InexactSubscriber,在间隔timespan会创发布通知,间隔timeshift会创建新的缓存空间。16. buffer()->open-close,自定义打开缓存和触发通知操作
public finalObservable
> buffer(Observable bufferOpenings, Func1 > bufferClosingSelector) { return lift(new OperatorBufferWithStartEndObservable (bufferOpenings, bufferClosingSelector)); }
17. buffer()->withboundray,自定义发送通知操作
public final Observable
> buffer(Observable boundary, int initialCapacity) { return lift(new OperatorBufferWithSingleObservable (boundary, initialCapacity)); }
public OperatorBufferWithSingleObservable(final Observable bufferClosing, int initialCapacity) { this.bufferClosingSelector = new Func0>() { @Override public Observable call() { return bufferClosing; } }; this.initialCapacity = initialCapacity; }
18. cache,缓存发送过的消息
产生消息:只有一个入口产生消息。public staticCachedObservable from(Observable source, int capacityHint) { if (capacityHint < 1) { throw new IllegalArgumentException("capacityHint > 0 required"); } CacheState state = new CacheState (source, capacityHint); CachedSubscribe onSubscribe = new CachedSubscribe (state); return new CachedObservable (onSubscribe, state); }
关键产生主题对象为CachedSubscrible,和CacheState。产生主题后,会将结果缓存在CacheState中,包括onNext,onError,onComplete。
而每新增加一个观察者,会响应缓存在CacheState中的所有对象。19. cast,类型转换
20. collect,值相加
public finalObservable collect(Func0 stateFactory, final Action2 collector) { Func2 accumulator = InternalObservableUtils.createCollectorCaller(collector); return lift(new OperatorScan (stateFactory, accumulator)).last(); }
其中,stateFactory为初始值生成器,而collector操作因子。
21. concatMap
22. debounce
23. groupby
key = keySelector.call(t);v = valueSelector.call(t);group.onNext(v);
24. join
引入了笛卡尔积的概念,left按照既定平率产生元素,right每产生一个元素,就与left中产生的所有值做join操作,及resultSelector产生的值。25. single,获取单值
@Override public void onNext(T value) { if (hasTooManyElements) { return; } else if (isNonEmpty) { hasTooManyElements = true; child.onError(new IllegalArgumentException("Sequence contains too many elements")); unsubscribe(); } else { this.value = value; isNonEmpty = true; } }
26.几个错误处理的方式,发生错误时,执行新的主题对象
public staticOperatorOnErrorResumeNextViaFunction withSingle(final Func1 resumeFunction) { return new OperatorOnErrorResumeNextViaFunction (new Func1 >() { @Override public Observable call(Throwable t) { return Observable.just(resumeFunction.call(t)); } }); } public static OperatorOnErrorResumeNextViaFunction withOther(final Observable other) { return new OperatorOnErrorResumeNextViaFunction (new Func1 >() { @Override public Observable call(Throwable t) { return other; } }); } public static OperatorOnErrorResumeNextViaFunction withException(final Observable other) { return new OperatorOnErrorResumeNextViaFunction (new Func1 >() { @Override public Observable call(Throwable t) { if (t instanceof Exception) { return other; } return Observable.error(t); } }); }