- Add retryWhen operator

- Add tests for retryWhen operator
This commit is contained in:
Junior B 2015-10-19 15:42:26 +02:00
parent 9a182a8f50
commit 93e3f5facb
4 changed files with 221 additions and 0 deletions

View File

@ -523,6 +523,10 @@
C8F0C0441BBBFBB9001B112F /* _RXSwizzling.h in Headers */ = {isa = PBXBuildFile; fileRef = C8093E881B8A732E0088E94D /* _RXSwizzling.h */; settings = {ATTRIBUTES = (Public, ); }; };
C8F0C0451BBBFBB9001B112F /* _RXKVOObserver.h in Headers */ = {isa = PBXBuildFile; fileRef = C8093E861B8A732E0088E94D /* _RXKVOObserver.h */; settings = {ATTRIBUTES = (Public, ); }; };
C8F0C04F1BBBFBCE001B112F /* Observable+Blocking.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093F581B8A73A20088E94D /* Observable+Blocking.swift */; };
CB255BD71BC46A9C00798A4C /* RetryWhen.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB255BD61BC46A9C00798A4C /* RetryWhen.swift */; settings = {ASSET_TAGS = (); }; };
CB255BD81BC46A9C00798A4C /* RetryWhen.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB255BD61BC46A9C00798A4C /* RetryWhen.swift */; settings = {ASSET_TAGS = (); }; };
CB255BD91BC46A9C00798A4C /* RetryWhen.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB255BD61BC46A9C00798A4C /* RetryWhen.swift */; settings = {ASSET_TAGS = (); }; };
CB255BDA1BC46A9C00798A4C /* RetryWhen.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB255BD61BC46A9C00798A4C /* RetryWhen.swift */; settings = {ASSET_TAGS = (); }; };
D203C4F31BB9C4CA00D02D00 /* RxCollectionViewReactiveArrayDataSource.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F11B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift */; };
D203C4F41BB9C52400D02D00 /* RxTableViewReactiveArrayDataSource.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F21B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift */; };
D203C4F51BB9C52900D02D00 /* ItemEvents.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F41B8A752B00B02D69 /* ItemEvents.swift */; };
@ -964,6 +968,7 @@
C8F0C0021BBBFB8B001B112F /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; };
C8F0C04B1BBBFBB9001B112F /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; };
C8F0C0581BBBFBCE001B112F /* RxBlocking.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; };
CB255BD61BC46A9C00798A4C /* RetryWhen.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RetryWhen.swift; sourceTree = "<group>"; };
D2138C751BB9BE9800339B5C /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; };
D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipWhile.swift; sourceTree = "<group>"; };
D235B23D1BD003DD007E84DA /* Using.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Using.swift; sourceTree = "<group>"; };
@ -1184,6 +1189,7 @@
C8093C841B8A72BE0088E94D /* Reduce.swift */,
C8093C851B8A72BE0088E94D /* RefCount.swift */,
C8640A021BA5B12A00D3C4E8 /* Repeat.swift */,
CB255BD61BC46A9C00798A4C /* RetryWhen.swift */,
C8093C861B8A72BE0088E94D /* Sample.swift */,
C8093C871B8A72BE0088E94D /* Scan.swift */,
C89CDB351BCB0DD7002063D9 /* ShareReplay1.swift */,
@ -2087,6 +2093,7 @@
C8093D4E1B8A72BE0088E94D /* Zip+arity.swift in Sources */,
C8093D4C1B8A72BE0088E94D /* Timer.swift in Sources */,
C8C3DA071B9393AC004D233E /* Empty.swift in Sources */,
CB255BD81BC46A9C00798A4C /* RetryWhen.swift in Sources */,
C8093D881B8A72BE0088E94D /* RxBox.swift in Sources */,
C8093D3A1B8A72BE0088E94D /* Sink.swift in Sources */,
C8093D461B8A72BE0088E94D /* TakeUntil.swift in Sources */,
@ -2206,6 +2213,7 @@
C8093D4D1B8A72BE0088E94D /* Zip+arity.swift in Sources */,
C8093D4B1B8A72BE0088E94D /* Timer.swift in Sources */,
C8C3DA061B9393AC004D233E /* Empty.swift in Sources */,
CB255BD71BC46A9C00798A4C /* RetryWhen.swift in Sources */,
C8093D871B8A72BE0088E94D /* RxBox.swift in Sources */,
C8093D391B8A72BE0088E94D /* Sink.swift in Sources */,
C8093D451B8A72BE0088E94D /* TakeUntil.swift in Sources */,
@ -2325,6 +2333,7 @@
C8F0BF9D1BBBFB8B001B112F /* Zip+arity.swift in Sources */,
C8F0BF9E1BBBFB8B001B112F /* Timer.swift in Sources */,
C8F0BF9F1BBBFB8B001B112F /* Empty.swift in Sources */,
CB255BDA1BC46A9C00798A4C /* RetryWhen.swift in Sources */,
C8F0BFA01BBBFB8B001B112F /* RxBox.swift in Sources */,
C8F0BFA11BBBFB8B001B112F /* Sink.swift in Sources */,
C8F0BFA21BBBFB8B001B112F /* TakeUntil.swift in Sources */,
@ -2592,6 +2601,7 @@
D2EBEAF81BB9B6B2003A27DC /* ScopedDisposable.swift in Sources */,
D2EBEAEA1BB9B697003A27DC /* SchedulerType.swift in Sources */,
D2EBEB031BB9B6C1003A27DC /* CombineLatest+CollectionType.swift in Sources */,
CB255BD91BC46A9C00798A4C /* RetryWhen.swift in Sources */,
D2EBEADC1BB9B697003A27DC /* Cancelable.swift in Sources */,
D2EBEAE41BB9B697003A27DC /* ObservableType.swift in Sources */,
D2EBEB331BB9B6CA003A27DC /* Observable+Time.swift in Sources */,

View File

@ -0,0 +1,122 @@
//
// RetryWhen.swift
// Rx
//
// Created by Junior B. on 06/10/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
class RetryTriggerSink<S: SequenceType, O: ObserverType, RetryTriggerType, Error: ErrorType where S.Generator.Element : ObservableType, S.Generator.Element.E == O.E> : ObserverType {
typealias E = RetryTriggerType
typealias Parent = RetryWhenSequenceSink<S, O, RetryTriggerType, Error>
let parent: Parent
init(parent: Parent) {
self.parent = parent
}
func on(event: Event<E>) {
switch event {
case .Next:
parent.lock.performLocked() {
parent.scheduleMoveNext()
}
case .Error(_):
parent.lock.performLocked() {
parent.done()
}
case .Completed:
parent.lock.performLocked() {
parent.lastError = nil
parent.done()
}
}
}
}
class RetryWhenSequenceSink<S: SequenceType, O: ObserverType, RetryTriggerType, Error: ErrorType where S.Generator.Element : ObservableType, S.Generator.Element.E == O.E> : TailRecursiveSink<S, O> {
typealias Element = O.E
typealias Parent = RetryWhenSequence<S, RetryTriggerType, Error>
let lock = NSRecursiveLock()
let parent: Parent
var lastError: ErrorType?
var errorSubject: BehaviorSubject<Error>?
let handlerSubscription = SingleAssignmentDisposable()
init(parent: Parent, observer: O, cancel: Disposable) {
self.parent = parent
super.init(observer: observer, cancel: cancel)
}
override func on(event: Event<Element>) {
switch event {
case .Next:
observer?.on(event)
case .Error(let error):
lock.performLocked() {
if errorSubject == nil {
errorSubject = BehaviorSubject(value: error as! Error)
let notifier = parent.notificationHandler(errorSubject!.asObservable())
handlerSubscription.disposable = notifier.subscribeSafe(RetryTriggerSink(parent: self))
} else {
errorSubject?.on(.Next(error as! Error))
}
self.lastError = error
}
case .Completed:
observer?.on(event)
dispose()
}
}
override func done() {
if let lastError = self.lastError {
observer?.on(.Error(lastError))
}
else {
observer?.on(.Completed)
}
self.dispose()
}
override func dispose() {
handlerSubscription.dispose()
super.dispose()
}
override func extract(observable: Observable<Element>) -> S.Generator? {
if let onError = observable as? RetryWhenSequence<S, RetryTriggerType, Error> {
return onError.sources.generate()
}
else {
return nil
}
}
}
class RetryWhenSequence<S: SequenceType, RetryTriggerType, Error: ErrorType where S.Generator.Element : ObservableType> : Producer<S.Generator.Element.E> {
typealias Element = S.Generator.Element.E
let sources: S
let notificationHandler: Observable<Error> -> Observable<RetryTriggerType>
init(sources: S, notificationHandler: Observable<Error> -> Observable<RetryTriggerType>) {
self.sources = sources
self.notificationHandler = notificationHandler
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = RetryWhenSequenceSink<S, O, RetryTriggerType, Error>(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run(self.sources.generate())
}
}

View File

@ -140,6 +140,18 @@ extension ObservableType {
-> Observable<E> {
return CatchSequence(sources: Repeat(count: maxAttemptCount, repeatedValue: self.asObservable()))
}
/**
Repeats the source observable sequence on error when the notifier emits a next value.
If the source observable errors and the notifier completes, it will complete the source sequence.
- parameter notificationHandler: A handler that is passed an observable sequence of errors raised by the source observable and returns and observable that either continues, completes or errors. This behavior is then applied to the source observable.
- returns: An observable sequence producing the elements of the given sequence repeatedly until it terminates successfully or is notified to error or complete.
*/
public func retryWhen<U, Error: ErrorType>(notificationHandler: Observable<Error> -> Observable<U>)
-> Observable<E> {
return RetryWhenSequence(sources: InfiniteSequence(repeatedValue: self.asObservable()), notificationHandler: notificationHandler)
}
}
// scan

View File

@ -766,8 +766,85 @@ extension ObservableSingleTest {
Subscription(200, 450),
])
}
func testRetryWhen_Basic() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createColdObservable([
next(5, 1),
next(10, 2),
error(20, testError)
])
let res = scheduler.start(300) {
xs.retryWhen({ (errors: Observable<NSError>) in
return errors.delaySubscription(30, scheduler)
}).take(6)
}
let correct: [Recorded<Int>] = [
next(205, 1),
next(210, 2),
next(255, 1),
next(260, 2),
next(275, 1),
next(280, 2),
completed(280)
]
XCTAssertEqual(res.messages, correct)
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 250),
Subscription(250, 270),
Subscription(270, 280)
])
}
func testRetryWhen_Incremental_BackOff() {
let scheduler = TestScheduler(initialClock: 0)
// just fails
let xs = scheduler.createColdObservable([
next(5, 1),
error(10, testError)
])
let res = scheduler.start(800) {
xs.retryWhen({ (errors: Observable<NSError>) in
return scheduler.createColdObservable([
next(50, 1), // delay 50
next(150, 2), // delay 100
next(300, 3), // delay 150
completed(500) // delay 200
])
})
}
let correct: [Recorded<Int>] = [
next(205, 1),
next(265, 1),
next(365, 1),
next(515, 1),
completed(710)
]
XCTAssertEqual(res.messages, correct)
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 260),
Subscription(260, 360),
Subscription(360, 510),
Subscription(510, 710)
])
}
}
// scan
extension ObservableSingleTest {