From 3dfd56d8dac20fd896d0b5d5c4d8132dcd2b031a Mon Sep 17 00:00:00 2001 From: tarunon Date: Mon, 14 Mar 2016 22:51:25 +0900 Subject: [PATCH] patch the implement of convert to serial queue. --- .../Observables/Implementations/Delay.swift | 60 +++++++++++++------ 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/RxSwift/Observables/Implementations/Delay.swift b/RxSwift/Observables/Implementations/Delay.swift index 5b725a7a..311d25c1 100644 --- a/RxSwift/Observables/Implementations/Delay.swift +++ b/RxSwift/Observables/Implementations/Delay.swift @@ -10,15 +10,20 @@ import Foundation class DelaySink : Sink -, ObserverType { - typealias Source = Observable + , ObserverType { typealias E = O.E + typealias Source = Observable + typealias DisposeKey = Bag.KeyType // state private let _group = CompositeDisposable() private let _sourceSubscription = SingleAssignmentDisposable() private let _lock = NSRecursiveLock() + + private var _queue = Queue<(onTime: RxTime, event: Event)>(capacity: 0) + private var _running = false + private var _disposed = false private let _dueTime: RxTimeInterval private let _scheduler: SchedulerType @@ -29,33 +34,50 @@ class DelaySink super.init(observer: observer) } + func drainQueue(key: DisposeKey) -> Disposable { + _lock.lock(); defer { _lock.unlock() } // lock { + if !_queue.isEmpty { + let (onTime, event) = _queue.peek() + let timeInterval = _scheduler.now.timeIntervalSinceDate(onTime) + if timeInterval < _dueTime { + return _scheduler.scheduleRelative(key, dueTime: _dueTime - timeInterval, action: drainQueue) + } + _queue.dequeue() + forwardOn(event) + if event.isStopEvent { + dispose() + } else { + return drainQueue(key) + } + } + _running = false + _group.removeDisposable(key) + return NopDisposable.instance + // } + } + func on(event: Event) { - switch event { - case .Error(_): - _lock.lock(); defer { _lock.unlock() } // lock { + _lock.lock(); defer { _lock.unlock() } // lock { + switch event { + case .Error(_): forwardOn(event) dispose() - // } - default: - let delayDisposable = SingleAssignmentDisposable() - if let key = _group.addDisposable(delayDisposable) { - delayDisposable.disposable = _scheduler.scheduleRecursive((self, key), dueTime: _dueTime) { state, _ in - let (sink, key) = state - sink.forwardOn(event) - sink._group.removeDisposable(key) - if event.isStopEvent { - sink.dispose() + default: + _queue.enqueue((_scheduler.now, event)) + if !_running { + _running = true + let delayDisposable = SingleAssignmentDisposable() + if let key = _group.addDisposable(delayDisposable) { + delayDisposable.disposable = _scheduler.scheduleRelative(key, dueTime: _dueTime, action: drainQueue) } } } - } + // } } func run(source: Source) -> Disposable { _group.addDisposable(_sourceSubscription) - - let subscription = source.subscribe(self) - _sourceSubscription.disposable = subscription + _sourceSubscription.disposable = source.subscribe(self) return _group }