Reactive Programming How to use Subject

Put it simply, a subject is both an observer and an observable. This sound confusing, how to understand it? This post will shows you the usage of subject.

Being an observer means it can receive and push notifications. You can call the onNext method which pushes the notifications to next operator.

Subject being observer by implementing the Observer interface.

 
 
public interface Observer<T> {
 
    void onCompleted();
 
    void onError(Throwable e);
 
    void onNext(T t);
 
}
 
 

You are explicitly telling it to emit items. This is not the same as the normal observer, in a normal observer, it only accept notifications implicitly, you are not the one to call its onNext method, you don't write the code to call it, it's called implicitly after the chain was setup properly.

In a subject, you call the onNext method explicitly, you write the code.

 
subject.onNext(message);
 

It pushes the notifications to the next operator. Semantically, calling the onNext method means receive messages, that's what an observer do. But in this case, you are sending messages, or formally, publish. This is because the onNext method plays double roles: the first is receive messages, the messages is passed in through the parameter, but it will also forward the messages out to the next or subscribers.

So who's going to receive those forwarded items? It solves the problem by being an observable at the same time.

Being an observable means you can subscribe to it, register listeners. They will receive the items.

This makes it an idea tool to implement an event bus. You can register several handlers to it, and you post messages to it, the bus will forward the messages to each one of listeners. Each listener receives all messages. The can filter the message they want.

Here is an example

 
package com.makble.rxexamples;
 
import rx.Subscription;
import rx.functions.Action1;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
import rx.subscriptions.CompositeSubscription;
 
public class EventBus {
    private static final Subject<Object, Object> bus = new SerializedSubject<>(PublishSubject.create());
 
    private static CompositeSubscription subscriptions = new CompositeSubscription();
 
    public static void main (String [] args){
 
        setup();
 
        post(1);
        post(2);
        post(3);
        post(4);
 
        clearSubscriptions();
 
    }
 
    public static void post(Object o) {
        bus.onNext(o);
    }
 
    public static void setup() {
        Subscription sub1 = bus.subscribe(new Action1<Object>(){
                @Override
                public void call(Object obj) {
                    System.out.println("listener a received: " + obj);
                }
            });
 
         Subscription sub2 = bus.subscribe(new Action1<Object>(){
                @Override
                public void call(Object obj) {
                    if( (int)obj % 2 == 0){
                        System.out.println("listener b received: " + obj);
                    }
                }
             });
 
         subscriptions.add(sub1);
         subscriptions.add(sub2);
 
 
    }
 
    public static void clearSubscriptions() {
        System.out.println("clearing");
        subscriptions.clear();
    }
 
}
 
 

Output

 
listener a received: 1
listener a received: 2
listener b received: 2
listener a received: 3
listener a received: 4
listener b received: 4
clearing
 

Here is how to setup Gradle project in Eclipse and explore the RxJava library: Gradle Hello World project with RxJava

The pattern illustrated here is Pub/Sub pattern. Look at this typical observable pattern example in What ObserveOn and SubscribeOn do in Reactive Programming. You can see the differences between them. Many people using the two names interchangeably but there are not the same thing.