`trySend` to `?.on` and cleanup for `switchLatest`

This commit is contained in:
Krunoslav Zaher 2015-08-11 10:10:55 +02:00
parent 83fa0854a2
commit d4aa127e63
1 changed files with 26 additions and 36 deletions

View File

@ -11,67 +11,58 @@ import Foundation
class SwitchSink<S: ObservableType, O: ObserverType where S.E == O.Element> : Sink<O>, ObserverType {
typealias Element = S
typealias Parent = Switch<S>
typealias SwitchState = (
subscription: SingleAssignmentDisposable,
innerSubscription: SerialDisposable,
stopped: Bool,
latest: Int,
hasLatest: Bool
)
let subscriptions: SingleAssignmentDisposable = SingleAssignmentDisposable()
let innerSubscription: SerialDisposable = SerialDisposable()
let parent: Parent
var lock = NSRecursiveLock()
var switchState: SwitchState
// state
var stopped = false
var latest = 0
var hasLatest = false
init(parent: Parent, observer: O, cancel: Disposable) {
self.parent = parent
self.switchState = (
subscription: SingleAssignmentDisposable(),
innerSubscription: SerialDisposable(),
stopped: false,
latest: 0,
hasLatest: false
)
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
let subscription = self.parent.sources.subscribeSafe(self)
let switchState = self.switchState
switchState.subscription.disposable = subscription
return CompositeDisposable(switchState.subscription, switchState.innerSubscription)
subscriptions.disposable = subscription
return CompositeDisposable(subscriptions, innerSubscription)
}
func on(event: Event<Element>) {
switch event {
case .Next(let observable):
let latest: Int = self.lock.calculateLocked {
self.switchState.hasLatest = true
self.switchState.latest = self.switchState.latest &+ 1
return self.switchState.latest
hasLatest = true
self.latest = self.latest &+ 1
return self.latest
}
let d = SingleAssignmentDisposable()
self.switchState.innerSubscription.disposable = d
innerSubscription.disposable = d
let observer = SwitchSinkIter(parent: self, id: latest, _self: d)
let disposable = observable.subscribeSafe(observer)
d.disposable = disposable
case .Error(let error):
self.lock.performLocked {
trySendError(observer, error)
observer?.on(.Error(error))
self.dispose()
}
case .Completed:
self.lock.performLocked {
self.switchState.stopped = true
self.stopped = true
self.switchState.subscription.dispose()
self.subscriptions.dispose()
if !self.switchState.hasLatest {
trySendCompleted(observer)
if !self.hasLatest {
observer?.on(.Completed)
self.dispose()
}
}
@ -95,7 +86,6 @@ class SwitchSinkIter<S: ObservableType, O: ObserverType where S.E == O.Element>
func on(event: Event<Element>) {
return parent.lock.calculateLocked {
let switchState = self.parent.switchState
switch event {
case .Next: break
@ -103,7 +93,7 @@ class SwitchSinkIter<S: ObservableType, O: ObserverType where S.E == O.Element>
case .Completed: self._self.dispose()
}
if switchState.latest != self.id {
if parent.latest != self.id {
return
}
@ -111,14 +101,14 @@ class SwitchSinkIter<S: ObservableType, O: ObserverType where S.E == O.Element>
switch event {
case .Next:
trySend(observer, event)
observer?.on(event)
case .Error:
trySend(observer, event)
observer?.on(event)
self.parent.dispose()
case .Completed:
parent.switchState.hasLatest = false
if switchState.stopped {
trySend(observer, event)
parent.hasLatest = false
if parent.stopped {
observer?.on(event)
self.parent.dispose()
}
}