What ObserveOn and SubscribeOn do in Reactive Programming

Both ObserveOn and SubscribeOn are about instruct the system to do something in some threads. But there are a little differences between them and it's vague one. This post will try to clarify them for you and let you use them correctly and understand the code when you read code written by someone else.

What's the difference between observe and subscribe

In Rx, observe and subscribe are two verbs, and obviously there are some difference here. The key to understand ObserveOn and SubscribeOn is to know the differences between the two verbs.

The first difference, you can find observeOn and subscribeOn method in the Observable instance, but you can only find subscribe method, there is no observe method.

When you do subscribe, you are actively triggering the emitting of the observable you are subscribe to. All the operations before the calling of subscribe just set up the system, put the callbacks to the right places waiting to be called but not called yet. By executing subscribe you pull the trigger and the system start running. Items are emitted one by one and go through different onNext method in different operators in the chain.

Here is a simple example, everything happens in current thread

 
    public static void syncProcess(){
 
        Integer[] numbers = {1,2 ,3,4};
        Observable<Integer> source = Observable.from(numbers);
 
        System.out.println("start operator chain");
 
 
        source = source.map(new Func1<Integer, Integer>(){
            @Override
            public Integer call(Integer i){
                System.out.println("Received " + i + " in first map operator call on thread " + Thread.currentThread().getName() + " id: " + Thread.currentThread().getId());
                return i + 3;
            }
        })
        .map(new Func1<Integer, Integer>(){
            @Override
            public Integer call(Integer i){
                System.out.println("Received " + i + " in second map operator call on thread " + Thread.currentThread().getName() + " id: " + Thread.currentThread().getId());
                return i + 4;
            }
        });
 
        System.out.println("system set up");
        source.subscribe(new Action1<Integer>(){
            public void call(Integer i){
                System.out.println("Received " + i + " in subscribe call on thread " + Thread.currentThread().getName() + " id: " + Thread.currentThread().getId());
                System.out.println(i);
            }
        });
        System.out.println("after subscribe");
    }
 
 

Output

 
 
start operator chain
system set up
Received 1 in first map operator call on thread main id: 1
Received 4 in second map operator call on thread main id: 1
Received 8 in subscribe call on thread main id: 1
8
Received 2 in first map operator call on thread main id: 1
Received 5 in second map operator call on thread main id: 1
Received 9 in subscribe call on thread main id: 1
9
Received 3 in first map operator call on thread main id: 1
Received 6 in second map operator call on thread main id: 1
Received 10 in subscribe call on thread main id: 1
10
Received 4 in first map operator call on thread main id: 1
Received 7 in second map operator call on thread main id: 1
Received 11 in subscribe call on thread main id: 1
11
after subscribe
 
 

Before subscribe returns, everything finished, and before the invocation of subscribe, nothing is done.

Now we can understand what subscribeOn means, it means do the subscribe or pull the trigger on another thread. Another thread will visit the observable instance which maybe referenced by the this keyword inside subscribe invocation.

It will affect up stream operators of the subscribeOn invocation because the subscribe happens before all of them.

Change the thread to do subscribe, that is subscribeOn another thread

 
    public static void changeSubscribeThread(){
 
        Integer[] numbers = {1,2 ,3,4};
        Observable<Integer> source = Observable.from(numbers);
 
        System.out.println("start operator chain");
 
 
        source = source.map(new Func1<Integer, Integer>(){
            @Override
            public Integer call(Integer i){
                System.out.println("Received " + i + " in first map operator call on thread " + Thread.currentThread().getName() + " id: " + Thread.currentThread().getId());
                return i + 3;
            }
        })
        .map(new Func1<Integer, Integer>(){
            @Override
            public Integer call(Integer i){
                System.out.println("Received " + i + " in second map operator call on thread " + Thread.currentThread().getName() + " id: " + Thread.currentThread().getId());
                return i + 4;
            }
        });
 
        System.out.println("system set up");
 
        source.subscribeOn(Schedulers.io())
        .subscribe(new Action1<Integer>(){
            public void call(Integer i){
                System.out.println("Received " + i + " in subscribe call on thread " + Thread.currentThread().getName() + " id: " + Thread.currentThread().getId());
                System.out.println(i);
            }
        });
        System.out.println("after subscribe " + Thread.currentThread().getName() + " id: " + Thread.currentThread().getId());
    }
 
 

The output

 
start operator chain
system set up
Received 1 in first map operator call on thread RxCachedThreadScheduler-1 id: 9
Received 4 in second map operator call on thread RxCachedThreadScheduler-1 id: 9
after subscribe main id: 1
Received 8 in subscribe call on thread RxCachedThreadScheduler-1 id: 9
8
Received 2 in first map operator call on thread RxCachedThreadScheduler-1 id: 9
Received 5 in second map operator call on thread RxCachedThreadScheduler-1 id: 9
Received 9 in subscribe call on thread RxCachedThreadScheduler-1 id: 9
9
Received 3 in first map operator call on thread RxCachedThreadScheduler-1 id: 9
Received 6 in second map operator call on thread RxCachedThreadScheduler-1 id: 9
Received 10 in subscribe call on thread RxCachedThreadScheduler-1 id: 9
10
Received 4 in first map operator call on thread RxCachedThreadScheduler-1 id: 9
Received 7 in second map operator call on thread RxCachedThreadScheduler-1 id: 9
Received 11 in subscribe call on thread RxCachedThreadScheduler-1 id: 9
11
 

The subscribe returns immediately in main thread, and at the same time, another thread start the emitting.

Now the observe

The point of observe is it's passive, it just accept notifications on the chain, of course, those notifications are emitted by the source observable. As the name indicated, an observer just sit there and observe things, it won't trigger anything else or make something happen. Obviously you can only observe after the subscribe.

So the observeOn means you want to do the observe on another thread. It affects the operations after the invocation of observeOn, from that point, the item passing happens on another thread.

Here is an example combines observeOn and subscribeOn

 
    public static void switchThread(){
 
        Integer[] numbers = {1,2 ,3,4};
        Observable<Integer> source = Observable.from(numbers);
 
        System.out.println("start operator chain");
 
        source.map(new Func1<Integer, Integer>(){
            @Override
            public Integer call(Integer i){
                System.out.println("Received " + i + " in first map operator call on thread " + Thread.currentThread().getName() + " id: " + Thread.currentThread().getId());
                return i + 3;
            }
        })
        .subscribeOn(Schedulers.immediate())
        .map(new Func1<Integer, Integer>(){
            @Override
            public Integer call(Integer i){
                System.out.println("Received " + i + " in second map operator call on thread " + Thread.currentThread().getName() + " id: " + Thread.currentThread().getId());
                return i + 4;
            }
        })
        .observeOn(Schedulers.computation())
         .map(new Func1<Integer, Integer>(){
            @Override
            public Integer call(Integer i){
                System.out.println("Received " + i + " in third map operator call on thread " + Thread.currentThread().getName() + " id: " + Thread.currentThread().getId());
                return i + 4;
            }
        })
        .subscribe(new Action1<Integer>(){
            public void call(Integer i){
                System.out.println("Received " + i + " in subscribe call on thread " + Thread.currentThread().getName() + " id: " + Thread.currentThread().getId());
                System.out.println(i);
            }
        });
    }
 
 

The output

 
 
start operator chain
Received 1 in first map operator call on thread main id: 1
Received 4 in second map operator call on thread main id: 1
Received 2 in first map operator call on thread main id: 1
Received 5 in second map operator call on thread main id: 1
Received 3 in first map operator call on thread main id: 1
Received 6 in second map operator call on thread main id: 1
Received 4 in first map operator call on thread main id: 1
Received 8 in third map operator call on thread RxComputationThreadPool-3 id: 10
Received 7 in second map operator call on thread main id: 1
Received 12 in subscribe call on thread RxComputationThreadPool-3 id: 10
12
Received 9 in third map operator call on thread RxComputationThreadPool-3 id: 10
Received 13 in subscribe call on thread RxComputationThreadPool-3 id: 10
13
Received 10 in third map operator call on thread RxComputationThreadPool-3 id: 10
Received 14 in subscribe call on thread RxComputationThreadPool-3 id: 10
14
Received 11 in third map operator call on thread RxComputationThreadPool-3 id: 10
Received 15 in subscribe call on thread RxComputationThreadPool-3 id: 10
15
 
 

Notice the third operator start accepting items after the first two finished consume the sequence. Makes it looks like there is a new observable start emitting on another thread. The pattern is not

 
op1 op2 op3
op1 op2 op3
...
 

but

 
op1 op2
op1 op2
op3
op3
 

Here is how to setup Gradle project in Eclipse and explore the RxJava library: Gradle Hello World project with RxJava