Port of retryWhen tests from RxJS

This commit is contained in:
Junior B 2015-10-25 10:45:08 +01:00
parent 4e5f86f00c
commit 662d4dc549
2 changed files with 297 additions and 31 deletions

View File

@ -23,6 +23,8 @@ class RetryTriggerSink<S: SequenceType, O: ObserverType, RetryTriggerType, Error
switch event {
case .Next:
parent.lock.performLocked() {
parent.handlerSubscription.dispose()
parent.lastError = nil
parent.scheduleMoveNext()
}
case .Error(_):
@ -47,33 +49,33 @@ class RetryWhenSequenceSink<S: SequenceType, O: ObserverType, RetryTriggerType,
let parent: Parent
var lastError: ErrorType?
var errorSubject: BehaviorSubject<Error>?
let handlerSubscription = SingleAssignmentDisposable()
let errorSubject = PublishSubject<Error>()
let handler: Observable<RetryTriggerType>
let handlerSubscription = SerialDisposable()
init(parent: Parent, observer: O, cancel: Disposable) {
self.parent = parent
self.handler = parent.notificationHandler(errorSubject.asObservable())
super.init(observer: observer, cancel: cancel)
}
override func on(event: Event<Element>) {
override func on(event: Event<E>) {
guard lastError == nil else {
return
}
switch event {
case .Next:
observer?.on(event)
case .Error(let error):
lock.performLocked() {
if errorSubject == nil {
errorSubject = BehaviorSubject(value: error as! Error)
let notifier = parent.notificationHandler(errorSubject!.asObservable())
handlerSubscription.disposable = notifier.subscribeSafe(RetryTriggerSink(parent: self))
} else {
errorSubject?.on(.Next(error as! Error))
}
self.lastError = error
handlerSubscription.disposable = handler.subscribeSafe(RetryTriggerSink(parent: self))
errorSubject.on(.Next(error as! Error))
}
case .Completed:
observer?.on(event)
handlerSubscription.dispose()
dispose()
}
}
@ -81,6 +83,7 @@ class RetryWhenSequenceSink<S: SequenceType, O: ObserverType, RetryTriggerType,
override func done() {
if let lastError = self.lastError {
observer?.on(.Error(lastError))
self.lastError = nil
}
else {
observer?.on(.Completed)
@ -89,11 +92,11 @@ class RetryWhenSequenceSink<S: SequenceType, O: ObserverType, RetryTriggerType,
}
override func dispose() {
handlerSubscription.dispose()
super.dispose()
}
override func extract(observable: Observable<Element>) -> S.Generator? {
override func extract(observable: Observable<E>) -> S.Generator? {
if let onError = observable as? RetryWhenSequence<S, RetryTriggerType, Error> {
return onError.sources.generate()
}
@ -101,6 +104,14 @@ class RetryWhenSequenceSink<S: SequenceType, O: ObserverType, RetryTriggerType,
return nil
}
}
override func run(sources: S.Generator) -> Disposable {
_generators.append(sources)
scheduleMoveNext()
return handlerSubscription
}
}
class RetryWhenSequence<S: SequenceType, RetryTriggerType, Error: ErrorType where S.Generator.Element : ObservableType> : Producer<S.Generator.Element.E> {

View File

@ -767,41 +767,295 @@ extension ObservableSingleTest {
])
}
func testRetryWhen_Basic() {
func testRetryWhen_Never() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createColdObservable([
next(5, 1),
next(10, 2),
error(20, testError)
let xs = scheduler.createHotObservable([
next(150, 1),
completed(250)
])
let empty = scheduler.createHotObservable([
next(150, 1),
completed(210)
])
let res = scheduler.start(300) {
xs.retryWhen({ (errors: Observable<NSError>) in
return errors.delaySubscription(30, scheduler)
}).take(6)
return empty.asObservable()
})
}
let correct: [Recorded<Int>] = [
next(205, 1),
next(210, 2),
next(255, 1),
next(260, 2),
next(275, 1),
next(280, 2),
completed(280)
completed(250)
]
XCTAssertEqual(res.messages, correct)
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 250),
Subscription(250, 270),
Subscription(270, 280)
Subscription(200, 250)
])
}
func testRetryWhen_ObservableNever() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 1),
next(210, 2),
next(220, 3),
next(230, 4),
next(240, 5),
error(250, testError)
])
let never = scheduler.createHotObservable([
next(150, 1)
])
let res = scheduler.start() {
xs.retryWhen({ (errors: Observable<NSError>) in
return never.asObservable()
})
}
let correct: [Recorded<Int>] = [
next(210, 2),
next(220, 3),
next(230, 4),
next(240, 5)
]
XCTAssertEqual(res.messages, correct)
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 1000)
])
}
func testRetryWhen_ObservableNeverComplete() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 1),
next(210, 2),
next(220, 3),
next(230, 4),
next(240, 5),
completed(250)
])
let never = scheduler.createHotObservable([
next(150, 1)
])
let res = scheduler.start() {
xs.retryWhen({ (errors: Observable<NSError>) in
return never.asObservable()
})
}
let correct: [Recorded<Int>] = [
next(210, 2),
next(220, 3),
next(230, 4),
next(240, 5),
completed(250)
]
XCTAssertEqual(res.messages, correct)
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 250)
])
}
func testRetryWhen_ObservableEmpty() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createColdObservable([
next(100, 1),
next(150, 2),
next(200, 3),
completed(250)
])
let empty = scheduler.createHotObservable([
next(150, 0),
completed(0)
])
let res = scheduler.start() {
xs.retryWhen({ (errors: Observable<NSError>) in
return empty.asObservable()
})
}
let correct: [Recorded<Int>] = [
next(300, 1),
next(350, 2),
next(400, 3),
completed(450)
]
XCTAssertEqual(res.messages, correct)
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 450)
])
}
func testRetryWhen_ObservableNextError() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createColdObservable([
next(10, 1),
next(20, 2),
error(30, testError),
completed(40)
])
let res = scheduler.start(300) {
xs.retryWhen({ (errors: Observable<NSError>) in
return errors.scan(0) { (var a, e) in
if ++a == 2 {
throw testError
}
return a
}
})
}
let correct: [Recorded<Int>] = [
next(210, 1),
next(220, 2),
next(240, 1),
next(250, 2),
error(260, testError)
]
XCTAssertEqual(res.messages, correct)
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 230),
Subscription(230, 260)
])
}
func testRetryWhen_ObservableComplete() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createColdObservable([
next(10, 1),
next(20, 2),
error(30, testError),
completed(40)
])
let empty = scheduler.createHotObservable([
next(150, 1),
completed(230)
])
let res = scheduler.start() {
xs.retryWhen({ (errors: Observable<NSError>) in
return empty.asObservable()
})
}
let correct: [Recorded<Int>] = [
next(210, 1),
next(220, 2),
completed(230)
]
XCTAssertEqual(res.messages, correct)
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 230)
])
}
func testRetryWhen_ObservableNextComplete() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createColdObservable([
next(10, 1),
next(20, 2),
error(30, testError),
completed(40)
])
let res = scheduler.start(300) {
xs.retryWhen({ (errors: Observable<NSError>) in
return errors.scan(0) { (a, e) in
return a + 1
}.takeWhile { (num: Int) -> Bool in
return num < 2
}
})
}
let correct: [Recorded<Int>] = [
next(210, 1),
next(220, 2),
next(240, 1),
next(250, 2),
completed(260)
]
XCTAssertEqual(res.messages, correct)
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 230),
Subscription(230, 260)
])
}
func testRetryWhen_ObservableInfinite() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createColdObservable([
next(10, 1),
next(20, 2),
error(30, testError),
completed(40)
])
let never = scheduler.createHotObservable([
next(150, 1)
])
let res = scheduler.start() {
xs.retryWhen({ (errors: Observable<NSError>) in
return never.asObservable()
})
}
let correct: [Recorded<Int>] = [
next(210, 1),
next(220, 2)
]
XCTAssertEqual(res.messages, correct)
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 1000)
])
}
/*
func testRetryWhen_Incremental_BackOff() {
let scheduler = TestScheduler(initialClock: 0)
@ -840,6 +1094,7 @@ extension ObservableSingleTest {
Subscription(510, 710)
])
}
*/
}