Archive for April, 2015

Android, Rx and Kotlin: part 3

This is part three of my Rx – Android – Kotlin series (part 1, part 2).

This article has three parts, much like a magic trick:

  • The Pledge: we will implement a simple GUI operation with Rx and Kotlin on Android.
  • The Turn: we will create a brand new Rx operator.
  • The Prestige: we will use a standard feature of Kotlin for a final coup de grâce in code elegance.

Let’s get started.

The Pledge

We want to detect triple taps. There are various ways to define what a triple tap is so let’s just settle on a simple one: three taps are considered a triple tap if the first and last tap are separated by less than five hundred milliseconds.

With a traditional (non Rx) implementation, you would probably have two fields maintaining the timestamps of two taps ago [t-2] and that from one tap ago [t-1]. When a new tap arrives, [t], you compare the timestamp of [t] and [t-2] and if it’s less than five hundred milliseconds, we have a triple tap. Then you shift down the timestamps: [t-2] receives [t-1] and [t-1] receives the current timestamp. Repeat.

This approach is pretty standard but it’s also ugly: it bleeds state into your class and it allows for little flexibility in terms of filtering, transforming and composing.

Let’s look at this problem from an Rx perspective.

The Turn

As I explained in the first article of this series, we need to change our perspective to find out how to implement this with Rx. We are now looking for emitters and consumers of events.

The emitter is pretty obvious: it’s a tap. On Android, we can just create an Observable from a View with the convenient RxAndroid project. Once we receive the tap events from the view, we need to use an operator which will pass them in sets of threes. This is sometimes referred to as a “sliding window”. For example, if you have an Observable that emits the numbers from 0 to 5, you would expect a sliding window of size 3 to receive the following parameters:

[0, 1, 2]
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]

Any list that is not of size three is ignored. Then, whenever our operator is notified, we simply extract the timestamps of the first and last index of that list and we see if the difference is less than five hundred milliseconds.

For the sake of learning, we’ll assume that such an operator doesn’t already exist and we’ll implement it ourselves (as it turns out, this is not true, jump to the bottom of this article if you want to find out).

Implementing an Rx operator

I don’t want to dive too deep into the internals of how Rx works so I’ll just focus on the important parts here. Fundamentally, an operator is like a Callable: it receives parameters and it returns a value. That value is a Subscriber which needs to implement methods you are probably familiar with now after a few hours spent on Rx: onNext(), onCompleted() and onError().

We’re just going to focus on onNext() for now in order to keep this article short and because this is where our main logic resides. The implementation is pretty simple:

class SwSubscriber<T> (val child: Subscriber<in List<T>>, val size: Int)
        : Subscriber<T>(child)
{
    val buffer = ArrayList<T>(size)
    override fun onNext(t: T) {
        buffer.add(t)
        if (buffer.size() == size) {
            child.onNext(ArrayList(buffer))
            buffer.remove(0)
        }
    }
    override fun onCompleted() { /* ... */ }
    override fun onError(e: Throwable?) { /* ... */ }
}

We maintain a buffer of size three and we accumulate the elements from the Observable in it. When that buffer reaches the size three, we emit it and we shift the buffer. Note that when I call onNext() on the buffer, I pass a copy of the list and not the list itself. This is just a precaution against the fact that the subscriber might decide to alter the list I pass them, which would break our operator.

Now that we have a subscriber, we can create our operator:

class SwOperator<T>(val n: Int) : Observable.Operator<List<T>, T> {
    override fun call(child: Subscriber<in List<T>>): Subscriber<in T>? =
            SwSubscriber(child, n)
}

And this is how we use it:

Observable.just(1, 2, 3, 4, 5)
    .lift(SwOperator(3))
    .subscribe { println("Received ${it}") }

… which prints:

Received [1, 2, 3]
Received [2, 3, 4]
Received [3, 4, 5]

The Prestige

We’re not quite done yet: you might have noticed that because our new function is, obviously, not part of the Observable class, we need to use a helper method called lift() in order to insert our code in a composition chain. Of course, we can always send a pull request to the project to have our new operator added, but we don’t have to wait: we can define an extension method in Kotlin to do just that:

fun <T> Observable<T>.slidingWindow(n: Int) =
    lift(SwOperator(n))

And now we can write:

Observable.just(1, 2, 3, 4, 5)
    .slidingWindow(3))
    .subscribe { println("Received ${it}")

Our new operator looks just as if it were part of the Rx library.

Android

Let’s close the loop on the initial problem so we can move on to more interesting things: now we want to use our sliding window to report triple taps.

Using RxAndroid, we have the convenient clicks() method that turns taps from a View into a stream of events. Unfortunately, these events are simple wrappers that don’t carry with them the timestamp of the tap, but that’s not really a problem: we just need to convert our stream of events into a stream of timestamps and we can add our logic then:

    ViewObservable.clicks(view)
	    .map { System.currentTimeMillis() }
	    .slidingWindow(3)
        .filter { it.get(2) - it.get(0) < 500 }
        .subscribe { println("Triple tap!") }
)

This is another example of the power of composition: no need to create our own events or our own wrappers. When you look at problems in the form of streams of data that you can transform at will, a lot of problems become trivial.

Rx and the sliding window

Note that Rx already has the functionality we implemented in this article: it’s called buffer() and you pass it not just the window size but also by how many elements you want to skip. buffer() behaves a bit differently from our implementation, though, by notifying subscribers of windows that are smaller than the desired size:

        Observable.just(1, 2, 3, 4, 5)
            .buffer(3, 1)
            .subscribe { p("Received ${it}")

prints:

[main] Received [1, 2, 3]
[main] Received [2, 3, 4]
[main] Received [3, 4, 5]
[main] Received [4, 5]
[main] Received [5]

Wrapping up

The concept of a sliding window is very common in all domains. Detecting multiple taps in a user interface is an example but you could also conceive using this to determine the bounding box of multiple taps. Stepping out from the graphical domain, your stream could contain geo locations and you could use the sliding window to determine instant or average speed. A few years ago, I actually posed a coding challenge to implement a sliding window in order to throttle calls to a server. Needless to say, I would use Rx for this today.

Hopefully, this article gave you some insight on the benefits that Rx can bring to your application, regardless of the domain. Revisiting problems with the Rx philosophy is always an interesting exercise which often results in simpler, more elegant solutions.

Read part 1, part 2.

Android, Rx and Kotlin: part 2

I haven’t been quite honest with you my previous post: the code I showed in the article doesn’t exactly result in the short video of the application at the top of the article.

If you run the code as is, you will notice something very irritating (and unacceptable in any application): whenever the app is pretending to make a network call, the entire user interface freezes for a second. You can’t type anything and the loading icon stops spinning. This is the classic symptom of blocking the main thread. You will remember that I am simulating network calls by simply sleeping for a little while, and obviously, if you do this on the main thread, you will freeze your UI.

By default, Rx runs everything on your current thread, which is the main thread in Android: the thread that is in charge of updating your user interface. Android is exactly like most graphical toolkits: you should only use the main thread to update your UI but anything else you do (network or file system access, computations, database updates, etc…) needs to be done on a background thread. Rx has a very good solution to this problem.

Threading

Until recently, AsyncTask was the recommended way of performing this kind of task: by creating and executing an AsyncTask, you can run your code in two locations, one that will be run in a background thread (doInBackground()) and once that task completes, code that will run on the main thread (onPostExecute()).

AsyncTask has a troubled past and it has evolved quite a bit over the many revisions of the Android API: first it was single threaded, then it became multithreaded and more recently, it’s running in the background on one thread in an attempt to provide both parallelism and sequencing at the same time. If you need more information about AsyncTask, this article explains how it evolved.

This is not the only issue with AsyncTask: it’s also fairly challenging to get its behavior right while going through configuration changes or the possibility of your activity being paused or destroyed while the task is still running.

Rx offers a few solutions to some of these problems, but not all.

Threading and Rx

Rx offers two methods to control your threading model: subscribeOn() and observeOn().

In a nutshell, observeOn() defines what thread your observer will run on (this is where you usually do the work) and subscribeOn() defines the thread where your operators will run (map(), filter(), etc…).

The parameter you give to these methods is a Scheduler, an Rx abstraction that encapsulates a thread. Rx defines a few standard ones:

  • Schedulers.computation(): When you are calculating something.
  • Schedulers.io(): When you are doing I/O (network, file system, database access, …).
  • And a few others I won’t get into here.

Additionally, RxAndroid defines the more Android-specific AndroidSchedulers.mainThread(), which is self explanatory.

A typical piece of code on Android is to run a few tasks in the background (network access, expensive computation, database update, etc…) and based on the result of that action, you update your UI. The way to implement this with Rx is straightforward: you subscribe on whichever background thread is more appropriate for your actions and you observe on the main thread:

trait Server {
    fun findUser(name: String) : Observable<JsonObject>
}
data class User(val id: String, val name: String)
fun p(s: String) {
    println("[${Thread.currentThread().getName()}] ${s}")
}
Observable.just("cedric")
    .subscribeOn(Schedulers.io())
    .flatMap {
    	p("Calling server.findUser");
    	server.findUser("cedric")
    }
    .map{jo ->
        p("Mapping to a User object")
    	User(jo.get("id").getAsString(),
             jo.get("name").getAsString())
     }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe{ u -> p("User: ${u}a") }

We start with a string (which could come from an EditText and we specify that we’ll be subscribing from the I/O thread. Then we call the server with that name (on the I/O thread), turn the JSON response into a User object and we print that object:

[IoThreadScheduler-1] Calling server.findUser
[IoThreadScheduler-1] Mapping to a User object
[Main] User: User(id=123, name=cedric)

Note that even though you can specify multiple subscribeOn, all the subscriptions will happen on the first scheduler (subsequent subscribeOn will be ignored). I’m not sure if this is by design or just an oversight, but it’s not really a problem in practice. If you ever want to subscribe on multiple schedulers, you can always make this happen in the body of your subscription itself (for example, in the example above, if the server call was actually using Retrofit, you would see it’s using its own thread pool to make that call).

And that’s about all there is to get started with thread management with Rx on Android. As you can see, structuring your code this way makes the intent and thread handling extremely clear and easy to trace through, much more so than with AsyncTask.

With the growing number of Android libraries adding support for Rx, it’s becoming even more trivial to use these libraries within this framework and combine them in straightforward yet powerful ways. You can see in the examples I used in this post and the previous one how Rx makes it trivial to combine network calls and GUI updates simply by the fact that Retrofit returns Observables. You should also take a look at SQLBrite, which wraps SQLiteOpenHelper in Observables to offer you similar flexibility but for database access.

Read part 1, part 3.