1149 lines
38 KiB
Swift
1149 lines
38 KiB
Swift
//
|
|
// Driver+Test.swift
|
|
// RxTests
|
|
//
|
|
// Created by Krunoslav Zaher on 10/14/15.
|
|
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
|
|
//
|
|
|
|
import Foundation
|
|
import RxSwift
|
|
import RxCocoa
|
|
import XCTest
|
|
import RxTests
|
|
|
|
class DriverTest : RxTest {
|
|
var backgroundScheduler = SerialDispatchQueueScheduler(globalConcurrentQueueQOS: .default)
|
|
|
|
override func tearDown() {
|
|
super.tearDown()
|
|
}
|
|
}
|
|
|
|
// test helpers that make sure that resulting driver operator honors definition
|
|
// * only one subscription is made and shared - shareReplay(1)
|
|
// * subscription is made on main thread - subscribeOn(ConcurrentMainScheduler.instance)
|
|
// * events are observed on main thread - observeOn(MainScheduler.instance)
|
|
// * it can't error out - it needs to have catch somewhere
|
|
extension DriverTest {
|
|
|
|
func subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription<R: Equatable>(_ driver: Driver<R>, subscribedOnBackground: () -> ()) -> [R] {
|
|
var firstElements = [R]()
|
|
var secondElements = [R]()
|
|
|
|
let subscribeFinished = self.expectation(description: "subscribeFinished")
|
|
|
|
var expectation1: XCTestExpectation!
|
|
var expectation2: XCTestExpectation!
|
|
|
|
_ = backgroundScheduler.schedule(()) { _ in
|
|
var subscribing1 = true
|
|
_ = driver.asObservable().subscribe { e in
|
|
if !subscribing1 {
|
|
XCTAssertTrue(isMainThread())
|
|
}
|
|
switch e {
|
|
case .next(let element):
|
|
firstElements.append(element)
|
|
case .error(let error):
|
|
XCTFail("Error passed \(error)")
|
|
case .completed:
|
|
expectation1.fulfill()
|
|
}
|
|
}
|
|
subscribing1 = false
|
|
|
|
var subscribing = true
|
|
_ = driver.asDriver().asObservable().subscribe { e in
|
|
if !subscribing {
|
|
XCTAssertTrue(isMainThread())
|
|
}
|
|
switch e {
|
|
case .next(let element):
|
|
secondElements.append(element)
|
|
case .error(let error):
|
|
XCTFail("Error passed \(error)")
|
|
case .completed:
|
|
expectation2.fulfill()
|
|
}
|
|
}
|
|
|
|
subscribing = false
|
|
|
|
// Subscription should be made on main scheduler
|
|
// so this will make sure execution is continued after
|
|
// subscription because of serial nature of main scheduler.
|
|
_ = MainScheduler.instance.schedule(()) { _ in
|
|
subscribeFinished.fulfill()
|
|
return Disposables.create()
|
|
}
|
|
|
|
return Disposables.create()
|
|
}
|
|
|
|
waitForExpectations(timeout: 1.0) { error in
|
|
XCTAssertTrue(error == nil)
|
|
}
|
|
|
|
expectation1 = self.expectation(description: "finished1")
|
|
expectation2 = self.expectation(description: "finished2")
|
|
|
|
subscribedOnBackground()
|
|
|
|
waitForExpectations(timeout: 1.0) { error in
|
|
XCTAssertTrue(error == nil)
|
|
}
|
|
|
|
XCTAssertTrue(firstElements == secondElements)
|
|
|
|
return firstElements
|
|
}
|
|
}
|
|
|
|
// MARK: properties
|
|
extension DriverTest {
|
|
func testDriverSharing_WhenErroring() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let observer1 = scheduler.createObserver(Int.self)
|
|
let observer2 = scheduler.createObserver(Int.self)
|
|
let observer3 = scheduler.createObserver(Int.self)
|
|
var disposable1: Disposable!
|
|
var disposable2: Disposable!
|
|
var disposable3: Disposable!
|
|
|
|
let coldObservable = scheduler.createColdObservable([
|
|
next(10, 0),
|
|
next(20, 1),
|
|
next(30, 2),
|
|
next(40, 3),
|
|
error(50, testError)
|
|
])
|
|
let driver = coldObservable.asDriver(onErrorJustReturn: -1)
|
|
|
|
scheduler.scheduleAt(200) {
|
|
disposable1 = driver.asObservable().subscribe(observer1)
|
|
}
|
|
|
|
scheduler.scheduleAt(225) {
|
|
disposable2 = driver.asObservable().subscribe(observer2)
|
|
}
|
|
|
|
scheduler.scheduleAt(235) {
|
|
disposable1.dispose()
|
|
}
|
|
|
|
scheduler.scheduleAt(260) {
|
|
disposable2.dispose()
|
|
}
|
|
|
|
// resubscription
|
|
|
|
scheduler.scheduleAt(260) {
|
|
disposable3 = driver.asObservable().subscribe(observer3)
|
|
}
|
|
|
|
scheduler.scheduleAt(285) {
|
|
disposable3.dispose()
|
|
}
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(observer1.events, [
|
|
next(210, 0),
|
|
next(220, 1),
|
|
next(230, 2)
|
|
])
|
|
|
|
XCTAssertEqual(observer2.events, [
|
|
next(225, 1),
|
|
next(230, 2),
|
|
next(240, 3),
|
|
next(250, -1),
|
|
completed(250)
|
|
])
|
|
|
|
XCTAssertEqual(observer3.events, [
|
|
next(270, 0),
|
|
next(280, 1),
|
|
])
|
|
|
|
XCTAssertEqual(coldObservable.subscriptions, [
|
|
Subscription(200, 250),
|
|
Subscription(260, 285),
|
|
])
|
|
}
|
|
|
|
func testDriverSharing_WhenCompleted() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let observer1 = scheduler.createObserver(Int.self)
|
|
let observer2 = scheduler.createObserver(Int.self)
|
|
let observer3 = scheduler.createObserver(Int.self)
|
|
var disposable1: Disposable!
|
|
var disposable2: Disposable!
|
|
var disposable3: Disposable!
|
|
|
|
let coldObservable = scheduler.createColdObservable([
|
|
next(10, 0),
|
|
next(20, 1),
|
|
next(30, 2),
|
|
next(40, 3),
|
|
error(50, testError)
|
|
])
|
|
let driver = coldObservable.asDriver(onErrorJustReturn: -1)
|
|
|
|
|
|
scheduler.scheduleAt(200) {
|
|
disposable1 = driver.asObservable().subscribe(observer1)
|
|
}
|
|
|
|
scheduler.scheduleAt(225) {
|
|
disposable2 = driver.asObservable().subscribe(observer2)
|
|
}
|
|
|
|
scheduler.scheduleAt(235) {
|
|
disposable1.dispose()
|
|
}
|
|
|
|
scheduler.scheduleAt(260) {
|
|
disposable2.dispose()
|
|
}
|
|
|
|
// resubscription
|
|
|
|
scheduler.scheduleAt(260) {
|
|
disposable3 = driver.asObservable().subscribe(observer3)
|
|
}
|
|
|
|
scheduler.scheduleAt(285) {
|
|
disposable3.dispose()
|
|
}
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(observer1.events, [
|
|
next(210, 0),
|
|
next(220, 1),
|
|
next(230, 2)
|
|
])
|
|
|
|
XCTAssertEqual(observer2.events, [
|
|
next(225, 1),
|
|
next(230, 2),
|
|
next(240, 3),
|
|
next(250, -1),
|
|
completed(250)
|
|
])
|
|
|
|
XCTAssertEqual(observer3.events, [
|
|
next(270, 0),
|
|
next(280, 1),
|
|
])
|
|
|
|
XCTAssertEqual(coldObservable.subscriptions, [
|
|
Subscription(200, 250),
|
|
Subscription(260, 285),
|
|
])
|
|
}
|
|
}
|
|
|
|
// MARK: conversions
|
|
extension DriverTest {
|
|
func testVariableAsDriver() {
|
|
let hotObservable = Variable(1)
|
|
let driver = Driver.zip(hotObservable.asDriver(), Driver.of(0, 0)) { all in
|
|
return all.0
|
|
}
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
hotObservable.value = 1
|
|
hotObservable.value = 2
|
|
}
|
|
|
|
XCTAssertEqual(results, [1, 1])
|
|
}
|
|
|
|
func testAsDriver_onErrorJustReturn() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: -1)
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [1, 2, -1])
|
|
}
|
|
|
|
func testAsDriver_onErrorDriveWith() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let driver = hotObservable.asDriver(onErrorDriveWith: Driver.just(-1))
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [1, 2, -1])
|
|
}
|
|
|
|
func testAsDriver_onErrorRecover() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let driver = hotObservable.asDriver { e in
|
|
return Driver.empty()
|
|
}
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [1, 2])
|
|
}
|
|
}
|
|
|
|
// MARK: deferred
|
|
extension DriverTest {
|
|
func testAsDriver_deferred() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let driver = Driver.deferred { hotObservable.asDriver(onErrorJustReturn: -1) }
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [1, 2, -1])
|
|
}
|
|
}
|
|
|
|
// MARK: map
|
|
extension DriverTest {
|
|
func testAsDriver_map() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: -1).map { (n: Int) -> Int in
|
|
XCTAssertTrue(isMainThread())
|
|
return n + 1
|
|
}
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [2, 3, 0])
|
|
}
|
|
}
|
|
|
|
// MARK: filter
|
|
extension DriverTest {
|
|
func testAsDriver_filter() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: -1).filter { n in
|
|
XCTAssertTrue(isMainThread())
|
|
return n % 2 == 0
|
|
}
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [2])
|
|
}
|
|
}
|
|
|
|
|
|
// MARK: switch latest
|
|
extension DriverTest {
|
|
func testAsDriver_switchLatest() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Driver<Int>>()
|
|
let hotObservable1 = MainThreadPrimitiveHotObservable<Int>()
|
|
let hotObservable2 = MainThreadPrimitiveHotObservable<Int>()
|
|
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: hotObservable1.asDriver(onErrorJustReturn: -1)).switchLatest()
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(hotObservable1.asDriver(onErrorJustReturn: -2)))
|
|
|
|
hotObservable1.on(.next(1))
|
|
hotObservable1.on(.next(2))
|
|
hotObservable1.on(.error(testError))
|
|
|
|
hotObservable.on(.next(hotObservable2.asDriver(onErrorJustReturn: -3)))
|
|
|
|
hotObservable2.on(.next(10))
|
|
hotObservable2.on(.next(11))
|
|
hotObservable2.on(.error(testError))
|
|
|
|
hotObservable.on(.error(testError))
|
|
|
|
hotObservable1.on(.completed)
|
|
hotObservable.on(.completed)
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [
|
|
1, 2, -2,
|
|
10, 11, -3
|
|
])
|
|
}
|
|
}
|
|
|
|
// MARK: flatMapLatest
|
|
extension DriverTest {
|
|
func testAsDriver_flatMapLatest() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let hotObservable1 = MainThreadPrimitiveHotObservable<Int>()
|
|
let hotObservable2 = MainThreadPrimitiveHotObservable<Int>()
|
|
let errorHotObservable = MainThreadPrimitiveHotObservable<Int>()
|
|
|
|
let drivers: [Driver<Int>] = [
|
|
hotObservable1.asDriver(onErrorJustReturn: -2),
|
|
hotObservable2.asDriver(onErrorJustReturn: -3),
|
|
errorHotObservable.asDriver(onErrorJustReturn: -4),
|
|
]
|
|
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: 2).flatMapLatest { drivers[$0] }
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(0))
|
|
|
|
hotObservable1.on(.next(1))
|
|
hotObservable1.on(.next(2))
|
|
hotObservable1.on(.error(testError))
|
|
|
|
hotObservable.on(.next(1))
|
|
|
|
hotObservable2.on(.next(10))
|
|
hotObservable2.on(.next(11))
|
|
hotObservable2.on(.error(testError))
|
|
|
|
hotObservable.on(.error(testError))
|
|
|
|
errorHotObservable.on(.completed)
|
|
hotObservable.on(.completed)
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [
|
|
1, 2, -2,
|
|
10, 11, -3
|
|
])
|
|
}
|
|
}
|
|
|
|
// MARK: flatMapFirst
|
|
extension DriverTest {
|
|
func testAsDriver_flatMapFirst() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let hotObservable1 = MainThreadPrimitiveHotObservable<Int>()
|
|
let hotObservable2 = MainThreadPrimitiveHotObservable<Int>()
|
|
let errorHotObservable = MainThreadPrimitiveHotObservable<Int>()
|
|
|
|
let drivers: [Driver<Int>] = [
|
|
hotObservable1.asDriver(onErrorJustReturn: -2),
|
|
hotObservable2.asDriver(onErrorJustReturn: -3),
|
|
errorHotObservable.asDriver(onErrorJustReturn: -4),
|
|
]
|
|
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: 2).flatMapFirst { drivers[$0] }
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(0))
|
|
hotObservable.on(.next(1))
|
|
|
|
hotObservable1.on(.next(1))
|
|
hotObservable1.on(.next(2))
|
|
hotObservable1.on(.error(testError))
|
|
|
|
hotObservable2.on(.next(10))
|
|
hotObservable2.on(.next(11))
|
|
hotObservable2.on(.error(testError))
|
|
|
|
hotObservable.on(.error(testError))
|
|
|
|
errorHotObservable.on(.completed)
|
|
hotObservable.on(.completed)
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [
|
|
1, 2, -2,
|
|
])
|
|
}
|
|
}
|
|
|
|
// MARK: doOn
|
|
extension DriverTest {
|
|
func testAsDriver_doOn() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
|
|
var events = [Event<Int>]()
|
|
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: -1).doOn { e in
|
|
XCTAssertTrue(isMainThread())
|
|
|
|
events.append(e)
|
|
}
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [1, 2, -1])
|
|
let expectedEvents = [.next(1), .next(2), .next(-1), .completed] as [Event<Int>]
|
|
XCTAssertEqual(events, expectedEvents)
|
|
}
|
|
|
|
|
|
func testAsDriver_doOnNext() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
|
|
var events = [Int]()
|
|
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: -1).do(onNext: { e in
|
|
XCTAssertTrue(isMainThread())
|
|
events.append(e)
|
|
})
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [1, 2, -1])
|
|
let expectedEvents = [1, 2, -1]
|
|
XCTAssertEqual(events, expectedEvents)
|
|
}
|
|
|
|
func testAsDriver_doOnCompleted() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
|
|
var completed = false
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: -1).do(onCompleted: { e in
|
|
XCTAssertTrue(isMainThread())
|
|
completed = true
|
|
})
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [1, 2, -1])
|
|
XCTAssertEqual(completed, true)
|
|
}
|
|
}
|
|
|
|
// MARK: distinct until change
|
|
extension DriverTest {
|
|
func testAsDriver_distinctUntilChanged1() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: -1).distinctUntilChanged()
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [1, 2, -1])
|
|
}
|
|
|
|
func testAsDriver_distinctUntilChanged2() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: -1).distinctUntilChanged({ $0 })
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [1, 2, -1])
|
|
}
|
|
|
|
func testAsDriver_distinctUntilChanged3() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: -1).distinctUntilChanged({ $0 == $1 })
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [1, 2, -1])
|
|
}
|
|
|
|
|
|
func testAsDriver_distinctUntilChanged4() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: -1).distinctUntilChanged({ $0 }) { $0 == $1 }
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [1, 2, -1])
|
|
}
|
|
|
|
}
|
|
|
|
// MARK: flat map
|
|
extension DriverTest {
|
|
func testAsDriver_flatMap() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: -1).flatMap { (n: Int) -> Driver<Int> in
|
|
XCTAssertTrue(isMainThread())
|
|
return Driver.just(n + 1)
|
|
}
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [2, 3, 0])
|
|
}
|
|
|
|
}
|
|
|
|
// MARK: merge
|
|
extension DriverTest {
|
|
func testAsDriver_merge() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: -1).map { (n: Int) -> Driver<Int> in
|
|
XCTAssertTrue(isMainThread())
|
|
return Driver.just(n + 1)
|
|
}.merge()
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [2, 3, 0])
|
|
}
|
|
|
|
func testAsDriver_merge2() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: -1).map { (n: Int) -> Driver<Int> in
|
|
XCTAssertTrue(isMainThread())
|
|
return Driver.just(n + 1)
|
|
}.merge(maxConcurrent: 1)
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [2, 3, 0])
|
|
}
|
|
}
|
|
|
|
// MARK: debounce
|
|
extension DriverTest {
|
|
func testAsDriver_debounce() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: -1).debounce(0.0)
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [-1])
|
|
}
|
|
|
|
func testAsDriver_throttle() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: -1).throttle(0.0)
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [-1])
|
|
}
|
|
|
|
}
|
|
|
|
// MARK: scan
|
|
extension DriverTest {
|
|
func testAsDriver_scan() {
|
|
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let driver = hotObservable.asDriver(onErrorJustReturn: -1).scan(0) { (a: Int, n: Int) -> Int in
|
|
XCTAssertTrue(isMainThread())
|
|
return a + n
|
|
}
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable.on(.next(1))
|
|
hotObservable.on(.next(2))
|
|
hotObservable.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [1, 3, 2])
|
|
}
|
|
|
|
}
|
|
|
|
// MARK: concat
|
|
extension DriverTest {
|
|
func testAsDriver_concat_sequenceType() {
|
|
let hotObservable1 = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let hotObservable2 = MainThreadPrimitiveHotObservable<Int>()
|
|
|
|
let driver = AnySequence([hotObservable1.asDriver(onErrorJustReturn: -1), hotObservable2.asDriver(onErrorJustReturn: -2)]).concat()
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable1.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable1.on(.next(1))
|
|
hotObservable1.on(.next(2))
|
|
hotObservable1.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable1.subscriptions == [UnsunscribedFromHotObservable])
|
|
XCTAssertTrue(hotObservable2.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable2.on(.next(4))
|
|
hotObservable2.on(.next(5))
|
|
hotObservable2.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable2.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [1, 2, -1, 4, 5, -2])
|
|
}
|
|
|
|
func testAsDriver_concat() {
|
|
let hotObservable1 = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let hotObservable2 = MainThreadPrimitiveHotObservable<Int>()
|
|
|
|
let driver = [hotObservable1.asDriver(onErrorJustReturn: -1), hotObservable2.asDriver(onErrorJustReturn: -2)].concat()
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable1.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable1.on(.next(1))
|
|
hotObservable1.on(.next(2))
|
|
hotObservable1.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable1.subscriptions == [UnsunscribedFromHotObservable])
|
|
XCTAssertTrue(hotObservable2.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable2.on(.next(4))
|
|
hotObservable2.on(.next(5))
|
|
hotObservable2.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable2.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [1, 2, -1, 4, 5, -2])
|
|
}
|
|
}
|
|
|
|
// MARK: combine latest
|
|
extension DriverTest {
|
|
func testAsDriver_combineLatest_array() {
|
|
let hotObservable1 = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let hotObservable2 = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
|
|
let driver = [hotObservable1.asDriver(onErrorJustReturn: -1), hotObservable2.asDriver(onErrorJustReturn: -2)].combineLatest { a in a.reduce(0, +) }
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable1.subscriptions == [SubscribedToHotObservable])
|
|
XCTAssertTrue(hotObservable2.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable1.on(.next(1))
|
|
hotObservable2.on(.next(4))
|
|
|
|
hotObservable1.on(.next(2))
|
|
hotObservable2.on(.next(5))
|
|
|
|
hotObservable1.on(.error(testError))
|
|
hotObservable2.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable1.subscriptions == [UnsunscribedFromHotObservable])
|
|
XCTAssertTrue(hotObservable2.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [5, 6, 7, 4, -3])
|
|
}
|
|
|
|
func testAsDriver_combineLatest() {
|
|
let hotObservable1 = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let hotObservable2 = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
|
|
let driver = Driver.combineLatest(hotObservable1.asDriver(onErrorJustReturn: -1), hotObservable2.asDriver(onErrorJustReturn: -2), resultSelector: +)
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable1.subscriptions == [SubscribedToHotObservable])
|
|
XCTAssertTrue(hotObservable2.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable1.on(.next(1))
|
|
hotObservable2.on(.next(4))
|
|
|
|
hotObservable1.on(.next(2))
|
|
hotObservable2.on(.next(5))
|
|
|
|
hotObservable1.on(.error(testError))
|
|
hotObservable2.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable1.subscriptions == [UnsunscribedFromHotObservable])
|
|
XCTAssertTrue(hotObservable2.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [5, 6, 7, 4, -3])
|
|
}
|
|
}
|
|
|
|
// MARK: zip
|
|
extension DriverTest {
|
|
func testAsDriver_zip_array() {
|
|
let hotObservable1 = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let hotObservable2 = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
|
|
let driver = [hotObservable1.asDriver(onErrorJustReturn: -1), hotObservable2.asDriver(onErrorJustReturn: -2)].zip { a in a.reduce(0, +) }
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable1.subscriptions == [SubscribedToHotObservable])
|
|
XCTAssertTrue(hotObservable2.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable1.on(.next(1))
|
|
hotObservable2.on(.next(4))
|
|
|
|
hotObservable1.on(.next(2))
|
|
hotObservable2.on(.next(5))
|
|
|
|
hotObservable1.on(.error(testError))
|
|
hotObservable2.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable1.subscriptions == [UnsunscribedFromHotObservable])
|
|
XCTAssertTrue(hotObservable2.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [5, 7, -3])
|
|
}
|
|
|
|
func testAsDriver_zip() {
|
|
let hotObservable1 = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let hotObservable2 = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
|
|
let driver = Driver.zip(hotObservable1.asDriver(onErrorJustReturn: -1), hotObservable2.asDriver(onErrorJustReturn: -2), resultSelector: +)
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable1.subscriptions == [SubscribedToHotObservable])
|
|
XCTAssertTrue(hotObservable2.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable1.on(.next(1))
|
|
hotObservable2.on(.next(4))
|
|
|
|
hotObservable1.on(.next(2))
|
|
hotObservable2.on(.next(5))
|
|
|
|
hotObservable1.on(.error(testError))
|
|
hotObservable2.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable1.subscriptions == [UnsunscribedFromHotObservable])
|
|
XCTAssertTrue(hotObservable2.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [5, 7, -3])
|
|
}
|
|
}
|
|
|
|
// MARK: withLatestFrom
|
|
extension DriverTest {
|
|
func testAsDriver_withLatestFrom() {
|
|
let hotObservable1 = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let hotObservable2 = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
|
|
let driver = hotObservable1.asDriver(onErrorJustReturn: -1).withLatestFrom(hotObservable2.asDriver(onErrorJustReturn: -2)) { f, s in "\(f)\(s)" }
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable1.subscriptions == [SubscribedToHotObservable])
|
|
XCTAssertTrue(hotObservable2.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable1.on(.next(1))
|
|
hotObservable2.on(.next(4))
|
|
|
|
hotObservable1.on(.next(2))
|
|
hotObservable2.on(.next(5))
|
|
|
|
hotObservable1.on(.error(testError))
|
|
hotObservable2.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable1.subscriptions == [UnsunscribedFromHotObservable])
|
|
XCTAssertTrue(hotObservable2.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, ["24", "-15"])
|
|
}
|
|
|
|
func testAsDriver_withLatestFromDefaultOverload() {
|
|
let hotObservable1 = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
let hotObservable2 = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
|
|
let driver = hotObservable1.asDriver(onErrorJustReturn: -1).withLatestFrom(hotObservable2.asDriver(onErrorJustReturn: -2))
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable1.subscriptions == [SubscribedToHotObservable])
|
|
XCTAssertTrue(hotObservable2.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable1.on(.next(1))
|
|
hotObservable2.on(.next(4))
|
|
|
|
hotObservable1.on(.next(2))
|
|
hotObservable2.on(.next(5))
|
|
|
|
hotObservable1.on(.error(testError))
|
|
hotObservable2.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable1.subscriptions == [UnsunscribedFromHotObservable])
|
|
XCTAssertTrue(hotObservable2.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [4, 5])
|
|
|
|
}
|
|
}
|
|
|
|
// MARK: skip
|
|
extension DriverTest {
|
|
func testAsDriver_skip() {
|
|
let hotObservable1 = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
|
|
let driver = hotObservable1.asDriver(onErrorJustReturn: -1).skip(1)
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable1.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable1.on(.next(1))
|
|
hotObservable1.on(.next(2))
|
|
|
|
hotObservable1.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable1.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [2, -1])
|
|
}
|
|
}
|
|
|
|
// MARK: startWith
|
|
extension DriverTest {
|
|
func testAsDriver_startWith() {
|
|
let hotObservable1 = BackgroundThreadPrimitiveHotObservable<Int>()
|
|
|
|
let driver = hotObservable1.asDriver(onErrorJustReturn: -1).startWith(0)
|
|
|
|
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
|
XCTAssertTrue(hotObservable1.subscriptions == [SubscribedToHotObservable])
|
|
|
|
hotObservable1.on(.next(1))
|
|
hotObservable1.on(.next(2))
|
|
|
|
hotObservable1.on(.error(testError))
|
|
|
|
XCTAssertTrue(hotObservable1.subscriptions == [UnsunscribedFromHotObservable])
|
|
}
|
|
|
|
XCTAssertEqual(results, [0, 1, 2, -1])
|
|
}
|
|
}
|
|
|
|
//MARK: interval
|
|
extension DriverTest {
|
|
func testAsDriver_interval() {
|
|
let testScheduler = TestScheduler(initialClock: 0)
|
|
|
|
let firstObserver = testScheduler.createObserver(Int.self)
|
|
let secondObserver = testScheduler.createObserver(Int.self)
|
|
|
|
var disposable1: Disposable!
|
|
var disposable2: Disposable!
|
|
|
|
driveOnScheduler(testScheduler) {
|
|
let interval = Driver<Int>.interval(100)
|
|
|
|
testScheduler.scheduleAt(20) {
|
|
disposable1 = interval.asObservable().subscribe(firstObserver)
|
|
}
|
|
|
|
testScheduler.scheduleAt(170) {
|
|
disposable2 = interval.asObservable().subscribe(secondObserver)
|
|
}
|
|
|
|
testScheduler.scheduleAt(230) {
|
|
disposable1.dispose()
|
|
disposable2.dispose()
|
|
}
|
|
|
|
testScheduler.start()
|
|
}
|
|
|
|
XCTAssertEqual(firstObserver.events, [
|
|
next(120, 0),
|
|
next(220, 1)
|
|
])
|
|
XCTAssertEqual(secondObserver.events, [
|
|
next(170, 0),
|
|
next(220, 1)
|
|
])
|
|
}
|
|
}
|
|
|
|
//MARK: timer
|
|
extension DriverTest {
|
|
func testAsDriver_timer() {
|
|
let testScheduler = TestScheduler(initialClock: 0)
|
|
|
|
let firstObserver = testScheduler.createObserver(Int.self)
|
|
let secondObserver = testScheduler.createObserver(Int.self)
|
|
|
|
var disposable1: Disposable!
|
|
var disposable2: Disposable!
|
|
|
|
driveOnScheduler(testScheduler) {
|
|
let interval = Driver<Int>.timer(100, period: 105)
|
|
|
|
testScheduler.scheduleAt(20) {
|
|
disposable1 = interval.asObservable().subscribe(firstObserver)
|
|
}
|
|
|
|
testScheduler.scheduleAt(170) {
|
|
disposable2 = interval.asObservable().subscribe(secondObserver)
|
|
}
|
|
|
|
testScheduler.scheduleAt(230) {
|
|
disposable1.dispose()
|
|
disposable2.dispose()
|
|
}
|
|
|
|
testScheduler.start()
|
|
}
|
|
|
|
XCTAssertEqual(firstObserver.events, [
|
|
next(120, 0),
|
|
next(225, 1)
|
|
])
|
|
XCTAssertEqual(secondObserver.events, [
|
|
next(170, 0),
|
|
next(225, 1)
|
|
])
|
|
}
|
|
}
|