patch the implement of convert to serial queue.
This commit is contained in:
parent
6e30c7812d
commit
3dfd56d8da
|
|
@ -10,15 +10,20 @@ import Foundation
|
|||
|
||||
class DelaySink<ElementType, O: ObserverType where O.E == ElementType>
|
||||
: Sink<O>
|
||||
, ObserverType {
|
||||
typealias Source = Observable<ElementType>
|
||||
, ObserverType {
|
||||
typealias E = O.E
|
||||
typealias Source = Observable<E>
|
||||
typealias DisposeKey = Bag<Disposable>.KeyType
|
||||
|
||||
// state
|
||||
private let _group = CompositeDisposable()
|
||||
private let _sourceSubscription = SingleAssignmentDisposable()
|
||||
|
||||
private let _lock = NSRecursiveLock()
|
||||
|
||||
private var _queue = Queue<(onTime: RxTime, event: Event<E>)>(capacity: 0)
|
||||
private var _running = false
|
||||
private var _disposed = false
|
||||
|
||||
private let _dueTime: RxTimeInterval
|
||||
private let _scheduler: SchedulerType
|
||||
|
|
@ -29,33 +34,50 @@ class DelaySink<ElementType, O: ObserverType where O.E == ElementType>
|
|||
super.init(observer: observer)
|
||||
}
|
||||
|
||||
func drainQueue(key: DisposeKey) -> Disposable {
|
||||
_lock.lock(); defer { _lock.unlock() } // lock {
|
||||
if !_queue.isEmpty {
|
||||
let (onTime, event) = _queue.peek()
|
||||
let timeInterval = _scheduler.now.timeIntervalSinceDate(onTime)
|
||||
if timeInterval < _dueTime {
|
||||
return _scheduler.scheduleRelative(key, dueTime: _dueTime - timeInterval, action: drainQueue)
|
||||
}
|
||||
_queue.dequeue()
|
||||
forwardOn(event)
|
||||
if event.isStopEvent {
|
||||
dispose()
|
||||
} else {
|
||||
return drainQueue(key)
|
||||
}
|
||||
}
|
||||
_running = false
|
||||
_group.removeDisposable(key)
|
||||
return NopDisposable.instance
|
||||
// }
|
||||
}
|
||||
|
||||
func on(event: Event<E>) {
|
||||
switch event {
|
||||
case .Error(_):
|
||||
_lock.lock(); defer { _lock.unlock() } // lock {
|
||||
_lock.lock(); defer { _lock.unlock() } // lock {
|
||||
switch event {
|
||||
case .Error(_):
|
||||
forwardOn(event)
|
||||
dispose()
|
||||
// }
|
||||
default:
|
||||
let delayDisposable = SingleAssignmentDisposable()
|
||||
if let key = _group.addDisposable(delayDisposable) {
|
||||
delayDisposable.disposable = _scheduler.scheduleRecursive((self, key), dueTime: _dueTime) { state, _ in
|
||||
let (sink, key) = state
|
||||
sink.forwardOn(event)
|
||||
sink._group.removeDisposable(key)
|
||||
if event.isStopEvent {
|
||||
sink.dispose()
|
||||
default:
|
||||
_queue.enqueue((_scheduler.now, event))
|
||||
if !_running {
|
||||
_running = true
|
||||
let delayDisposable = SingleAssignmentDisposable()
|
||||
if let key = _group.addDisposable(delayDisposable) {
|
||||
delayDisposable.disposable = _scheduler.scheduleRelative(key, dueTime: _dueTime, action: drainQueue)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// }
|
||||
}
|
||||
|
||||
func run(source: Source) -> Disposable {
|
||||
_group.addDisposable(_sourceSubscription)
|
||||
|
||||
let subscription = source.subscribe(self)
|
||||
_sourceSubscription.disposable = subscription
|
||||
_sourceSubscription.disposable = source.subscribe(self)
|
||||
|
||||
return _group
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue