From 13a87af009b1ef7841feb5bc3fb634cd94c0347b Mon Sep 17 00:00:00 2001 From: Krunoslav Zaher Date: Sun, 1 Nov 2015 11:48:10 +0100 Subject: [PATCH] Moves disposables combo in `Producer.subscribe` to `Sink`. `SynchronizedOnType` overhauls. --- RxSwift/Disposables/AnonymousDisposable.swift | 2 + .../SingleAssignmentDisposable.swift | 3 + RxSwift/Observables/Implementations/Amb.swift | 12 +- .../Implementations/AnonymousObservable.swift | 12 +- .../Observables/Implementations/Buffer.swift | 56 +++-- .../Observables/Implementations/Catch.swift | 24 +- .../CombineLatest+CollectionType.swift | 95 ++++---- .../Implementations/CombineLatest+arity.swift | 154 ++++++------- .../Implementations/CombineLatest+arity.tt | 14 +- .../Implementations/CombineLatest.swift | 10 +- .../Observables/Implementations/Concat.swift | 14 +- .../Observables/Implementations/Debug.swift | 12 +- .../Implementations/Deferred.swift | 12 +- .../Implementations/DelaySubscription.swift | 17 +- .../DistinctUntilChanged.swift | 12 +- RxSwift/Observables/Implementations/Do.swift | 14 +- .../Implementations/ElementAt.swift | 12 +- .../Observables/Implementations/Filter.swift | 12 +- .../Observables/Implementations/FlatMap.swift | 28 +-- .../Implementations/Generate.swift | 12 +- RxSwift/Observables/Implementations/Map.swift | 26 +-- .../Observables/Implementations/Merge.swift | 213 ++++++++++-------- .../Implementations/Multicast.swift | 12 +- .../Implementations/ObserveOn.swift | 98 ++++---- .../ObserveOnSerialDispatchQueue.swift | 25 +- .../Implementations/Producer.swift | 21 +- .../Observables/Implementations/Range.swift | 12 +- .../Observables/Implementations/Reduce.swift | 12 +- .../Implementations/RefCount.swift | 26 ++- .../Observables/Implementations/Repeat.swift | 13 +- .../Observables/Implementations/Sample.swift | 105 +++++---- .../Observables/Implementations/Scan.swift | 12 +- .../Observables/Implementations/Sink.swift | 28 +-- .../Observables/Implementations/Skip.swift | 25 +- .../Implementations/SkipUntil.swift | 102 +++++---- .../Implementations/SkipWhile.swift | 22 +- .../Implementations/StartWith.swift | 2 +- .../Implementations/SubscribeOn.swift | 12 +- .../Observables/Implementations/Switch.swift | 118 +++++----- .../Observables/Implementations/Take.swift | 64 +++--- .../Implementations/TakeLast.swift | 12 +- .../Implementations/TakeUntil.swift | 82 +++---- .../Implementations/TakeWhile.swift | 30 +-- .../Implementations/Throttle.swift | 105 ++++----- .../Observables/Implementations/Timer.swift | 22 +- .../Observables/Implementations/ToArray.swift | 14 +- .../Observables/Implementations/Using.swift | 12 +- .../Implementations/WithLatestFrom.swift | 92 ++++---- .../Implementations/Zip+CollectionType.swift | 19 +- .../Implementations/Zip+arity.swift | 154 ++++++------- .../Observables/Implementations/Zip+arity.tt | 14 +- RxSwift/Observables/Implementations/Zip.swift | 48 ++-- RxSwift/Observers/TailRecursiveSink.swift | 4 +- .../Schedulers/CurrentThreadScheduler.swift | 6 +- RxTests/PerformanceTests/main.swift | 10 +- ...rvable+StandardSequenceOperatorsTest.swift | 6 +- RxTests/RxTest.swift | 2 +- .../xcschemes/RxTests-OSX.xcscheme | 2 +- .../xcschemes/RxTests-iOS.xcscheme | 2 +- .../xcschemes/RxTests-tvOS.xcscheme | 2 +- 60 files changed, 1090 insertions(+), 1023 deletions(-) diff --git a/RxSwift/Disposables/AnonymousDisposable.swift b/RxSwift/Disposables/AnonymousDisposable.swift index dffdd30d..b9aef47a 100644 --- a/RxSwift/Disposables/AnonymousDisposable.swift +++ b/RxSwift/Disposables/AnonymousDisposable.swift @@ -45,6 +45,8 @@ public final class AnonymousDisposable : DisposeBase, Cancelable { */ public func dispose() { if OSAtomicCompareAndSwap32(0, 1, &_disposed) { + assert(_disposed == 1) + if let action = _disposeAction { _disposeAction = nil action() diff --git a/RxSwift/Disposables/SingleAssignmentDisposable.swift b/RxSwift/Disposables/SingleAssignmentDisposable.swift index 8b2858bd..f74e22eb 100644 --- a/RxSwift/Disposables/SingleAssignmentDisposable.swift +++ b/RxSwift/Disposables/SingleAssignmentDisposable.swift @@ -72,6 +72,9 @@ public class SingleAssignmentDisposable : DisposeBase, Disposable, Cancelable { Disposes the underlying disposable. */ public func dispose() { + if _disposed { + return + } _dispose()?.dispose() } diff --git a/RxSwift/Observables/Implementations/Amb.swift b/RxSwift/Observables/Implementations/Amb.swift index 9c19d568..439cbd65 100644 --- a/RxSwift/Observables/Implementations/Amb.swift +++ b/RxSwift/Observables/Implementations/Amb.swift @@ -58,9 +58,9 @@ class AmbSink : Sink { // state private var _choice = AmbState.Neither - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func run() -> Disposable { @@ -114,9 +114,9 @@ class Amb: Producer { _right = right } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = AmbSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = AmbSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/AnonymousObservable.swift b/RxSwift/Observables/Implementations/AnonymousObservable.swift index 74379c2d..506f8b31 100644 --- a/RxSwift/Observables/Implementations/AnonymousObservable.swift +++ b/RxSwift/Observables/Implementations/AnonymousObservable.swift @@ -16,8 +16,8 @@ class AnonymousObservableSink : Sink, ObserverType { // state private var _isStopped: Int32 = 0 - override init(observer: O, cancel: Disposable) { - super.init(observer: observer, cancel: cancel) + override init(observer: O) { + super.init(observer: observer) } func on(event: Event) { @@ -49,9 +49,9 @@ public class AnonymousObservable : Producer { _subscribeHandler = subscribeHandler } - public override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = AnonymousObservableSink(observer: observer, cancel: cancel) - setSink(sink) - return sink.run(self) + public override func run(observer: O) -> Disposable { + let sink = AnonymousObservableSink(observer: observer) + sink.disposable = sink.run(self) + return sink } } diff --git a/RxSwift/Observables/Implementations/Buffer.swift b/RxSwift/Observables/Implementations/Buffer.swift index eb380461..1c932ed3 100644 --- a/RxSwift/Observables/Implementations/Buffer.swift +++ b/RxSwift/Observables/Implementations/Buffer.swift @@ -22,29 +22,33 @@ class BufferTimeCount : Producer<[Element]> { _scheduler = scheduler } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = BufferTimeCountSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = BufferTimeCountSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } -class BufferTimeCountSink : Sink, ObserverType { +class BufferTimeCountSink + : Sink + , LockOwnerType + , ObserverType + , SynchronizedOnType { typealias Parent = BufferTimeCount typealias E = Element private let _parent: Parent - private let _lock = NSRecursiveLock() + let _lock = NSRecursiveLock() // state private let _timerD = SerialDisposable() private var _buffer = [Element]() private var _windowID = 0 - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func run() -> Disposable { @@ -64,24 +68,26 @@ class BufferTimeCountSink) { - _lock.performLocked { - switch event { - case .Next(let element): - _buffer.append(element) - - if _buffer.count == _parent._count { - startNewWindowAndSendCurrentOne() - } - - case .Error(let error): - _buffer = [] - observer?.on(.Error(error)) - dispose() - case .Completed: - observer?.on(.Next(_buffer)) - observer?.on(.Completed) - dispose() + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { + switch event { + case .Next(let element): + _buffer.append(element) + + if _buffer.count == _parent._count { + startNewWindowAndSendCurrentOne() } + + case .Error(let error): + _buffer = [] + observer?.on(.Error(error)) + dispose() + case .Completed: + observer?.on(.Next(_buffer)) + observer?.on(.Completed) + dispose() } } diff --git a/RxSwift/Observables/Implementations/Catch.swift b/RxSwift/Observables/Implementations/Catch.swift index 3924948c..eaccfc8e 100644 --- a/RxSwift/Observables/Implementations/Catch.swift +++ b/RxSwift/Observables/Implementations/Catch.swift @@ -39,9 +39,9 @@ class CatchSink : Sink, ObserverType { private let _parent: Parent private let _subscription = SerialDisposable() - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func run() -> Disposable { @@ -86,10 +86,10 @@ class Catch : Producer { _handler = handler } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = CatchSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = CatchSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } @@ -101,8 +101,8 @@ class CatchSequenceSink) { @@ -148,9 +148,9 @@ class CatchSequence(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = CatchSequenceSink(observer: observer, cancel: cancel) - setSink(sink) - return sink.run(self.sources.generate()) + override func run(observer: O) -> Disposable { + let sink = CatchSequenceSink(observer: observer) + sink.disposable = sink.run(self.sources.generate()) + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/CombineLatest+CollectionType.swift b/RxSwift/Observables/Implementations/CombineLatest+CollectionType.swift index e61c4390..e1e2bb82 100644 --- a/RxSwift/Observables/Implementations/CombineLatest+CollectionType.swift +++ b/RxSwift/Observables/Implementations/CombineLatest+CollectionType.swift @@ -8,48 +8,49 @@ import Foundation -class CombineLatestCollectionTypeSink : Sink { +class CombineLatestCollectionTypeSink + : Sink { typealias Parent = CombineLatestCollectionType typealias SourceElement = C.Generator.Element.E - let parent: Parent - - let lock = NSRecursiveLock() + let _parent: Parent + let _lock = NSRecursiveLock() + // state - var numberOfValues = 0 - var values: [SourceElement?] - var isDone: [Bool] - var numberOfDone = 0 - var subscriptions: [SingleAssignmentDisposable] + var _numberOfValues = 0 + var _values: [SourceElement?] + var _isDone: [Bool] + var _numberOfDone = 0 + var _subscriptions: [SingleAssignmentDisposable] - init(parent: Parent, observer: O, cancel: Disposable) { - self.parent = parent - self.values = [SourceElement?](count: parent.count, repeatedValue: nil) - self.isDone = [Bool](count: parent.count, repeatedValue: false) - self.subscriptions = Array() - self.subscriptions.reserveCapacity(parent.count) + init(parent: Parent, observer: O) { + _parent = parent + _values = [SourceElement?](count: parent._count, repeatedValue: nil) + _isDone = [Bool](count: parent._count, repeatedValue: false) + _subscriptions = Array() + _subscriptions.reserveCapacity(parent._count) - for _ in 0 ..< parent.count { - self.subscriptions.append(SingleAssignmentDisposable()) + for _ in 0 ..< parent._count { + _subscriptions.append(SingleAssignmentDisposable()) } - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event, atIndex: Int) { - lock.performLocked { + _lock.lock(); defer { _lock.unlock() } // { switch event { case .Next(let element): - if values[atIndex] == nil { - numberOfValues++ + if _values[atIndex] == nil { + _numberOfValues++ } - values[atIndex] = element + _values[atIndex] = element - if numberOfValues < parent.count { - let numberOfOthersThatAreDone = self.numberOfDone - (isDone[atIndex] ? 1 : 0) - if numberOfOthersThatAreDone == self.parent.count - 1 { + if _numberOfValues < _parent._count { + let numberOfOthersThatAreDone = self._numberOfDone - (_isDone[atIndex] ? 1 : 0) + if numberOfOthersThatAreDone == self._parent._count - 1 { observer?.on(.Completed) dispose() } @@ -57,7 +58,7 @@ class CombineLatestCollectionTypeSink Disposable { var j = 0 - for i in parent.sources.startIndex ..< parent.sources.endIndex { + for i in _parent._sources.startIndex ..< _parent._sources.endIndex { let index = j - let source = self.parent.sources[i].asObservable() - self.subscriptions[j].disposable = source.subscribe(AnyObserver { event in + let source = _parent._sources[i].asObservable() + _subscriptions[j].disposable = source.subscribe(AnyObserver { event in self.on(event, atIndex: index) }) j++ } - return CompositeDisposable(disposables: self.subscriptions.map { $0 }) + return CompositeDisposable(disposables: _subscriptions.map { $0 }) } } class CombineLatestCollectionType : Producer { typealias ResultSelector = [C.Generator.Element.E] throws -> R - let sources: C - let resultSelector: ResultSelector - let count: Int - + let _sources: C + let _resultSelector: ResultSelector + let _count: Int + init(sources: C, resultSelector: ResultSelector) { - self.sources = sources - self.resultSelector = resultSelector - self.count = Int(self.sources.count.toIntMax()) + _sources = sources + _resultSelector = resultSelector + _count = Int(self._sources.count.toIntMax()) } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = CombineLatestCollectionTypeSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = CombineLatestCollectionTypeSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/CombineLatest+arity.swift b/RxSwift/Observables/Implementations/CombineLatest+arity.swift index da65b503..320e701e 100644 --- a/RxSwift/Observables/Implementations/CombineLatest+arity.swift +++ b/RxSwift/Observables/Implementations/CombineLatest+arity.swift @@ -39,17 +39,17 @@ class CombineLatestSink2_ : CombineLatestSink { var _latestElement1: E1! = nil var _latestElement2: E2! = nil - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(arity: 2, observer: observer, cancel: cancel) + super.init(arity: 2, observer: observer) } func run() -> Disposable { let subscription1 = SingleAssignmentDisposable() let subscription2 = SingleAssignmentDisposable() - let observer1 = CombineLatestObserver(lock: lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1) - let observer2 = CombineLatestObserver(lock: lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2) + let observer1 = CombineLatestObserver(lock: _lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1) + let observer2 = CombineLatestObserver(lock: _lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2) subscription1.disposable = _parent._source1.subscribe(observer1) subscription2.disposable = _parent._source2.subscribe(observer2) @@ -80,10 +80,10 @@ class CombineLatest2 : Producer { _resultSelector = resultSelector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = CombineLatestSink2_(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = CombineLatestSink2_(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } @@ -117,9 +117,9 @@ class CombineLatestSink3_ : CombineLatestSink { var _latestElement2: E2! = nil var _latestElement3: E3! = nil - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(arity: 3, observer: observer, cancel: cancel) + super.init(arity: 3, observer: observer) } func run() -> Disposable { @@ -127,9 +127,9 @@ class CombineLatestSink3_ : CombineLatestSink { let subscription2 = SingleAssignmentDisposable() let subscription3 = SingleAssignmentDisposable() - let observer1 = CombineLatestObserver(lock: lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1) - let observer2 = CombineLatestObserver(lock: lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2) - let observer3 = CombineLatestObserver(lock: lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3) + let observer1 = CombineLatestObserver(lock: _lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1) + let observer2 = CombineLatestObserver(lock: _lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2) + let observer3 = CombineLatestObserver(lock: _lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3) subscription1.disposable = _parent._source1.subscribe(observer1) subscription2.disposable = _parent._source2.subscribe(observer2) @@ -164,10 +164,10 @@ class CombineLatest3 : Producer { _resultSelector = resultSelector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = CombineLatestSink3_(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = CombineLatestSink3_(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } @@ -202,9 +202,9 @@ class CombineLatestSink4_ : CombineLatestSink Disposable { @@ -213,10 +213,10 @@ class CombineLatestSink4_ : CombineLatestSink Void in self._latestElement1 = e }, this: subscription1) - let observer2 = CombineLatestObserver(lock: lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2) - let observer3 = CombineLatestObserver(lock: lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3) - let observer4 = CombineLatestObserver(lock: lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4) + let observer1 = CombineLatestObserver(lock: _lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1) + let observer2 = CombineLatestObserver(lock: _lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2) + let observer3 = CombineLatestObserver(lock: _lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3) + let observer4 = CombineLatestObserver(lock: _lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4) subscription1.disposable = _parent._source1.subscribe(observer1) subscription2.disposable = _parent._source2.subscribe(observer2) @@ -255,10 +255,10 @@ class CombineLatest4 : Producer { _resultSelector = resultSelector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = CombineLatestSink4_(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = CombineLatestSink4_(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } @@ -294,9 +294,9 @@ class CombineLatestSink5_ : CombineLatestSi var _latestElement4: E4! = nil var _latestElement5: E5! = nil - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(arity: 5, observer: observer, cancel: cancel) + super.init(arity: 5, observer: observer) } func run() -> Disposable { @@ -306,11 +306,11 @@ class CombineLatestSink5_ : CombineLatestSi let subscription4 = SingleAssignmentDisposable() let subscription5 = SingleAssignmentDisposable() - let observer1 = CombineLatestObserver(lock: lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1) - let observer2 = CombineLatestObserver(lock: lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2) - let observer3 = CombineLatestObserver(lock: lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3) - let observer4 = CombineLatestObserver(lock: lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4) - let observer5 = CombineLatestObserver(lock: lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5) + let observer1 = CombineLatestObserver(lock: _lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1) + let observer2 = CombineLatestObserver(lock: _lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2) + let observer3 = CombineLatestObserver(lock: _lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3) + let observer4 = CombineLatestObserver(lock: _lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4) + let observer5 = CombineLatestObserver(lock: _lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5) subscription1.disposable = _parent._source1.subscribe(observer1) subscription2.disposable = _parent._source2.subscribe(observer2) @@ -353,10 +353,10 @@ class CombineLatest5 : Producer { _resultSelector = resultSelector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = CombineLatestSink5_(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = CombineLatestSink5_(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } @@ -393,9 +393,9 @@ class CombineLatestSink6_ : CombineLate var _latestElement5: E5! = nil var _latestElement6: E6! = nil - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(arity: 6, observer: observer, cancel: cancel) + super.init(arity: 6, observer: observer) } func run() -> Disposable { @@ -406,12 +406,12 @@ class CombineLatestSink6_ : CombineLate let subscription5 = SingleAssignmentDisposable() let subscription6 = SingleAssignmentDisposable() - let observer1 = CombineLatestObserver(lock: lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1) - let observer2 = CombineLatestObserver(lock: lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2) - let observer3 = CombineLatestObserver(lock: lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3) - let observer4 = CombineLatestObserver(lock: lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4) - let observer5 = CombineLatestObserver(lock: lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5) - let observer6 = CombineLatestObserver(lock: lock, parent: self, index: 5, setLatestValue: { (e: E6) -> Void in self._latestElement6 = e }, this: subscription6) + let observer1 = CombineLatestObserver(lock: _lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1) + let observer2 = CombineLatestObserver(lock: _lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2) + let observer3 = CombineLatestObserver(lock: _lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3) + let observer4 = CombineLatestObserver(lock: _lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4) + let observer5 = CombineLatestObserver(lock: _lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5) + let observer6 = CombineLatestObserver(lock: _lock, parent: self, index: 5, setLatestValue: { (e: E6) -> Void in self._latestElement6 = e }, this: subscription6) subscription1.disposable = _parent._source1.subscribe(observer1) subscription2.disposable = _parent._source2.subscribe(observer2) @@ -458,10 +458,10 @@ class CombineLatest6 : Producer { _resultSelector = resultSelector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = CombineLatestSink6_(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = CombineLatestSink6_(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } @@ -499,9 +499,9 @@ class CombineLatestSink7_ : Combine var _latestElement6: E6! = nil var _latestElement7: E7! = nil - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(arity: 7, observer: observer, cancel: cancel) + super.init(arity: 7, observer: observer) } func run() -> Disposable { @@ -513,13 +513,13 @@ class CombineLatestSink7_ : Combine let subscription6 = SingleAssignmentDisposable() let subscription7 = SingleAssignmentDisposable() - let observer1 = CombineLatestObserver(lock: lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1) - let observer2 = CombineLatestObserver(lock: lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2) - let observer3 = CombineLatestObserver(lock: lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3) - let observer4 = CombineLatestObserver(lock: lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4) - let observer5 = CombineLatestObserver(lock: lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5) - let observer6 = CombineLatestObserver(lock: lock, parent: self, index: 5, setLatestValue: { (e: E6) -> Void in self._latestElement6 = e }, this: subscription6) - let observer7 = CombineLatestObserver(lock: lock, parent: self, index: 6, setLatestValue: { (e: E7) -> Void in self._latestElement7 = e }, this: subscription7) + let observer1 = CombineLatestObserver(lock: _lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1) + let observer2 = CombineLatestObserver(lock: _lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2) + let observer3 = CombineLatestObserver(lock: _lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3) + let observer4 = CombineLatestObserver(lock: _lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4) + let observer5 = CombineLatestObserver(lock: _lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5) + let observer6 = CombineLatestObserver(lock: _lock, parent: self, index: 5, setLatestValue: { (e: E6) -> Void in self._latestElement6 = e }, this: subscription6) + let observer7 = CombineLatestObserver(lock: _lock, parent: self, index: 6, setLatestValue: { (e: E7) -> Void in self._latestElement7 = e }, this: subscription7) subscription1.disposable = _parent._source1.subscribe(observer1) subscription2.disposable = _parent._source2.subscribe(observer2) @@ -570,10 +570,10 @@ class CombineLatest7 : Producer { _resultSelector = resultSelector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = CombineLatestSink7_(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = CombineLatestSink7_(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } @@ -612,9 +612,9 @@ class CombineLatestSink8_ : Com var _latestElement7: E7! = nil var _latestElement8: E8! = nil - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(arity: 8, observer: observer, cancel: cancel) + super.init(arity: 8, observer: observer) } func run() -> Disposable { @@ -627,14 +627,14 @@ class CombineLatestSink8_ : Com let subscription7 = SingleAssignmentDisposable() let subscription8 = SingleAssignmentDisposable() - let observer1 = CombineLatestObserver(lock: lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1) - let observer2 = CombineLatestObserver(lock: lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2) - let observer3 = CombineLatestObserver(lock: lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3) - let observer4 = CombineLatestObserver(lock: lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4) - let observer5 = CombineLatestObserver(lock: lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5) - let observer6 = CombineLatestObserver(lock: lock, parent: self, index: 5, setLatestValue: { (e: E6) -> Void in self._latestElement6 = e }, this: subscription6) - let observer7 = CombineLatestObserver(lock: lock, parent: self, index: 6, setLatestValue: { (e: E7) -> Void in self._latestElement7 = e }, this: subscription7) - let observer8 = CombineLatestObserver(lock: lock, parent: self, index: 7, setLatestValue: { (e: E8) -> Void in self._latestElement8 = e }, this: subscription8) + let observer1 = CombineLatestObserver(lock: _lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1) + let observer2 = CombineLatestObserver(lock: _lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2) + let observer3 = CombineLatestObserver(lock: _lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3) + let observer4 = CombineLatestObserver(lock: _lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4) + let observer5 = CombineLatestObserver(lock: _lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5) + let observer6 = CombineLatestObserver(lock: _lock, parent: self, index: 5, setLatestValue: { (e: E6) -> Void in self._latestElement6 = e }, this: subscription6) + let observer7 = CombineLatestObserver(lock: _lock, parent: self, index: 6, setLatestValue: { (e: E7) -> Void in self._latestElement7 = e }, this: subscription7) + let observer8 = CombineLatestObserver(lock: _lock, parent: self, index: 7, setLatestValue: { (e: E8) -> Void in self._latestElement8 = e }, this: subscription8) subscription1.disposable = _parent._source1.subscribe(observer1) subscription2.disposable = _parent._source2.subscribe(observer2) @@ -689,10 +689,10 @@ class CombineLatest8 : Producer { _resultSelector = resultSelector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = CombineLatestSink8_(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = CombineLatestSink8_(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } diff --git a/RxSwift/Observables/Implementations/CombineLatest+arity.tt b/RxSwift/Observables/Implementations/CombineLatest+arity.tt index 927f7e0d..89217773 100644 --- a/RxSwift/Observables/Implementations/CombineLatest+arity.tt +++ b/RxSwift/Observables/Implementations/CombineLatest+arity.tt @@ -38,9 +38,9 @@ class CombineLatestSink<%= i %>_<<%= (Array(1...i).map { "E\($0)" }).joinWithSep " var _latestElement\($0): E\($0)! = nil" }).joinWithSeparator("\n") %> - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(arity: <%= i %>, observer: observer, cancel: cancel) + super.init(arity: <%= i %>, observer: observer) } func run() -> Disposable { @@ -49,7 +49,7 @@ class CombineLatestSink<%= i %>_<<%= (Array(1...i).map { "E\($0)" }).joinWithSep }).joinWithSeparator("\n") %> <%= (Array(1...i).map { -" let observer\($0) = CombineLatestObserver(lock: lock, parent: self, index: \($0 - 1), setLatestValue: { (e: E\($0)) -> Void in self._latestElement\($0) = e }, this: subscription\($0))" +" let observer\($0) = CombineLatestObserver(lock: _lock, parent: self, index: \($0 - 1), setLatestValue: { (e: E\($0)) -> Void in self._latestElement\($0) = e }, this: subscription\($0))" }).joinWithSeparator("\n") %> <%= (Array(1...i).map { @@ -83,10 +83,10 @@ class CombineLatest<%= i %><<%= (Array(1...i).map { "E\($0)" }).joinWithSeparato _resultSelector = resultSelector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = CombineLatestSink<%= i %>_(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = CombineLatestSink<%= i %>_(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } diff --git a/RxSwift/Observables/Implementations/CombineLatest.swift b/RxSwift/Observables/Implementations/CombineLatest.swift index 300cf2db..5807ebdc 100644 --- a/RxSwift/Observables/Implementations/CombineLatest.swift +++ b/RxSwift/Observables/Implementations/CombineLatest.swift @@ -14,10 +14,12 @@ protocol CombineLatestProtocol : class { func done(index: Int) } -class CombineLatestSink : Sink, CombineLatestProtocol { +class CombineLatestSink + : Sink + , CombineLatestProtocol { typealias Element = O.E - let lock = NSRecursiveLock() + let _lock = NSRecursiveLock() private let _arity: Int private var _numberOfValues = 0 @@ -25,12 +27,12 @@ class CombineLatestSink : Sink, CombineLatestProtocol { private var _hasValue: [Bool] private var _isDone: [Bool] - init(arity: Int, observer: O, cancel: Disposable) { + init(arity: Int, observer: O) { _arity = arity _hasValue = [Bool](count: arity, repeatedValue: false) _isDone = [Bool](count: arity, repeatedValue: false) - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func getResult() throws -> Element { diff --git a/RxSwift/Observables/Implementations/Concat.swift b/RxSwift/Observables/Implementations/Concat.swift index 69136938..c9aca4e8 100644 --- a/RxSwift/Observables/Implementations/Concat.swift +++ b/RxSwift/Observables/Implementations/Concat.swift @@ -12,8 +12,8 @@ import Foundation class ConcatSink : TailRecursiveSink { typealias Element = O.E - override init(observer: O, cancel: Disposable) { - super.init(observer: observer, cancel: cancel) + override init(observer: O) { + super.init(observer: observer) } override func on(event: Event){ @@ -47,11 +47,9 @@ class Concat - (observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = ConcatSink(observer: observer, cancel: cancel) - setSink(sink) - - return sink.run(_sources.generate()) + override func run(observer: O) -> Disposable { + let sink = ConcatSink(observer: observer) + sink.disposable = sink.run(_sources.generate()) + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Debug.swift b/RxSwift/Observables/Implementations/Debug.swift index f527c2f5..090849cd 100644 --- a/RxSwift/Observables/Implementations/Debug.swift +++ b/RxSwift/Observables/Implementations/Debug.swift @@ -14,9 +14,9 @@ class Debug_ : Sink, ObserverType { private let _parent: Parent - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { @@ -45,10 +45,10 @@ class Debug : Producer { _source = source } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + override func run(observer: O) -> Disposable { print("[\(_identifier)] subscribed") - let sink = Debug_(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return _source.subscribe(sink) + let sink = Debug_(parent: self, observer: observer) + sink.disposable = _source.subscribe(sink) + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Deferred.swift b/RxSwift/Observables/Implementations/Deferred.swift index affbd3d2..92b71788 100644 --- a/RxSwift/Observables/Implementations/Deferred.swift +++ b/RxSwift/Observables/Implementations/Deferred.swift @@ -14,9 +14,9 @@ class DeferredSink : Sink, ObserverType { private let _parent: Parent - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func run() -> Disposable { @@ -54,10 +54,10 @@ class Deferred : Producer { _observableFactory = observableFactory } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = DeferredSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = DeferredSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } func eval() throws -> Observable { diff --git a/RxSwift/Observables/Implementations/DelaySubscription.swift b/RxSwift/Observables/Implementations/DelaySubscription.swift index 0669bc10..147fd032 100644 --- a/RxSwift/Observables/Implementations/DelaySubscription.swift +++ b/RxSwift/Observables/Implementations/DelaySubscription.swift @@ -8,15 +8,17 @@ import Foundation -class DelaySubscriptionSink : Sink, ObserverType { +class DelaySubscriptionSink + : Sink + , ObserverType { typealias Parent = DelaySubscription typealias E = O.E private let _parent: Parent - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { @@ -41,11 +43,12 @@ class DelaySubscription: Producer { _scheduler = scheduler } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = DelaySubscriptionSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return _scheduler.scheduleRelative((), dueTime: _dueTime) { _ in + override func run(observer: O) -> Disposable { + let sink = DelaySubscriptionSink(parent: self, observer: observer) + sink.disposable = _scheduler.scheduleRelative((), dueTime: _dueTime) { _ in return self._source.subscribe(sink) } + + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/DistinctUntilChanged.swift b/RxSwift/Observables/Implementations/DistinctUntilChanged.swift index 56521765..d0ab70cf 100644 --- a/RxSwift/Observables/Implementations/DistinctUntilChanged.swift +++ b/RxSwift/Observables/Implementations/DistinctUntilChanged.swift @@ -14,9 +14,9 @@ class DistinctUntilChangedSink: Sink, ObserverType { private let _parent: DistinctUntilChanged private var _currentKey: Key? = nil - init(parent: DistinctUntilChanged, observer: O, cancel: Disposable) { + init(parent: DistinctUntilChanged, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { @@ -64,9 +64,9 @@ class DistinctUntilChanged: Producer { _comparer = comparer } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = DistinctUntilChangedSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return _source.subscribe(sink) + override func run(observer: O) -> Disposable { + let sink = DistinctUntilChangedSink(parent: self, observer: observer) + sink.disposable = _source.subscribe(sink) + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Do.swift b/RxSwift/Observables/Implementations/Do.swift index 1af070c6..5f163cf6 100644 --- a/RxSwift/Observables/Implementations/Do.swift +++ b/RxSwift/Observables/Implementations/Do.swift @@ -14,9 +14,9 @@ class DoSink : Sink, ObserverType { private let _parent: Parent - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { @@ -45,11 +45,9 @@ class Do : Producer { _eventHandler = eventHandler } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = DoSink(parent: self, observer: observer, cancel: cancel) - - setSink(sink) - - return _source.subscribe(sink) + override func run(observer: O) -> Disposable { + let sink = DoSink(parent: self, observer: observer) + sink.disposable = _source.subscribe(sink) + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/ElementAt.swift b/RxSwift/Observables/Implementations/ElementAt.swift index 29f635e2..19401b2c 100644 --- a/RxSwift/Observables/Implementations/ElementAt.swift +++ b/RxSwift/Observables/Implementations/ElementAt.swift @@ -15,11 +15,11 @@ class ElementAtSink : Sink< let _parent: Parent var _i: Int - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent _i = parent._index - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { @@ -71,9 +71,9 @@ class ElementAt : Producer { self._throwOnEmpty = throwOnEmpty } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = ElementAtSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return _source.subscribeSafe(sink) + override func run(observer: O) -> Disposable { + let sink = ElementAtSink(parent: self, observer: observer) + sink.disposable = _source.subscribeSafe(sink) + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Filter.swift b/RxSwift/Observables/Implementations/Filter.swift index ad47c6fb..8efda19b 100644 --- a/RxSwift/Observables/Implementations/Filter.swift +++ b/RxSwift/Observables/Implementations/Filter.swift @@ -15,9 +15,9 @@ class FilterSink: Sink, ObserverType { private let _parent: Parent - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { @@ -51,9 +51,9 @@ class Filter : Producer { _predicate = predicate } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = FilterSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return _source.subscribe(sink) + override func run(observer: O) -> Disposable { + let sink = FilterSink(parent: self, observer: observer) + sink.disposable = _source.subscribe(sink) + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/FlatMap.swift b/RxSwift/Observables/Implementations/FlatMap.swift index 11ebef1f..f9383df3 100644 --- a/RxSwift/Observables/Implementations/FlatMap.swift +++ b/RxSwift/Observables/Implementations/FlatMap.swift @@ -67,9 +67,9 @@ class FlatMapSink S { @@ -128,8 +128,8 @@ class FlatMapSink : FlatMapSink { - override init(parent: Parent, observer: O, cancel: Disposable) { - super.init(parent: parent, observer: observer, cancel: cancel) + override init(parent: Parent, observer: O) { + super.init(parent: parent, observer: observer) } override func performMap(element: SourceType) throws -> S { @@ -140,8 +140,8 @@ class FlatMapSink1 : FlatMapSink { private var _index = 0 - override init(parent: Parent, observer: O, cancel: Disposable) { - super.init(parent: parent, observer: observer, cancel: cancel) + override init(parent: Parent, observer: O) { + super.init(parent: parent, observer: observer) } override func performMap(element: SourceType) throws -> S { @@ -170,16 +170,18 @@ class FlatMap: Producer { _selector1 = nil } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + override func run(observer: O) -> Disposable { + let sink: FlatMapSink if let _ = _selector1 { - let sink = FlatMapSink1(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + sink = FlatMapSink1(parent: self, observer: observer) } else { - let sink = FlatMapSink2(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + sink = FlatMapSink2(parent: self, observer: observer) } + + let subscription = sink.run() + sink.disposable = subscription + + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Generate.swift b/RxSwift/Observables/Implementations/Generate.swift index 4e6a51c1..1c2f6ba2 100644 --- a/RxSwift/Observables/Implementations/Generate.swift +++ b/RxSwift/Observables/Implementations/Generate.swift @@ -15,10 +15,10 @@ class GenerateSink : Sink { private var _state: S - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent _state = parent._initialState - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func run() -> Disposable { @@ -63,9 +63,9 @@ class Generate : Producer { super.init() } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = GenerateSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = GenerateSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Map.swift b/RxSwift/Observables/Implementations/Map.swift index 5781ed60..e7817cbe 100644 --- a/RxSwift/Observables/Implementations/Map.swift +++ b/RxSwift/Observables/Implementations/Map.swift @@ -15,9 +15,9 @@ class MapSink : Sink, ObserverType { private let _parent: Parent - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func performMap(element: SourceType) throws -> ResultType { @@ -50,8 +50,8 @@ class MapSink : Sink, ObserverType { class MapSink1 : MapSink { typealias ResultType = O.E - override init(parent: Map, observer: O, cancel: Disposable) { - super.init(parent: parent, observer: observer, cancel: cancel) + override init(parent: Map, observer: O) { + super.init(parent: parent, observer: observer) } override func performMap(element: SourceType) throws -> ResultType { @@ -64,8 +64,8 @@ class MapSink2 : MapSink { private var _index = 0 - override init(parent: Map, observer: O, cancel: Disposable) { - super.init(parent: parent, observer: observer, cancel: cancel) + override init(parent: Map, observer: O) { + super.init(parent: parent, observer: observer) } override func performMap(element: SourceType) throws -> ResultType { return try _parent._selector2!(element, try incrementChecked(&_index)) @@ -93,16 +93,16 @@ class Map: Producer { _selector1 = nil } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + override func run(observer: O) -> Disposable { if let _ = _selector1 { - let sink = MapSink1(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return _source.subscribe(sink) + let sink = MapSink1(parent: self, observer: observer) + sink.disposable = _source.subscribe(sink) + return sink } else { - let sink = MapSink2(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return _source.subscribe(sink) + let sink = MapSink2(parent: self, observer: observer) + sink.disposable = _source.subscribe(sink) + return sink } } diff --git a/RxSwift/Observables/Implementations/Merge.swift b/RxSwift/Observables/Implementations/Merge.swift index 65761898..a0b996ba 100644 --- a/RxSwift/Observables/Implementations/Merge.swift +++ b/RxSwift/Observables/Implementations/Merge.swift @@ -10,46 +10,59 @@ import Foundation // sequential -class MergeSinkIter : ObserverType { +class MergeSinkIter + : ObserverType + , LockOwnerType + , SynchronizedOnType { typealias E = O.E typealias DisposeKey = Bag.KeyType typealias Parent = MergeSink private let _parent: Parent private let _disposeKey: DisposeKey - + + var _lock: NSRecursiveLock { + return _parent._lock + } + init(parent: Parent, disposeKey: DisposeKey) { _parent = parent _disposeKey = disposeKey } func on(event: Event) { - _parent._lock.performLocked { - switch event { - case .Next: - _parent.observer?.on(event) - case .Error: - _parent.observer?.on(event) + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { + switch event { + case .Next: + _parent.observer?.on(event) + case .Error: + _parent.observer?.on(event) + _parent.dispose() + case .Completed: + _parent._group.removeDisposable(_disposeKey) + + if _parent._stopped && _parent._group.count == 1 { + _parent.observer?.on(.Completed) _parent.dispose() - case .Completed: - _parent._group.removeDisposable(_disposeKey) - - if _parent._stopped && _parent._group.count == 1 { - _parent.observer?.on(.Completed) - _parent.dispose() - } } } } } -class MergeSink : Sink, ObserverType { +class MergeSink + : Sink + , ObserverType + , LockOwnerType + , SynchronizedOnType { typealias E = S typealias Parent = Merge private let _parent: Parent - private let _lock = NSRecursiveLock() + let _lock = NSRecursiveLock() // state private var _stopped = false @@ -57,10 +70,10 @@ class MergeSink private let _group = CompositeDisposable() private let _sourceSubscription = SingleAssignmentDisposable() - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func run() -> Disposable { @@ -73,8 +86,7 @@ class MergeSink } func on(event: Event) { - switch event { - case .Next(let value): + if case .Next(let value) = event { let innerSubscription = SingleAssignmentDisposable() let maybeKey = _group.addDisposable(innerSubscription) @@ -83,22 +95,29 @@ class MergeSink let disposable = value.asObservable().subscribe(observer) innerSubscription.disposable = disposable } + + return + } + + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { + switch event { + case .Next: + rxFatalError("Next should have been handled") case .Error(let error): - _lock.performLocked { - observer?.on(.Error(error)) + observer?.on(.Error(error)) + dispose() + case .Completed: + _stopped = true + + if _group.count == 1 { + observer?.on(.Completed) dispose() } - case .Completed: - _lock.performLocked { - _stopped = true - - if _group.count == 1 { - observer?.on(.Completed) - dispose() - } - else { - _sourceSubscription.dispose() - } + else { + _sourceSubscription.dispose() } } } @@ -106,13 +125,20 @@ class MergeSink // concurrent -class MergeConcurrentSinkIter : ObserverType { +class MergeConcurrentSinkIter + : ObserverType + , LockOwnerType + , SynchronizedOnType { typealias E = O.E typealias DisposeKey = Bag.KeyType typealias Parent = MergeConcurrentSink private let _parent: Parent private let _disposeKey: DisposeKey + + var _lock: NSRecursiveLock { + return _parent._lock + } init(parent: Parent, disposeKey: DisposeKey) { _parent = parent @@ -120,42 +146,48 @@ class MergeConcurrentSinkIter) { - _parent._lock.performLocked { - switch event { - case .Next: - _parent.observer?.on(event) - case .Error: - _parent.observer?.on(event) - _parent.dispose() - case .Completed: - _parent._group.removeDisposable(_disposeKey) - let queue = _parent._queue - if queue.value.count > 0 { - let s = queue.value.dequeue() - _parent.subscribe(s, group: _parent._group) - } - else { - _parent._activeCount = _parent._activeCount - 1 - - if _parent._stopped && _parent._activeCount == 0 { - _parent.observer?.on(.Completed) - _parent.dispose() - } + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { + switch event { + case .Next: + _parent.observer?.on(event) + case .Error: + _parent.observer?.on(event) + _parent.dispose() + case .Completed: + _parent._group.removeDisposable(_disposeKey) + let queue = _parent._queue + if queue.value.count > 0 { + let s = queue.value.dequeue() + _parent.subscribe(s, group: _parent._group) + } + else { + _parent._activeCount = _parent._activeCount - 1 + + if _parent._stopped && _parent._activeCount == 0 { + _parent.observer?.on(.Completed) + _parent.dispose() } } } } } -class MergeConcurrentSink : Sink, ObserverType { +class MergeConcurrentSink + : Sink + , ObserverType + , LockOwnerType + , SynchronizedOnType { typealias E = S typealias Parent = Merge typealias QueueType = Queue private let _parent: Parent - private let _lock = NSRecursiveLock() - + let _lock = NSRecursiveLock() + // state private var _stopped = false private var _activeCount = 0 @@ -164,11 +196,11 @@ class MergeConcurrentSink Disposable { @@ -193,39 +225,38 @@ class MergeConcurrentSink) { + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { switch event { case .Next(let value): - let subscribe = _lock.calculateLocked { () -> Bool in - if _activeCount < _parent._maxConcurrent { - _activeCount += 1 - return true - } - else { - _queue.value.enqueue(value) - return false - } + let subscribe: Bool + if _activeCount < _parent._maxConcurrent { + _activeCount += 1 + subscribe = true } - + else { + _queue.value.enqueue(value) + subscribe = false + } + if subscribe { self.subscribe(value, group: _group) } case .Error(let error): - _lock.performLocked { - observer?.on(.Error(error)) + observer?.on(.Error(error)) + dispose() + case .Completed: + if _activeCount == 0 { + observer?.on(.Completed) dispose() } - case .Completed: - _lock.performLocked { - if _activeCount == 0 { - observer?.on(.Completed) - dispose() - } - else { - _sourceSubscription.dispose() - } - - _stopped = true + else { + _sourceSubscription.dispose() } + + _stopped = true } } } @@ -239,16 +270,16 @@ class Merge : Producer { _maxConcurrent = maxConcurrent } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + override func run(observer: O) -> Disposable { if _maxConcurrent > 0 { - let sink = MergeConcurrentSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + let sink = MergeConcurrentSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } else { - let sink = MergeSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + let sink = MergeSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Multicast.swift b/RxSwift/Observables/Implementations/Multicast.swift index f63d14b5..770d5e8d 100644 --- a/RxSwift/Observables/Implementations/Multicast.swift +++ b/RxSwift/Observables/Implementations/Multicast.swift @@ -15,9 +15,9 @@ class MulticastSink: Sink, ObserverType { private let _parent: MutlicastType - init(parent: MutlicastType, observer: O, cancel: Disposable) { + init(parent: MutlicastType, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func run() -> Disposable { @@ -63,9 +63,9 @@ class Multicast: Producer { _selector = selector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = MulticastSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = MulticastSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/ObserveOn.swift b/RxSwift/Observables/Implementations/ObserveOn.swift index ec2d12f1..5eb1298d 100644 --- a/RxSwift/Observables/Implementations/ObserveOn.swift +++ b/RxSwift/Observables/Implementations/ObserveOn.swift @@ -21,10 +21,10 @@ class ObserveOn : Producer { #endif } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = ObserveOnSink(scheduler: scheduler, observer: observer, cancel: cancel) - setSink(sink) - return source.subscribe(sink) + override func run(observer: O) -> Disposable { + let sink = ObserveOnSink(scheduler: scheduler, observer: observer) + sink._subscription.disposable = source.subscribe(sink) + return sink } #if TRACE_RESOURCES @@ -44,31 +44,30 @@ enum ObserveOnState : Int32 { class ObserveOnSink : ObserverBase { typealias E = O.E - var cancel: Disposable - - var lock = SpinLock() - - let scheduler: ImmediateSchedulerType - var observer: O? - - var state = ObserveOnState.Stopped - - var queue = Queue>(capacity: 10) - let scheduleDisposable = SerialDisposable() - - init(scheduler: ImmediateSchedulerType, observer: O, cancel: Disposable) { - self.cancel = cancel - self.scheduler = scheduler - self.observer = observer + let _scheduler: ImmediateSchedulerType + + var _lock = SpinLock() + + // state + var _state = ObserveOnState.Stopped + var _observer: O? + var _queue = Queue>(capacity: 10) + + let _scheduleDisposable = SerialDisposable() + let _subscription = SingleAssignmentDisposable() + + init(scheduler: ImmediateSchedulerType, observer: O) { + _scheduler = scheduler + _observer = observer } override func onCore(event: Event) { - let shouldStart = lock.calculateLocked { () -> Bool in - self.queue.enqueue(event) + let shouldStart = _lock.calculateLocked { () -> Bool in + self._queue.enqueue(event) - switch self.state { + switch self._state { case .Stopped: - self.state = .Running + self._state = .Running return true case .Running: return false @@ -76,18 +75,18 @@ class ObserveOnSink : ObserverBase { } if shouldStart { - scheduleDisposable.disposable = self.scheduler.scheduleRecursive((), action: self.run) + _scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run) } } func run(state: Void, recurse: Void -> Void) { - let (nextEvent, observer) = self.lock.calculateLocked { () -> (Event?, O?) in - if self.queue.count > 0 { - return (self.queue.dequeue(), self.observer) + let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event?, O?) in + if self._queue.count > 0 { + return (self._queue.dequeue(), self._observer) } else { - self.state = .Stopped - return (nil, self.observer) + self._state = .Stopped + return (nil, self._observer) } } @@ -101,32 +100,33 @@ class ObserveOnSink : ObserverBase { return } - let shouldContinue = self.lock.calculateLocked { () -> Bool in - if self.queue.count > 0 { - return true - } - else { - self.state = .Stopped - return false - } - } + let shouldContinue = _shouldContinue_synchronized() if shouldContinue { recurse() } } + + func _shouldContinue_synchronized() -> Bool { + _lock.lock(); defer { _lock.unlock() } // { + if self._queue.count > 0 { + return true + } + else { + self._state = .Stopped + return false + } + // } + } override func dispose() { super.dispose() - - let toDispose = lock.calculateLocked { () -> Disposable in - let originalCancel = self.cancel - self.cancel = NopDisposable.instance - self.scheduleDisposable.dispose() - self.observer = nil - return originalCancel - } - - toDispose.dispose() + + _subscription.dispose() + _scheduleDisposable.dispose() + + _lock.lock(); defer { _lock.unlock() } // { + _observer = nil + // } } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/ObserveOnSerialDispatchQueue.swift b/RxSwift/Observables/Implementations/ObserveOnSerialDispatchQueue.swift index 3fa66efd..7d3f5675 100644 --- a/RxSwift/Observables/Implementations/ObserveOnSerialDispatchQueue.swift +++ b/RxSwift/Observables/Implementations/ObserveOnSerialDispatchQueue.swift @@ -22,12 +22,9 @@ class ObserveOnSerialDispatchQueueSink : ObserverBase { let scheduler: SerialDispatchQueueScheduler let observer: O - var disposeLock = SpinLock() + let subscription = SingleAssignmentDisposable() - var cancel: Disposable - - init(scheduler: SerialDispatchQueueScheduler, observer: O, cancel: Disposable) { - self.cancel = cancel + init(scheduler: SerialDispatchQueueScheduler, observer: O) { self.scheduler = scheduler self.observer = observer super.init() @@ -47,14 +44,8 @@ class ObserveOnSerialDispatchQueueSink : ObserverBase { override func dispose() { super.dispose() - - let toDispose = disposeLock.calculateLocked { () -> Disposable in - let originalCancel = self.cancel - self.cancel = NopDisposable.instance - return originalCancel - } - - toDispose.dispose() + + subscription.dispose() } } @@ -72,10 +63,10 @@ class ObserveOnSerialDispatchQueue : Producer { #endif } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = ObserveOnSerialDispatchQueueSink(scheduler: scheduler, observer: observer, cancel: cancel) - setSink(sink) - return source.subscribe(sink) + override func run(observer: O) -> Disposable { + let sink = ObserveOnSerialDispatchQueueSink(scheduler: scheduler, observer: observer) + sink.subscription.disposable = source.subscribe(sink) + return sink } #if TRACE_RESOURCES diff --git a/RxSwift/Observables/Implementations/Producer.swift b/RxSwift/Observables/Implementations/Producer.swift index 63c38c76..8b8098a8 100644 --- a/RxSwift/Observables/Implementations/Producer.swift +++ b/RxSwift/Observables/Implementations/Producer.swift @@ -14,30 +14,17 @@ class Producer : Observable { } override func subscribe(observer: O) -> Disposable { - let sink = SingleAssignmentDisposable() - let subscription = SingleAssignmentDisposable() - - let d = BinaryDisposable(sink, subscription) - - let setSink: (Disposable) -> Void = { d in sink.disposable = d } - if !CurrentThreadScheduler.isScheduleRequired { - let disposable = run(observer, cancel: subscription, setSink: setSink) - - subscription.disposable = disposable + return run(observer) } else { - CurrentThreadScheduler.instance.schedule(sink) { sink in - let disposable = self.run(observer, cancel: subscription, setSink: setSink) - subscription.disposable = disposable - return NopDisposable.instance + return CurrentThreadScheduler.instance.schedule(()) { _ in + return self.run(observer) } } - - return d } - func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + func run(observer: O) -> Disposable { abstractMethod() } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Range.swift b/RxSwift/Observables/Implementations/Range.swift index ded21856..8093416a 100644 --- a/RxSwift/Observables/Implementations/Range.swift +++ b/RxSwift/Observables/Implementations/Range.swift @@ -27,10 +27,10 @@ class RangeProducer<_CompilerWorkaround> : Producer { _scheduler = scheduler } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = RangeSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = RangeSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } @@ -39,9 +39,9 @@ class RangeSink<_CompilerWorkaround, O: ObserverType where O.E == Int> : Sink private let _parent: Parent - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func run() -> Disposable { diff --git a/RxSwift/Observables/Implementations/Reduce.swift b/RxSwift/Observables/Implementations/Reduce.swift index e8653d85..3276b326 100644 --- a/RxSwift/Observables/Implementations/Reduce.swift +++ b/RxSwift/Observables/Implementations/Reduce.swift @@ -15,11 +15,11 @@ class ReduceSink : Sink, Observe private let _parent: Parent private var _accumulation: AccumulateType - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent _accumulation = parent._seed - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { @@ -66,9 +66,9 @@ class Reduce : Producer { _mapResult = mapResult } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = ReduceSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return _source.subscribe(sink) + override func run(observer: O) -> Disposable { + let sink = ReduceSink(parent: self, observer: observer) + sink.disposable = _source.subscribe(sink) + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/RefCount.swift b/RxSwift/Observables/Implementations/RefCount.swift index eacd4ac0..4216aefa 100644 --- a/RxSwift/Observables/Implementations/RefCount.swift +++ b/RxSwift/Observables/Implementations/RefCount.swift @@ -8,21 +8,23 @@ import Foundation -class RefCountSink : Sink, ObserverType { +class RefCountSink + : Sink + , ObserverType { typealias Element = O.E typealias Parent = RefCount private let _parent: Parent - - init(parent: Parent, observer: O, cancel: Disposable) { + + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func run() -> Disposable { let subscription = _parent._source.subscribeSafe(self) - _parent._lock.performLocked { + _parent._lock.lock(); defer { _parent._lock.unlock() } // { if _parent._count == 0 { _parent._count = 1 _parent._connectableSubscription = _parent._source.connect() @@ -30,11 +32,11 @@ class RefCountSink: Producer { _source = source } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = RefCountSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = RefCountSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Repeat.swift b/RxSwift/Observables/Implementations/Repeat.swift index 33621d68..8655d929 100644 --- a/RxSwift/Observables/Implementations/Repeat.swift +++ b/RxSwift/Observables/Implementations/Repeat.swift @@ -17,10 +17,11 @@ class RepeatElement : Producer { _scheduler = scheduler } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = RepeatElementSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = RepeatElementSink(parent: self, observer: observer) + sink.disposable = sink.run() + + return sink } } @@ -29,9 +30,9 @@ class RepeatElementSink : Sink { private let _parent: Parent - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func run() -> Disposable { diff --git a/RxSwift/Observables/Implementations/Sample.swift b/RxSwift/Observables/Implementations/Sample.swift index 459a3484..703b86e1 100644 --- a/RxSwift/Observables/Implementations/Sample.swift +++ b/RxSwift/Observables/Implementations/Sample.swift @@ -8,57 +8,70 @@ import Foundation -class SamplerSink : ObserverType { +class SamplerSink + : ObserverType + , LockOwnerType + , SynchronizedOnType { typealias E = SampleType typealias Parent = SampleSequenceSink private let _parent: Parent + + var _lock: NSRecursiveLock { + return _parent._lock + } init(parent: Parent) { _parent = parent } func on(event: Event) { - _parent._lock.performLocked { - switch event { - case .Next: - if let element = _parent._element { - if _parent._parent._onlyNew { - _parent._element = nil - } - - _parent.observer?.on(.Next(element)) - } + synchronizedOn(event) + } - if _parent._atEnd { - _parent.observer?.on(.Completed) - _parent.dispose() - } - case .Error(let e): - _parent.observer?.on(.Error(e)) - _parent.dispose() - case .Completed: - if let element = _parent._element { + func _synchronized_on(event: Event) { + switch event { + case .Next: + if let element = _parent._element { + if _parent._parent._onlyNew { _parent._element = nil - _parent.observer?.on(.Next(element)) - } - if _parent._atEnd { - _parent.observer?.on(.Completed) - _parent.dispose() } + + _parent.observer?.on(.Next(element)) + } + + if _parent._atEnd { + _parent.observer?.on(.Completed) + _parent.dispose() + } + case .Error(let e): + _parent.observer?.on(.Error(e)) + _parent.dispose() + case .Completed: + if let element = _parent._element { + _parent._element = nil + _parent.observer?.on(.Next(element)) + } + if _parent._atEnd { + _parent.observer?.on(.Completed) + _parent.dispose() } } } } -class SampleSequenceSink : Sink, ObserverType { +class SampleSequenceSink + : Sink + , ObserverType + , LockOwnerType + , SynchronizedOnType { typealias Element = O.E typealias Parent = Sample private let _parent: Parent - private let _lock = NSRecursiveLock() + let _lock = NSRecursiveLock() // state private var _element = nil as Element? @@ -66,30 +79,32 @@ class SampleSequenceSink : Sink, ObserverType { private let _sourceSubscription = SingleAssignmentDisposable() - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func run() -> Disposable { _sourceSubscription.disposable = _parent._source.subscribe(self) let samplerSubscription = _parent._sampler.subscribe(SamplerSink(parent: self)) - return CompositeDisposable(_sourceSubscription, samplerSubscription) + return StableCompositeDisposable.create(_sourceSubscription, samplerSubscription) } func on(event: Event) { - _lock.performLocked { - switch event { - case .Next(let element): - _element = element - case .Error: - observer?.on(event) - dispose() - case .Completed: - _atEnd = true - _sourceSubscription.dispose() - } + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { + switch event { + case .Next(let element): + _element = element + case .Error: + observer?.on(event) + dispose() + case .Completed: + _atEnd = true + _sourceSubscription.dispose() } } @@ -106,9 +121,9 @@ class Sample : Producer { _onlyNew = onlyNew } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = SampleSequenceSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = SampleSequenceSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Scan.swift b/RxSwift/Observables/Implementations/Scan.swift index 58a7d720..9f0f5feb 100644 --- a/RxSwift/Observables/Implementations/Scan.swift +++ b/RxSwift/Observables/Implementations/Scan.swift @@ -15,10 +15,10 @@ class ScanSink private let _parent: Parent private var _accumulate: Accumulate - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent _accumulate = parent._seed - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { @@ -56,9 +56,9 @@ class Scan: Producer { _accumulator = accumulator } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = ScanSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return _source.subscribe(sink) + override func run(observer: O) -> Disposable { + let sink = ScanSink(parent: self, observer: observer) + sink.disposable = _source.subscribe(sink) + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Sink.swift b/RxSwift/Observables/Implementations/Sink.swift index 62851338..e6ce7278 100644 --- a/RxSwift/Observables/Implementations/Sink.swift +++ b/RxSwift/Observables/Implementations/Sink.swift @@ -8,14 +8,12 @@ import Foundation -class Sink : Disposable { +class Sink : SingleAssignmentDisposable { private var _lock = SpinLock() // state private var _observer: O? - private var _cancel: Disposable - private var _disposed: Bool = false - + var observer: O? { get { _lock.lock(); defer { _lock.unlock() } @@ -23,32 +21,24 @@ class Sink : Disposable { } } - init(observer: O, cancel: Disposable) { + init(observer: O) { #if TRACE_RESOURCES OSAtomicIncrement32(&resourceCount) #endif _observer = observer - _cancel = cancel } - private func _disposeInternal() -> Disposable? { + private func _disposeObserver() { _lock.lock(); defer { _lock.unlock() } - if _disposed { - return nil - } - - let cancel = _cancel - - _disposed = true _observer = nil - _cancel = NopDisposable.instance - - return cancel } - func dispose() { - _disposeInternal()?.dispose() + override func dispose() { + if !disposed { + _disposeObserver() + } + super.dispose() } deinit { diff --git a/RxSwift/Observables/Implementations/Skip.swift b/RxSwift/Observables/Implementations/Skip.swift index 65df2212..8d4ddc53 100644 --- a/RxSwift/Observables/Implementations/Skip.swift +++ b/RxSwift/Observables/Implementations/Skip.swift @@ -18,10 +18,10 @@ class SkipCountSink : Sin var remaining: Int - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { self.parent = parent self.remaining = parent.count - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { @@ -54,10 +54,11 @@ class SkipCount: Producer { self.count = count } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = SkipCountSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return source.subscribe(sink) + override func run(observer: O) -> Disposable { + let sink = SkipCountSink(parent: self, observer: observer) + sink.disposable = source.subscribe(sink) + + return sink } } @@ -72,9 +73,9 @@ class SkipTimeSink) { @@ -121,9 +122,9 @@ class SkipTime: Producer { self.duration = duration } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = SkipTimeSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = SkipTimeSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/SkipUntil.swift b/RxSwift/Observables/Implementations/SkipUntil.swift index 05882d29..66448416 100644 --- a/RxSwift/Observables/Implementations/SkipUntil.swift +++ b/RxSwift/Observables/Implementations/SkipUntil.swift @@ -8,20 +8,27 @@ import Foundation -class SkipUntilSinkOther : ObserverType { +class SkipUntilSinkOther + : ObserverType + , LockOwnerType + , SynchronizedOnType { typealias Parent = SkipUntilSink typealias E = Other private let _parent: Parent + + var _lock: NSRecursiveLock { + return _parent._lock + } - private let _singleAssignmentDisposable = SingleAssignmentDisposable() - - var disposable: Disposable { + private let _subscription = SingleAssignmentDisposable() + + var subscription: Disposable { get { abstractMethod() } set { - _singleAssignmentDisposable.disposable = newValue + _subscription.disposable = newValue } } @@ -33,19 +40,19 @@ class SkipUntilSinkOther) { + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { switch event { case .Next: - _parent._lock.performLocked { - _parent._forwardElements = true - _singleAssignmentDisposable.dispose() - } + _parent._forwardElements = true + _subscription.dispose() case .Error(let e): - _parent._lock.performLocked { - _parent.observer?.onError(e) - _parent.dispose() - } + _parent.observer?.onError(e) + _parent.dispose() case .Completed: - _singleAssignmentDisposable.dispose() + _subscription.dispose() } } @@ -58,46 +65,43 @@ class SkipUntilSinkOther : Sink, ObserverType { +class SkipUntilSink + : Sink + , ObserverType + , LockOwnerType + , SynchronizedOnType { typealias E = ElementType typealias Parent = SkipUntil - private let _lock = NSRecursiveLock() + let _lock = NSRecursiveLock() private let _parent: Parent private var _forwardElements = false - private let _singleAssignmentDisposable = SingleAssignmentDisposable() - - var disposable: Disposable { - get { - abstractMethod() - } - set { - _singleAssignmentDisposable.disposable = newValue - } - } - - init(parent: Parent, observer: O, cancel: Disposable) { + private let _sourceSubscription = SingleAssignmentDisposable() + + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { - _lock.performLocked { - switch event { - case .Next: - if _forwardElements { - observer?.on(event) - } - case .Error: + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { + switch event { + case .Next: + if _forwardElements { observer?.on(event) - dispose() - case .Completed: - if _forwardElements { - observer?.on(event) - } - _singleAssignmentDisposable.dispose() } + case .Error: + observer?.on(event) + dispose() + case .Completed: + if _forwardElements { + observer?.on(event) + } + _sourceSubscription.dispose() } } @@ -105,10 +109,10 @@ class SkipUntilSink: Producer { _other = other } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = SkipUntilSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = SkipUntilSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } diff --git a/RxSwift/Observables/Implementations/SkipWhile.swift b/RxSwift/Observables/Implementations/SkipWhile.swift index 8261f0eb..65af9476 100644 --- a/RxSwift/Observables/Implementations/SkipWhile.swift +++ b/RxSwift/Observables/Implementations/SkipWhile.swift @@ -14,9 +14,9 @@ class SkipWhileSink : Sin private let _parent: Parent private var _running = false - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { @@ -51,9 +51,9 @@ class SkipWhileSinkWithIndex) { @@ -100,16 +100,16 @@ class SkipWhile: Producer { _predicateWithIndex = predicate } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + override func run(observer: O) -> Disposable { if let _ = _predicate { - let sink = SkipWhileSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return _source.subscribe(sink) + let sink = SkipWhileSink(parent: self, observer: observer) + sink.disposable = _source.subscribe(sink) + return sink } else { - let sink = SkipWhileSinkWithIndex(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return _source.subscribe(sink) + let sink = SkipWhileSinkWithIndex(parent: self, observer: observer) + sink.disposable = _source.subscribe(sink) + return sink } } } diff --git a/RxSwift/Observables/Implementations/StartWith.swift b/RxSwift/Observables/Implementations/StartWith.swift index 450c1189..f2848409 100644 --- a/RxSwift/Observables/Implementations/StartWith.swift +++ b/RxSwift/Observables/Implementations/StartWith.swift @@ -18,7 +18,7 @@ class StartWith: Producer { super.init() } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + override func run(observer: O) -> Disposable { for e in elements { observer.on(.Next(e)) } diff --git a/RxSwift/Observables/Implementations/SubscribeOn.swift b/RxSwift/Observables/Implementations/SubscribeOn.swift index e1419d40..a919af00 100644 --- a/RxSwift/Observables/Implementations/SubscribeOn.swift +++ b/RxSwift/Observables/Implementations/SubscribeOn.swift @@ -14,9 +14,9 @@ class SubscribeOnSink : S let parent: Parent - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { self.parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { @@ -52,9 +52,9 @@ class SubscribeOn : Producer { self.scheduler = scheduler } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = SubscribeOnSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Switch.swift b/RxSwift/Observables/Implementations/Switch.swift index 1f48d33c..db5b7662 100644 --- a/RxSwift/Observables/Implementations/Switch.swift +++ b/RxSwift/Observables/Implementations/Switch.swift @@ -8,7 +8,11 @@ import Foundation -class SwitchSink : Sink, ObserverType { +class SwitchSink + : Sink + , ObserverType + , LockOwnerType + , SynchronizedOnType { typealias E = S typealias Parent = Switch @@ -16,34 +20,36 @@ class SwitchSink private let _innerSubscription: SerialDisposable = SerialDisposable() private let _parent: Parent - private let _lock = NSRecursiveLock() + let _lock = NSRecursiveLock() // state private var _stopped = false private var _latest = 0 private var _hasLatest = false - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func run() -> Disposable { let subscription = _parent._sources.subscribe(self) _subscriptions.disposable = subscription - return CompositeDisposable(_subscriptions, _innerSubscription) + return StableCompositeDisposable.create(_subscriptions, _innerSubscription) } func on(event: Event) { + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { switch event { case .Next(let observable): - let latest: Int = _lock.calculateLocked { - _hasLatest = true - _latest = _latest &+ 1 - return _latest - } - + _hasLatest = true + _latest = _latest &+ 1 + let latest = _latest + let d = SingleAssignmentDisposable() _innerSubscription.disposable = d @@ -51,33 +57,36 @@ class SwitchSink let disposable = observable.asObservable().subscribe(observer) d.disposable = disposable case .Error(let error): - _lock.performLocked { - observer?.on(.Error(error)) - dispose() - } + observer?.on(.Error(error)) + dispose() case .Completed: - _lock.performLocked { - _stopped = true - - _subscriptions.dispose() - - if !_hasLatest { - observer?.on(.Completed) - dispose() - } + _stopped = true + + _subscriptions.dispose() + + if !_hasLatest { + observer?.on(.Completed) + dispose() } } } } -class SwitchSinkIter : ObserverType { +class SwitchSinkIter + : ObserverType + , LockOwnerType + , SynchronizedOnType { typealias E = O.E typealias Parent = SwitchSink private let _parent: Parent private let _id: Int private let _self: Disposable - + + var _lock: NSRecursiveLock { + return _parent._lock + } + init(parent: Parent, id: Int, _self: Disposable) { _parent = parent _id = id @@ -85,32 +94,33 @@ class SwitchSinkIter) { - return _parent._lock.calculateLocked { - - switch event { - case .Next: break - case .Error, .Completed: - _self.dispose() - } - - if _parent._latest != _id { - return - } - - let observer = _parent.observer - - switch event { - case .Next: - observer?.on(event) - case .Error: + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { + switch event { + case .Next: break + case .Error, .Completed: + _self.dispose() + } + + if _parent._latest != _id { + return + } + + let observer = _parent.observer + + switch event { + case .Next: + observer?.on(event) + case .Error: + observer?.on(event) + _parent.dispose() + case .Completed: + _parent._hasLatest = false + if _parent._stopped { observer?.on(event) _parent.dispose() - case .Completed: - _parent._hasLatest = false - if _parent._stopped { - observer?.on(event) - _parent.dispose() - } } } } @@ -123,9 +133,9 @@ class Switch : Producer { _sources = sources } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = SwitchSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = SwitchSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Take.swift b/RxSwift/Observables/Implementations/Take.swift index 9208e618..24ef5231 100644 --- a/RxSwift/Observables/Implementations/Take.swift +++ b/RxSwift/Observables/Implementations/Take.swift @@ -18,10 +18,10 @@ class TakeCountSink : Sin private var _remaining: Int - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent _remaining = parent._count - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { @@ -61,48 +61,54 @@ class TakeCount: Producer { _count = count } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = TakeCountSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return _source.subscribe(sink) + override func run(observer: O) -> Disposable { + let sink = TakeCountSink(parent: self, observer: observer) + sink.disposable = _source.subscribe(sink) + return sink } } // time version -class TakeTimeSink : Sink, ObserverType { +class TakeTimeSink + : Sink + , LockOwnerType + , ObserverType + , SynchronizedOnType { typealias Parent = TakeTime typealias E = ElementType private let _parent: Parent - private let _lock = NSRecursiveLock() + let _lock = NSRecursiveLock() - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { - _lock.performLocked { - switch event { - case .Next(let value): - observer?.on(.Next(value)) - case .Error: - observer?.on(event) - dispose() - case .Completed: - observer?.on(event) - dispose() - } + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { + switch event { + case .Next(let value): + observer?.on(.Next(value)) + case .Error: + observer?.on(event) + dispose() + case .Completed: + observer?.on(event) + dispose() } } func tick() { - _lock.performLocked { - observer?.on(.Completed) - dispose() - } + _lock.lock(); defer { _lock.unlock() } + + observer?.on(.Completed) + dispose() } func run() -> Disposable { @@ -130,9 +136,9 @@ class TakeTime: Producer { _duration = duration } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = TakeTimeSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = TakeTimeSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/TakeLast.swift b/RxSwift/Observables/Implementations/TakeLast.swift index 34edb8a2..1a14e2e6 100644 --- a/RxSwift/Observables/Implementations/TakeLast.swift +++ b/RxSwift/Observables/Implementations/TakeLast.swift @@ -17,10 +17,10 @@ class TakeLastSink : Sink private var _elements: Queue - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent _elements = Queue(capacity: parent._count + 1) - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { @@ -55,9 +55,9 @@ class TakeLast: Producer { _count = count } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = TakeLastSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return _source.subscribe(sink) + override func run(observer: O) -> Disposable { + let sink = TakeLastSink(parent: self, observer: observer) + sink.disposable = _source.subscribe(sink) + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/TakeUntil.swift b/RxSwift/Observables/Implementations/TakeUntil.swift index b15020b9..21f492d1 100644 --- a/RxSwift/Observables/Implementations/TakeUntil.swift +++ b/RxSwift/Observables/Implementations/TakeUntil.swift @@ -8,11 +8,18 @@ import Foundation -class TakeUntilSinkOther : ObserverType { +class TakeUntilSinkOther + : ObserverType + , LockOwnerType + , SynchronizedOnType { typealias Parent = TakeUntilSink typealias E = Other private let _parent: Parent + + var _lock: NSRecursiveLock { + return _parent._lock + } private let _singleAssignmentDisposable = SingleAssignmentDisposable() @@ -33,18 +40,20 @@ class TakeUntilSinkOther) { - _parent._lock.performLocked { - switch event { - case .Next: - _parent.observer?.on(.Completed) - _parent.dispose() - case .Error(let e): - _parent.observer?.on(.Error(e)) - _parent.dispose() - case .Completed: - _parent._open = true - _singleAssignmentDisposable.dispose() - } + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { + switch event { + case .Next: + _parent.observer?.on(.Completed) + _parent.dispose() + case .Error(let e): + _parent.observer?.on(.Error(e)) + _parent.dispose() + case .Completed: + _parent._open = true + _singleAssignmentDisposable.dispose() } } @@ -55,43 +64,40 @@ class TakeUntilSinkOther : Sink, ObserverType { +class TakeUntilSink + : Sink + , LockOwnerType + , ObserverType + , SynchronizedOnType { typealias E = ElementType typealias Parent = TakeUntil private let _parent: Parent - private let _lock = NSRecursiveLock() + let _lock = NSRecursiveLock() // state private var _open = false - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { switch event { case .Next: - if _open { - observer?.on(event) - } - else { - _lock.performLocked { - observer?.on(event) - } - } + observer?.on(event) case .Error: - _lock.performLocked { - observer?.on(event) - dispose() - } + observer?.on(event) + dispose() case .Completed: - _lock.performLocked { - observer?.on(event) - dispose() - } + observer?.on(event) + dispose() } } @@ -101,7 +107,7 @@ class TakeUntilSink: Producer { _other = other } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = TakeUntilSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = TakeUntilSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/TakeWhile.swift b/RxSwift/Observables/Implementations/TakeWhile.swift index 22c5f99a..80dadf94 100644 --- a/RxSwift/Observables/Implementations/TakeWhile.swift +++ b/RxSwift/Observables/Implementations/TakeWhile.swift @@ -8,7 +8,9 @@ import Foundation -class TakeWhileSink : Sink, ObserverType { +class TakeWhileSink + : Sink + , ObserverType { typealias Parent = TakeWhile typealias Element = ElementType @@ -16,9 +18,9 @@ class TakeWhileSink : Sin private var _running = true - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { @@ -50,7 +52,9 @@ class TakeWhileSink : Sin } -class TakeWhileSinkWithIndex : Sink, ObserverType { +class TakeWhileSinkWithIndex + : Sink + , ObserverType { typealias Parent = TakeWhile typealias Element = ElementType @@ -59,9 +63,9 @@ class TakeWhileSinkWithIndex) { @@ -114,15 +118,15 @@ class TakeWhile: Producer { _predicateWithIndex = predicate } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + override func run(observer: O) -> Disposable { if let _ = _predicate { - let sink = TakeWhileSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return _source.subscribe(sink) + let sink = TakeWhileSink(parent: self, observer: observer) + sink.disposable = _source.subscribe(sink) + return sink } else { - let sink = TakeWhileSinkWithIndex(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return _source.subscribe(sink) + let sink = TakeWhileSinkWithIndex(parent: self, observer: observer) + sink.disposable = _source.subscribe(sink) + return sink } } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Throttle.swift b/RxSwift/Observables/Implementations/Throttle.swift index 5ca0642f..e89cf908 100644 --- a/RxSwift/Observables/Implementations/Throttle.swift +++ b/RxSwift/Observables/Implementations/Throttle.swift @@ -8,13 +8,17 @@ import Foundation -class ThrottleSink : Sink, ObserverType { +class ThrottleSink + : Sink + , ObserverType + , LockOwnerType + , SynchronizedOnType { typealias Element = O.E typealias ParentType = Throttle private let _parent: ParentType - private let _lock = NSRecursiveLock() + let _lock = NSRecursiveLock() // state private var _id = 0 as UInt64 @@ -22,81 +26,60 @@ class ThrottleSink : Sink, Observe let cancellable = SerialDisposable() - init(parent: ParentType, observer: O, cancel: Disposable) { + init(parent: ParentType, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func run() -> Disposable { let subscription = _parent._source.subscribe(self) - return CompositeDisposable(subscription, cancellable) + return StableCompositeDisposable.create(subscription, cancellable) } func on(event: Event) { + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { switch event { - case .Next: - break - case .Error, .Completed: - cancellable.dispose() - } - - let latestId = _lock.calculateLocked { () -> UInt64 in - let observer = self.observer - - let oldValue = _value - + case .Next(let element): _id = _id &+ 1 - - switch event { - case .Next(let element): - _value = element - case .Error: - _value = nil - observer?.on(event) - dispose() - case .Completed: - _value = nil - if let value = oldValue { - observer?.on(.Next(value)) - } - observer?.on(.Completed) - dispose() - } - - return _id - } - - - switch event { - case .Next: - let d = SingleAssignmentDisposable() - self.cancellable.disposable = d + let currentId = _id + _value = element + let scheduler = _parent._scheduler let dueTime = _parent._dueTime - - let disposeTimer = scheduler.scheduleRelative(latestId, dueTime: dueTime) { (id) in - self.propagate() - return NopDisposable.instance + + let d = SingleAssignmentDisposable() + self.cancellable.disposable = d + d.disposable = scheduler.scheduleRelative(currentId, dueTime: dueTime, action: self.propagate) + case .Error: + _value = nil + observer?.on(event) + dispose() + case .Completed: + if let value = _value { + _value = nil + observer?.on(.Next(value)) } - - d.disposable = disposeTimer - default: break + observer?.on(.Completed) + dispose() } } - func propagate() { - let originalValue: Element? = _lock.calculateLocked { + func propagate(currentId: UInt64) -> Disposable { + _lock.lock(); defer { _lock.unlock() } // { let originalValue = _value - _value = nil - return originalValue - } - - if let value = originalValue { - observer?.on(.Next(value)) - } + + if let value = originalValue where _id == currentId { + _value = nil + observer?.on(.Next(value)) + } + // } + return NopDisposable.instance } } @@ -112,10 +95,10 @@ class Throttle : Producer { _scheduler = scheduler } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = ThrottleSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = ThrottleSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Timer.swift b/RxSwift/Observables/Implementations/Timer.swift index 35f2839f..f6b7d724 100644 --- a/RxSwift/Observables/Implementations/Timer.swift +++ b/RxSwift/Observables/Implementations/Timer.swift @@ -13,9 +13,9 @@ class TimerSink : Sink private let _parent: Parent - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func run() -> Disposable { @@ -31,9 +31,9 @@ class TimerOneOffSink : Si private let _parent: Parent - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func run() -> Disposable { @@ -59,16 +59,16 @@ class Timer: Producer { _period = period } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + override func run(observer: O) -> Disposable { if let _ = _period { - let sink = TimerSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + let sink = TimerSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } else { - let sink = TimerOneOffSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + let sink = TimerOneOffSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/ToArray.swift b/RxSwift/Observables/Implementations/ToArray.swift index c52cf710..1b514d26 100644 --- a/RxSwift/Observables/Implementations/ToArray.swift +++ b/RxSwift/Observables/Implementations/ToArray.swift @@ -14,10 +14,10 @@ class ToArraySink : Sink< let _parent: Parent var _list = Array() - init(parent: Parent, observer: O, cancel: Disposable) { - self._parent = parent + init(parent: Parent, observer: O) { + _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func on(event: Event) { @@ -42,9 +42,9 @@ class ToArray : Producer<[SourceType]> { _source = source } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = ToArraySink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return _source.subscribe(sink) + override func run(observer: O) -> Disposable { + let sink = ToArraySink(parent: self, observer: observer) + sink.disposable = _source.subscribe(sink) + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Using.swift b/RxSwift/Observables/Implementations/Using.swift index 6c852634..ed4a891a 100644 --- a/RxSwift/Observables/Implementations/Using.swift +++ b/RxSwift/Observables/Implementations/Using.swift @@ -15,9 +15,9 @@ class UsingSink Disposable { @@ -70,9 +70,9 @@ class Using: Producer { _observableFactory = observableFactory } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = UsingSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = UsingSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/WithLatestFrom.swift b/RxSwift/Observables/Implementations/WithLatestFrom.swift index b2cf3233..685c56d7 100644 --- a/RxSwift/Observables/Implementations/WithLatestFrom.swift +++ b/RxSwift/Observables/Implementations/WithLatestFrom.swift @@ -8,21 +8,24 @@ import Foundation -class WithLatestFromSink : Sink, ObserverType { - +class WithLatestFromSink + : Sink + , ObserverType + , LockOwnerType + , SynchronizedOnType { + typealias Parent = WithLatestFrom typealias E = FirstType private let _parent: Parent - private var _lock = NSRecursiveLock() + var _lock = NSRecursiveLock() private var _latest: SecondType? - - - init(parent: Parent, observer: O, cancel: Disposable) { + + init(parent: Parent, observer: O) { _parent = parent - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func run() -> Disposable { @@ -34,60 +37,70 @@ class WithLatestFromSink) { - _lock.performLocked { - switch event { - case let .Next(value): - guard let latest = _latest else { return } - do { - let res = try _parent._resultSelector(value, latest) - - observer?.onNext(res) - } catch let e { - observer?.onError(e) - dispose() - } - case .Completed: - observer?.onComplete() - dispose() - case let .Error(error): - observer?.onError(error) + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { + switch event { + case let .Next(value): + guard let latest = _latest else { return } + do { + let res = try _parent._resultSelector(value, latest) + + observer?.onNext(res) + } catch let e { + observer?.onError(e) dispose() } + case .Completed: + observer?.onComplete() + dispose() + case let .Error(error): + observer?.onError(error) + dispose() } } } -class WithLatestFromSecond: ObserverType { +class WithLatestFromSecond + : ObserverType + , LockOwnerType + , SynchronizedOnType { typealias Parent = WithLatestFromSink typealias E = SecondType private let _parent: Parent private let _disposable: Disposable - + + var _lock: NSRecursiveLock { + get { + return _parent._lock + } + } + init(parent: Parent, disposable: Disposable) { _parent = parent _disposable = disposable } func on(event: Event) { + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { switch event { case let .Next(value): - _parent._lock.performLocked { - _parent._latest = value - } + _parent._latest = value case .Completed: _disposable.dispose() case let .Error(error): - _parent._lock.performLocked { - _parent.observer?.onError(error) - _parent.dispose() - } + _parent.observer?.onError(error) + _parent.dispose() } } - } class WithLatestFrom: Producer { @@ -103,10 +116,9 @@ class WithLatestFrom: Producer { _resultSelector = resultSelector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - - let sink = WithLatestFromSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = WithLatestFromSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Zip+CollectionType.swift b/RxSwift/Observables/Implementations/Zip+CollectionType.swift index a2339599..f2b188a8 100644 --- a/RxSwift/Observables/Implementations/Zip+CollectionType.swift +++ b/RxSwift/Observables/Implementations/Zip+CollectionType.swift @@ -8,7 +8,8 @@ import Foundation -class ZipCollectionTypeSink : Sink { +class ZipCollectionTypeSink + : Sink { typealias Parent = ZipCollectionType typealias SourceElement = C.Generator.Element.E @@ -23,7 +24,7 @@ class ZipCollectionTypeSink](count: parent.count, repeatedValue: Queue(capacity: 4)) _isDone = [Bool](count: parent.count, repeatedValue: false) @@ -34,11 +35,11 @@ class ZipCollectionTypeSink, atIndex: Int) { - _lock.performLocked { + _lock.lock(); defer { _lock.unlock() } // { switch event { case .Next(let element): _values[atIndex].enqueue(element) @@ -97,7 +98,7 @@ class ZipCollectionTypeSink Disposable { @@ -128,9 +129,9 @@ class ZipCollectionType(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = ZipCollectionTypeSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = ZipCollectionTypeSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } diff --git a/RxSwift/Observables/Implementations/Zip+arity.swift b/RxSwift/Observables/Implementations/Zip+arity.swift index 196cd76c..7d70640a 100644 --- a/RxSwift/Observables/Implementations/Zip+arity.swift +++ b/RxSwift/Observables/Implementations/Zip+arity.swift @@ -39,9 +39,9 @@ class ZipSink2_ : ZipSink { var _values1: Queue = Queue(capacity: 2) var _values2: Queue = Queue(capacity: 2) - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(arity: 2, observer: observer, cancel: cancel) + super.init(arity: 2, observer: observer) } override func hasElements(index: Int) -> Bool { @@ -60,8 +60,8 @@ class ZipSink2_ : ZipSink { let subscription1 = SingleAssignmentDisposable() let subscription2 = SingleAssignmentDisposable() - let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) - let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) + let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) + let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) subscription1.disposable = _parent.source1.subscribe(observer1) subscription2.disposable = _parent.source2.subscribe(observer2) @@ -92,10 +92,10 @@ class Zip2 : Producer { _resultSelector = resultSelector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = ZipSink2_(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = ZipSink2_(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } @@ -129,9 +129,9 @@ class ZipSink3_ : ZipSink { var _values2: Queue = Queue(capacity: 2) var _values3: Queue = Queue(capacity: 2) - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(arity: 3, observer: observer, cancel: cancel) + super.init(arity: 3, observer: observer) } override func hasElements(index: Int) -> Bool { @@ -152,9 +152,9 @@ class ZipSink3_ : ZipSink { let subscription2 = SingleAssignmentDisposable() let subscription3 = SingleAssignmentDisposable() - let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) - let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) - let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) + let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) + let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) + let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) subscription1.disposable = _parent.source1.subscribe(observer1) subscription2.disposable = _parent.source2.subscribe(observer2) @@ -189,10 +189,10 @@ class Zip3 : Producer { _resultSelector = resultSelector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = ZipSink3_(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = ZipSink3_(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } @@ -227,9 +227,9 @@ class ZipSink4_ : ZipSink { var _values3: Queue = Queue(capacity: 2) var _values4: Queue = Queue(capacity: 2) - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(arity: 4, observer: observer, cancel: cancel) + super.init(arity: 4, observer: observer) } override func hasElements(index: Int) -> Bool { @@ -252,10 +252,10 @@ class ZipSink4_ : ZipSink { let subscription3 = SingleAssignmentDisposable() let subscription4 = SingleAssignmentDisposable() - let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) - let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) - let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) - let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4) + let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) + let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) + let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) + let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4) subscription1.disposable = _parent.source1.subscribe(observer1) subscription2.disposable = _parent.source2.subscribe(observer2) @@ -294,10 +294,10 @@ class Zip4 : Producer { _resultSelector = resultSelector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = ZipSink4_(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = ZipSink4_(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } @@ -333,9 +333,9 @@ class ZipSink5_ : ZipSink { var _values4: Queue = Queue(capacity: 2) var _values5: Queue = Queue(capacity: 2) - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(arity: 5, observer: observer, cancel: cancel) + super.init(arity: 5, observer: observer) } override func hasElements(index: Int) -> Bool { @@ -360,11 +360,11 @@ class ZipSink5_ : ZipSink { let subscription4 = SingleAssignmentDisposable() let subscription5 = SingleAssignmentDisposable() - let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) - let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) - let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) - let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4) - let observer5 = ZipObserver(lock: lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5) + let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) + let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) + let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) + let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4) + let observer5 = ZipObserver(lock: _lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5) subscription1.disposable = _parent.source1.subscribe(observer1) subscription2.disposable = _parent.source2.subscribe(observer2) @@ -407,10 +407,10 @@ class Zip5 : Producer { _resultSelector = resultSelector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = ZipSink5_(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = ZipSink5_(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } @@ -447,9 +447,9 @@ class ZipSink6_ : ZipSink { var _values5: Queue = Queue(capacity: 2) var _values6: Queue = Queue(capacity: 2) - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(arity: 6, observer: observer, cancel: cancel) + super.init(arity: 6, observer: observer) } override func hasElements(index: Int) -> Bool { @@ -476,12 +476,12 @@ class ZipSink6_ : ZipSink { let subscription5 = SingleAssignmentDisposable() let subscription6 = SingleAssignmentDisposable() - let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) - let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) - let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) - let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4) - let observer5 = ZipObserver(lock: lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5) - let observer6 = ZipObserver(lock: lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6) + let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) + let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) + let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) + let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4) + let observer5 = ZipObserver(lock: _lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5) + let observer6 = ZipObserver(lock: _lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6) subscription1.disposable = _parent.source1.subscribe(observer1) subscription2.disposable = _parent.source2.subscribe(observer2) @@ -528,10 +528,10 @@ class Zip6 : Producer { _resultSelector = resultSelector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = ZipSink6_(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = ZipSink6_(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } @@ -569,9 +569,9 @@ class ZipSink7_ : ZipSink { var _values6: Queue = Queue(capacity: 2) var _values7: Queue = Queue(capacity: 2) - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(arity: 7, observer: observer, cancel: cancel) + super.init(arity: 7, observer: observer) } override func hasElements(index: Int) -> Bool { @@ -600,13 +600,13 @@ class ZipSink7_ : ZipSink { let subscription6 = SingleAssignmentDisposable() let subscription7 = SingleAssignmentDisposable() - let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) - let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) - let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) - let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4) - let observer5 = ZipObserver(lock: lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5) - let observer6 = ZipObserver(lock: lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6) - let observer7 = ZipObserver(lock: lock, parent: self, index: 6, setNextValue: { self._values7.enqueue($0) }, this: subscription7) + let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) + let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) + let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) + let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4) + let observer5 = ZipObserver(lock: _lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5) + let observer6 = ZipObserver(lock: _lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6) + let observer7 = ZipObserver(lock: _lock, parent: self, index: 6, setNextValue: { self._values7.enqueue($0) }, this: subscription7) subscription1.disposable = _parent.source1.subscribe(observer1) subscription2.disposable = _parent.source2.subscribe(observer2) @@ -657,10 +657,10 @@ class Zip7 : Producer { _resultSelector = resultSelector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = ZipSink7_(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = ZipSink7_(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } @@ -699,9 +699,9 @@ class ZipSink8_ : ZipSink { var _values7: Queue = Queue(capacity: 2) var _values8: Queue = Queue(capacity: 2) - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(arity: 8, observer: observer, cancel: cancel) + super.init(arity: 8, observer: observer) } override func hasElements(index: Int) -> Bool { @@ -732,14 +732,14 @@ class ZipSink8_ : ZipSink { let subscription7 = SingleAssignmentDisposable() let subscription8 = SingleAssignmentDisposable() - let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) - let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) - let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) - let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4) - let observer5 = ZipObserver(lock: lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5) - let observer6 = ZipObserver(lock: lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6) - let observer7 = ZipObserver(lock: lock, parent: self, index: 6, setNextValue: { self._values7.enqueue($0) }, this: subscription7) - let observer8 = ZipObserver(lock: lock, parent: self, index: 7, setNextValue: { self._values8.enqueue($0) }, this: subscription8) + let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1) + let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2) + let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3) + let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4) + let observer5 = ZipObserver(lock: _lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5) + let observer6 = ZipObserver(lock: _lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6) + let observer7 = ZipObserver(lock: _lock, parent: self, index: 6, setNextValue: { self._values7.enqueue($0) }, this: subscription7) + let observer8 = ZipObserver(lock: _lock, parent: self, index: 7, setNextValue: { self._values8.enqueue($0) }, this: subscription8) subscription1.disposable = _parent.source1.subscribe(observer1) subscription2.disposable = _parent.source2.subscribe(observer2) @@ -794,10 +794,10 @@ class Zip8 : Producer { _resultSelector = resultSelector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = ZipSink8_(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = ZipSink8_(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } diff --git a/RxSwift/Observables/Implementations/Zip+arity.tt b/RxSwift/Observables/Implementations/Zip+arity.tt index 6b9a0460..c16b8418 100644 --- a/RxSwift/Observables/Implementations/Zip+arity.tt +++ b/RxSwift/Observables/Implementations/Zip+arity.tt @@ -38,9 +38,9 @@ class ZipSink<%= i %>_<<%= (Array(1...i).map { "E\($0)" }).joinWithSeparator(", " var _values\($0): Queue = Queue(capacity: 2)" }).joinWithSeparator("\n") %> - init(parent: Parent, observer: O, cancel: Disposable) { + init(parent: Parent, observer: O) { _parent = parent - super.init(arity: <%= i %>, observer: observer, cancel: cancel) + super.init(arity: <%= i %>, observer: observer) } override func hasElements(index: Int) -> Bool { @@ -61,7 +61,7 @@ class ZipSink<%= i %>_<<%= (Array(1...i).map { "E\($0)" }).joinWithSeparator(", }).joinWithSeparator("\n") %> <%= (Array(1...i).map { -" let observer\($0) = ZipObserver(lock: lock, parent: self, index: \($0 - 1), setNextValue: { self._values\($0).enqueue($0) }, this: subscription\($0))" +" let observer\($0) = ZipObserver(lock: _lock, parent: self, index: \($0 - 1), setNextValue: { self._values\($0).enqueue($0) }, this: subscription\($0))" }).joinWithSeparator("\n") %> <%= (Array(1...i).map { @@ -93,10 +93,10 @@ class Zip<%= i %><<%= (Array(1...i).map { "E\($0)" }).joinWithSeparator(", ") %> _resultSelector = resultSelector } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = ZipSink<%= i %>_(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run(observer: O) -> Disposable { + let sink = ZipSink<%= i %>_(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } diff --git a/RxSwift/Observables/Implementations/Zip.swift b/RxSwift/Observables/Implementations/Zip.swift index 71927516..273f80ff 100644 --- a/RxSwift/Observables/Implementations/Zip.swift +++ b/RxSwift/Observables/Implementations/Zip.swift @@ -18,18 +18,18 @@ protocol ZipSinkProtocol : class class ZipSink : Sink, ZipSinkProtocol { typealias Element = O.E - let arity: Int - - let lock = NSRecursiveLock() - + let _arity: Int + + let _lock = NSRecursiveLock() + // state private var _isDone: [Bool] - init(arity: Int, observer: O, cancel: Disposable) { + init(arity: Int, observer: O) { _isDone = [Bool](count: arity, repeatedValue: false) - self.arity = arity + _arity = arity - super.init(observer: observer, cancel: cancel) + super.init(observer: observer) } func getResult() throws -> Element { @@ -43,7 +43,7 @@ class ZipSink : Sink, ZipSinkProtocol { func next(index: Int) { var hasValueAll = true - for i in 0 ..< arity { + for i in 0 ..< _arity { if !hasElements(i) { hasValueAll = false; break; @@ -102,13 +102,16 @@ class ZipSink : Sink, ZipSinkProtocol { } } -class ZipObserver : ObserverType { +class ZipObserver + : ObserverType + , LockOwnerType + , SynchronizedOnType { typealias E = ElementType typealias ValueSetter = (ElementType) -> () private var _parent: ZipSinkProtocol? - private let _lock: NSRecursiveLock + let _lock: NSRecursiveLock // state private let _index: Int @@ -124,7 +127,10 @@ class ZipObserver : ObserverType { } func on(event: Event) { - + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { if let _ = _parent { switch event { case .Next(_): @@ -136,17 +142,15 @@ class ZipObserver : ObserverType { } } - _lock.performLocked { - if let parent = _parent { - switch event { - case .Next(let value): - _setNextValue(value) - parent.next(_index) - case .Error(let error): - parent.fail(error) - case .Completed: - parent.done(_index) - } + if let parent = _parent { + switch event { + case .Next(let value): + _setNextValue(value) + parent.next(_index) + case .Error(let error): + parent.fail(error) + case .Completed: + parent.done(_index) } } } diff --git a/RxSwift/Observers/TailRecursiveSink.swift b/RxSwift/Observers/TailRecursiveSink.swift index 93bee506..c4021064 100644 --- a/RxSwift/Observers/TailRecursiveSink.swift +++ b/RxSwift/Observers/TailRecursiveSink.swift @@ -19,8 +19,8 @@ class TailRecursiveSink Disposable { diff --git a/RxSwift/Schedulers/CurrentThreadScheduler.swift b/RxSwift/Schedulers/CurrentThreadScheduler.swift index b839a4a8..ada3b894 100644 --- a/RxSwift/Schedulers/CurrentThreadScheduler.swift +++ b/RxSwift/Schedulers/CurrentThreadScheduler.swift @@ -104,7 +104,7 @@ public class CurrentThreadScheduler : ImmediateSchedulerType { if CurrentThreadScheduler.isScheduleRequired { CurrentThreadScheduler.isScheduleRequired = false - action(state) + let disposable = action(state) defer { CurrentThreadScheduler.isScheduleRequired = true @@ -112,7 +112,7 @@ public class CurrentThreadScheduler : ImmediateSchedulerType { } guard let queue = CurrentThreadScheduler.queue else { - return NopDisposable.instance + return disposable } while let latest = queue.value.tryDequeue() { @@ -122,7 +122,7 @@ public class CurrentThreadScheduler : ImmediateSchedulerType { latest.invoke() } - return NopDisposable.instance + return disposable } let existingQueue = CurrentThreadScheduler.queue diff --git a/RxTests/PerformanceTests/main.swift b/RxTests/PerformanceTests/main.swift index 289abde2..414168da 100644 --- a/RxTests/PerformanceTests/main.swift +++ b/RxTests/PerformanceTests/main.swift @@ -25,17 +25,17 @@ compareTwoImplementations(benchmarkTime: true, first: { //combineLatest(a, publishSubject - //.shareReplay(1) + .shareReplay(1) + .map { $0 } + .filter { _ in true }// ){ x, _ in x } //.map { $0 } - //.filter { _ in true }// ){ x, _ in x } - //.map { $0 } - .flatMap { just($0) } + //.flatMap { just($0) } .subscribeNext { _ in } - for i in 0..<100 { + for i in 0..<1000 { publishSubject.on(.Next(i)) } diff --git a/RxTests/RxSwiftTests/Tests/Observable+StandardSequenceOperatorsTest.swift b/RxTests/RxSwiftTests/Tests/Observable+StandardSequenceOperatorsTest.swift index 3440ee91..0d77a9e6 100644 --- a/RxTests/RxSwiftTests/Tests/Observable+StandardSequenceOperatorsTest.swift +++ b/RxTests/RxSwiftTests/Tests/Observable+StandardSequenceOperatorsTest.swift @@ -3057,12 +3057,16 @@ extension ObservableStandardSequenceOperatorsTest { func testTakeLast_DecrementCountsFirst() { let k = BehaviorSubject(value: false) - + + var elements = [Bool]() _ = k.takeLast(1).subscribeNext { n in + elements.append(n) k.on(.Next(!n)) } k.on(.Completed) + + XCTAssertEqual(elements, [false]) } } diff --git a/RxTests/RxTest.swift b/RxTests/RxTest.swift index 2f661520..a15f9629 100644 --- a/RxTests/RxTest.swift +++ b/RxTests/RxTest.swift @@ -107,7 +107,7 @@ class RxTest: XCTestCase { #if TRACE_RESOURCES // give 5 sec to clean up resources - for var i = 0; i < 100; ++i { + for var i = 0; i < 10; ++i { if self.startResourceCount < resourceCount { // main schedulers need to finish work NSRunLoop.currentRunLoop().runMode(NSDefaultRunLoopMode, beforeDate: NSDate(timeIntervalSinceNow: 0.05)) diff --git a/RxTests/RxTests.xcodeproj/xcshareddata/xcschemes/RxTests-OSX.xcscheme b/RxTests/RxTests.xcodeproj/xcshareddata/xcschemes/RxTests-OSX.xcscheme index 4f6e4922..a9d93f0f 100644 --- a/RxTests/RxTests.xcodeproj/xcshareddata/xcschemes/RxTests-OSX.xcscheme +++ b/RxTests/RxTests.xcodeproj/xcshareddata/xcschemes/RxTests-OSX.xcscheme @@ -52,7 +52,7 @@