Merge Sink classes
This commit is contained in:
parent
b95ebfb364
commit
3d72cb2a7c
|
|
@ -8,10 +8,11 @@
|
|||
|
||||
import Foundation
|
||||
|
||||
class WithLatestFromSink<FirstO: ObservableType, SecondO: ObservableType, ResultType, O: ObserverType where O.E == ResultType > : Sink<O> {
|
||||
class WithLatestFromSink<FirstO: ObservableType, SecondO: ObservableType, ResultType, O: ObserverType where O.E == ResultType > : Sink<O>, ObserverType {
|
||||
|
||||
typealias Parent = WithLatestFrom<FirstO, SecondO, ResultType>
|
||||
typealias SecondType = SecondO.E
|
||||
typealias E = FirstO.E
|
||||
|
||||
private let _parent: Parent
|
||||
|
||||
|
|
@ -27,45 +28,33 @@ class WithLatestFromSink<FirstO: ObservableType, SecondO: ObservableType, Result
|
|||
|
||||
func run() -> Disposable {
|
||||
let sndSubscription = SingleAssignmentDisposable()
|
||||
let fstO = WithLatestFromFirst(parent: self)
|
||||
let sndO = WithLatestFromSecond(parent: self, disposable: sndSubscription)
|
||||
|
||||
let fstSubscription = _parent._first.subscribeSafe(fstO)
|
||||
let fstSubscription = _parent._first.subscribeSafe(self)
|
||||
sndSubscription.disposable = _parent._second.subscribeSafe(sndO)
|
||||
|
||||
return StableCompositeDisposable.create(fstSubscription, sndSubscription)
|
||||
}
|
||||
}
|
||||
|
||||
class WithLatestFromFirst<FirstO: ObservableType, SecondO: ObservableType, ResultType, O: ObserverType where O.E == ResultType>: ObserverType {
|
||||
|
||||
typealias Parent = WithLatestFromSink<FirstO, SecondO, ResultType, O>
|
||||
typealias E = FirstO.E
|
||||
private let _parent: Parent
|
||||
|
||||
init(parent: Parent) {
|
||||
_parent = parent
|
||||
}
|
||||
|
||||
func on(event: Event<E>) {
|
||||
_parent._lock.performLocked {
|
||||
_lock.performLocked {
|
||||
switch event {
|
||||
case let .Next(value):
|
||||
guard let latest = _parent._latest else { return }
|
||||
guard let latest = _latest else { return }
|
||||
do {
|
||||
let res = try _parent._parent._resultSelector(value, latest)
|
||||
let res = try _parent._resultSelector(value, latest)
|
||||
|
||||
_parent.observer?.onNext(res)
|
||||
observer?.onNext(res)
|
||||
} catch let e {
|
||||
_parent.observer?.onError(e)
|
||||
_parent.dispose()
|
||||
observer?.onError(e)
|
||||
dispose()
|
||||
}
|
||||
case .Completed:
|
||||
_parent.observer?.onComplete()
|
||||
_parent.dispose()
|
||||
observer?.onComplete()
|
||||
dispose()
|
||||
case let .Error(error):
|
||||
_parent.observer?.onError(error)
|
||||
_parent.dispose()
|
||||
observer?.onError(error)
|
||||
dispose()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -87,7 +76,9 @@ class WithLatestFromSecond<FirstO: ObservableType, SecondO: ObservableType, Resu
|
|||
func on(event: Event<E>) {
|
||||
switch event {
|
||||
case let .Next(value):
|
||||
_parent._latest = value
|
||||
_parent._lock.performLocked {
|
||||
_parent._latest = value
|
||||
}
|
||||
case .Completed:
|
||||
_disposable.dispose()
|
||||
case let .Error(error):
|
||||
|
|
|
|||
Loading…
Reference in New Issue