`trySend` to `?.on` and cleanup for `catch`

This commit is contained in:
Krunoslav Zaher 2015-08-13 21:24:03 +02:00
parent 66dfc64207
commit fee2b0cf85
4 changed files with 28 additions and 30 deletions

View File

@ -153,7 +153,7 @@
C88BB8E31B07F2BE0064D411 /* UILabel+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = "UILabel+Rx.swift"; sourceTree = "<group>"; xcLanguageSpecificationIdentifier = xcode.lang.swift; };
C88BB8E41B07F2BE0064D411 /* UIScrollView+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = "UIScrollView+Rx.swift"; sourceTree = "<group>"; xcLanguageSpecificationIdentifier = xcode.lang.swift; };
C88BB8E51B07F2BE0064D411 /* UISearchBar+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = "UISearchBar+Rx.swift"; sourceTree = "<group>"; xcLanguageSpecificationIdentifier = xcode.lang.swift; };
C88BB8E61B07F2BE0064D411 /* UITableView+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = "UITableView+Rx.swift"; sourceTree = "<group>"; xcLanguageSpecificationIdentifier = xcode.lang.swift; };
C88BB8E61B07F2BE0064D411 /* UITableView+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = "UITableView+Rx.swift"; sourceTree = "<group>"; };
C88BB8E71B07F2BE0064D411 /* UITextField+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = "UITextField+Rx.swift"; sourceTree = "<group>"; xcLanguageSpecificationIdentifier = xcode.lang.swift; };
C88BB91B1B07FD830064D411 /* NSButton+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = "NSButton+Rx.swift"; sourceTree = "<group>"; };
C8A56BCD1AD744FD00B4673B /* RxSwift.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; };

View File

@ -21,7 +21,7 @@ class CatchSinkProxy<O: ObserverType> : ObserverType {
}
func on(event: Event<Element>) {
trySend(parent.observer, event)
parent.observer?.on(event)
switch event {
case .Next:
@ -56,31 +56,29 @@ class CatchSink<O: ObserverType> : Sink<O>, ObserverType {
func on(event: Event<Element>) {
switch event {
case .Next:
trySend(observer, event)
observer?.on(event)
case .Completed:
trySend(observer, event)
observer?.on(event)
self.dispose()
case .Error(let error):
parent.handler(error).recoverWith { error2 in
trySendError(observer, error2)
self.dispose()
return failure(error2)
}.flatMap { catchObservable -> RxResult<Void> in
let d = SingleAssignmentDisposable()
subscription.disposable = d
do {
let catchSequence = try parent.handler(error)
let observer = CatchSinkProxy(parent: self)
let subscription2 = catchObservable.subscribeSafe(observer)
d.disposable = subscription2
return SuccessResult
let subscription2 = catchSequence.subscribeSafe(observer)
subscription.disposable = subscription2
}
catch let e {
observer?.on(.Error(e))
self.dispose()
}
}
}
}
class Catch<Element> : Producer<Element> {
typealias Handler = (ErrorType) -> RxResult<Observable<Element>>
typealias Handler = (ErrorType) throws -> Observable<Element>
let source: Observable<Element>
let handler: Handler
@ -118,13 +116,13 @@ class CatchToResultSink<ElementType> : Sink<Observer<RxResult<ElementType>>>, Ob
func on(event: Event<Element>) {
switch event {
case .Next(let value):
trySendNext(observer, success(value))
observer?.on(.Next(success(value)))
case .Completed:
trySendCompleted(observer)
observer?.on(.Completed)
self.dispose()
case .Error(let error):
trySendNext(observer, failure(error))
trySendCompleted(observer)
observer?.on(.Next(failure(error)))
observer?.on(.Completed)
self.dispose()
}
}
@ -159,22 +157,22 @@ class CatchSequenceSink<O: ObserverType> : TailRecursiveSink<O> {
override func on(event: Event<Element>) {
switch event {
case .Next:
trySend(observer, event)
observer?.on(event)
case .Error(let error):
self.lastError = error
self.scheduleMoveNext()
case .Completed:
trySend(self.observer, event)
self.observer?.on(event)
self.dispose()
}
}
override func done() {
if let lastError = self.lastError {
trySendError(observer, lastError)
observer?.on(.Error(lastError))
}
else {
trySendCompleted(observer)
observer?.on(.Completed)
}
self.dispose()

View File

@ -46,20 +46,20 @@ extension ObservableType where E : ObservableType {
// catch
extension ObservableType {
public func catchErrorOrDie(handler: (ErrorType) -> RxResult<Observable<E>>)
public func catchErrorOrDie(handler: (ErrorType) throws -> Observable<E>)
-> Observable<E> {
return Catch(source: self.normalize(), handler: handler)
}
public func catchError(handler: (ErrorType) -> Observable<E>)
-> Observable<E> {
return Catch(source: self.normalize(), handler: { success(handler($0)) })
return Catch(source: self.normalize(), handler: handler)
}
// In case of error, terminates sequence with `replaceErrorWith`.
public func catchError(replaceErrorWith: E)
// In case of error sends `errorElementValue` and completes sequence
public func catchError(errorElementValue: E)
-> Observable<E> {
return Catch(source: self.normalize(), handler: { _ in success(just(replaceErrorWith)) })
return Catch(source: self.normalize(), handler: { _ in just(errorElementValue) })
}
// When error happens `error` will be forwarded as a next `Result<E>` value

View File

@ -79,7 +79,7 @@ extension ObservableMultipleTest {
let res = scheduler.start {
o1.catchErrorOrDie { e in
handlerCalled = scheduler.clock
return failure(testError1)
throw testError1
}
}