From 662d4dc549429a496ec3b2e4e685bd2b73be1de4 Mon Sep 17 00:00:00 2001 From: Junior B Date: Sun, 25 Oct 2015 10:45:08 +0100 Subject: [PATCH] Port of retryWhen tests from RxJS --- .../Implementations/RetryWhen.swift | 39 ++- .../Tests/Observable+SingleTest.swift | 289 ++++++++++++++++-- 2 files changed, 297 insertions(+), 31 deletions(-) diff --git a/RxSwift/Observables/Implementations/RetryWhen.swift b/RxSwift/Observables/Implementations/RetryWhen.swift index 5ee185b8..c32f1a93 100644 --- a/RxSwift/Observables/Implementations/RetryWhen.swift +++ b/RxSwift/Observables/Implementations/RetryWhen.swift @@ -23,6 +23,8 @@ class RetryTriggerSink? - - let handlerSubscription = SingleAssignmentDisposable() + let errorSubject = PublishSubject() + let handler: Observable + let handlerSubscription = SerialDisposable() init(parent: Parent, observer: O, cancel: Disposable) { self.parent = parent + self.handler = parent.notificationHandler(errorSubject.asObservable()) super.init(observer: observer, cancel: cancel) } - override func on(event: Event) { + override func on(event: Event) { + guard lastError == nil else { + return + } + switch event { case .Next: observer?.on(event) case .Error(let error): lock.performLocked() { - if errorSubject == nil { - errorSubject = BehaviorSubject(value: error as! Error) - let notifier = parent.notificationHandler(errorSubject!.asObservable()) - handlerSubscription.disposable = notifier.subscribeSafe(RetryTriggerSink(parent: self)) - } else { - errorSubject?.on(.Next(error as! Error)) - } - self.lastError = error + handlerSubscription.disposable = handler.subscribeSafe(RetryTriggerSink(parent: self)) + errorSubject.on(.Next(error as! Error)) } case .Completed: observer?.on(event) + handlerSubscription.dispose() dispose() } } @@ -81,6 +83,7 @@ class RetryWhenSequenceSink) -> S.Generator? { + override func extract(observable: Observable) -> S.Generator? { if let onError = observable as? RetryWhenSequence { return onError.sources.generate() } @@ -101,6 +104,14 @@ class RetryWhenSequenceSink Disposable { + _generators.append(sources) + + scheduleMoveNext() + + return handlerSubscription + } } class RetryWhenSequence : Producer { diff --git a/RxTests/RxSwiftTests/Tests/Observable+SingleTest.swift b/RxTests/RxSwiftTests/Tests/Observable+SingleTest.swift index f532cc13..a9c73511 100644 --- a/RxTests/RxSwiftTests/Tests/Observable+SingleTest.swift +++ b/RxTests/RxSwiftTests/Tests/Observable+SingleTest.swift @@ -767,41 +767,295 @@ extension ObservableSingleTest { ]) } - func testRetryWhen_Basic() { + + func testRetryWhen_Never() { let scheduler = TestScheduler(initialClock: 0) - let xs = scheduler.createColdObservable([ - next(5, 1), - next(10, 2), - error(20, testError) + let xs = scheduler.createHotObservable([ + next(150, 1), + completed(250) + ]) + + let empty = scheduler.createHotObservable([ + next(150, 1), + completed(210) ]) let res = scheduler.start(300) { xs.retryWhen({ (errors: Observable) in - return errors.delaySubscription(30, scheduler) - }).take(6) + return empty.asObservable() + }) } let correct: [Recorded] = [ - next(205, 1), - next(210, 2), - next(255, 1), - next(260, 2), - next(275, 1), - next(280, 2), - completed(280) + completed(250) ] XCTAssertEqual(res.messages, correct) XCTAssertEqual(xs.subscriptions, [ - Subscription(200, 250), - Subscription(250, 270), - Subscription(270, 280) + Subscription(200, 250) ]) } + func testRetryWhen_ObservableNever() { + + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 1), + next(210, 2), + next(220, 3), + next(230, 4), + next(240, 5), + error(250, testError) + ]) + + let never = scheduler.createHotObservable([ + next(150, 1) + ]) + + let res = scheduler.start() { + xs.retryWhen({ (errors: Observable) in + return never.asObservable() + }) + } + + let correct: [Recorded] = [ + next(210, 2), + next(220, 3), + next(230, 4), + next(240, 5) + ] + + XCTAssertEqual(res.messages, correct) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 1000) + ]) + } + + func testRetryWhen_ObservableNeverComplete() { + + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 1), + next(210, 2), + next(220, 3), + next(230, 4), + next(240, 5), + completed(250) + ]) + + let never = scheduler.createHotObservable([ + next(150, 1) + ]) + + let res = scheduler.start() { + xs.retryWhen({ (errors: Observable) in + return never.asObservable() + }) + } + + let correct: [Recorded] = [ + next(210, 2), + next(220, 3), + next(230, 4), + next(240, 5), + completed(250) + ] + + XCTAssertEqual(res.messages, correct) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 250) + ]) + } + + func testRetryWhen_ObservableEmpty() { + + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createColdObservable([ + next(100, 1), + next(150, 2), + next(200, 3), + completed(250) + ]) + + let empty = scheduler.createHotObservable([ + next(150, 0), + completed(0) + ]) + + let res = scheduler.start() { + xs.retryWhen({ (errors: Observable) in + return empty.asObservable() + }) + } + + let correct: [Recorded] = [ + next(300, 1), + next(350, 2), + next(400, 3), + completed(450) + ] + + XCTAssertEqual(res.messages, correct) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 450) + ]) + } + + + func testRetryWhen_ObservableNextError() { + + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createColdObservable([ + next(10, 1), + next(20, 2), + error(30, testError), + completed(40) + ]) + + let res = scheduler.start(300) { + xs.retryWhen({ (errors: Observable) in + return errors.scan(0) { (var a, e) in + if ++a == 2 { + throw testError + } + return a + } + }) + } + + let correct: [Recorded] = [ + next(210, 1), + next(220, 2), + next(240, 1), + next(250, 2), + error(260, testError) + ] + + XCTAssertEqual(res.messages, correct) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 230), + Subscription(230, 260) + ]) + } + + + func testRetryWhen_ObservableComplete() { + + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createColdObservable([ + next(10, 1), + next(20, 2), + error(30, testError), + completed(40) + ]) + + let empty = scheduler.createHotObservable([ + next(150, 1), + completed(230) + ]) + + let res = scheduler.start() { + xs.retryWhen({ (errors: Observable) in + return empty.asObservable() + }) + } + + let correct: [Recorded] = [ + next(210, 1), + next(220, 2), + completed(230) + ] + + XCTAssertEqual(res.messages, correct) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 230) + ]) + } + + func testRetryWhen_ObservableNextComplete() { + + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createColdObservable([ + next(10, 1), + next(20, 2), + error(30, testError), + completed(40) + ]) + + let res = scheduler.start(300) { + xs.retryWhen({ (errors: Observable) in + return errors.scan(0) { (a, e) in + return a + 1 + }.takeWhile { (num: Int) -> Bool in + return num < 2 + } + }) + } + + let correct: [Recorded] = [ + next(210, 1), + next(220, 2), + next(240, 1), + next(250, 2), + completed(260) + ] + + XCTAssertEqual(res.messages, correct) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 230), + Subscription(230, 260) + ]) + } + + func testRetryWhen_ObservableInfinite() { + + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createColdObservable([ + next(10, 1), + next(20, 2), + error(30, testError), + completed(40) + ]) + + let never = scheduler.createHotObservable([ + next(150, 1) + ]) + + let res = scheduler.start() { + xs.retryWhen({ (errors: Observable) in + return never.asObservable() + }) + } + + let correct: [Recorded] = [ + next(210, 1), + next(220, 2) + ] + + XCTAssertEqual(res.messages, correct) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 1000) + ]) + } + + /* func testRetryWhen_Incremental_BackOff() { let scheduler = TestScheduler(initialClock: 0) @@ -840,6 +1094,7 @@ extension ObservableSingleTest { Subscription(510, 710) ]) } + */ }