More work on private vars
This commit is contained in:
parent
ec7ef864c7
commit
01304bcff7
|
|
@ -12,58 +12,58 @@ class SwitchSink<S: ObservableConvertibleType, O: ObserverType where S.E == O.E>
|
|||
typealias E = S
|
||||
typealias Parent = Switch<S>
|
||||
|
||||
let subscriptions: SingleAssignmentDisposable = SingleAssignmentDisposable()
|
||||
let innerSubscription: SerialDisposable = SerialDisposable()
|
||||
let parent: Parent
|
||||
private let _subscriptions: SingleAssignmentDisposable = SingleAssignmentDisposable()
|
||||
private let _innerSubscription: SerialDisposable = SerialDisposable()
|
||||
private let _parent: Parent
|
||||
|
||||
let lock = NSRecursiveLock()
|
||||
private let _lock = NSRecursiveLock()
|
||||
|
||||
// state
|
||||
var stopped = false
|
||||
var latest = 0
|
||||
var hasLatest = false
|
||||
private var _stopped = false
|
||||
private var _latest = 0
|
||||
private var _hasLatest = false
|
||||
|
||||
init(parent: Parent, observer: O, cancel: Disposable) {
|
||||
self.parent = parent
|
||||
_parent = parent
|
||||
|
||||
super.init(observer: observer, cancel: cancel)
|
||||
}
|
||||
|
||||
func run() -> Disposable {
|
||||
let subscription = self.parent.sources.subscribeSafe(self)
|
||||
subscriptions.disposable = subscription
|
||||
return CompositeDisposable(subscriptions, innerSubscription)
|
||||
let subscription = _parent._sources.subscribeSafe(self)
|
||||
_subscriptions.disposable = subscription
|
||||
return CompositeDisposable(_subscriptions, _innerSubscription)
|
||||
}
|
||||
|
||||
func on(event: Event<E>) {
|
||||
switch event {
|
||||
case .Next(let observable):
|
||||
let latest: Int = self.lock.calculateLocked {
|
||||
hasLatest = true
|
||||
self.latest = self.latest &+ 1
|
||||
return self.latest
|
||||
let latest: Int = _lock.calculateLocked {
|
||||
_hasLatest = true
|
||||
_latest = _latest &+ 1
|
||||
return _latest
|
||||
}
|
||||
|
||||
let d = SingleAssignmentDisposable()
|
||||
innerSubscription.disposable = d
|
||||
_innerSubscription.disposable = d
|
||||
|
||||
let observer = SwitchSinkIter(parent: self, id: latest, _self: d)
|
||||
let disposable = observable.asObservable().subscribeSafe(observer)
|
||||
d.disposable = disposable
|
||||
case .Error(let error):
|
||||
self.lock.performLocked {
|
||||
_lock.performLocked {
|
||||
observer?.on(.Error(error))
|
||||
self.dispose()
|
||||
dispose()
|
||||
}
|
||||
case .Completed:
|
||||
self.lock.performLocked {
|
||||
self.stopped = true
|
||||
_lock.performLocked {
|
||||
_stopped = true
|
||||
|
||||
self.subscriptions.dispose()
|
||||
_subscriptions.dispose()
|
||||
|
||||
if !self.hasLatest {
|
||||
if !_hasLatest {
|
||||
observer?.on(.Completed)
|
||||
self.dispose()
|
||||
dispose()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -74,42 +74,42 @@ class SwitchSinkIter<S: ObservableConvertibleType, O: ObserverType where S.E ==
|
|||
typealias E = O.E
|
||||
typealias Parent = SwitchSink<S, O>
|
||||
|
||||
let parent: Parent
|
||||
let id: Int
|
||||
let _self: Disposable
|
||||
private let _parent: Parent
|
||||
private let _id: Int
|
||||
private let _self: Disposable
|
||||
|
||||
init(parent: Parent, id: Int, _self: Disposable) {
|
||||
self.parent = parent
|
||||
self.id = id
|
||||
_parent = parent
|
||||
_id = id
|
||||
self._self = _self
|
||||
}
|
||||
|
||||
func on(event: Event<E>) {
|
||||
return parent.lock.calculateLocked {
|
||||
return _parent._lock.calculateLocked {
|
||||
|
||||
switch event {
|
||||
case .Next: break
|
||||
case .Error, .Completed:
|
||||
self._self.dispose()
|
||||
_self.dispose()
|
||||
}
|
||||
|
||||
if parent.latest != self.id {
|
||||
if _parent._latest != _id {
|
||||
return
|
||||
}
|
||||
|
||||
let observer = self.parent.observer
|
||||
let observer = _parent.observer
|
||||
|
||||
switch event {
|
||||
case .Next:
|
||||
observer?.on(event)
|
||||
case .Error:
|
||||
observer?.on(event)
|
||||
self.parent.dispose()
|
||||
_parent.dispose()
|
||||
case .Completed:
|
||||
parent.hasLatest = false
|
||||
if parent.stopped {
|
||||
_parent._hasLatest = false
|
||||
if _parent._stopped {
|
||||
observer?.on(event)
|
||||
self.parent.dispose()
|
||||
_parent.dispose()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -117,10 +117,10 @@ class SwitchSinkIter<S: ObservableConvertibleType, O: ObserverType where S.E ==
|
|||
}
|
||||
|
||||
class Switch<S: ObservableConvertibleType> : Producer<S.E> {
|
||||
let sources: Observable<S>
|
||||
private let _sources: Observable<S>
|
||||
|
||||
init(sources: Observable<S>) {
|
||||
self.sources = sources
|
||||
_sources = sources
|
||||
}
|
||||
|
||||
override func run<O : ObserverType where O.E == S.E>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
|
||||
|
|
|
|||
|
|
@ -14,13 +14,13 @@ class TakeCountSink<ElementType, O: ObserverType where O.E == ElementType> : Sin
|
|||
typealias Parent = TakeCount<ElementType>
|
||||
typealias E = ElementType
|
||||
|
||||
let parent: Parent
|
||||
private let _parent: Parent
|
||||
|
||||
var remaining: Int
|
||||
private var _remaining: Int
|
||||
|
||||
init(parent: Parent, observer: O, cancel: Disposable) {
|
||||
self.parent = parent
|
||||
self.remaining = parent.count
|
||||
_parent = parent
|
||||
_remaining = parent._count
|
||||
super.init(observer: observer, cancel: cancel)
|
||||
}
|
||||
|
||||
|
|
@ -28,40 +28,40 @@ class TakeCountSink<ElementType, O: ObserverType where O.E == ElementType> : Sin
|
|||
switch event {
|
||||
case .Next(let value):
|
||||
|
||||
if remaining > 0 {
|
||||
remaining--
|
||||
if _remaining > 0 {
|
||||
_remaining--
|
||||
|
||||
observer?.on(.Next(value))
|
||||
|
||||
if remaining == 0 {
|
||||
if _remaining == 0 {
|
||||
observer?.on(.Completed)
|
||||
self.dispose()
|
||||
dispose()
|
||||
}
|
||||
}
|
||||
case .Error:
|
||||
observer?.on(event)
|
||||
self.dispose()
|
||||
dispose()
|
||||
case .Completed:
|
||||
observer?.on(event)
|
||||
self.dispose()
|
||||
dispose()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class TakeCount<Element>: Producer<Element> {
|
||||
let source: Observable<Element>
|
||||
let count: Int
|
||||
private let _source: Observable<Element>
|
||||
private let _count: Int
|
||||
|
||||
init(source: Observable<Element>, count: Int) {
|
||||
self.source = source
|
||||
self.count = count
|
||||
_source = source
|
||||
_count = count
|
||||
}
|
||||
|
||||
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
|
||||
let sink = TakeCountSink(parent: self, observer: observer, cancel: cancel)
|
||||
setSink(sink)
|
||||
return source.subscribeSafe(sink)
|
||||
return _source.subscribeSafe(sink)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -71,44 +71,44 @@ class TakeTimeSink<ElementType, S: SchedulerType, O: ObserverType where O.E == E
|
|||
typealias Parent = TakeTime<ElementType, S>
|
||||
typealias E = ElementType
|
||||
|
||||
let parent: Parent
|
||||
private let _parent: Parent
|
||||
|
||||
let lock = NSRecursiveLock()
|
||||
private let _lock = NSRecursiveLock()
|
||||
|
||||
init(parent: Parent, observer: O, cancel: Disposable) {
|
||||
self.parent = parent
|
||||
_parent = parent
|
||||
super.init(observer: observer, cancel: cancel)
|
||||
}
|
||||
|
||||
func on(event: Event<E>) {
|
||||
lock.performLocked {
|
||||
_lock.performLocked {
|
||||
switch event {
|
||||
case .Next(let value):
|
||||
observer?.on(.Next(value))
|
||||
case .Error:
|
||||
observer?.on(event)
|
||||
self.dispose()
|
||||
dispose()
|
||||
case .Completed:
|
||||
observer?.on(event)
|
||||
self.dispose()
|
||||
dispose()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func tick() {
|
||||
lock.performLocked {
|
||||
self.observer?.on(.Completed)
|
||||
self.dispose()
|
||||
_lock.performLocked {
|
||||
observer?.on(.Completed)
|
||||
dispose()
|
||||
}
|
||||
}
|
||||
|
||||
func run() -> Disposable {
|
||||
let disposeTimer = parent.scheduler.scheduleRelative((), dueTime: self.parent.duration) {
|
||||
let disposeTimer = _parent._scheduler.scheduleRelative((), dueTime: _parent._duration) {
|
||||
self.tick()
|
||||
return NopDisposable.instance
|
||||
}
|
||||
|
||||
let disposeSubscription = parent.source.subscribeSafe(self)
|
||||
let disposeSubscription = _parent._source.subscribeSafe(self)
|
||||
|
||||
return BinaryDisposable(disposeTimer, disposeSubscription)
|
||||
}
|
||||
|
|
@ -117,14 +117,14 @@ class TakeTimeSink<ElementType, S: SchedulerType, O: ObserverType where O.E == E
|
|||
class TakeTime<Element, S: SchedulerType>: Producer<Element> {
|
||||
typealias TimeInterval = S.TimeInterval
|
||||
|
||||
let source: Observable<Element>
|
||||
let duration: TimeInterval
|
||||
let scheduler: S
|
||||
private let _source: Observable<Element>
|
||||
private let _duration: TimeInterval
|
||||
private let _scheduler: S
|
||||
|
||||
init(source: Observable<Element>, duration: TimeInterval, scheduler: S) {
|
||||
self.source = source
|
||||
self.scheduler = scheduler
|
||||
self.duration = duration
|
||||
_source = source
|
||||
_scheduler = scheduler
|
||||
_duration = duration
|
||||
}
|
||||
|
||||
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
|
||||
|
|
|
|||
|
|
@ -12,38 +12,38 @@ class TakeUntilSinkOther<ElementType, Other, O: ObserverType where O.E == Elemen
|
|||
typealias Parent = TakeUntilSink<ElementType, Other, O>
|
||||
typealias E = Other
|
||||
|
||||
let parent: Parent
|
||||
private let _parent: Parent
|
||||
|
||||
let singleAssignmentDisposable = SingleAssignmentDisposable()
|
||||
private let _singleAssignmentDisposable = SingleAssignmentDisposable()
|
||||
|
||||
var disposable: Disposable {
|
||||
get {
|
||||
abstractMethod()
|
||||
}
|
||||
set(value) {
|
||||
singleAssignmentDisposable.disposable = value
|
||||
_singleAssignmentDisposable.disposable = value
|
||||
}
|
||||
}
|
||||
|
||||
init(parent: Parent) {
|
||||
self.parent = parent
|
||||
_parent = parent
|
||||
#if TRACE_RESOURCES
|
||||
OSAtomicIncrement32(&resourceCount)
|
||||
#endif
|
||||
}
|
||||
|
||||
func on(event: Event<E>) {
|
||||
parent.lock.performLocked {
|
||||
_parent._lock.performLocked {
|
||||
switch event {
|
||||
case .Next:
|
||||
parent.observer?.on(.Completed)
|
||||
parent.dispose()
|
||||
_parent.observer?.on(.Completed)
|
||||
_parent.dispose()
|
||||
case .Error(let e):
|
||||
parent.observer?.on(.Error(e))
|
||||
parent.dispose()
|
||||
_parent.observer?.on(.Error(e))
|
||||
_parent.dispose()
|
||||
case .Completed:
|
||||
parent.open = true
|
||||
singleAssignmentDisposable.dispose()
|
||||
_parent._open = true
|
||||
_singleAssignmentDisposable.dispose()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -59,50 +59,47 @@ class TakeUntilSink<ElementType, Other, O: ObserverType where O.E == ElementType
|
|||
typealias E = ElementType
|
||||
typealias Parent = TakeUntil<E, Other>
|
||||
|
||||
let parent: Parent
|
||||
private let _parent: Parent
|
||||
|
||||
let lock = NSRecursiveLock()
|
||||
private let _lock = NSRecursiveLock()
|
||||
|
||||
// state
|
||||
var open = false
|
||||
private var _open = false
|
||||
|
||||
init(parent: Parent, observer: O, cancel: Disposable) {
|
||||
self.parent = parent
|
||||
_parent = parent
|
||||
super.init(observer: observer, cancel: cancel)
|
||||
}
|
||||
|
||||
func on(event: Event<E>) {
|
||||
switch event {
|
||||
case .Next:
|
||||
if open {
|
||||
if _open {
|
||||
observer?.on(event)
|
||||
}
|
||||
else {
|
||||
lock.performLocked {
|
||||
_lock.performLocked {
|
||||
observer?.on(event)
|
||||
}
|
||||
}
|
||||
break
|
||||
case .Error:
|
||||
lock.performLocked {
|
||||
_lock.performLocked {
|
||||
observer?.on(event)
|
||||
self.dispose()
|
||||
dispose()
|
||||
}
|
||||
break
|
||||
case .Completed:
|
||||
lock.performLocked {
|
||||
_lock.performLocked {
|
||||
observer?.on(event)
|
||||
self.dispose()
|
||||
dispose()
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
func run() -> Disposable {
|
||||
let otherObserver = TakeUntilSinkOther(parent: self)
|
||||
let otherSubscription = parent.other.subscribeSafe(otherObserver)
|
||||
let otherSubscription = _parent._other.subscribeSafe(otherObserver)
|
||||
otherObserver.disposable = otherSubscription
|
||||
let sourceSubscription = parent.source.subscribeSafe(self)
|
||||
let sourceSubscription = _parent._source.subscribeSafe(self)
|
||||
|
||||
return CompositeDisposable(sourceSubscription, otherSubscription)
|
||||
}
|
||||
|
|
@ -110,12 +107,12 @@ class TakeUntilSink<ElementType, Other, O: ObserverType where O.E == ElementType
|
|||
|
||||
class TakeUntil<Element, Other>: Producer<Element> {
|
||||
|
||||
let source: Observable<Element>
|
||||
let other: Observable<Other>
|
||||
private let _source: Observable<Element>
|
||||
private let _other: Observable<Other>
|
||||
|
||||
init(source: Observable<Element>, other: Observable<Other>) {
|
||||
self.source = source
|
||||
self.other = other
|
||||
_source = source
|
||||
_other = other
|
||||
}
|
||||
|
||||
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
|
||||
|
|
|
|||
|
|
@ -12,24 +12,24 @@ class ThrottleSink<O: ObserverType, Scheduler: SchedulerType> : Sink<O>, Observe
|
|||
typealias Element = O.E
|
||||
typealias ParentType = Throttle<Element, Scheduler>
|
||||
|
||||
let parent: ParentType
|
||||
private let _parent: ParentType
|
||||
|
||||
let lock = NSRecursiveLock()
|
||||
private let _lock = NSRecursiveLock()
|
||||
|
||||
// state
|
||||
var id = 0 as UInt64
|
||||
var value: Element? = nil
|
||||
private var _id = 0 as UInt64
|
||||
private var _value: Element? = nil
|
||||
|
||||
let cancellable = SerialDisposable()
|
||||
|
||||
init(parent: ParentType, observer: O, cancel: Disposable) {
|
||||
self.parent = parent
|
||||
_parent = parent
|
||||
|
||||
super.init(observer: observer, cancel: cancel)
|
||||
}
|
||||
|
||||
func run() -> Disposable {
|
||||
let subscription = parent.source.subscribeSafe(self)
|
||||
let subscription = _parent._source.subscribeSafe(self)
|
||||
|
||||
return CompositeDisposable(subscription, cancellable)
|
||||
}
|
||||
|
|
@ -42,40 +42,40 @@ class ThrottleSink<O: ObserverType, Scheduler: SchedulerType> : Sink<O>, Observe
|
|||
cancellable.dispose()
|
||||
}
|
||||
|
||||
let latestId = self.lock.calculateLocked { () -> UInt64 in
|
||||
let latestId = _lock.calculateLocked { () -> UInt64 in
|
||||
let observer = self.observer
|
||||
|
||||
let oldValue = self.value
|
||||
let oldValue = _value
|
||||
|
||||
self.id = self.id &+ 1
|
||||
_id = _id &+ 1
|
||||
|
||||
switch event {
|
||||
case .Next(let element):
|
||||
self.value = element
|
||||
_value = element
|
||||
case .Error:
|
||||
self.value = nil
|
||||
_value = nil
|
||||
observer?.on(event)
|
||||
self.dispose()
|
||||
dispose()
|
||||
case .Completed:
|
||||
self.value = nil
|
||||
_value = nil
|
||||
if let value = oldValue {
|
||||
observer?.on(.Next(value))
|
||||
}
|
||||
observer?.on(.Completed)
|
||||
self.dispose()
|
||||
dispose()
|
||||
}
|
||||
|
||||
return id
|
||||
return _id
|
||||
}
|
||||
|
||||
|
||||
switch event {
|
||||
case .Next(_):
|
||||
case .Next:
|
||||
let d = SingleAssignmentDisposable()
|
||||
self.cancellable.disposable = d
|
||||
|
||||
let scheduler = self.parent.scheduler
|
||||
let dueTime = self.parent.dueTime
|
||||
let scheduler = _parent._scheduler
|
||||
let dueTime = _parent._dueTime
|
||||
|
||||
let disposeTimer = scheduler.scheduleRelative(latestId, dueTime: dueTime) { (id) in
|
||||
self.propagate()
|
||||
|
|
@ -88,9 +88,9 @@ class ThrottleSink<O: ObserverType, Scheduler: SchedulerType> : Sink<O>, Observe
|
|||
}
|
||||
|
||||
func propagate() {
|
||||
let originalValue: Element? = self.lock.calculateLocked {
|
||||
let originalValue = self.value
|
||||
self.value = nil
|
||||
let originalValue: Element? = _lock.calculateLocked {
|
||||
let originalValue = _value
|
||||
_value = nil
|
||||
return originalValue
|
||||
}
|
||||
|
||||
|
|
@ -102,14 +102,14 @@ class ThrottleSink<O: ObserverType, Scheduler: SchedulerType> : Sink<O>, Observe
|
|||
|
||||
class Throttle<Element, Scheduler: SchedulerType> : Producer<Element> {
|
||||
|
||||
let source: Observable<Element>
|
||||
let dueTime: Scheduler.TimeInterval
|
||||
let scheduler: Scheduler
|
||||
private let _source: Observable<Element>
|
||||
private let _dueTime: Scheduler.TimeInterval
|
||||
private let _scheduler: Scheduler
|
||||
|
||||
init(source: Observable<Element>, dueTime: Scheduler.TimeInterval, scheduler: Scheduler) {
|
||||
self.source = source
|
||||
self.dueTime = dueTime
|
||||
self.scheduler = scheduler
|
||||
_source = source
|
||||
_dueTime = dueTime
|
||||
_scheduler = scheduler
|
||||
}
|
||||
|
||||
override func run<O: ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
|
||||
|
|
|
|||
|
|
@ -11,15 +11,15 @@ import Foundation
|
|||
class TimerSink<S: SchedulerType, O: ObserverType where O.E == Int64> : Sink<O> {
|
||||
typealias Parent = Timer<S>
|
||||
|
||||
let parent: Parent
|
||||
private let _parent: Parent
|
||||
|
||||
init(parent: Parent, observer: O, cancel: Disposable) {
|
||||
self.parent = parent
|
||||
_parent = parent
|
||||
super.init(observer: observer, cancel: cancel)
|
||||
}
|
||||
|
||||
func run() -> Disposable {
|
||||
return self.parent.scheduler.schedulePeriodic(0 as Int64, startAfter: self.parent.dueTime, period: self.parent.period!) { state in
|
||||
return _parent._scheduler.schedulePeriodic(0 as Int64, startAfter: _parent._dueTime, period: _parent._period!) { state in
|
||||
self.observer?.on(.Next(state))
|
||||
return state &+ 1
|
||||
}
|
||||
|
|
@ -29,15 +29,15 @@ class TimerSink<S: SchedulerType, O: ObserverType where O.E == Int64> : Sink<O>
|
|||
class TimerOneOffSink<S: SchedulerType, O: ObserverType where O.E == Int64> : Sink<O> {
|
||||
typealias Parent = Timer<S>
|
||||
|
||||
let parent: Parent
|
||||
private let _parent: Parent
|
||||
|
||||
init(parent: Parent, observer: O, cancel: Disposable) {
|
||||
self.parent = parent
|
||||
_parent = parent
|
||||
super.init(observer: observer, cancel: cancel)
|
||||
}
|
||||
|
||||
func run() -> Disposable {
|
||||
return self.parent.scheduler.scheduleRelative((), dueTime: self.parent.dueTime) { (_) -> Disposable in
|
||||
return _parent._scheduler.scheduleRelative((), dueTime: _parent._dueTime) { (_) -> Disposable in
|
||||
self.observer?.on(.Next(0))
|
||||
self.observer?.on(.Completed)
|
||||
|
||||
|
|
@ -49,18 +49,18 @@ class TimerOneOffSink<S: SchedulerType, O: ObserverType where O.E == Int64> : Si
|
|||
class Timer<S: SchedulerType>: Producer<Int64> {
|
||||
typealias TimeInterval = S.TimeInterval
|
||||
|
||||
let scheduler: S
|
||||
let dueTime: TimeInterval
|
||||
let period: TimeInterval?
|
||||
private let _scheduler: S
|
||||
private let _dueTime: TimeInterval
|
||||
private let _period: TimeInterval?
|
||||
|
||||
init(dueTime: TimeInterval, period: TimeInterval?, scheduler: S) {
|
||||
self.scheduler = scheduler
|
||||
self.dueTime = dueTime
|
||||
self.period = period
|
||||
_scheduler = scheduler
|
||||
_dueTime = dueTime
|
||||
_period = period
|
||||
}
|
||||
|
||||
override func run<O : ObserverType where O.E == Int64>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
|
||||
if let _ = period {
|
||||
if let _ = _period {
|
||||
let sink = TimerSink(parent: self, observer: observer, cancel: cancel)
|
||||
setSink(sink)
|
||||
return sink.run()
|
||||
|
|
|
|||
Loading…
Reference in New Issue