diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index 5065dec5..e7ede3e6 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -523,6 +523,10 @@ C8F0C0441BBBFBB9001B112F /* _RXSwizzling.h in Headers */ = {isa = PBXBuildFile; fileRef = C8093E881B8A732E0088E94D /* _RXSwizzling.h */; settings = {ATTRIBUTES = (Public, ); }; }; C8F0C0451BBBFBB9001B112F /* _RXKVOObserver.h in Headers */ = {isa = PBXBuildFile; fileRef = C8093E861B8A732E0088E94D /* _RXKVOObserver.h */; settings = {ATTRIBUTES = (Public, ); }; }; C8F0C04F1BBBFBCE001B112F /* Observable+Blocking.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093F581B8A73A20088E94D /* Observable+Blocking.swift */; }; + CB255BD71BC46A9C00798A4C /* RetryWhen.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB255BD61BC46A9C00798A4C /* RetryWhen.swift */; settings = {ASSET_TAGS = (); }; }; + CB255BD81BC46A9C00798A4C /* RetryWhen.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB255BD61BC46A9C00798A4C /* RetryWhen.swift */; settings = {ASSET_TAGS = (); }; }; + CB255BD91BC46A9C00798A4C /* RetryWhen.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB255BD61BC46A9C00798A4C /* RetryWhen.swift */; settings = {ASSET_TAGS = (); }; }; + CB255BDA1BC46A9C00798A4C /* RetryWhen.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB255BD61BC46A9C00798A4C /* RetryWhen.swift */; settings = {ASSET_TAGS = (); }; }; D203C4F31BB9C4CA00D02D00 /* RxCollectionViewReactiveArrayDataSource.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F11B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift */; }; D203C4F41BB9C52400D02D00 /* RxTableViewReactiveArrayDataSource.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F21B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift */; }; D203C4F51BB9C52900D02D00 /* ItemEvents.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F41B8A752B00B02D69 /* ItemEvents.swift */; }; @@ -964,6 +968,7 @@ C8F0C0021BBBFB8B001B112F /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; }; C8F0C04B1BBBFBB9001B112F /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; }; C8F0C0581BBBFBCE001B112F /* RxBlocking.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; }; + CB255BD61BC46A9C00798A4C /* RetryWhen.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RetryWhen.swift; sourceTree = ""; }; D2138C751BB9BE9800339B5C /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; }; D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipWhile.swift; sourceTree = ""; }; D235B23D1BD003DD007E84DA /* Using.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Using.swift; sourceTree = ""; }; @@ -1184,6 +1189,7 @@ C8093C841B8A72BE0088E94D /* Reduce.swift */, C8093C851B8A72BE0088E94D /* RefCount.swift */, C8640A021BA5B12A00D3C4E8 /* Repeat.swift */, + CB255BD61BC46A9C00798A4C /* RetryWhen.swift */, C8093C861B8A72BE0088E94D /* Sample.swift */, C8093C871B8A72BE0088E94D /* Scan.swift */, C89CDB351BCB0DD7002063D9 /* ShareReplay1.swift */, @@ -2087,6 +2093,7 @@ C8093D4E1B8A72BE0088E94D /* Zip+arity.swift in Sources */, C8093D4C1B8A72BE0088E94D /* Timer.swift in Sources */, C8C3DA071B9393AC004D233E /* Empty.swift in Sources */, + CB255BD81BC46A9C00798A4C /* RetryWhen.swift in Sources */, C8093D881B8A72BE0088E94D /* RxBox.swift in Sources */, C8093D3A1B8A72BE0088E94D /* Sink.swift in Sources */, C8093D461B8A72BE0088E94D /* TakeUntil.swift in Sources */, @@ -2206,6 +2213,7 @@ C8093D4D1B8A72BE0088E94D /* Zip+arity.swift in Sources */, C8093D4B1B8A72BE0088E94D /* Timer.swift in Sources */, C8C3DA061B9393AC004D233E /* Empty.swift in Sources */, + CB255BD71BC46A9C00798A4C /* RetryWhen.swift in Sources */, C8093D871B8A72BE0088E94D /* RxBox.swift in Sources */, C8093D391B8A72BE0088E94D /* Sink.swift in Sources */, C8093D451B8A72BE0088E94D /* TakeUntil.swift in Sources */, @@ -2325,6 +2333,7 @@ C8F0BF9D1BBBFB8B001B112F /* Zip+arity.swift in Sources */, C8F0BF9E1BBBFB8B001B112F /* Timer.swift in Sources */, C8F0BF9F1BBBFB8B001B112F /* Empty.swift in Sources */, + CB255BDA1BC46A9C00798A4C /* RetryWhen.swift in Sources */, C8F0BFA01BBBFB8B001B112F /* RxBox.swift in Sources */, C8F0BFA11BBBFB8B001B112F /* Sink.swift in Sources */, C8F0BFA21BBBFB8B001B112F /* TakeUntil.swift in Sources */, @@ -2592,6 +2601,7 @@ D2EBEAF81BB9B6B2003A27DC /* ScopedDisposable.swift in Sources */, D2EBEAEA1BB9B697003A27DC /* SchedulerType.swift in Sources */, D2EBEB031BB9B6C1003A27DC /* CombineLatest+CollectionType.swift in Sources */, + CB255BD91BC46A9C00798A4C /* RetryWhen.swift in Sources */, D2EBEADC1BB9B697003A27DC /* Cancelable.swift in Sources */, D2EBEAE41BB9B697003A27DC /* ObservableType.swift in Sources */, D2EBEB331BB9B6CA003A27DC /* Observable+Time.swift in Sources */, diff --git a/RxSwift/Observables/Implementations/RetryWhen.swift b/RxSwift/Observables/Implementations/RetryWhen.swift new file mode 100644 index 00000000..5ee185b8 --- /dev/null +++ b/RxSwift/Observables/Implementations/RetryWhen.swift @@ -0,0 +1,122 @@ +// +// RetryWhen.swift +// Rx +// +// Created by Junior B. on 06/10/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +class RetryTriggerSink : ObserverType { + typealias E = RetryTriggerType + + typealias Parent = RetryWhenSequenceSink + + let parent: Parent + + init(parent: Parent) { + self.parent = parent + } + + func on(event: Event) { + switch event { + case .Next: + parent.lock.performLocked() { + parent.scheduleMoveNext() + } + case .Error(_): + parent.lock.performLocked() { + parent.done() + } + case .Completed: + parent.lock.performLocked() { + parent.lastError = nil + parent.done() + } + } + } +} + +class RetryWhenSequenceSink : TailRecursiveSink { + typealias Element = O.E + typealias Parent = RetryWhenSequence + + let lock = NSRecursiveLock() + + let parent: Parent + + var lastError: ErrorType? + var errorSubject: BehaviorSubject? + + let handlerSubscription = SingleAssignmentDisposable() + + init(parent: Parent, observer: O, cancel: Disposable) { + self.parent = parent + super.init(observer: observer, cancel: cancel) + } + + override func on(event: Event) { + switch event { + case .Next: + observer?.on(event) + case .Error(let error): + lock.performLocked() { + if errorSubject == nil { + errorSubject = BehaviorSubject(value: error as! Error) + let notifier = parent.notificationHandler(errorSubject!.asObservable()) + handlerSubscription.disposable = notifier.subscribeSafe(RetryTriggerSink(parent: self)) + } else { + errorSubject?.on(.Next(error as! Error)) + } + + self.lastError = error + } + case .Completed: + observer?.on(event) + dispose() + } + } + + override func done() { + if let lastError = self.lastError { + observer?.on(.Error(lastError)) + } + else { + observer?.on(.Completed) + } + self.dispose() + } + + override func dispose() { + handlerSubscription.dispose() + super.dispose() + } + + override func extract(observable: Observable) -> S.Generator? { + if let onError = observable as? RetryWhenSequence { + return onError.sources.generate() + } + else { + return nil + } + } +} + +class RetryWhenSequence : Producer { + typealias Element = S.Generator.Element.E + + let sources: S + let notificationHandler: Observable -> Observable + + init(sources: S, notificationHandler: Observable -> Observable) { + self.sources = sources + self.notificationHandler = notificationHandler + } + + override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + let sink = RetryWhenSequenceSink(parent: self, observer: observer, cancel: cancel) + setSink(sink) + return sink.run(self.sources.generate()) + } +} \ No newline at end of file diff --git a/RxSwift/Observables/Observable+Single.swift b/RxSwift/Observables/Observable+Single.swift index cfe98678..f7c88fc5 100644 --- a/RxSwift/Observables/Observable+Single.swift +++ b/RxSwift/Observables/Observable+Single.swift @@ -140,6 +140,18 @@ extension ObservableType { -> Observable { return CatchSequence(sources: Repeat(count: maxAttemptCount, repeatedValue: self.asObservable())) } + + /** + Repeats the source observable sequence on error when the notifier emits a next value. + If the source observable errors and the notifier completes, it will complete the source sequence. + + - parameter notificationHandler: A handler that is passed an observable sequence of errors raised by the source observable and returns and observable that either continues, completes or errors. This behavior is then applied to the source observable. + - returns: An observable sequence producing the elements of the given sequence repeatedly until it terminates successfully or is notified to error or complete. + */ + public func retryWhen(notificationHandler: Observable -> Observable) + -> Observable { + return RetryWhenSequence(sources: InfiniteSequence(repeatedValue: self.asObservable()), notificationHandler: notificationHandler) + } } // scan diff --git a/RxTests/RxSwiftTests/Tests/Observable+SingleTest.swift b/RxTests/RxSwiftTests/Tests/Observable+SingleTest.swift index ba7d391a..f532cc13 100644 --- a/RxTests/RxSwiftTests/Tests/Observable+SingleTest.swift +++ b/RxTests/RxSwiftTests/Tests/Observable+SingleTest.swift @@ -766,8 +766,85 @@ extension ObservableSingleTest { Subscription(200, 450), ]) } + + func testRetryWhen_Basic() { + + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createColdObservable([ + next(5, 1), + next(10, 2), + error(20, testError) + ]) + + let res = scheduler.start(300) { + xs.retryWhen({ (errors: Observable) in + return errors.delaySubscription(30, scheduler) + }).take(6) + } + + let correct: [Recorded] = [ + next(205, 1), + next(210, 2), + next(255, 1), + next(260, 2), + next(275, 1), + next(280, 2), + completed(280) + ] + + XCTAssertEqual(res.messages, correct) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 250), + Subscription(250, 270), + Subscription(270, 280) + ]) + } + + func testRetryWhen_Incremental_BackOff() { + + let scheduler = TestScheduler(initialClock: 0) + + // just fails + let xs = scheduler.createColdObservable([ + next(5, 1), + error(10, testError) + ]) + + let res = scheduler.start(800) { + xs.retryWhen({ (errors: Observable) in + return scheduler.createColdObservable([ + next(50, 1), // delay 50 + next(150, 2), // delay 100 + next(300, 3), // delay 150 + completed(500) // delay 200 + ]) + }) + } + + let correct: [Recorded] = [ + next(205, 1), + next(265, 1), + next(365, 1), + next(515, 1), + completed(710) + ] + + XCTAssertEqual(res.messages, correct) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 260), + Subscription(260, 360), + Subscription(360, 510), + Subscription(510, 710) + ]) + } + } + + // scan extension ObservableSingleTest {