RxJava in Android Part-1

Arun Pandian M
9 min readMar 10, 2019

What is ReactiveX:

ReactiveX is a library for composing asynchronous and event-based programs by using Observable sequences.

It extends the observer pattern to support sequences of data and/or events and adds Operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.

Why RxJava?

At its core, RxJava simplifies development because it raises the level of abstraction around threading. That is, as a developer you don’t have to worry too much about the details of how to perform operations that should occur on different threads. This is particularly attractive since threading is challenging to get right and, if not correctly implemented, can cause some of the most difficult bugs to debug and fix.

RxJava is all about two key components: Observable and Observer. In addition to these, there are other things like Schedulers, Operators and Subscription

Observable: Observable is a data stream that does some work and emits data.

Observer: Observer is the counterpart of Observable. It receives the data emitted by Observable.

Subscription: The bonding between Observable and Observer is called a Subscription. There can be multiple Observers subscribed to a single Observable.

Operator / Transformation: Operators modifies the data emitted by Observable before an observer receives them.

Schedulers: Schedulers decides the thread on which Observable should emit the data and on which thread should receive the data i.e background thread, main thread etc.,

Types of Schedulers in RxJava:

  1. Schedulers.io() used to perform non-CPU-intensive operations like making network calls, reading disc/files, database operations etc., This maintains the pool of threads.
  2. AndroidSchedulers.mainThread() — This provides access to android Main Thread / UI Thread. Usually operations like updating UI, user interactions happen on this thread. We shouldn’t perform any intensive operations on this thread as it makes the app glitchy or ANR dialogue can be thrown.
  3. Schedulers.newThread() — Using this, a new thread will be created each time a task is scheduled. It’s usually suggested not to use scheduler unless there is a very long running operation. The threads created via newThread() won’t be reused.
  4. Schedulers.computation() — This scheduler can be used to perform CPU-intensive operations like processing huge data, bitmap processing etc., The number of threads created using this scheduler completely depends on number CPU cores available.
  5. Schedulers.immediate() — This scheduler executes the task immediately in a synchronous way by blocking the main thread.
  6. Schedulers.trampoline() — It executes the tasks in First In — First Out manner. All the scheduled tasks will be executed one by one by limiting the number of background threads to one.
  7. Schedulers.from() — This allows us to create a schedule from an executor by limiting the number of threads to be created. When the thread pool is occupied, tasks will be queued.

Dependency

You have to add the following dependency in-app level build.gradleto implement the Rx Java in Your project

/for basic Rx Java supportimplementation 'io.reactivex.rxjava2:rxjava:2.2.0'//it has the specific functionality of that androidimplementation 'io.reactivex.rxjava2:rxandroid:2.1.0'//room with Rx Java supportimplementation 'android.arch.persistence.room:rxjava2:1.1.1'

Below is the flow of steps required and objects needed to create components using RxJava.

  1. Create an Observable.
  2. Create an Observer implementing onNext, onComplete and onError methods.
  3. Subscribe Observer to Observable.
  4. After subscription, Observable will start emitting items.
  5. Observer takes action on the items.

RxJava Operators:

Operators allow you to create Observables by transforming, merging, filtering and grouping items emitted by other Observables. Rxjava2 Operators convert one Observable into another Observable which emits somehow modified items emitted by the source Observable.

Here I will give a basic example. For Operators, I referred android hive and mind works repo only. But I will tell some Operators which I checked. For detail about more Operators visit here

Without executing or implementing about Operators you can’t do anything about Rx Java.

You should first know about few Operators and only then you can use Rx Java efficiently.

Here I mention very few Operators but Rx Java has more than 300 Operators

Basic Example of RxJava Operators:

// Observables for emitting the stream of dataObservable<String> androidObservable = Observable.just("Arun", "Bala", "Chandru", "Jaison", "Ranjith");

Here I have used the just method to create an Observable for an emitting stream of data.

You have to write an Observer which is required to receive the data emitted by Observable.

private Observer<String> getDeveperObserver() {return new Observer<String>() {@Overridepublic void onSubscribe(Disposable d){Log.d(TAG, "onSubscribe");disposable=d;}@Overridepublic void onNext(String s) {Log.d(TAG, "Name: " + s);}@Overridepublic void onError(Throwable e) {Log.e(TAG, "onError: " + e.getMessage());}@Overridepublic void onComplete() {Log.d(TAG, "All items are emitted!");}};}
  • In Observer onNext(Object s) callback will be called for each item.
  • onError(Throwable e) callback will be called when there is an error
  • onComplete() callback will be called when all items are emitted
Observer<String> observerDeveloperName=getDeveperObserver();// operaters added to filter the dataandroidObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).filter(new Predicate<String>() {@Overridepublic boolean test(String s) throws Exception {return s.toLowerCase().startsWith("b");}}).subscribe(observerDeveloperName);

Here I used the filter operator to filter the data from emitted items by Observable and filter by filter operator and only filtered data reached the observer to observe the data.

RxJava in NetWorking:

The ReactiveX Observable model allows you to treat streams of asynchronous events with the same sort of simple, composable operations that you use for collections of data items like arrays. It frees you from tangled webs of callbacks and thereby makes your code more readable and less prone to bugs.

If we are using the Retrofit library to network calls then we have to add the following adapter library

//Rxjava adapter for retrofitimplementation 'com.squareup.retrofit2:adapter-rxjava2:2.2.0'

Retrofit instance:

Retrofit instance:public static Retrofit getRetrofitInstance(String baseUrl){if (retrofit == null) {retrofit = new retrofit2.Retrofit.Builder().baseUrl(baseUrl)// we have to add the RxJava Factory method for the Rxjava.addCallAdapterFactory(RxJava2CallAdapterFactory.create()).addConverterFactory(GsonConverterFactory.create()).build();}return retrofit;}

We have to create an API interface using Observable

@Headers({"Accept: application/json"})@POST("/graphql")Single<ResponseBody> makeGraphQlApiCall(@Header("deviceid")        String deviceid,@Body RequestBody requestBody);

Here single is like an Observable class but Observable emits the list of data here single emits one value

  • There are many types of Observable in RxJava, for example- Observable emits the stream of data but if there is no data it will return an error
  • Single Observable emits single item only so here I used the single Observable type
  • Maybe Observable item returns if data is there otherwise it won’t return any data
CompositeDisposable compositeDisposable=new CompositeDisposable();
  • CompositeDiasposal is a disposable container that can hold onto multiple other disposables and offers O(1) add and removal complexity.

Below things are normal things usually you did in the normal way

InterfaceService interfaceService = retrofitClient.create(InterfaceService.class);Gson gson = new Gson();RequestBody requestBody = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), gson.toJson(query));

Call the network in the following manner:

compositeDisposable.add(interfaceService.makeGraphQlApiCall("123",listbody).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ResponseBody>(){@Overridepublic void onSuccess(ResponseBody response){try{if (response!=null){//                    logd(TAG,response.raw().message());}catch (IOException io) {}}@Overridepublic void onError(Throwable e) {Log.e(TAG, "onError: " + e.getMessage());}}));

RX Java in DB:

Room with Observable makes it possible to display modified data in UI as it changes in the database.

RxJava can be a better option compared to LiveData when the transformation of resulting Observable from Room is required, as RxJava provides several Operators to perform transformations.

We have to add the following dependency for Room DB supports Rx Java

implementation 'android.arch.persistence.room:rxjava2:1.1.1'

Room DB DAO:

Data Access Objects are the main classes where we define our database interactions. we can include a variety of query methods. Here we have to define the Observable type of Rx Java to get the data. Because the main reason to use RxJava in Room DB is we can use several Operators to perform the transformation. As my understanding, we can define the following three Observable types

Here I didn’t detail explained about Room DB, I hope you all know about Room DB. If you not please know about the basic stuff of Room DB

Flowable, Maybe and Single

  • Flowable for emitting data when there is data in the database and every time data is updated in the database.
  • Maybe for emitting the data when there is no data, it won’t return any error if there data is there then it will return the data
  • Single it always return the single item if the database has the value

In my example:

I hope you already did the Room database instance singleton class for accessing the all DAO class from anywhere of the class

We have to do code change in Dao class and calling methods only for using Rx Java

StudentDao.java

@Daopublic interface StudentDao{@Query("Select * FROM Student")Maybe<List<Student>> getAllStudent();}

Here I used the Maybe Observable types. Because In my case the data may not be available or may be available so I am using the Maybe observable type.

From call the getAllStudent() method:

// for when we add multiple Observables, CompositeDisposable will help us to dispose all the Observables at the time of activity destroyed.CompositeDisposable compositeDisposable=new CompositeDisposable();AppDataBase   appDataBase=roomDbApp.getAppDataBase();public void getAllStudends(){compositeDisposable.add( appDataBase.studentDao().getAllStudent().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<List<Student>>() {@Overridepublic void accept(@io.reactivex.annotations.NonNull List<Student> students) throws Exception {studentList=students;}}));}

Uses of RxJava in Db:

  • We can modify or transforms the data of DB Objects
  • We need not to worry about threads
  • We can get the updated list when the DB get an updated list

RxJava in Pagination:

  • Easy to do the pagination
  • It has the onBackpressureDrop method which handles the big amount of list when users scrolls fast and get the list very fastly.
  • Rxjava always returns the response in the order of request queue so you need not to worry about pagination when the user scrolls.
  • To deliver a smooth user experience to our users without freezing the main thread, slowing them down and we don’t want to provide the jerky performance to our user

Subjects in RxJava:

Subject:

A Subject is a sort of bridge or proxy that is available in some implementations of ReactiveX that acts both as an Observer and as an Observable.

Use of Subjects:

The best way to use Subjects when you have an unknown number of sources and a single Observable. Subjects can be helpful for decoupling between Observable and Observers

For RXBus you have to get knowledge about the RX subject using the following links

Rx Bus:

When we implement the EventBus pattern with RxJava, we call it as RxBus.

To implement Rx bus we have to RXBus class like below:

public class RxBusUtil{PublishSubject<Object> bus;public RxBusUtil(){bus = PublishSubject.create();}/*** Subscribe to this Observable. On event, do something* e.g. replace a fragment*/public Observable<Object> toObservable(){return bus;}public void send(Object o){bus.onNext(o);}}

For Rx bus, we have to create the Object for an above class in the application class and send the event to any class.

How to send the event:

We can send the event like below here I sent the user object as an event

User user=new User();user.firstname="ARUN";user.lastname="PANDIAN";rxBusUtil.send(user);

We can subscribe the or get the events like below

final RxBusUtil rxBusUtil=new RxBusUtil();disposable.add(rxBusUtil.toObservable().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception{if (o instanceof User){//       tvEvent.setText("it is a user event");Log.d(TAG, "accept: ");}else{//    tvEvent.setText("it is a non user event");Log.d(TAG, "accept: No");}}}));

Callbacks vs RxBus:

  • As we know both are used for the event triggering or communication purpose.
  • If we use the same event over the application, we can use the Rx bus
  • For small or one-time message passing, we don’t need to use the Rx bus. It makes an unnecessary headache.

RX Java Uses:

Unitive: Queries in Rx are done in the same style as functional programmings, like java streams. In Rx, functional style transformations is used on event streams.

Extensible: RxJava can be extended with custom Operators. and RxJava offers all the extensibility.

Declarative: Functional transformations are read in a declarative way

Composable: RxJava Operators can be combined to produce more complicated operations

Transformative: RxJava Operators can transform one type of data to another, reducing, mapping or expanding streams as needed.

In the next post onwards, We will see all type of RxJava Operators and its use cases with the example.

You can get the sample code from my GitHub Repo.

Thanks for reading! If you enjoyed this story, let’s discuss it and before you go click the clap button and share to help others find it! Also, feel free to leave a comment below. Happy coding and have fun.

--

--

Arun Pandian M

Senior Android developer at FundsIndia, A time investor to learn new things about Programming. Currently in a relationship with Green Bug(Android).