RxSwift/Documentation/GettingStarted.md

29 KiB

Getting Started

This project tries to be consistent with ReactiveX.io. The general cross platform documentation and tutorials should also be valid in case of RxSwift.

  1. Observables aka Sequences
  2. Disposing
  3. Pipe operator
  4. Implicit Observable guarantees
  5. Creating your first Observable (aka sequence producers)
  6. Creating an Observable that performs work
  7. Sharing subscription, refCount and variable operator
  8. Operators
  9. Custom operators
  10. Error handling
  11. Debugging
  12. Debugging memory leaks
  13. UI layer tips
  14. Making HTTP requests
  15. Examples

Observables aka Sequences

Basics

Equivalence of observer pattern(Observable<Element>) and sequences (Generators) is one of the most important things to understand about Rx.

Observer pattern is needed because you want to model asynchronous behavior and that equivalence enables implementation of high level sequence operations as operators on Observables.

Sequences are a simple, familiar concept that is easy to visualize.

People are creatures with huge visual cortexes. When you can visualize something easily, it's a lot easier to reason about.

In that way you can lift a lot of the cognitive load from trying to simulate event state machines inside every Rx operator to high level operations over sequences.

If you don't use Rx and you model async systems, that probably means that your code is full of those state machines and transient states that you need to simulate instead of abstracting them away.

Lists/sequences are probably one of the first concepts mathematicians/programmers learn.

Here is a sequence of numbers

--1--2--3--4--5--6--| // it terminates normally

Here is another one with characters

--a--b--a--a--a---d---X // it terminates with error

Some sequences are finite, and some are infinite, like sequence of button taps

---tap-tap-------tap--->

These diagrams are called marble diagrams.

http://rxmarbles.com/

If we were to specify sequence grammar as regular expression it would look something like this

Next (Error | Completed)*

This describes the following:

  • sequences can have 0 or more elements
  • once an Error or Completed event is received, the sequence can't produce any other element

Sequences in Rx are described by a push interface (aka callback).

enum Event<Element>  {
    case Next(Element)      // next element of a sequence
    case Error(ErrorType)   // sequence failed with error
    case Completed          // sequence terminated successfully
}

class Observable<Element> {
    func subscribe(observer: Observer<Element>) -> Disposable
}

protocol ObserverType {
    func on(event: Event<Element>)
}

In case you are curious why ErrorType isn't generic, you can find explanation here.

So the only thing left on the table is Disposable.

protocol Disposable
{
    func dispose()
}

Disposing

There is one additional way an observed sequence can terminate. When you are done with a sequence and want to release all of the resources that were allocated to compute upcoming elements, calling dispose on a subscription will clean this up for you.

Here is an example with interval operator. Definition of >- operator is here

let subscription = interval(0.3, scheduler)
            >- subscribe { (e: Event<Int64>) in
                println(e)
            }

NSThread.sleepForTimeInterval(2)

subscription.dispose()

This will print:

0
1
2
3
4
5

One thing to note here is that you usually don't want to manually call dispose and this is only educational example. Calling dispose manually is usually bad code smell, and there are better ways to dispose subscriptions. You can either use DisposeBag, ScopedDispose, takeUntil operator or some other mechanism.

So can this code print something after dispose call executed? The answer is, it depends.

  • If the scheduler is serial scheduler (MainScheduler is serial scheduler) and dispose is called on on the same serial scheduler, then the answer is no.

  • otherwise yes.

You can find out more about schedulers here.

You simply have two processes happening in parallel.

  • one is producing elements
  • other is disposing subscription

When you think about it, the question can something be printed after doesn't even make sense in case those processes are on different schedulers.

A few more examples just to be sure (observeOn is explained here).

In case you have something like:

let subscription = interval(0.3, scheduler)
            >- observeOn(MainScheduler.sharedInstance)
            >- subscribe { (e: Event<Int64>) in
                println(e)
            }

// ....

subscription.dispose() // called from main thread

After dispose call returns, nothing will be printed. That is a guarantee.

Also in this case:

let subscription = interval(0.3, scheduler)
            >- observeOn(serialScheduler)
            >- subscribe { (e: Event<Int64>) in
                println(e)
            }

// ...

subscription.dispose() // executing on same `serialScheduler`

After dispose call returns, nothing will be printed. That is a guarantee.

Pipe operator

To understand following examples, you will need to understand what is pipe operator (>-).

Implicit Observable guarantees

There is also a couple of additional guarantees that all sequence producers (Observables) must honor.

It doesn't matter on which thread they produce elements but if they generate one element and send it to the observer observer.on(.Next(nextElement)), they can't send next element until observer.on method has finished execution.

Producers also cannot send terminating .Completed or .Error in case .Next event hasn't finished.

In short, consider this example:

someObservable
  >- subscribe { (e: Event<Element>) in
      println("Event processing started")
      // processing
      println("Event processing ended")
  }

this will always print:

Event processing started
Event processing ended
Event processing started
Event processing ended
Event processing started
Event processing ended

it can never print:

Event processing started
Event processing started
Event processing ended
Event processing ended

Creating your own Observable (aka sequence producers)

There is one crucial thing to understand about observables.

When an observable is created, it doesn't perform any work simply because it has been created.

It is true that Observable can generate elements in many ways. Some of them cause side effects and some of them tap into existing running processes like tapping into mouse events, etc.

But if you just call a method that returns an Observable, no sequence generation is performed, and there are no side effects. Observable is just a definition how the sequence is generated and what parameters are used for element generation. Sequence generation starts when subscribe method is called.

E.g. Let's say you have a method with similar prototype:

func searchWikipedia(searchTerm: String) -> Observable<Results> {}
let searchForMe = searchWikipedia("me")

// no requests are performed, no work is being done, no URL requests were fired

let cancel = searchForMe
  // sequence generation starts now, URL requests are fired
  >- subscribeNext { results in
      println(results)
  }

There are a lot of ways how you can create your own Observable sequence. Probably the easiest way is using create function.

Let's create a function which creates a sequence that returns one element upon subscription. That function is called 'just'.

This is the actual implementation

func myJust<E>(element: E) -> Observable<E> {
    return create { observer in
        sendNext(observer, element)
        sendCompleted(observer)
        return NopDisposable.instance
    }
}

myJust(0)
    >- subscribeNext { n in
      print(n)
    }

this will print:

0

Not bad. So what is the create function?

It's just a convenience method that enables you to easily implement subscribe method using Swift lambda function. Like subscribe method it takes one argument, observer, and returns disposable.

So what is the sendNext function?

It's just a convenient way of calling observer.on(.Next(RxBox(element))). The same is valid for sendCompleted(observer).

Sequence implemented this way is actually synchronous. It will generate elements and terminate before subscribe call returns disposable representing subscription. Because of that it doesn't really matter what disposable it returns, process of generating elements can't be interrupted.

When generating synchronous sequences, the usual disposable to return is singleton instance of NopDisposable.

Lets now create an observable that returns elements from an array.

This is the actual implementation

func myFrom<E>(sequence: [E]) -> Observable<E> {
    return create { observer in
        for element in sequence {
            sendNext(observer, element)
        }

        sendCompleted(observer)
        return NopDisposable.instance
    }
}

let stringCounter = myFrom(["first", "second"])

println("Started ----")

// first time
stringCounter
    >- subscribeNext { n in
        println(n)
    }

println("----")

// again
stringCounter
    >- subscribeNext { n in
        println(n)
    }

println("Ended ----")

This will print:

Started ----
first
second
----
first
second
Ended ----

Creating an Observable that performs work

Ok, now something more interesting. Let's create that interval operator that was used in previous examples.

This is equivalent of actual implementation for dispatch queue schedulers

func myInterval(interval: NSTimeInterval) -> Observable<Int> {
    return create { observer in
        println("Subscribed")
        let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)
        let timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue)

        var next = 0

        dispatch_source_set_timer(timer, 0, UInt64(interval * Double(NSEC_PER_SEC)), 0)
        let cancel = AnonymousDisposable {
            println("Disposed")
            dispatch_source_cancel(timer)
        }
        dispatch_source_set_event_handler(timer, {
            if cancel.disposed {
                return
            }
            sendNext(observer, next++)
        })
        dispatch_resume(timer)

        return cancel
    }
}
let counter = myInterval(0.1)

println("Started ----")

let subscription = counter
    >- subscribeNext { n in
       println(n)
    }

NSThread.sleepForTimeInterval(0.5)

subscription.dispose()

println("Ended ----")

This will print

Started ----
Subscribed
0
1
2
3
4
Disposed
Ended ----

What if you would write

let counter = myInterval(0.1)

println("Started ----")

let subscription1 = counter
    >- subscribeNext { n in
       println("First \(n)")
    }
let subscription2 = counter
    >- subscribeNext { n in
       println("Second \(n)")
    }

NSThread.sleepForTimeInterval(0.5)

subscription1.dispose()

NSThread.sleepForTimeInterval(0.5)

subscription2.dispose()

println("Ended ----")

this would print:

Started ----
Subscribed
Subscribed
First 0
Second 0
First 1
Second 1
First 2
Second 2
First 3
Second 3
First 4
Second 4
Disposed
Second 5
Second 6
Second 7
Second 8
Second 9
Disposed
Ended ----

Every subscriber upon subscription usually generates it's own separate sequence of elements. Operators are stateless by default. There is vastly more stateless operators then stateful ones.

Sharing subscription, refCount and variable operator

But what if you want multiple observers to share one subscription?

There are two things that need to be defined.

  • How to handle historical elements (replay latest only, replay all, replay last n)
  • How to control when to subscribe to shared sequence (refCount, manual or some other algorithm)

The usual choice is a combination of replay(1) >- refCount.

let counter = myInterval(0.1)
    >- replay(1)
    >- refCount

println("Started ----")

let subscription1 = counter
    >- subscribeNext { n in
       println("First \(n)")
    }
let subscription2 = counter
    >- subscribeNext { n in
       println("Second \(n)")
    }

NSThread.sleepForTimeInterval(0.5)

subscription1.dispose()

NSThread.sleepForTimeInterval(0.5)

subscription2.dispose()

println("Ended ----")

this will print

Started ----
Subscribed
First 0
Second 0
First 1
Second 1
First 2
Second 2
First 3
Second 3
First 4
Second 4
First 5
Second 5
Second 6
Second 7
Second 8
Second 9
Disposed
Ended ----

Notice how now there is only one Subscribed and Disposed event.

This pattern of sharing subscriptions is so common in UI layer that it has it's own operator. Instead of writing >- replay(1) >- refCount, you can just write >- variable.

Behavior for URL observables is equivalent.

This is how HTTP requests are wrapped in Rx. It's pretty much the same pattern like the interval operator.

extension NSURLSession {
    public func rx_response(request: NSURLRequest) -> Observable<(NSData!, NSURLResponse!)> {
        return create { observer in
            let task = self.dataTaskWithRequest(request) { (data, response, error) in
                if data == nil || response == nil {
                    sendError(observer, error ?? UnknownError)
                }
                else {
                    sendNext(observer, (data, response))
                    sendCompleted(observer)
                }
            }

            task.resume()

            return AnonymousDisposable {
                task.cancel()
            }
        }
    }
}

Operators

There are numerous operators implemented in RxSwift. The complete list can be found here.

Marble diagrams for all operators can be found on ReactiveX.io

Almost all operators are demonstrated in Playgrounds.

To use playgrounds please open Rx.xcworkspace, build RxSwift-OSX scheme and then open playgrounds in Rx.xcworkspace tree view.

In case you need an operator, and don't know how to find it there a decision tree of operators http://reactivex.io/documentation/operators.html#tree).

Supported RxSwift operators are also grouped by function they perform, so that can also help.

Custom operators

There are two ways how you can create custom operators.

Easy way

All of the internal code uses highly optimized versions of operators, so they aren't the best tutorial material. That's why it's highly encouraged to use standard operators.

Fortunately there is an easier way to create operators. Creating new operators is actually all about creating observables, and previous chapter already describes how to do that.

Lets see how an unoptimized map operator can be implemented.

func myMap<E, R>(transform: E -> R)(source: Observable<E>) -> Observable<R> {
    return create { observer in

        let subscription = source >- subscribe { e in
                switch e {
                case .Next(let boxedValue):
                    sendNext(observer, transform(boxedValue.value))
                case .Error(let error):
                    sendError(observer, error)
                case .Completed:
                    sendCompleted(observer)
                }
            }

        return subscription
    }
}

So now you can use your own map:

let subscription = myInterval(0.1)
    >- myMap { e in
        return "This is simply \(e)"
    }
    >- subscribeNext { n in
        println(n)
    }

and this will print

Subscribed
This is simply 0
This is simply 1
This is simply 2
This is simply 3
This is simply 4
This is simply 5
This is simply 6
This is simply 7
This is simply 8
...

Harder, more performant way

You can perform the same optimizations like we have made and create more performant operators. That usually isn't necessary, but it of course can be done.

Disclaimer: when taking this approach you are also taking a lot more responsibility when creating operators. You will need to make sure that sequence grammar is correct and be responsible of disposing subscriptions.

There are plenty of examples in RxSwift project how to do this. I would suggest talking a look at map or filter first.

Creating your own custom operators is tricky because you have to manually handle all of the chaos of error handling, asynchronous execution and disposal, but it's not rocket science either.

Every operator in Rx is just a factory for an observable. Returned observable usually contains information about source Observable and parameters that are needed to transform it.

In RxSwift code, almost all optimized Observables have a common parent called Producer. Returned observable serves as a proxy between subscribers and source observable. It usually performs these things:

  • on new subscription creates a sink that performs transformations
  • registers that sink as observer to source observable
  • on received events proxies transformed events to original observer

Life happens

So what if it's just too hard to solve some cases with custom operators? You can exit the Rx monad, perform actions in imperative world, and then tunnel results to Rx again using Subjects.

This isn't something that should be practiced often, and is a bad code smell, but you can do it.

  let magicBeings: Observable<MagicBeing> = summonFromMiddleEarth()

  magicBeings
    >- subscribeNext { being in     // exit the Rx monad  
        self.doSomeStateMagic(being)
    }
    >- disposeBag.addDisposable

  //
  //  Mess
  //
  let kitten = globalParty(   // calculate something in messy world
    being,
    UIApplication.delegate.dataSomething.attendees
  )
  sendNext(kittens, kitten)   // send result back to rx
  //
  // Another mess
  //

  let kittens = Variable<Kitten>(firstKitten) // again back in Rx monad

  kittens
    >- map { kitten in
      return kitten.purr()
    }
    // ....

Every time you do this, somebody will probably write this code somewhere

  kittens
    >- subscribeNext { kitten in
      // so something with kitten
    }
    >- disposeBag.addDisposable

so please try not to do this.

Error handling

The are two error mechanisms.

Anynchronous error handling mechanism in observables

Error handling is pretty straightforward. If one sequence terminates with error, then all of the dependent sequences will terminate with error. It's usual short circuit logic.

You can recover from failure of observable by using catch operator. There are various overloads that enable you to specify recovery in great detail.

There is also retry operator that enables retries in case of errored sequence.

Synchronous error handling

Unfortunately Swift doesn't have a concept of exceptions or some kind of built in error monad so this project introduces RxResult enum. It is Swift port of Scala Try type. It is also similar to Haskell Either monad.

This will be replaced in Swift 2.0 with try/throws

public enum RxResult<ResultType> {
    case Success(ResultType)
    case Error(ErrorType)
}

To enable writing more readable code, a few Result operators are introduced

result1.flatMap { okValue in        // success handling block
    // executed on success
    return ?
}.recoverWith { error in            // error handling block
    //  executed on error
    return ?
}

Error handling and function names

For every group of transforming functions there are versions with and without "OrDie" suffix.

This will change in 2.0 version and map will have two overloads, with and without throws.

e.g.

public func mapOrDie<E, R>
    (selector: E -> RxResult<R>)
    -> (Observable<E> -> Observable<R>) {
    return { source in
        return selectOrDie(selector)(source)
    }
}

public func map<E, R>
    (selector: E -> R)
        -> (Observable<E> -> Observable<R>) {
    return { source in
        return select(selector)(source)
    }
}

Returning an error from a selector will cause entire graph of dependent sequence transformers to "die" and fail with error. Dying implies that it will release all of its resources and never produce another sequence value. This is usually not an obvious effect.

If there is some UITextField bound to a observable sequence that fails with error or completes, screen won't be updated ever again.

To make those situations more obvious, RxCocoa debug build will throw an exception in case some sequence that is bound to UI control terminates with an error.

Using functions without "OrDie" suffix is usually a more safe option.

There is also the catch operator for easier error handling.

Debugging

Using debugger alone is useful, but you can also use >- debug. debug operator will print out all events to standard output and you can add also label those events.

debug acts like a probe. Here is an example of using it:

let subscription = myInterval(0.1)
    >- debug("my probe")
    >- map { e in
        return "This is simply \(e)"
    }
    >- subscribeNext { n in
        println(n)
    }

NSThread.sleepForTimeInterval(0.5)

subscription.dispose()

will print

[my probe] subscribed
Subscribed
[my probe] -> Event Next(Box(0))
This is simply 0
[my probe] -> Event Next(Box(1))
This is simply 1
[my probe] -> Event Next(Box(2))
This is simply 2
[my probe] -> Event Next(Box(3))
This is simply 3
[my probe] -> Event Next(Box(4))
This is simply 4
[my probe] dispose
Disposed

You can also use subscribe instead of subscribeNext

NSURLSession.sharedSession().rx_JSON(request)
   >- map { json in
       return parse()
   }
   >- subscribe { n in      // this subscribes on all events including error and completed
       println(n)
   }

Debugging memory leaks

All of the unit tests run with leak profiling. In case there is a leak there is a fair chance that it's not caused by Rx.

In debug mode Rx tracks all allocated resources into a global variable resourceCount.

As a sanity check, you can just do a println in your view controller deinit method.

The code would look something like this.

class ViewController: UIViewController {
#if TRACE_RESOURCES
    private let startResourceCount = RxSwift.resourceCount
#endif

    override func viewDidLoad() {
      super.viewDidLoad()
#if TRACE_RESOURCES
        println("Number of start resources = \(resourceCount)")
#endif
    }

    deinit {
#if TRACE_RESOURCES
        println("View controller disposed with \(resourceCount) resources")

        var numberOfResourcesThatShouldRemain = startResourceCount
        let time = dispatch_time(DISPATCH_TIME_NOW, Int64(0.1 * Double(NSEC_PER_SEC)))
        dispatch_after(time, dispatch_get_main_queue(), { () -> Void in
            println("Resource count after dealloc \(RxSwift.resourceCount), difference \(RxSwift.resourceCount - numberOfResourcesThatShouldRemain)")
        })
#endif
    }
}

The reason why you should use a small delay is because sometimes it takes a small amount of time for scheduled entities to release their memory.

Variables

Variables represent some observable state. Variable without containing value can't exist because initializer requires initial value.

Variable is a Subject. More specifically it is a BehaviorSubject. Because BehaviorSubject is aliased as Variable for convenience.

Variable is both an ObserverType and Observable.

That means that you can send values to variables using sendNext and it will broadcast element to all subscribers.

It will also broadcast it's current value immediately on subscription.

let variable = Variable(0)

println("Before first subscription ---")

variable
    >- subscribeNext { n in
        println("First \(n)")
    }

println("Before send 1")

sendNext(variable, 1)

println("Before second subscription ---")

variable
    >- subscribeNext { n in
        println("Second \(n)")
    }

sendNext(variable, 2)

println("End ---")

will print

Before first subscription ---
First 0
Before send 1
First 1
Before second subscription ---
Second 1
First 2
Second 2
End ---

There is also >- variable operator. >- variable operator is already described here.

So why are they both called variable?

For one, they both have internal state that all subscribers share. When they contain value (and Variable always contains it), they broadcast it immediately to subscribers.

The difference is that Variable enables you to manually choose elements of a sequence by using sendNext, and you can think of >- variable as a kind calculated "variable".

UI layer tips

There are certain things that your Observables need to satisfy in the UI layer when binding to UIKit controls.

  • They need to observe values on MainScheduler(UIThread). That's just a normal UIKit/Cocoa property. It is usually a good idea that you APIs return results on MainScheduler. In case that doesn't happen, RxCocoa will throw an exception to inform you of that and crash the app.

To fix this you need to add >- observeOn(MainScheduler.sharedInstance).

NSURLSession extensions don't return result on MainScheduler by default.

  • You can't bind errors to UIKit controls because that makes no sense.

If you don't know if Observable can fail, you can ensure it can't fail using >-catch(valueThatIsReturnedWhenErrorHappens)

  • You usually want to share subscription

Lets say you have something like this:

let searchResults = searchText
    >- throttle(0.3, $.mainScheduler)
    >- distinctUntilChanged
    >- map { query in
        API.getSearchResults(query)
            >- retry(3)
            >- startWith([]) // clears results on new search term
            >- catch([])
    }
    >- switchLatest
    >- variable              // <- notice the variable

What you usually want is to share search results once calculated. That is what >- variable means.

It is usually a good rule of thumb in the UI layer to add >- variable at the end of transformation chain because you really want to share calculated results, and not fire separate HTTP connections when binding searchResults to multiple UI elements.

Additional information about >- variable can be found here

Making HTTP requests

Making http requests is one of the first things people try to do with Rx.

You first need to build NSURLRequest object that represents the work that needs to be made. Is it a GET request, or a POST request, what is the request body, query parameters ...

This is how you can create a simple GET request

let request = NSURLRequest(URL: NSURL(string: "http://en.wikipedia.org/w/api.php?action=parse&page=Pizza&format=json")!)

If you want to just execute that request outside of composition with other observables, this is what needs to be done.

let responseJSON = NSURLSession.sharedSession().rx_JSON(request)

// no requests will be performed up to this point
// `responseJSON` is just a description how to fetch the response

let cancelRequest = responseJSON
    // this will fire the request
    >- subscribeNext { json in
        println(json)
    }

NSThread.sleepForTimeInterval(3)

// if you want to cancel request after 3 seconds have passed just call
cancelRequest.dispose()

NSURLSession extensions don't return result on MainScheduler by default.

In case you want a more low level access to response, you can use:

NSURLSession.sharedSession().rx_response(myNSURLRequest)
    >- debug("my request") // this will print out information to console
    >- flatMap { (data: NSData!, response: NSURLResponse!) -> Observable<String> in
        if let response = response as? NSHTTPURLResponse {
            if 200 ..< 300 ~= response.statusCode {
                return just(transform(data))
            }
            else {
                return failWith(yourNSError)
            }
        }
        else {
            rxFatalError("response = nil")
            return failWith(yourNSError)
        }
    }
    >- subscribe { event in
        println(event) // if error happened, this will also print out error to console
    }