Wrapping Realm with RxJava

In this blog entry I want to show how to use Realm and RxJava together. Realm is new mobile-first NoSQL database for Android. RxJava is a library for composing asynchronous and event-based programs by using observable sequences. I’ve used it as a part of my main libraries stack in Android development more than a year. If you’re not familiar with it check out Grokking RxJava series.

Defining Realm models

In general working with Realm is operating instances of Realm and RealmObject. Realms are equivalent to databases and map to single file on disk. Realm objects can be retrieved from realms. And they also map some data in file. Every RealmObject is backed up by generated proxy class which overrides all getters and setters.

For example, simplified model of GitHub issue:

    public class RealmIssue extends RealmObject {
        private String title;
        private String body;
        private RealmUser user;
        private RealmList<RealmLabel> labels;

        // standard getters and setters..
    }

    public class RealmUser extends RealmObject {
        private String login;

        // standard getters and setters..
    }

    public class RealmLabel extends RealmObject {
        private String name;
        private String color;

        // standard getters and setters..
    }

Asynchronous access

RxJava has a great feature - the ability to pass objects across observer’s and subscriber’s threads. An asynchronous database query becomes very simple by using observeOn and subscribeOn. However, Realm has a strong restriction: Realm, RealmObject or RealmResults instances cannot be passed across threads. This leads us to following rules of using Realm with RxJava:

  • RealmObjects must be used at non-UI thread;
  • RealmObjects are mapped to immutable UI objects before observing using Observable.map(...).

Until Realm not comes with internal support of RxJava, we should follow these rules. UI objects are immutable POJOs with the same set of fields as Realm objects.

    public class Issue {
        private final String title;
        private final String body;
        private final User user;
        private final List<Label> labels;

        public Issue(String title, String body, User user, List<Label> labels) {
            this.title = title;
            this.body = body;
            this.user = user;
            this.labels = labels;
        }

        // getters..
    }

    public class User {
        private final String login;

        public User(String login) {
            this.login = login;
        }

        // getters..
    }

    public class Label {
        private final String name;
        private final String color;

        public Label(String name, String color) {
            this.name = name;
            this.color = color;
        }
        
        // getters..
    }

Observing Realm

Let’s imagine what RealmObservable should look like:

  • execute atomic operations: read, write, delete or combined;
  • control single Realm instance: creation and closing;
  • configurable by user defined function with Realm instance as argument.

Realm supports these transactions: common begin, commit, cancel flow. They well fit the atomicity we need.

Here we are implementing Observable.OnSubscribe for RealmObject subclass that follows Observable contract:

    public abstract class OnSubscribeRealm<T extends RealmObject> implements Observable.OnSubscribe<T> {
        private Context context;
        private String fileName;

        public OnSubscribeRealm(Context context) {
            this(context, null);
        }

        public OnSubscribeRealm(Context context, String fileName) {
            this.context = context.getApplicationContext();
            this.fileName = fileName;
        }

        @Override
        public void call(final Subscriber<? super T> subscriber) {
            final Realm realm = fileName != null ? Realm.getInstance(context, fileName) : Realm.getInstance(context);
            subscriber.add(Subscriptions.create(new Action0() {
                @Override
                public void call() {
                    try {
                        realm.close();
                    } catch (RealmException ex) {
                        subscriber.onError(ex);
                    }
                }
            }));

            T object;
            realm.beginTransaction();
            try {
                object = get(realm);
                realm.commitTransaction();
            } catch (RuntimeException e) {
                realm.cancelTransaction();
                subscriber.onError(new RealmException("Error during transaction.", e));
                return;
            } catch (Error e) {
                realm.cancelTransaction();
                subscriber.onError(e);
                return;
            }
            if (object != null) {
                subscriber.onNext(object);
            }
            subscriber.onCompleted();
        }

        public abstract T get(Realm realm);
    }

Implementations for RealmResults and RealmList are the same. Here’s a helper class that simplifies usage of these functions providing static methods with Func1<Realm, T> as argument:

    public final class RealmObservable {
        private RealmObservable() {
        }

        public static <T extends RealmObject> Observable<T> object(Context context, final Func1<Realm, T> function) {
            return Observable.create(new OnSubscribeRealm<T>(context) {
                @Override
                public T get(Realm realm) {
                    return function.call(realm);
                }
            });
        }

        public static <T extends RealmObject> Observable<T> object(Context context, String fileName, final Func1<Realm, T> function) {
            return Observable.create(new OnSubscribeRealm<T>(context, fileName) {
                @Override
                public T get(Realm realm) {
                    return function.call(realm);
                }
            });
        }
    }

Data Service

Let’s define simple DataService interface for operating with issues in database.

    public interface DataService {
        public Observable<List<Issue>> issues();
        public Observable<Issue> newIssue(String title, String body, User user, List<Label> labels);
    }

And implement it using RealmObsevables:

    public class RealmDataService implements DataService {
        private final Context context;

        public RealmDataService(Context context) {
            this.context = context.getApplicationContext();
        }

        @Override
        public Observable<Issue> newIssue(final String title, final String body, final User user, List<Label> labels) {
            // map internal UI objects to Realm objects
            final RealmUser realmUser = new RealmUser();
            realmUser.setLogin(user.getLogin());
            final RealmList<RealmLabel> realmLabels = new RealmList<RealmLabel>();
            for (Label label : labels) {
                RealmLabel realmLabel = new RealmLabel();
                realmLabel.setName(label.getName());
                realmLabel.setColor(label.getColor());
                realmLabels.add(realmLabel);
            }
            return RealmObservable.object(context, new Func1<Realm, RealmIssue>() {
                @Override
                public RealmIssue call(Realm realm) {
                    // internal object instances are not created by realm
                    // saving them using copyToRealm returning instance associated with realm
                    RealmUser user = realm.copyToRealm(realmUser);
                    RealmList<RealmLabel> labels = new RealmList<RealmLabel>();
                    for (RealmLabel realmLabel : realmLabels) {
                        labels.add(realm.copyToRealm(realmLabel));
                    }
                    // create RealmIssue instance and save it
                    RealmIssue issue = new RealmIssue();
                    issue.setTitle(title);
                    issue.setBody(body);
                    issue.setUser(user);
                    issue.setLabels(labels);
                    return realm.copyToRealm(issue);
                }
            }).map(new Func1<RealmIssue, Issue>() {
                @Override
                public Issue call(RealmIssue realmIssue) {
                    // map to UI object
                    return issueFromRealm(realmIssue);
                }
            });
        }

        private static Issue issueFromRealm(RealmIssue realmIssue) {
            final String title = realmIssue.getTitle();
            final String body = realmIssue.getBody();
            final User user = userFromRealm(realmIssue.getUser());
            final RealmList<RealmLabel> realmLabels = realmIssue.getLabels();
            final List<Label> labels = new ArrayList<>(realmLabels.size());
            for (RealmLabel realmLabel : realmLabels) {
                labels.add(labelFromRealm(realmLabel));
            }
            return new Issue(title, body, user, labels);
        }

        private static User userFromRealm(RealmUser realmUser) {
            return new User(realmUser.getLogin());
        }

        private static Label labelFromRealm(RealmLabel realmLabel) {
            return new Label(realmLabel.getName(), realmLabel.getColor());
        }
    }

Usage

RealmDataService can be used in Activity or Fragment:

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        dataService = new RealmDataService(this);
    }

    private void requestAllIssues() {
        Subscription subscription = dataService.issues().
            subscribeOn(Schedulers.io()).
            observeOn(AndroidSchedulers.mainThread()).
            subscribe(
                new Action1<List<Issue>>() {
                    @Override
                    public void call(List<Issue> issues) {
                        Log.d(TAG, "Issues received with size " + issues.size());
                    }
                },
                new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.e(TAG, "Request all issues error", throwable);
                    }
                }
            );
        if (compositeSubscription != null) {
            compositeSubscription.add(subscription);
        }
    }

    private void addNewIssue() {
        String title = "Feature request: removing issues";
        String body = "Add function to remove issues";
        User user = new User("kboyarshinov");
        List<Label> labels = new ArrayList<>();
        labels.add(new Label("feature", "FF5722"));
        Subscription subscription = dataService.newIssue(title, body, user, labels).
            subscribeOn(Schedulers.io()).
            observeOn(AndroidSchedulers.mainThread()).
            subscribe(
                new Action1<Issue>() {
                    @Override
                    public void call(Issue issue) {
                        Log.d(TAG, "Issue with title " + issue.getTitle() + " successfully saved");
                    }
                },
                new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.e(TAG, "Add new issue error", throwable);
                    }
                }
            );
        if (compositeSubscription != null) {
            compositeSubscription.add(subscription);
        }
    }

The full source code is avaivable at GitHub.

Final thoughts

Realm is good mobile database. It provides fast and simple way of storing data, without SQL. However, right now using it with RxJava requires some boilerplate - explicit creating UI objects and mapping them to Realm objects. It can be simplified using AutoValue. I hope that internal support will overcome any threading issues. Check out the full Realm documentation if you want to start using it. In my next blog post about Realm I intend to write about best practices in managing complex Realm objects, how to reach data integrity, avoid duplicates and redundancy.

UPDATE 17.12.2015

Realm finally gets native RxJava support. This example is deprecated since now.

Leave a Comment