diff --git a/RxSwift/RxSwift/Observables/Implementations/AsObservable.swift b/RxSwift/RxSwift/Observables/Implementations/AsObservable.swift index 7a383c3b..8f2d100b 100644 --- a/RxSwift/RxSwift/Observables/Implementations/AsObservable.swift +++ b/RxSwift/RxSwift/Observables/Implementations/AsObservable.swift @@ -8,17 +8,16 @@ import Foundation -class AsObservableSink_ : ObserverType, Disposable { +class AsObservableSink_ : Sink, ObserverType { typealias Element = ElementType - let sink: Sink - - func dispose() { - sink.dispose() + override init(observer: ObserverOf, cancel: Disposable) { + super.init(observer: observer, cancel: cancel) } func on(event: Event) { - self.sink.state.observer.on(event) + self.observer.on(event) + switch event { case .Error: fallthrough case .Completed: @@ -27,9 +26,6 @@ class AsObservableSink_ : ObserverType, Disposable { } } - init(observer: ObserverOf, cancel: Disposable) { - self.sink = Sink(observer: observer, cancel: cancel) - } } class AsObservable : Producer { diff --git a/RxSwift/RxSwift/Observables/Implementations/DistinctUntilChanged.swift b/RxSwift/RxSwift/Observables/Implementations/DistinctUntilChanged.swift index 746ca437..decdc031 100644 --- a/RxSwift/RxSwift/Observables/Implementations/DistinctUntilChanged.swift +++ b/RxSwift/RxSwift/Observables/Implementations/DistinctUntilChanged.swift @@ -24,24 +24,25 @@ class DistinctUntilChanged_: Sink, ObserverType { switch event { case .Next(let value): - let keyResult = self.parent.selector(value.value) - - let areEqualResult = keyResult >== { key -> Result in + self.parent.selector(value.value) >== { key in + var areEqual: Result if let currentKey = self.currentKey { - return self.parent.comparer(currentKey, key) + areEqual = self.parent.comparer(currentKey, key) } else { - return success(false) + areEqual = success(false) } - } >== { areEqual in - if areEqual { + + return areEqual >== { areEqual in + if areEqual { + return SuccessResult + } + + self.currentKey = key + + observer.on(event) return SuccessResult } - - self.currentKey = *keyResult - - observer.on(event) - return SuccessResult } >>! { error -> Result in observer.on(.Error(error)) self.dispose() diff --git a/RxSwift/RxSwift/Observables/Implementations/Do.swift b/RxSwift/RxSwift/Observables/Implementations/Do.swift index db2a67c4..2fd557e9 100644 --- a/RxSwift/RxSwift/Observables/Implementations/Do.swift +++ b/RxSwift/RxSwift/Observables/Implementations/Do.swift @@ -22,11 +22,11 @@ class Do_ : Sink, ObserverType, Disposable { func on(event: Event) { parent.eventHandler(event) >>! { error in // catch clause - self.state.observer.on(Event.Error(error)) + self.observer.on(.Error(error)) self.dispose() return SuccessResult } >== { _ -> Result in - self.state.observer.on(event) + self.observer.on(event) if event.isStopEvent { self.dispose() } diff --git a/RxTests/RxSwiftTests/Tests/Observable+StandardSequenceOperatorsTest.swift b/RxTests/RxSwiftTests/Tests/Observable+StandardSequenceOperatorsTest.swift index 883b201a..6bf7ed80 100644 --- a/RxTests/RxSwiftTests/Tests/Observable+StandardSequenceOperatorsTest.swift +++ b/RxTests/RxSwiftTests/Tests/Observable+StandardSequenceOperatorsTest.swift @@ -36,7 +36,8 @@ func isPrime(i: Int) -> Bool { return true } -// Where Tests +// where + extension ObservableStandardSequenceOperators { func test_whereComplete() { let scheduler = TestScheduler(initialClock: 0)