RxJava Insight - Sample Operator

For the last 2 years I’ve used RxJava a lot. But even now I still find something new about it or get more understanding how it works. In this blog post I want to share one interesting thing I’ve found in sample operator behaviour. Its documentation states:

The Sample operator periodically looks at an Observable and emits whichever item it has most recently emitted since the previous sampling.

Sample operator is useful with time related operations. Also it helps to resolve backpressure issues with fast emitting observables. E.g. when you need to synchronise emissions between one slow observable and one fast.

It has two implementations:

  • time-based (accepts period and TimeUnit)

    Observable<T> sample(long period, TimeUnit unit);

  • Observable-based (accepts Observable that serves as a sampler):

    Observable<T> sample(Observable<U> sampler);

Earlier this week I was working with sample operator. And one of my observable’s emissions become lost after the sampling. In particular, the last one. I found that if observable emits item followed by onCompleted event, this item is not sampled by sampler.

After some digging I found that this behaviour is valid (discussions here and here) and even tested:

    private Observer<Long> observer;
    private Observer<Object> observer2;

    @Before
    public void before() {
        observer = mock(Observer.class);
        observer2 = mock(Observer.class);
    }

    @Test
    public void sampleWithSamplerEmitAndTerminate() {
        PublishSubject<Integer> source = PublishSubject.create();
        PublishSubject<Integer> sampler = PublishSubject.create();

        Observable<Integer> m = source.sample(sampler);
        m.subscribe(observer2);

        source.onNext(1);
        source.onNext(2);
        sampler.onNext(1);
        // item 3 is followed by onCompleted
        source.onNext(3);
        source.onCompleted();
        sampler.onNext(2);
        sampler.onCompleted();

        InOrder inOrder = inOrder(observer2);
        inOrder.verify(observer2, never()).onNext(1);
        inOrder.verify(observer2, times(1)).onNext(2);
        // item 3 never emitted
        inOrder.verify(observer2, never()).onNext(3);
        inOrder.verify(observer2, times(1)).onCompleted();
        inOrder.verify(observer2, never()).onNext(any());
        verify(observer, never()).onError(any(Throwable.class));
    }

Somehow, it originates in Rx.NET and still exists in RxJava. Yet, marble diagram for sample operator shows the required behaviour. Item 5 is emitted right before onCompleted and sampler samples both of them after it receives onCompleted event.

Here’s a test of the time-based implementation that fails if emission followed by onCompleted is lost

    private TestScheduler scheduler;
    private Scheduler.Worker innerScheduler;
    private Observer<Long> observer;

    @Before
    public void before() {
        scheduler = new TestScheduler();
        innerScheduler = scheduler.createWorker();
        observer = mock(Observer.class);
    }

    @Test
    public void sampleEmitAndTerminate() {
        Observable<Long> source = Observable.create(new OnSubscribe<Long>() {
            @Override
            public void call(final Subscriber<? super Long> observer1) {
                innerScheduler.schedule(new Action0() {
                    @Override
                    public void call() {
                        observer1.onNext(1L);
                    }
                }, 1, TimeUnit.SECONDS);
                innerScheduler.schedule(new Action0() {
                    @Override
                    public void call() {
                        // item 2L is followed by onCompleted
                        observer1.onNext(2L);
                        observer1.onCompleted();
                    }
                }, 2, TimeUnit.SECONDS);
            }
        });

        Observable<Long> sampled = source.sample(400L, TimeUnit.MILLISECONDS, scheduler);
        sampled.subscribe(observer);

        InOrder inOrder = inOrder(observer);

        scheduler.advanceTimeTo(2000L, TimeUnit.MILLISECONDS);
        inOrder.verify(observer, times(1)).onNext(1L);
        // item 2L should be emitted
        inOrder.verify(observer, times(1)).onNext(2L);
        verify(observer, times(1)).onCompleted();
        verify(observer, never()).onError(any(Throwable.class));
    }

I created an issue on Github to either fix marble diagram or change the operator to behave as diagram shows. For now I ended up using my local version of sample operator with slight changes I added (can be found in gist), because the last emission is the most important emission in those observables I’m working on.

Thanks for reading. Hope this information will be useful for you as it was for me.

UPDATE 30.03.2016

Fix for sample operator has been merged to RxJava 1.x branch with following changes in behavior:

  • OperatorSampleWithTime emits last stored value if it is set before onCompleted;
  • OperatorSampleWithObservable emits last sampled value if source or sample complete.

It will be available in next release.

UPDATE 09.04.2016

Fix is released in RxJava v1.1.3.

Tags:

Categories:

Updated:

Leave a Comment