1634 lines
48 KiB
Swift
1634 lines
48 KiB
Swift
//
|
|
// Observable+BindingTest.swift
|
|
// RxSwift
|
|
//
|
|
// Created by Krunoslav Zaher on 4/15/15.
|
|
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
|
|
//
|
|
|
|
import Foundation
|
|
import XCTest
|
|
import RxSwift
|
|
import RxTests
|
|
|
|
class ObservableBindingTest : RxTest {
|
|
|
|
}
|
|
|
|
// multicast
|
|
extension ObservableBindingTest {
|
|
func testMulticast_Cold_Completed() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(40, 0),
|
|
next(90, 1),
|
|
next(150, 2),
|
|
next(210, 3),
|
|
next(240, 4),
|
|
next(270, 5),
|
|
next(330, 6),
|
|
next(340, 7),
|
|
completed(390)
|
|
])
|
|
|
|
let res = scheduler.start {
|
|
xs.multicast({ PublishSubject<Int>() }) { $0 }
|
|
}
|
|
|
|
XCTAssertEqual(res.events, [
|
|
next(210, 3),
|
|
next(240, 4),
|
|
next(270, 5),
|
|
next(330, 6),
|
|
next(340, 7),
|
|
completed(390)
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(200, 390)
|
|
])
|
|
}
|
|
|
|
func testMulticast_Cold_Error() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(40, 0),
|
|
next(90, 1),
|
|
next(150, 2),
|
|
next(210, 3),
|
|
next(240, 4),
|
|
next(270, 5),
|
|
next(330, 6),
|
|
next(340, 7),
|
|
error(390, testError)
|
|
])
|
|
|
|
let res = scheduler.start {
|
|
xs.multicast({ PublishSubject<Int>() }) { $0 }
|
|
}
|
|
|
|
XCTAssertEqual(res.events, [
|
|
next(210, 3),
|
|
next(240, 4),
|
|
next(270, 5),
|
|
next(330, 6),
|
|
next(340, 7),
|
|
error(390, testError)
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(200, 390)
|
|
])
|
|
}
|
|
|
|
func testMulticast_Cold_Dispose() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(40, 0),
|
|
next(90, 1),
|
|
next(150, 2),
|
|
next(210, 3),
|
|
next(240, 4),
|
|
next(270, 5),
|
|
next(330, 6),
|
|
next(340, 7),
|
|
])
|
|
|
|
let res = scheduler.start {
|
|
xs.multicast({ PublishSubject<Int>() }) { $0 }
|
|
}
|
|
|
|
XCTAssertEqual(res.events, [
|
|
next(210, 3),
|
|
next(240, 4),
|
|
next(270, 5),
|
|
next(330, 6),
|
|
next(340, 7),
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(200, 1000)
|
|
])
|
|
}
|
|
|
|
func testMulticast_Cold_Zip() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(40, 0),
|
|
next(90, 1),
|
|
next(150, 2),
|
|
next(210, 3),
|
|
next(240, 4),
|
|
next(270, 5),
|
|
next(330, 6),
|
|
next(340, 7),
|
|
completed(390)
|
|
])
|
|
|
|
let res = scheduler.start {
|
|
xs.multicast({ PublishSubject<Int>() }) { Observable.zip($0, $0) { a, b in a + b } }
|
|
}
|
|
|
|
XCTAssertEqual(res.events, [
|
|
next(210, 6),
|
|
next(240, 8),
|
|
next(270, 10),
|
|
next(330, 12),
|
|
next(340, 14),
|
|
completed(390)
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(200, 390)
|
|
])
|
|
}
|
|
|
|
func testMulticast_SubjectSelectorThrows() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(210, 1),
|
|
next(240, 2),
|
|
completed(300)
|
|
])
|
|
|
|
let res = scheduler.start {
|
|
xs.multicast({ () throws -> PublishSubject<Int> in throw testError }) { $0 }
|
|
}
|
|
|
|
XCTAssertEqual(res.events, [
|
|
error(200, testError)
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
])
|
|
}
|
|
|
|
func testMulticast_SelectorThrows() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(210, 1),
|
|
next(240, 2),
|
|
completed(300)
|
|
])
|
|
|
|
let res = scheduler.start {
|
|
xs.multicast({ PublishSubject<Int>() }) { _ -> Observable<Int> in throw testError }
|
|
}
|
|
|
|
XCTAssertEqual(res.events, [
|
|
error(200, testError)
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
])
|
|
}
|
|
}
|
|
|
|
// refCount
|
|
extension ObservableBindingTest {
|
|
func testRefCount_DeadlockSimple() {
|
|
let subject = MySubject<Int>()
|
|
|
|
var nEvents = 0
|
|
|
|
let observable = TestConnectableObservable(o: Observable.of(0, 1, 2), s: subject)
|
|
let d = observable.subscribe(onNext: { n in
|
|
nEvents += 1
|
|
})
|
|
|
|
defer {
|
|
d.dispose()
|
|
}
|
|
|
|
observable.connect().dispose()
|
|
|
|
XCTAssertEqual(nEvents, 3)
|
|
}
|
|
|
|
func testRefCount_DeadlockErrorAfterN() {
|
|
let subject = MySubject<Int>()
|
|
|
|
var nEvents = 0
|
|
|
|
let observable = TestConnectableObservable(o: [Observable.of(0, 1, 2), Observable.error(testError)].concat(), s: subject)
|
|
let d = observable.subscribe(onError: { n in
|
|
nEvents += 1
|
|
})
|
|
|
|
defer {
|
|
d.dispose()
|
|
}
|
|
|
|
observable.connect().dispose()
|
|
|
|
XCTAssertEqual(nEvents, 1)
|
|
}
|
|
|
|
func testRefCount_DeadlockErrorImmediatelly() {
|
|
let subject = MySubject<Int>()
|
|
|
|
var nEvents = 0
|
|
|
|
let observable = TestConnectableObservable(o: Observable.error(testError), s: subject)
|
|
let d = observable.subscribe(onError: { n in
|
|
nEvents += 1
|
|
})
|
|
|
|
defer {
|
|
d.dispose()
|
|
}
|
|
|
|
observable.connect().dispose()
|
|
|
|
XCTAssertEqual(nEvents, 1)
|
|
}
|
|
|
|
func testRefCount_DeadlockEmpty() {
|
|
let subject = MySubject<Int>()
|
|
|
|
var nEvents = 0
|
|
|
|
let observable = TestConnectableObservable(o: Observable.empty(), s: subject)
|
|
let d = observable.subscribe(onCompleted: {
|
|
nEvents += 1
|
|
})
|
|
|
|
defer {
|
|
d.dispose()
|
|
}
|
|
|
|
observable.connect().dispose()
|
|
|
|
XCTAssertEqual(nEvents, 1)
|
|
}
|
|
|
|
func testRefCount_ConnectsOnFirst() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(210, 1),
|
|
next(220, 2),
|
|
next(230, 3),
|
|
next(240, 4),
|
|
completed(250)
|
|
])
|
|
|
|
let subject = MySubject<Int>()
|
|
|
|
let conn = TestConnectableObservable(o: xs.asObservable(), s: subject)
|
|
|
|
let res = scheduler.start { conn.refCount() }
|
|
|
|
XCTAssertEqual(res.events, [
|
|
next(210, 1),
|
|
next(220, 2),
|
|
next(230, 3),
|
|
next(240, 4),
|
|
completed(250)
|
|
])
|
|
|
|
XCTAssertTrue(subject.isDisposed)
|
|
}
|
|
|
|
func testRefCount_NotConnected() {
|
|
_ = TestScheduler(initialClock: 0)
|
|
|
|
var disconnected = false
|
|
var count = 0
|
|
|
|
let xs: Observable<Int> = Observable.deferred {
|
|
count += 1
|
|
return Observable.create { obs in
|
|
return Disposables.create {
|
|
disconnected = true
|
|
}
|
|
}
|
|
}
|
|
|
|
let subject = MySubject<Int>()
|
|
|
|
let conn = TestConnectableObservable(o: xs, s: subject)
|
|
let refd = conn.refCount()
|
|
|
|
let dis1 = refd.subscribe { _ -> Void in () }
|
|
XCTAssertEqual(1, count)
|
|
XCTAssertEqual(1, subject.subscribeCount)
|
|
XCTAssertFalse(disconnected)
|
|
|
|
let dis2 = refd.subscribe { _ -> Void in () }
|
|
XCTAssertEqual(1, count)
|
|
XCTAssertEqual(2, subject.subscribeCount)
|
|
XCTAssertFalse(disconnected)
|
|
|
|
dis1.dispose()
|
|
XCTAssertFalse(disconnected)
|
|
dis2.dispose()
|
|
XCTAssertTrue(disconnected)
|
|
disconnected = false
|
|
|
|
let dis3 = refd.subscribe { _ -> Void in () }
|
|
XCTAssertEqual(2, count)
|
|
XCTAssertEqual(3, subject.subscribeCount)
|
|
XCTAssertFalse(disconnected)
|
|
|
|
dis3.dispose()
|
|
XCTAssertTrue(disconnected)
|
|
}
|
|
|
|
func testRefCount_Error() {
|
|
let xs: Observable<Int> = Observable.error(testError)
|
|
|
|
let res = xs.publish().refCount()
|
|
_ = res.subscribe { event in
|
|
switch event {
|
|
case .next:
|
|
XCTAssertTrue(false)
|
|
case .error(let error):
|
|
XCTAssertErrorEqual(error, testError)
|
|
case .completed:
|
|
XCTAssertTrue(false)
|
|
}
|
|
}
|
|
_ = res.subscribe { event in
|
|
switch event {
|
|
case .next:
|
|
XCTAssertTrue(false)
|
|
case .error(let error):
|
|
XCTAssertErrorEqual(error, testError)
|
|
case .completed:
|
|
XCTAssertTrue(false)
|
|
}
|
|
}
|
|
}
|
|
|
|
func testRefCount_Publish() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(210, 1),
|
|
next(220, 2),
|
|
next(230, 3),
|
|
next(240, 4),
|
|
next(250, 5),
|
|
next(260, 6),
|
|
next(270, 7),
|
|
next(280, 8),
|
|
next(290, 9),
|
|
completed(300)
|
|
])
|
|
|
|
let res = xs.publish().refCount()
|
|
|
|
var d1: Disposable!
|
|
let o1 = scheduler.createObserver(Int.self)
|
|
scheduler.scheduleAt(215) { d1 = res.subscribe(o1) }
|
|
scheduler.scheduleAt(235) { d1.dispose() }
|
|
|
|
var d2: Disposable!
|
|
let o2 = scheduler.createObserver(Int.self)
|
|
scheduler.scheduleAt(225) { d2 = res.subscribe(o2) }
|
|
scheduler.scheduleAt(275) { d2.dispose() }
|
|
|
|
var d3: Disposable!
|
|
let o3 = scheduler.createObserver(Int.self)
|
|
scheduler.scheduleAt(255) { d3 = res.subscribe(o3) }
|
|
scheduler.scheduleAt(265) { d3.dispose() }
|
|
|
|
var d4: Disposable!
|
|
let o4 = scheduler.createObserver(Int.self)
|
|
scheduler.scheduleAt(285) { d4 = res.subscribe(o4) }
|
|
scheduler.scheduleAt(320) { d4.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(o1.events, [
|
|
next(220, 2),
|
|
next(230, 3)
|
|
])
|
|
|
|
XCTAssertEqual(o2.events, [
|
|
next(230, 3),
|
|
next(240, 4),
|
|
next(250, 5),
|
|
next(260, 6),
|
|
next(270, 7)
|
|
])
|
|
|
|
XCTAssertEqual(o3.events, [
|
|
next(260, 6)
|
|
])
|
|
|
|
XCTAssertEqual(o4.events, [
|
|
next(290, 9),
|
|
completed(300)
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(215, 275),
|
|
Subscription(285, 300)
|
|
])
|
|
}
|
|
}
|
|
|
|
// replay
|
|
extension ObservableBindingTest {
|
|
func testReplayCount_Basic() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
error(600, testError)
|
|
])
|
|
|
|
var ys: ConnectableObservable<Int>! = nil
|
|
var subscription: Disposable! = nil
|
|
var connection: Disposable! = nil
|
|
let res = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = xs.replay(3) }
|
|
scheduler.scheduleAt(450, action: { subscription = ys.subscribe(res) })
|
|
scheduler.scheduleAt(Defaults.disposed) { subscription.dispose() }
|
|
|
|
scheduler.scheduleAt(300) { connection = ys.connect() }
|
|
scheduler.scheduleAt(400) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(500) { connection = ys.connect() }
|
|
scheduler.scheduleAt(550) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(650) { connection = ys.connect() }
|
|
scheduler.scheduleAt(800) { connection.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res.events, [
|
|
next(450, 5),
|
|
next(450, 6),
|
|
next(450, 7),
|
|
next(520, 11),
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(300, 400),
|
|
Subscription(500, 550),
|
|
Subscription(650, 800)
|
|
])
|
|
}
|
|
|
|
func testReplayCount_Error() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
error(600, testError)
|
|
])
|
|
|
|
var ys: ConnectableObservable<Int>! = nil
|
|
var subscription: Disposable! = nil
|
|
var connection: Disposable! = nil
|
|
let res = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = xs.replay(3) }
|
|
scheduler.scheduleAt(450, action: { subscription = ys.subscribe(res) })
|
|
scheduler.scheduleAt(Defaults.disposed) { subscription.dispose() }
|
|
|
|
scheduler.scheduleAt(300) { connection = ys.connect() }
|
|
scheduler.scheduleAt(400) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(500) { connection = ys.connect() }
|
|
scheduler.scheduleAt(800) { connection.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res.events, [
|
|
next(450, 5),
|
|
next(450, 6),
|
|
next(450, 7),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
error(600, testError),
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(300, 400),
|
|
Subscription(500, 600),
|
|
])
|
|
}
|
|
|
|
func testReplayCount_Complete() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
completed(600)
|
|
])
|
|
|
|
var ys: ConnectableObservable<Int>! = nil
|
|
var subscription: Disposable! = nil
|
|
var connection: Disposable! = nil
|
|
let res = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = xs.replay(3) }
|
|
scheduler.scheduleAt(450, action: { subscription = ys.subscribe(res) })
|
|
scheduler.scheduleAt(Defaults.disposed) { subscription.dispose() }
|
|
|
|
scheduler.scheduleAt(300) { connection = ys.connect() }
|
|
scheduler.scheduleAt(400) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(500) { connection = ys.connect() }
|
|
scheduler.scheduleAt(800) { connection.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res.events, [
|
|
next(450, 5),
|
|
next(450, 6),
|
|
next(450, 7),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
completed(600)
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(300, 400),
|
|
Subscription(500, 600),
|
|
])
|
|
}
|
|
|
|
func testReplayCount_Dispose() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
completed(600)
|
|
])
|
|
|
|
var ys: ConnectableObservable<Int>! = nil
|
|
var subscription: Disposable! = nil
|
|
var connection: Disposable! = nil
|
|
let res = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = xs.replay(3) }
|
|
scheduler.scheduleAt(450, action: { subscription = ys.subscribe(res) })
|
|
scheduler.scheduleAt(475) { subscription.dispose() }
|
|
|
|
scheduler.scheduleAt(300) { connection = ys.connect() }
|
|
scheduler.scheduleAt(400) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(500) { connection = ys.connect() }
|
|
scheduler.scheduleAt(550) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(650) { connection = ys.connect() }
|
|
scheduler.scheduleAt(800) { connection.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res.events, [
|
|
next(450, 5),
|
|
next(450, 6),
|
|
next(450, 7),
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(300, 400),
|
|
Subscription(500, 550),
|
|
Subscription(650, 800),
|
|
])
|
|
}
|
|
|
|
func testReplayOneCount_Basic() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
error(600, testError)
|
|
])
|
|
|
|
var ys: ConnectableObservable<Int>! = nil
|
|
var subscription: Disposable! = nil
|
|
var connection: Disposable! = nil
|
|
let res = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = xs.replay(1) }
|
|
scheduler.scheduleAt(450, action: { subscription = ys.subscribe(res) })
|
|
scheduler.scheduleAt(Defaults.disposed) { subscription.dispose() }
|
|
|
|
scheduler.scheduleAt(300) { connection = ys.connect() }
|
|
scheduler.scheduleAt(400) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(500) { connection = ys.connect() }
|
|
scheduler.scheduleAt(550) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(650) { connection = ys.connect() }
|
|
scheduler.scheduleAt(800) { connection.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res.events, [
|
|
next(450, 7),
|
|
next(520, 11),
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(300, 400),
|
|
Subscription(500, 550),
|
|
Subscription(650, 800)
|
|
])
|
|
}
|
|
|
|
func testReplayOneCount_Error() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
error(600, testError)
|
|
])
|
|
|
|
var ys: ConnectableObservable<Int>! = nil
|
|
var subscription: Disposable! = nil
|
|
var connection: Disposable! = nil
|
|
let res = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = xs.replay(1) }
|
|
scheduler.scheduleAt(450, action: { subscription = ys.subscribe(res) })
|
|
scheduler.scheduleAt(Defaults.disposed) { subscription.dispose() }
|
|
|
|
scheduler.scheduleAt(300) { connection = ys.connect() }
|
|
scheduler.scheduleAt(400) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(500) { connection = ys.connect() }
|
|
scheduler.scheduleAt(800) { connection.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res.events, [
|
|
next(450, 7),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
error(600, testError),
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(300, 400),
|
|
Subscription(500, 600),
|
|
])
|
|
}
|
|
|
|
func testReplayOneCount_Complete() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
completed(600)
|
|
])
|
|
|
|
var ys: ConnectableObservable<Int>! = nil
|
|
var subscription: Disposable! = nil
|
|
var connection: Disposable! = nil
|
|
let res = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = xs.replay(1) }
|
|
scheduler.scheduleAt(450, action: { subscription = ys.subscribe(res) })
|
|
scheduler.scheduleAt(Defaults.disposed) { subscription.dispose() }
|
|
|
|
scheduler.scheduleAt(300) { connection = ys.connect() }
|
|
scheduler.scheduleAt(400) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(500) { connection = ys.connect() }
|
|
scheduler.scheduleAt(800) { connection.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res.events, [
|
|
next(450, 7),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
completed(600)
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(300, 400),
|
|
Subscription(500, 600),
|
|
])
|
|
}
|
|
|
|
func testReplayOneCount_Dispose() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
completed(600)
|
|
])
|
|
|
|
var ys: ConnectableObservable<Int>! = nil
|
|
var subscription: Disposable! = nil
|
|
var connection: Disposable! = nil
|
|
let res = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = xs.replay(1) }
|
|
scheduler.scheduleAt(450, action: { subscription = ys.subscribe(res) })
|
|
scheduler.scheduleAt(475) { subscription.dispose() }
|
|
|
|
scheduler.scheduleAt(300) { connection = ys.connect() }
|
|
scheduler.scheduleAt(400) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(500) { connection = ys.connect() }
|
|
scheduler.scheduleAt(550) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(650) { connection = ys.connect() }
|
|
scheduler.scheduleAt(800) { connection.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res.events, [
|
|
next(450, 7),
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(300, 400),
|
|
Subscription(500, 550),
|
|
Subscription(650, 800),
|
|
])
|
|
}
|
|
|
|
func testReplayAll_Basic() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
error(600, testError)
|
|
])
|
|
|
|
var ys: ConnectableObservable<Int>! = nil
|
|
var subscription: Disposable! = nil
|
|
var connection: Disposable! = nil
|
|
let res = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = xs.replayAll() }
|
|
scheduler.scheduleAt(450, action: { subscription = ys.subscribe(res) })
|
|
scheduler.scheduleAt(Defaults.disposed) { subscription.dispose() }
|
|
|
|
scheduler.scheduleAt(200) { connection = ys.connect() }
|
|
scheduler.scheduleAt(400) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(500) { connection = ys.connect() }
|
|
scheduler.scheduleAt(550) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(650) { connection = ys.connect() }
|
|
scheduler.scheduleAt(800) { connection.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res.events, [
|
|
next(450, 3),
|
|
next(450, 4),
|
|
next(450, 1),
|
|
next(450, 8),
|
|
next(450, 5),
|
|
next(450, 6),
|
|
next(450, 7),
|
|
next(520, 11),
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(200, 400),
|
|
Subscription(500, 550),
|
|
Subscription(650, 800)
|
|
])
|
|
}
|
|
|
|
|
|
func testReplayAll_Error() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
error(600, testError)
|
|
])
|
|
|
|
var ys: ConnectableObservable<Int>! = nil
|
|
var subscription: Disposable! = nil
|
|
var connection: Disposable! = nil
|
|
let res = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = xs.replayAll() }
|
|
scheduler.scheduleAt(450, action: { subscription = ys.subscribe(res) })
|
|
scheduler.scheduleAt(Defaults.disposed) { subscription.dispose() }
|
|
|
|
scheduler.scheduleAt(300) { connection = ys.connect() }
|
|
scheduler.scheduleAt(400) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(500) { connection = ys.connect() }
|
|
scheduler.scheduleAt(800) { connection.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res.events, [
|
|
next(450, 8),
|
|
next(450, 5),
|
|
next(450, 6),
|
|
next(450, 7),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
error(600, testError),
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(300, 400),
|
|
Subscription(500, 600),
|
|
])
|
|
}
|
|
|
|
func testReplayAll_Complete() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
completed(600)
|
|
])
|
|
|
|
var ys: ConnectableObservable<Int>! = nil
|
|
var subscription: Disposable! = nil
|
|
var connection: Disposable! = nil
|
|
let res = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = xs.replayAll() }
|
|
scheduler.scheduleAt(450, action: { subscription = ys.subscribe(res) })
|
|
scheduler.scheduleAt(Defaults.disposed) { subscription.dispose() }
|
|
|
|
scheduler.scheduleAt(300) { connection = ys.connect() }
|
|
scheduler.scheduleAt(400) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(500) { connection = ys.connect() }
|
|
scheduler.scheduleAt(800) { connection.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res.events, [
|
|
next(450, 8),
|
|
next(450, 5),
|
|
next(450, 6),
|
|
next(450, 7),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
completed(600)
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(300, 400),
|
|
Subscription(500, 600),
|
|
])
|
|
}
|
|
|
|
func testReplayAll_Dispose() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
completed(600)
|
|
])
|
|
|
|
var ys: ConnectableObservable<Int>! = nil
|
|
var subscription: Disposable! = nil
|
|
var connection: Disposable! = nil
|
|
let res = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = xs.replayAll() }
|
|
scheduler.scheduleAt(450, action: { subscription = ys.subscribe(res) })
|
|
scheduler.scheduleAt(475) { subscription.dispose() }
|
|
|
|
scheduler.scheduleAt(250) { connection = ys.connect() }
|
|
scheduler.scheduleAt(400) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(500) { connection = ys.connect() }
|
|
scheduler.scheduleAt(550) { connection.dispose() }
|
|
|
|
scheduler.scheduleAt(650) { connection = ys.connect() }
|
|
scheduler.scheduleAt(800) { connection.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res.events, [
|
|
next(450, 4),
|
|
next(450, 1),
|
|
next(450, 8),
|
|
next(450, 5),
|
|
next(450, 6),
|
|
next(450, 7),
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(250, 400),
|
|
Subscription(500, 550),
|
|
Subscription(650, 800),
|
|
])
|
|
}
|
|
}
|
|
|
|
|
|
// shareReplay(1)
|
|
extension ObservableBindingTest {
|
|
func _testIdenticalBehaviorOfShareReplayOptimizedAndComposed(_ action: (_ transform: ((Observable<Int>) -> Observable<Int>)) -> Void) {
|
|
action { $0.shareReplay(1) }
|
|
action { $0.replay(1).refCount() }
|
|
}
|
|
|
|
func testShareReplay_DeadlockImmediatelly() {
|
|
_testIdenticalBehaviorOfShareReplayOptimizedAndComposed { transform in
|
|
var nEvents = 0
|
|
|
|
let observable = transform(Observable.of(0, 1, 2))
|
|
_ = observable.subscribe(onNext: { n in
|
|
nEvents += 1
|
|
})
|
|
|
|
XCTAssertEqual(nEvents, 3)
|
|
}
|
|
}
|
|
|
|
func testShareReplay_DeadlockEmpty() {
|
|
_testIdenticalBehaviorOfShareReplayOptimizedAndComposed { transform in
|
|
var nEvents = 0
|
|
|
|
let observable = transform(Observable.empty())
|
|
_ = observable.subscribe(onCompleted: { n in
|
|
nEvents += 1
|
|
})
|
|
|
|
XCTAssertEqual(nEvents, 1)
|
|
}
|
|
}
|
|
|
|
func testShareReplay_DeadlockError() {
|
|
_testIdenticalBehaviorOfShareReplayOptimizedAndComposed { transform in
|
|
var nEvents = 0
|
|
|
|
let observable = transform(Observable.error(testError))
|
|
_ = observable.subscribe(onError: { _ in
|
|
nEvents += 1
|
|
})
|
|
|
|
XCTAssertEqual(nEvents, 1)
|
|
}
|
|
}
|
|
|
|
func testShareReplay1_DeadlockErrorAfterN() {
|
|
_testIdenticalBehaviorOfShareReplayOptimizedAndComposed { transform in
|
|
var nEvents = 0
|
|
|
|
let observable = transform([Observable.of(0, 1, 2), Observable.error(testError)].concat())
|
|
_ = observable.subscribe(onError: { n in
|
|
nEvents += 1
|
|
})
|
|
|
|
XCTAssertEqual(nEvents, 1)
|
|
}
|
|
}
|
|
|
|
func testShareReplay1_Basic() {
|
|
_testIdenticalBehaviorOfShareReplayOptimizedAndComposed { transform in
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
error(600, testError)
|
|
])
|
|
|
|
var ys: Observable<Int>! = nil
|
|
|
|
var subscription1: Disposable! = nil
|
|
var subscription2: Disposable! = nil
|
|
|
|
let res1 = scheduler.createObserver(Int.self)
|
|
let res2 = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = transform(xs.asObservable()) }
|
|
|
|
scheduler.scheduleAt(335) { subscription1 = ys.subscribe(res1) }
|
|
scheduler.scheduleAt(400) { subscription1.dispose() }
|
|
|
|
scheduler.scheduleAt(355) { subscription2 = ys.subscribe(res2) }
|
|
scheduler.scheduleAt(415) { subscription2.dispose() }
|
|
|
|
scheduler.scheduleAt(440) { subscription1 = ys.subscribe(res1) }
|
|
scheduler.scheduleAt(455) { subscription1.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res1.events, [
|
|
// 1rt batch
|
|
next(340, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
|
|
// 2nd batch
|
|
next(440, 13),
|
|
next(450, 9)
|
|
])
|
|
|
|
XCTAssertEqual(res2.events, [
|
|
next(355, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13)
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(335, 415),
|
|
Subscription(440, 455)
|
|
])
|
|
}
|
|
}
|
|
|
|
func testShareReplay1_Error() {
|
|
_testIdenticalBehaviorOfShareReplayOptimizedAndComposed { transform in
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
error(365, testError),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
])
|
|
|
|
var ys: Observable<Int>! = nil
|
|
|
|
var subscription1: Disposable! = nil
|
|
var subscription2: Disposable! = nil
|
|
|
|
let res1 = scheduler.createObserver(Int.self)
|
|
let res2 = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = transform(xs.asObservable()) }
|
|
|
|
scheduler.scheduleAt(335) { subscription1 = ys.subscribe(res1) }
|
|
scheduler.scheduleAt(400) { subscription1.dispose() }
|
|
|
|
scheduler.scheduleAt(355) { subscription2 = ys.subscribe(res2) }
|
|
scheduler.scheduleAt(415) { subscription2.dispose() }
|
|
|
|
scheduler.scheduleAt(440) { subscription1 = ys.subscribe(res1) }
|
|
scheduler.scheduleAt(455) { subscription1.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res1.events, [
|
|
// 1rt batch
|
|
next(340, 8),
|
|
next(360, 5),
|
|
error(365, testError),
|
|
|
|
// 2nd batch
|
|
next(440, 5),
|
|
error(440, testError),
|
|
])
|
|
|
|
XCTAssertEqual(res2.events, [
|
|
next(355, 8),
|
|
next(360, 5),
|
|
error(365, testError),
|
|
])
|
|
|
|
// unoptimized version of replay subject will make a subscription and kill it immediatelly
|
|
XCTAssertEqual(xs.subscriptions[0], Subscription(335, 365))
|
|
XCTAssertTrue(xs.subscriptions.count <= 2)
|
|
XCTAssertTrue(xs.subscriptions.count == 1 || xs.subscriptions[1] == Subscription(440, 440))
|
|
}
|
|
}
|
|
|
|
func testShareReplay1_Completed() {
|
|
_testIdenticalBehaviorOfShareReplayOptimizedAndComposed { transform in
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
completed(365),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
])
|
|
|
|
var ys: Observable<Int>! = nil
|
|
|
|
var subscription1: Disposable! = nil
|
|
var subscription2: Disposable! = nil
|
|
|
|
let res1 = scheduler.createObserver(Int.self)
|
|
let res2 = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = transform(xs.asObservable()) }
|
|
|
|
scheduler.scheduleAt(335) { subscription1 = ys.subscribe(res1) }
|
|
scheduler.scheduleAt(400) { subscription1.dispose() }
|
|
|
|
scheduler.scheduleAt(355) { subscription2 = ys.subscribe(res2) }
|
|
scheduler.scheduleAt(415) { subscription2.dispose() }
|
|
|
|
scheduler.scheduleAt(440) { subscription1 = ys.subscribe(res1) }
|
|
scheduler.scheduleAt(455) { subscription1.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res1.events, [
|
|
// 1rt batch
|
|
next(340, 8),
|
|
next(360, 5),
|
|
completed(365),
|
|
|
|
// 2nd batch
|
|
next(440, 5),
|
|
completed(440)
|
|
])
|
|
|
|
XCTAssertEqual(res2.events, [
|
|
next(355, 8),
|
|
next(360, 5),
|
|
completed(365)
|
|
])
|
|
|
|
// unoptimized version of replay subject will make a subscription and kill it immediatelly
|
|
XCTAssertEqual(xs.subscriptions[0], Subscription(335, 365))
|
|
XCTAssertTrue(xs.subscriptions.count <= 2)
|
|
XCTAssertTrue(xs.subscriptions.count == 1 || xs.subscriptions[1] == Subscription(440, 440))
|
|
}
|
|
}
|
|
|
|
func testShareReplay1_Canceled() {
|
|
_testIdenticalBehaviorOfShareReplayOptimizedAndComposed { transform in
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
completed(365),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
])
|
|
|
|
var ys: Observable<Int>! = nil
|
|
|
|
var subscription1: Disposable! = nil
|
|
var subscription2: Disposable! = nil
|
|
|
|
let res1 = scheduler.createObserver(Int.self)
|
|
let res2 = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = transform(xs.asObservable()) }
|
|
|
|
scheduler.scheduleAt(335) { subscription1 = ys.subscribe(res1) }
|
|
scheduler.scheduleAt(400) { subscription1.dispose() }
|
|
|
|
scheduler.scheduleAt(355) { subscription2 = ys.subscribe(res2) }
|
|
scheduler.scheduleAt(415) { subscription2.dispose() }
|
|
|
|
scheduler.scheduleAt(440) { subscription1 = ys.subscribe(res1) }
|
|
scheduler.scheduleAt(455) { subscription1.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res1.events, [
|
|
// 1rt batch
|
|
completed(365),
|
|
|
|
// 2nd batch
|
|
completed(440)
|
|
])
|
|
|
|
XCTAssertEqual(res2.events, [
|
|
completed(365)
|
|
])
|
|
|
|
// unoptimized version of replay subject will make a subscription and kill it immediatelly
|
|
XCTAssertEqual(xs.subscriptions[0], Subscription(335, 365))
|
|
XCTAssertTrue(xs.subscriptions.count <= 2)
|
|
XCTAssertTrue(xs.subscriptions.count == 1 || xs.subscriptions[1] == Subscription(440, 440))
|
|
}
|
|
}
|
|
}
|
|
|
|
// shareReplay(1)
|
|
extension ObservableBindingTest {
|
|
func testShareReplayLatestWhileConnected_DeadlockImmediatelly() {
|
|
var nEvents = 0
|
|
|
|
let observable = Observable.of(0, 1, 2).shareReplayLatestWhileConnected()
|
|
_ = observable.subscribe(onNext: { n in
|
|
nEvents += 1
|
|
})
|
|
|
|
XCTAssertEqual(nEvents, 3)
|
|
}
|
|
|
|
func testShareReplayLatestWhileConnected_DeadlockEmpty() {
|
|
var nEvents = 0
|
|
|
|
let observable = Observable<Int>.empty().shareReplayLatestWhileConnected()
|
|
_ = observable.subscribe(onCompleted: { n in
|
|
nEvents += 1
|
|
})
|
|
|
|
XCTAssertEqual(nEvents, 1)
|
|
}
|
|
|
|
func testShareReplayLatestWhileConnected_DeadlockError() {
|
|
var nEvents = 0
|
|
|
|
let observable = Observable<Int>.error(testError).shareReplayLatestWhileConnected()
|
|
_ = observable.subscribe(onError: { _ in
|
|
nEvents += 1
|
|
})
|
|
|
|
XCTAssertEqual(nEvents, 1)
|
|
}
|
|
|
|
func testShareReplayLatestWhileConnected_DeadlockErrorAfterN() {
|
|
var nEvents = 0
|
|
|
|
let observable = [Observable.of(0, 1, 2), Observable.error(testError)].concat().shareReplayLatestWhileConnected()
|
|
_ = observable.subscribe(onError: { n in
|
|
nEvents += 1
|
|
})
|
|
|
|
XCTAssertEqual(nEvents, 1)
|
|
}
|
|
|
|
func testShareReplayLatestWhileConnected_Basic() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
error(600, testError)
|
|
])
|
|
|
|
var ys: Observable<Int>! = nil
|
|
|
|
var subscription1: Disposable! = nil
|
|
var subscription2: Disposable! = nil
|
|
|
|
let res1 = scheduler.createObserver(Int.self)
|
|
let res2 = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = xs.shareReplayLatestWhileConnected() }
|
|
|
|
scheduler.scheduleAt(335) { subscription1 = ys.subscribe(res1) }
|
|
scheduler.scheduleAt(400) { subscription1.dispose() }
|
|
|
|
scheduler.scheduleAt(355) { subscription2 = ys.subscribe(res2) }
|
|
scheduler.scheduleAt(415) { subscription2.dispose() }
|
|
|
|
scheduler.scheduleAt(440) { subscription1 = ys.subscribe(res1) }
|
|
scheduler.scheduleAt(455) { subscription1.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res1.events, [
|
|
// 1rt batch
|
|
next(340, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
|
|
// 2nd batch
|
|
next(450, 9)
|
|
])
|
|
|
|
XCTAssertEqual(res2.events, [
|
|
next(355, 8),
|
|
next(360, 5),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13)
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(335, 415),
|
|
Subscription(440, 455)
|
|
])
|
|
}
|
|
|
|
func testShareReplayLatestWhileConnected_Error() {
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
error(365, testError),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
])
|
|
|
|
var ys: Observable<Int>! = nil
|
|
|
|
var subscription1: Disposable! = nil
|
|
var subscription2: Disposable! = nil
|
|
|
|
let res1 = scheduler.createObserver(Int.self)
|
|
let res2 = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = xs.shareReplayLatestWhileConnected() }
|
|
|
|
scheduler.scheduleAt(335) { subscription1 = ys.subscribe(res1) }
|
|
scheduler.scheduleAt(400) { subscription1.dispose() }
|
|
|
|
scheduler.scheduleAt(355) { subscription2 = ys.subscribe(res2) }
|
|
scheduler.scheduleAt(415) { subscription2.dispose() }
|
|
|
|
scheduler.scheduleAt(440) { subscription1 = ys.subscribe(res1) }
|
|
scheduler.scheduleAt(455) { subscription1.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res1.events, [
|
|
// 1rt batch
|
|
next(340, 8),
|
|
next(360, 5),
|
|
error(365, testError),
|
|
|
|
// 2nd batch
|
|
next(450, 9),
|
|
])
|
|
|
|
XCTAssertEqual(res2.events, [
|
|
next(355, 8),
|
|
next(360, 5),
|
|
error(365, testError),
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(335, 365),
|
|
Subscription(440, 455)
|
|
])
|
|
}
|
|
|
|
func testShareReplayLatestWhileConnected_Completed() {
|
|
_testIdenticalBehaviorOfShareReplayOptimizedAndComposed { transform in
|
|
let scheduler = TestScheduler(initialClock: 0)
|
|
|
|
let xs = scheduler.createHotObservable([
|
|
next(110, 7),
|
|
next(220, 3),
|
|
next(280, 4),
|
|
next(290, 1),
|
|
next(340, 8),
|
|
next(360, 5),
|
|
completed(365),
|
|
next(370, 6),
|
|
next(390, 7),
|
|
next(410, 13),
|
|
next(430, 2),
|
|
next(450, 9),
|
|
next(520, 11),
|
|
next(560, 20),
|
|
])
|
|
|
|
var ys: Observable<Int>! = nil
|
|
|
|
var subscription1: Disposable! = nil
|
|
var subscription2: Disposable! = nil
|
|
|
|
let res1 = scheduler.createObserver(Int.self)
|
|
let res2 = scheduler.createObserver(Int.self)
|
|
|
|
scheduler.scheduleAt(Defaults.created) { ys = xs.shareReplayLatestWhileConnected() }
|
|
|
|
scheduler.scheduleAt(335) { subscription1 = ys.subscribe(res1) }
|
|
scheduler.scheduleAt(400) { subscription1.dispose() }
|
|
|
|
scheduler.scheduleAt(355) { subscription2 = ys.subscribe(res2) }
|
|
scheduler.scheduleAt(415) { subscription2.dispose() }
|
|
|
|
scheduler.scheduleAt(440) { subscription1 = ys.subscribe(res1) }
|
|
scheduler.scheduleAt(455) { subscription1.dispose() }
|
|
|
|
scheduler.start()
|
|
|
|
XCTAssertEqual(res1.events, [
|
|
// 1rt batch
|
|
next(340, 8),
|
|
next(360, 5),
|
|
completed(365),
|
|
|
|
// 2nd batch
|
|
next(450, 9),
|
|
])
|
|
|
|
XCTAssertEqual(res2.events, [
|
|
next(355, 8),
|
|
next(360, 5),
|
|
completed(365)
|
|
])
|
|
|
|
XCTAssertEqual(xs.subscriptions, [
|
|
Subscription(335, 365),
|
|
Subscription(440, 455)
|
|
])
|
|
}
|
|
}
|
|
}
|