Moves disposables combo in `Producer.subscribe` to `Sink`. `SynchronizedOnType` overhauls.

This commit is contained in:
Krunoslav Zaher 2015-11-01 11:48:10 +01:00
parent 22de82ba2b
commit 13a87af009
60 changed files with 1090 additions and 1023 deletions

View File

@ -45,6 +45,8 @@ public final class AnonymousDisposable : DisposeBase, Cancelable {
*/
public func dispose() {
if OSAtomicCompareAndSwap32(0, 1, &_disposed) {
assert(_disposed == 1)
if let action = _disposeAction {
_disposeAction = nil
action()

View File

@ -72,6 +72,9 @@ public class SingleAssignmentDisposable : DisposeBase, Disposable, Cancelable {
Disposes the underlying disposable.
*/
public func dispose() {
if _disposed {
return
}
_dispose()?.dispose()
}

View File

@ -58,9 +58,9 @@ class AmbSink<ElementType, O: ObserverType where O.E == ElementType> : Sink<O> {
// state
private var _choice = AmbState.Neither
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {
@ -114,9 +114,9 @@ class Amb<Element>: Producer<Element> {
_right = right
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = AmbSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = AmbSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -16,8 +16,8 @@ class AnonymousObservableSink<O: ObserverType> : Sink<O>, ObserverType {
// state
private var _isStopped: Int32 = 0
override init(observer: O, cancel: Disposable) {
super.init(observer: observer, cancel: cancel)
override init(observer: O) {
super.init(observer: observer)
}
func on(event: Event<E>) {
@ -49,9 +49,9 @@ public class AnonymousObservable<Element> : Producer<Element> {
_subscribeHandler = subscribeHandler
}
public override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
setSink(sink)
return sink.run(self)
public override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = AnonymousObservableSink(observer: observer)
sink.disposable = sink.run(self)
return sink
}
}

View File

@ -22,29 +22,33 @@ class BufferTimeCount<Element, S: SchedulerType> : Producer<[Element]> {
_scheduler = scheduler
}
override func run<O : ObserverType where O.E == [Element]>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = BufferTimeCountSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O : ObserverType where O.E == [Element]>(observer: O) -> Disposable {
let sink = BufferTimeCountSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
class BufferTimeCountSink<S: SchedulerType, Element, O: ObserverType where O.E == [Element]> : Sink<O>, ObserverType {
class BufferTimeCountSink<S: SchedulerType, Element, O: ObserverType where O.E == [Element]>
: Sink<O>
, LockOwnerType
, ObserverType
, SynchronizedOnType {
typealias Parent = BufferTimeCount<Element, S>
typealias E = Element
private let _parent: Parent
private let _lock = NSRecursiveLock()
let _lock = NSRecursiveLock()
// state
private let _timerD = SerialDisposable()
private var _buffer = [Element]()
private var _windowID = 0
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {
@ -64,24 +68,26 @@ class BufferTimeCountSink<S: SchedulerType, Element, O: ObserverType where O.E =
}
func on(event: Event<E>) {
_lock.performLocked {
switch event {
case .Next(let element):
_buffer.append(element)
if _buffer.count == _parent._count {
startNewWindowAndSendCurrentOne()
}
case .Error(let error):
_buffer = []
observer?.on(.Error(error))
dispose()
case .Completed:
observer?.on(.Next(_buffer))
observer?.on(.Completed)
dispose()
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
switch event {
case .Next(let element):
_buffer.append(element)
if _buffer.count == _parent._count {
startNewWindowAndSendCurrentOne()
}
case .Error(let error):
_buffer = []
observer?.on(.Error(error))
dispose()
case .Completed:
observer?.on(.Next(_buffer))
observer?.on(.Completed)
dispose()
}
}

View File

@ -39,9 +39,9 @@ class CatchSink<O: ObserverType> : Sink<O>, ObserverType {
private let _parent: Parent
private let _subscription = SerialDisposable()
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {
@ -86,10 +86,10 @@ class Catch<Element> : Producer<Element> {
_handler = handler
}
override func run<O: ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = CatchSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = CatchSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
@ -101,8 +101,8 @@ class CatchSequenceSink<S: SequenceType, O: ObserverType where S.Generator.Eleme
private var _lastError: ErrorType?
override init(observer: O, cancel: Disposable) {
super.init(observer: observer, cancel: cancel)
override init(observer: O) {
super.init(observer: observer)
}
override func on(event: Event<Element>) {
@ -148,9 +148,9 @@ class CatchSequence<S: SequenceType where S.Generator.Element : ObservableConver
self.sources = sources
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = CatchSequenceSink<S, O>(observer: observer, cancel: cancel)
setSink(sink)
return sink.run(self.sources.generate())
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = CatchSequenceSink<S, O>(observer: observer)
sink.disposable = sink.run(self.sources.generate())
return sink
}
}

View File

@ -8,48 +8,49 @@
import Foundation
class CombineLatestCollectionTypeSink<C: CollectionType, R, O: ObserverType where C.Generator.Element : ObservableConvertibleType, O.E == R> : Sink<O> {
class CombineLatestCollectionTypeSink<C: CollectionType, R, O: ObserverType where C.Generator.Element : ObservableConvertibleType, O.E == R>
: Sink<O> {
typealias Parent = CombineLatestCollectionType<C, R>
typealias SourceElement = C.Generator.Element.E
let parent: Parent
let lock = NSRecursiveLock()
let _parent: Parent
let _lock = NSRecursiveLock()
// state
var numberOfValues = 0
var values: [SourceElement?]
var isDone: [Bool]
var numberOfDone = 0
var subscriptions: [SingleAssignmentDisposable]
var _numberOfValues = 0
var _values: [SourceElement?]
var _isDone: [Bool]
var _numberOfDone = 0
var _subscriptions: [SingleAssignmentDisposable]
init(parent: Parent, observer: O, cancel: Disposable) {
self.parent = parent
self.values = [SourceElement?](count: parent.count, repeatedValue: nil)
self.isDone = [Bool](count: parent.count, repeatedValue: false)
self.subscriptions = Array<SingleAssignmentDisposable>()
self.subscriptions.reserveCapacity(parent.count)
init(parent: Parent, observer: O) {
_parent = parent
_values = [SourceElement?](count: parent._count, repeatedValue: nil)
_isDone = [Bool](count: parent._count, repeatedValue: false)
_subscriptions = Array<SingleAssignmentDisposable>()
_subscriptions.reserveCapacity(parent._count)
for _ in 0 ..< parent.count {
self.subscriptions.append(SingleAssignmentDisposable())
for _ in 0 ..< parent._count {
_subscriptions.append(SingleAssignmentDisposable())
}
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<SourceElement>, atIndex: Int) {
lock.performLocked {
_lock.lock(); defer { _lock.unlock() } // {
switch event {
case .Next(let element):
if values[atIndex] == nil {
numberOfValues++
if _values[atIndex] == nil {
_numberOfValues++
}
values[atIndex] = element
_values[atIndex] = element
if numberOfValues < parent.count {
let numberOfOthersThatAreDone = self.numberOfDone - (isDone[atIndex] ? 1 : 0)
if numberOfOthersThatAreDone == self.parent.count - 1 {
if _numberOfValues < _parent._count {
let numberOfOthersThatAreDone = self._numberOfDone - (_isDone[atIndex] ? 1 : 0)
if numberOfOthersThatAreDone == self._parent._count - 1 {
observer?.on(.Completed)
dispose()
}
@ -57,7 +58,7 @@ class CombineLatestCollectionTypeSink<C: CollectionType, R, O: ObserverType wher
}
do {
let result = try parent.resultSelector(values.map { $0! })
let result = try _parent._resultSelector(_values.map { $0! })
observer?.on(.Next(result))
}
catch let error {
@ -69,56 +70,56 @@ class CombineLatestCollectionTypeSink<C: CollectionType, R, O: ObserverType wher
observer?.on(.Error(error))
dispose()
case .Completed:
if isDone[atIndex] {
if _isDone[atIndex] {
return
}
isDone[atIndex] = true
numberOfDone++
_isDone[atIndex] = true
_numberOfDone++
if numberOfDone == self.parent.count {
if _numberOfDone == self._parent._count {
observer?.on(.Completed)
dispose()
}
else {
subscriptions[atIndex].dispose()
_subscriptions[atIndex].dispose()
}
}
}
// }
}
func run() -> Disposable {
var j = 0
for i in parent.sources.startIndex ..< parent.sources.endIndex {
for i in _parent._sources.startIndex ..< _parent._sources.endIndex {
let index = j
let source = self.parent.sources[i].asObservable()
self.subscriptions[j].disposable = source.subscribe(AnyObserver { event in
let source = _parent._sources[i].asObservable()
_subscriptions[j].disposable = source.subscribe(AnyObserver { event in
self.on(event, atIndex: index)
})
j++
}
return CompositeDisposable(disposables: self.subscriptions.map { $0 })
return CompositeDisposable(disposables: _subscriptions.map { $0 })
}
}
class CombineLatestCollectionType<C: CollectionType, R where C.Generator.Element : ObservableConvertibleType> : Producer<R> {
typealias ResultSelector = [C.Generator.Element.E] throws -> R
let sources: C
let resultSelector: ResultSelector
let count: Int
let _sources: C
let _resultSelector: ResultSelector
let _count: Int
init(sources: C, resultSelector: ResultSelector) {
self.sources = sources
self.resultSelector = resultSelector
self.count = Int(self.sources.count.toIntMax())
_sources = sources
_resultSelector = resultSelector
_count = Int(self._sources.count.toIntMax())
}
override func run<O : ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = CombineLatestCollectionTypeSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O : ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = CombineLatestCollectionTypeSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -39,17 +39,17 @@ class CombineLatestSink2_<E1, E2, O: ObserverType> : CombineLatestSink<O> {
var _latestElement1: E1! = nil
var _latestElement2: E2! = nil
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(arity: 2, observer: observer, cancel: cancel)
super.init(arity: 2, observer: observer)
}
func run() -> Disposable {
let subscription1 = SingleAssignmentDisposable()
let subscription2 = SingleAssignmentDisposable()
let observer1 = CombineLatestObserver(lock: lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
let observer1 = CombineLatestObserver(lock: _lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: _lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
subscription1.disposable = _parent._source1.subscribe(observer1)
subscription2.disposable = _parent._source2.subscribe(observer2)
@ -80,10 +80,10 @@ class CombineLatest2<E1, E2, R> : Producer<R> {
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = CombineLatestSink2_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = CombineLatestSink2_(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
@ -117,9 +117,9 @@ class CombineLatestSink3_<E1, E2, E3, O: ObserverType> : CombineLatestSink<O> {
var _latestElement2: E2! = nil
var _latestElement3: E3! = nil
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(arity: 3, observer: observer, cancel: cancel)
super.init(arity: 3, observer: observer)
}
func run() -> Disposable {
@ -127,9 +127,9 @@ class CombineLatestSink3_<E1, E2, E3, O: ObserverType> : CombineLatestSink<O> {
let subscription2 = SingleAssignmentDisposable()
let subscription3 = SingleAssignmentDisposable()
let observer1 = CombineLatestObserver(lock: lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
let observer3 = CombineLatestObserver(lock: lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3)
let observer1 = CombineLatestObserver(lock: _lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: _lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
let observer3 = CombineLatestObserver(lock: _lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3)
subscription1.disposable = _parent._source1.subscribe(observer1)
subscription2.disposable = _parent._source2.subscribe(observer2)
@ -164,10 +164,10 @@ class CombineLatest3<E1, E2, E3, R> : Producer<R> {
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = CombineLatestSink3_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = CombineLatestSink3_(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
@ -202,9 +202,9 @@ class CombineLatestSink4_<E1, E2, E3, E4, O: ObserverType> : CombineLatestSink<O
var _latestElement3: E3! = nil
var _latestElement4: E4! = nil
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(arity: 4, observer: observer, cancel: cancel)
super.init(arity: 4, observer: observer)
}
func run() -> Disposable {
@ -213,10 +213,10 @@ class CombineLatestSink4_<E1, E2, E3, E4, O: ObserverType> : CombineLatestSink<O
let subscription3 = SingleAssignmentDisposable()
let subscription4 = SingleAssignmentDisposable()
let observer1 = CombineLatestObserver(lock: lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
let observer3 = CombineLatestObserver(lock: lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3)
let observer4 = CombineLatestObserver(lock: lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4)
let observer1 = CombineLatestObserver(lock: _lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: _lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
let observer3 = CombineLatestObserver(lock: _lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3)
let observer4 = CombineLatestObserver(lock: _lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4)
subscription1.disposable = _parent._source1.subscribe(observer1)
subscription2.disposable = _parent._source2.subscribe(observer2)
@ -255,10 +255,10 @@ class CombineLatest4<E1, E2, E3, E4, R> : Producer<R> {
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = CombineLatestSink4_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = CombineLatestSink4_(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
@ -294,9 +294,9 @@ class CombineLatestSink5_<E1, E2, E3, E4, E5, O: ObserverType> : CombineLatestSi
var _latestElement4: E4! = nil
var _latestElement5: E5! = nil
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(arity: 5, observer: observer, cancel: cancel)
super.init(arity: 5, observer: observer)
}
func run() -> Disposable {
@ -306,11 +306,11 @@ class CombineLatestSink5_<E1, E2, E3, E4, E5, O: ObserverType> : CombineLatestSi
let subscription4 = SingleAssignmentDisposable()
let subscription5 = SingleAssignmentDisposable()
let observer1 = CombineLatestObserver(lock: lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
let observer3 = CombineLatestObserver(lock: lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3)
let observer4 = CombineLatestObserver(lock: lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4)
let observer5 = CombineLatestObserver(lock: lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5)
let observer1 = CombineLatestObserver(lock: _lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: _lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
let observer3 = CombineLatestObserver(lock: _lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3)
let observer4 = CombineLatestObserver(lock: _lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4)
let observer5 = CombineLatestObserver(lock: _lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5)
subscription1.disposable = _parent._source1.subscribe(observer1)
subscription2.disposable = _parent._source2.subscribe(observer2)
@ -353,10 +353,10 @@ class CombineLatest5<E1, E2, E3, E4, E5, R> : Producer<R> {
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = CombineLatestSink5_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = CombineLatestSink5_(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
@ -393,9 +393,9 @@ class CombineLatestSink6_<E1, E2, E3, E4, E5, E6, O: ObserverType> : CombineLate
var _latestElement5: E5! = nil
var _latestElement6: E6! = nil
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(arity: 6, observer: observer, cancel: cancel)
super.init(arity: 6, observer: observer)
}
func run() -> Disposable {
@ -406,12 +406,12 @@ class CombineLatestSink6_<E1, E2, E3, E4, E5, E6, O: ObserverType> : CombineLate
let subscription5 = SingleAssignmentDisposable()
let subscription6 = SingleAssignmentDisposable()
let observer1 = CombineLatestObserver(lock: lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
let observer3 = CombineLatestObserver(lock: lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3)
let observer4 = CombineLatestObserver(lock: lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4)
let observer5 = CombineLatestObserver(lock: lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5)
let observer6 = CombineLatestObserver(lock: lock, parent: self, index: 5, setLatestValue: { (e: E6) -> Void in self._latestElement6 = e }, this: subscription6)
let observer1 = CombineLatestObserver(lock: _lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: _lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
let observer3 = CombineLatestObserver(lock: _lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3)
let observer4 = CombineLatestObserver(lock: _lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4)
let observer5 = CombineLatestObserver(lock: _lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5)
let observer6 = CombineLatestObserver(lock: _lock, parent: self, index: 5, setLatestValue: { (e: E6) -> Void in self._latestElement6 = e }, this: subscription6)
subscription1.disposable = _parent._source1.subscribe(observer1)
subscription2.disposable = _parent._source2.subscribe(observer2)
@ -458,10 +458,10 @@ class CombineLatest6<E1, E2, E3, E4, E5, E6, R> : Producer<R> {
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = CombineLatestSink6_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = CombineLatestSink6_(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
@ -499,9 +499,9 @@ class CombineLatestSink7_<E1, E2, E3, E4, E5, E6, E7, O: ObserverType> : Combine
var _latestElement6: E6! = nil
var _latestElement7: E7! = nil
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(arity: 7, observer: observer, cancel: cancel)
super.init(arity: 7, observer: observer)
}
func run() -> Disposable {
@ -513,13 +513,13 @@ class CombineLatestSink7_<E1, E2, E3, E4, E5, E6, E7, O: ObserverType> : Combine
let subscription6 = SingleAssignmentDisposable()
let subscription7 = SingleAssignmentDisposable()
let observer1 = CombineLatestObserver(lock: lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
let observer3 = CombineLatestObserver(lock: lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3)
let observer4 = CombineLatestObserver(lock: lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4)
let observer5 = CombineLatestObserver(lock: lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5)
let observer6 = CombineLatestObserver(lock: lock, parent: self, index: 5, setLatestValue: { (e: E6) -> Void in self._latestElement6 = e }, this: subscription6)
let observer7 = CombineLatestObserver(lock: lock, parent: self, index: 6, setLatestValue: { (e: E7) -> Void in self._latestElement7 = e }, this: subscription7)
let observer1 = CombineLatestObserver(lock: _lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: _lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
let observer3 = CombineLatestObserver(lock: _lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3)
let observer4 = CombineLatestObserver(lock: _lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4)
let observer5 = CombineLatestObserver(lock: _lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5)
let observer6 = CombineLatestObserver(lock: _lock, parent: self, index: 5, setLatestValue: { (e: E6) -> Void in self._latestElement6 = e }, this: subscription6)
let observer7 = CombineLatestObserver(lock: _lock, parent: self, index: 6, setLatestValue: { (e: E7) -> Void in self._latestElement7 = e }, this: subscription7)
subscription1.disposable = _parent._source1.subscribe(observer1)
subscription2.disposable = _parent._source2.subscribe(observer2)
@ -570,10 +570,10 @@ class CombineLatest7<E1, E2, E3, E4, E5, E6, E7, R> : Producer<R> {
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = CombineLatestSink7_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = CombineLatestSink7_(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
@ -612,9 +612,9 @@ class CombineLatestSink8_<E1, E2, E3, E4, E5, E6, E7, E8, O: ObserverType> : Com
var _latestElement7: E7! = nil
var _latestElement8: E8! = nil
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(arity: 8, observer: observer, cancel: cancel)
super.init(arity: 8, observer: observer)
}
func run() -> Disposable {
@ -627,14 +627,14 @@ class CombineLatestSink8_<E1, E2, E3, E4, E5, E6, E7, E8, O: ObserverType> : Com
let subscription7 = SingleAssignmentDisposable()
let subscription8 = SingleAssignmentDisposable()
let observer1 = CombineLatestObserver(lock: lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
let observer3 = CombineLatestObserver(lock: lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3)
let observer4 = CombineLatestObserver(lock: lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4)
let observer5 = CombineLatestObserver(lock: lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5)
let observer6 = CombineLatestObserver(lock: lock, parent: self, index: 5, setLatestValue: { (e: E6) -> Void in self._latestElement6 = e }, this: subscription6)
let observer7 = CombineLatestObserver(lock: lock, parent: self, index: 6, setLatestValue: { (e: E7) -> Void in self._latestElement7 = e }, this: subscription7)
let observer8 = CombineLatestObserver(lock: lock, parent: self, index: 7, setLatestValue: { (e: E8) -> Void in self._latestElement8 = e }, this: subscription8)
let observer1 = CombineLatestObserver(lock: _lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: _lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
let observer3 = CombineLatestObserver(lock: _lock, parent: self, index: 2, setLatestValue: { (e: E3) -> Void in self._latestElement3 = e }, this: subscription3)
let observer4 = CombineLatestObserver(lock: _lock, parent: self, index: 3, setLatestValue: { (e: E4) -> Void in self._latestElement4 = e }, this: subscription4)
let observer5 = CombineLatestObserver(lock: _lock, parent: self, index: 4, setLatestValue: { (e: E5) -> Void in self._latestElement5 = e }, this: subscription5)
let observer6 = CombineLatestObserver(lock: _lock, parent: self, index: 5, setLatestValue: { (e: E6) -> Void in self._latestElement6 = e }, this: subscription6)
let observer7 = CombineLatestObserver(lock: _lock, parent: self, index: 6, setLatestValue: { (e: E7) -> Void in self._latestElement7 = e }, this: subscription7)
let observer8 = CombineLatestObserver(lock: _lock, parent: self, index: 7, setLatestValue: { (e: E8) -> Void in self._latestElement8 = e }, this: subscription8)
subscription1.disposable = _parent._source1.subscribe(observer1)
subscription2.disposable = _parent._source2.subscribe(observer2)
@ -689,10 +689,10 @@ class CombineLatest8<E1, E2, E3, E4, E5, E6, E7, E8, R> : Producer<R> {
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = CombineLatestSink8_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = CombineLatestSink8_(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -38,9 +38,9 @@ class CombineLatestSink<%= i %>_<<%= (Array(1...i).map { "E\($0)" }).joinWithSep
" var _latestElement\($0): E\($0)! = nil"
}).joinWithSeparator("\n") %>
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(arity: <%= i %>, observer: observer, cancel: cancel)
super.init(arity: <%= i %>, observer: observer)
}
func run() -> Disposable {
@ -49,7 +49,7 @@ class CombineLatestSink<%= i %>_<<%= (Array(1...i).map { "E\($0)" }).joinWithSep
}).joinWithSeparator("\n") %>
<%= (Array(1...i).map {
" let observer\($0) = CombineLatestObserver(lock: lock, parent: self, index: \($0 - 1), setLatestValue: { (e: E\($0)) -> Void in self._latestElement\($0) = e }, this: subscription\($0))"
" let observer\($0) = CombineLatestObserver(lock: _lock, parent: self, index: \($0 - 1), setLatestValue: { (e: E\($0)) -> Void in self._latestElement\($0) = e }, this: subscription\($0))"
}).joinWithSeparator("\n") %>
<%= (Array(1...i).map {
@ -83,10 +83,10 @@ class CombineLatest<%= i %><<%= (Array(1...i).map { "E\($0)" }).joinWithSeparato
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = CombineLatestSink<%= i %>_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = CombineLatestSink<%= i %>_(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -14,10 +14,12 @@ protocol CombineLatestProtocol : class {
func done(index: Int)
}
class CombineLatestSink<O: ObserverType> : Sink<O>, CombineLatestProtocol {
class CombineLatestSink<O: ObserverType>
: Sink<O>
, CombineLatestProtocol {
typealias Element = O.E
let lock = NSRecursiveLock()
let _lock = NSRecursiveLock()
private let _arity: Int
private var _numberOfValues = 0
@ -25,12 +27,12 @@ class CombineLatestSink<O: ObserverType> : Sink<O>, CombineLatestProtocol {
private var _hasValue: [Bool]
private var _isDone: [Bool]
init(arity: Int, observer: O, cancel: Disposable) {
init(arity: Int, observer: O) {
_arity = arity
_hasValue = [Bool](count: arity, repeatedValue: false)
_isDone = [Bool](count: arity, repeatedValue: false)
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func getResult() throws -> Element {

View File

@ -12,8 +12,8 @@ import Foundation
class ConcatSink<S: SequenceType, O: ObserverType where S.Generator.Element : ObservableConvertibleType, S.Generator.Element.E == O.E> : TailRecursiveSink<S, O> {
typealias Element = O.E
override init(observer: O, cancel: Disposable) {
super.init(observer: observer, cancel: cancel)
override init(observer: O) {
super.init(observer: observer)
}
override func on(event: Event<Element>){
@ -47,11 +47,9 @@ class Concat<S: SequenceType where S.Generator.Element : ObservableConvertibleTy
_sources = sources
}
override func run<O: ObserverType where O.E == Element>
(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ConcatSink<S, O>(observer: observer, cancel: cancel)
setSink(sink)
return sink.run(_sources.generate())
override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = ConcatSink<S, O>(observer: observer)
sink.disposable = sink.run(_sources.generate())
return sink
}
}

View File

@ -14,9 +14,9 @@ class Debug_<O: ObserverType> : Sink<O>, ObserverType {
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<Element>) {
@ -45,10 +45,10 @@ class Debug<Element> : Producer<Element> {
_source = source
}
override func run<O: ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable {
print("[\(_identifier)] subscribed")
let sink = Debug_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribe(sink)
let sink = Debug_(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
}

View File

@ -14,9 +14,9 @@ class DeferredSink<O: ObserverType> : Sink<O>, ObserverType {
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {
@ -54,10 +54,10 @@ class Deferred<Element> : Producer<Element> {
_observableFactory = observableFactory
}
override func run<O: ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = DeferredSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = DeferredSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
func eval() throws -> Observable<Element> {

View File

@ -8,15 +8,17 @@
import Foundation
class DelaySubscriptionSink<ElementType, O: ObserverType, S: SchedulerType where O.E == ElementType> : Sink<O>, ObserverType {
class DelaySubscriptionSink<ElementType, O: ObserverType, S: SchedulerType where O.E == ElementType>
: Sink<O>
, ObserverType {
typealias Parent = DelaySubscription<ElementType, S>
typealias E = O.E
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<E>) {
@ -41,11 +43,12 @@ class DelaySubscription<Element, S: SchedulerType>: Producer<Element> {
_scheduler = scheduler
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = DelaySubscriptionSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _scheduler.scheduleRelative((), dueTime: _dueTime) { _ in
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = DelaySubscriptionSink(parent: self, observer: observer)
sink.disposable = _scheduler.scheduleRelative((), dueTime: _dueTime) { _ in
return self._source.subscribe(sink)
}
return sink
}
}

View File

@ -14,9 +14,9 @@ class DistinctUntilChangedSink<O: ObserverType, Key>: Sink<O>, ObserverType {
private let _parent: DistinctUntilChanged<E, Key>
private var _currentKey: Key? = nil
init(parent: DistinctUntilChanged<E, Key>, observer: O, cancel: Disposable) {
init(parent: DistinctUntilChanged<E, Key>, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<E>) {
@ -64,9 +64,9 @@ class DistinctUntilChanged<Element, Key>: Producer<Element> {
_comparer = comparer
}
override func run<O: ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = DistinctUntilChangedSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribe(sink)
override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = DistinctUntilChangedSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
}

View File

@ -14,9 +14,9 @@ class DoSink<O: ObserverType> : Sink<O>, ObserverType {
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<Element>) {
@ -45,11 +45,9 @@ class Do<Element> : Producer<Element> {
_eventHandler = eventHandler
}
override func run<O: ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = DoSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribe(sink)
override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = DoSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
}

View File

@ -15,11 +15,11 @@ class ElementAtSink<SourceType, O: ObserverType where O.E == SourceType> : Sink<
let _parent: Parent
var _i: Int
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
_i = parent._index
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<SourceType>) {
@ -71,9 +71,9 @@ class ElementAt<SourceType> : Producer<SourceType> {
self._throwOnEmpty = throwOnEmpty
}
override func run<O: ObserverType where O.E == SourceType>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ElementAtSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribeSafe(sink)
override func run<O: ObserverType where O.E == SourceType>(observer: O) -> Disposable {
let sink = ElementAtSink(parent: self, observer: observer)
sink.disposable = _source.subscribeSafe(sink)
return sink
}
}

View File

@ -15,9 +15,9 @@ class FilterSink<O : ObserverType>: Sink<O>, ObserverType {
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<Element>) {
@ -51,9 +51,9 @@ class Filter<Element> : Producer<Element> {
_predicate = predicate
}
override func run<O: ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = FilterSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribe(sink)
override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = FilterSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
}

View File

@ -67,9 +67,9 @@ class FlatMapSink<SourceType, S: ObservableConvertibleType, O: ObserverType wher
private var _stopped = false
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func performMap(element: SourceType) throws -> S {
@ -128,8 +128,8 @@ class FlatMapSink<SourceType, S: ObservableConvertibleType, O: ObserverType wher
}
class FlatMapSink1<SourceType, S: ObservableConvertibleType, O : ObserverType where S.E == O.E> : FlatMapSink<SourceType, S, O> {
override init(parent: Parent, observer: O, cancel: Disposable) {
super.init(parent: parent, observer: observer, cancel: cancel)
override init(parent: Parent, observer: O) {
super.init(parent: parent, observer: observer)
}
override func performMap(element: SourceType) throws -> S {
@ -140,8 +140,8 @@ class FlatMapSink1<SourceType, S: ObservableConvertibleType, O : ObserverType wh
class FlatMapSink2<SourceType, S: ObservableConvertibleType, O: ObserverType where S.E == O.E> : FlatMapSink<SourceType, S, O> {
private var _index = 0
override init(parent: Parent, observer: O, cancel: Disposable) {
super.init(parent: parent, observer: observer, cancel: cancel)
override init(parent: Parent, observer: O) {
super.init(parent: parent, observer: observer)
}
override func performMap(element: SourceType) throws -> S {
@ -170,16 +170,18 @@ class FlatMap<SourceType, S: ObservableConvertibleType>: Producer<S.E> {
_selector1 = nil
}
override func run<O: ObserverType where O.E == S.E>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable {
let sink: FlatMapSink<SourceType, S, O>
if let _ = _selector1 {
let sink = FlatMapSink1(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
sink = FlatMapSink1(parent: self, observer: observer)
}
else {
let sink = FlatMapSink2(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
sink = FlatMapSink2(parent: self, observer: observer)
}
let subscription = sink.run()
sink.disposable = subscription
return sink
}
}

View File

@ -15,10 +15,10 @@ class GenerateSink<S, O: ObserverType> : Sink<O> {
private var _state: S
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
_state = parent._initialState
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {
@ -63,9 +63,9 @@ class Generate<S, E> : Producer<E> {
super.init()
}
override func run<O : ObserverType where O.E == E>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = GenerateSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O : ObserverType where O.E == E>(observer: O) -> Disposable {
let sink = GenerateSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -15,9 +15,9 @@ class MapSink<SourceType, O : ObserverType> : Sink<O>, ObserverType {
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func performMap(element: SourceType) throws -> ResultType {
@ -50,8 +50,8 @@ class MapSink<SourceType, O : ObserverType> : Sink<O>, ObserverType {
class MapSink1<SourceType, O: ObserverType> : MapSink<SourceType, O> {
typealias ResultType = O.E
override init(parent: Map<SourceType, ResultType>, observer: O, cancel: Disposable) {
super.init(parent: parent, observer: observer, cancel: cancel)
override init(parent: Map<SourceType, ResultType>, observer: O) {
super.init(parent: parent, observer: observer)
}
override func performMap(element: SourceType) throws -> ResultType {
@ -64,8 +64,8 @@ class MapSink2<SourceType, O: ObserverType> : MapSink<SourceType, O> {
private var _index = 0
override init(parent: Map<SourceType, ResultType>, observer: O, cancel: Disposable) {
super.init(parent: parent, observer: observer, cancel: cancel)
override init(parent: Map<SourceType, ResultType>, observer: O) {
super.init(parent: parent, observer: observer)
}
override func performMap(element: SourceType) throws -> ResultType {
return try _parent._selector2!(element, try incrementChecked(&_index))
@ -93,16 +93,16 @@ class Map<SourceType, ResultType>: Producer<ResultType> {
_selector1 = nil
}
override func run<O: ObserverType where O.E == ResultType>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
override func run<O: ObserverType where O.E == ResultType>(observer: O) -> Disposable {
if let _ = _selector1 {
let sink = MapSink1(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribe(sink)
let sink = MapSink1(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
else {
let sink = MapSink2(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribe(sink)
let sink = MapSink2(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
}

View File

@ -10,46 +10,59 @@ import Foundation
// sequential
class MergeSinkIter<S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : ObserverType {
class MergeSinkIter<S: ObservableConvertibleType, O: ObserverType where O.E == S.E>
: ObserverType
, LockOwnerType
, SynchronizedOnType {
typealias E = O.E
typealias DisposeKey = Bag<Disposable>.KeyType
typealias Parent = MergeSink<S, O>
private let _parent: Parent
private let _disposeKey: DisposeKey
var _lock: NSRecursiveLock {
return _parent._lock
}
init(parent: Parent, disposeKey: DisposeKey) {
_parent = parent
_disposeKey = disposeKey
}
func on(event: Event<E>) {
_parent._lock.performLocked {
switch event {
case .Next:
_parent.observer?.on(event)
case .Error:
_parent.observer?.on(event)
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
switch event {
case .Next:
_parent.observer?.on(event)
case .Error:
_parent.observer?.on(event)
_parent.dispose()
case .Completed:
_parent._group.removeDisposable(_disposeKey)
if _parent._stopped && _parent._group.count == 1 {
_parent.observer?.on(.Completed)
_parent.dispose()
case .Completed:
_parent._group.removeDisposable(_disposeKey)
if _parent._stopped && _parent._group.count == 1 {
_parent.observer?.on(.Completed)
_parent.dispose()
}
}
}
}
}
class MergeSink<S: ObservableConvertibleType, O: ObserverType where O.E == S.E> : Sink<O>, ObserverType {
class MergeSink<S: ObservableConvertibleType, O: ObserverType where O.E == S.E>
: Sink<O>
, ObserverType
, LockOwnerType
, SynchronizedOnType {
typealias E = S
typealias Parent = Merge<S>
private let _parent: Parent
private let _lock = NSRecursiveLock()
let _lock = NSRecursiveLock()
// state
private var _stopped = false
@ -57,10 +70,10 @@ class MergeSink<S: ObservableConvertibleType, O: ObserverType where O.E == S.E>
private let _group = CompositeDisposable()
private let _sourceSubscription = SingleAssignmentDisposable()
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {
@ -73,8 +86,7 @@ class MergeSink<S: ObservableConvertibleType, O: ObserverType where O.E == S.E>
}
func on(event: Event<E>) {
switch event {
case .Next(let value):
if case .Next(let value) = event {
let innerSubscription = SingleAssignmentDisposable()
let maybeKey = _group.addDisposable(innerSubscription)
@ -83,22 +95,29 @@ class MergeSink<S: ObservableConvertibleType, O: ObserverType where O.E == S.E>
let disposable = value.asObservable().subscribe(observer)
innerSubscription.disposable = disposable
}
return
}
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
switch event {
case .Next:
rxFatalError("Next should have been handled")
case .Error(let error):
_lock.performLocked {
observer?.on(.Error(error))
observer?.on(.Error(error))
dispose()
case .Completed:
_stopped = true
if _group.count == 1 {
observer?.on(.Completed)
dispose()
}
case .Completed:
_lock.performLocked {
_stopped = true
if _group.count == 1 {
observer?.on(.Completed)
dispose()
}
else {
_sourceSubscription.dispose()
}
else {
_sourceSubscription.dispose()
}
}
}
@ -106,13 +125,20 @@ class MergeSink<S: ObservableConvertibleType, O: ObserverType where O.E == S.E>
// concurrent
class MergeConcurrentSinkIter<S: ObservableConvertibleType, O: ObserverType where S.E == O.E> : ObserverType {
class MergeConcurrentSinkIter<S: ObservableConvertibleType, O: ObserverType where S.E == O.E>
: ObserverType
, LockOwnerType
, SynchronizedOnType {
typealias E = O.E
typealias DisposeKey = Bag<Disposable>.KeyType
typealias Parent = MergeConcurrentSink<S, O>
private let _parent: Parent
private let _disposeKey: DisposeKey
var _lock: NSRecursiveLock {
return _parent._lock
}
init(parent: Parent, disposeKey: DisposeKey) {
_parent = parent
@ -120,42 +146,48 @@ class MergeConcurrentSinkIter<S: ObservableConvertibleType, O: ObserverType wher
}
func on(event: Event<E>) {
_parent._lock.performLocked {
switch event {
case .Next:
_parent.observer?.on(event)
case .Error:
_parent.observer?.on(event)
_parent.dispose()
case .Completed:
_parent._group.removeDisposable(_disposeKey)
let queue = _parent._queue
if queue.value.count > 0 {
let s = queue.value.dequeue()
_parent.subscribe(s, group: _parent._group)
}
else {
_parent._activeCount = _parent._activeCount - 1
if _parent._stopped && _parent._activeCount == 0 {
_parent.observer?.on(.Completed)
_parent.dispose()
}
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
switch event {
case .Next:
_parent.observer?.on(event)
case .Error:
_parent.observer?.on(event)
_parent.dispose()
case .Completed:
_parent._group.removeDisposable(_disposeKey)
let queue = _parent._queue
if queue.value.count > 0 {
let s = queue.value.dequeue()
_parent.subscribe(s, group: _parent._group)
}
else {
_parent._activeCount = _parent._activeCount - 1
if _parent._stopped && _parent._activeCount == 0 {
_parent.observer?.on(.Completed)
_parent.dispose()
}
}
}
}
}
class MergeConcurrentSink<S: ObservableConvertibleType, O: ObserverType where S.E == O.E> : Sink<O>, ObserverType {
class MergeConcurrentSink<S: ObservableConvertibleType, O: ObserverType where S.E == O.E>
: Sink<O>
, ObserverType
, LockOwnerType
, SynchronizedOnType {
typealias E = S
typealias Parent = Merge<S>
typealias QueueType = Queue<S>
private let _parent: Parent
private let _lock = NSRecursiveLock()
let _lock = NSRecursiveLock()
// state
private var _stopped = false
private var _activeCount = 0
@ -164,11 +196,11 @@ class MergeConcurrentSink<S: ObservableConvertibleType, O: ObserverType where S.
private let _sourceSubscription = SingleAssignmentDisposable()
private let _group = CompositeDisposable()
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
_group.addDisposable(_sourceSubscription)
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {
@ -193,39 +225,38 @@ class MergeConcurrentSink<S: ObservableConvertibleType, O: ObserverType where S.
}
func on(event: Event<E>) {
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
switch event {
case .Next(let value):
let subscribe = _lock.calculateLocked { () -> Bool in
if _activeCount < _parent._maxConcurrent {
_activeCount += 1
return true
}
else {
_queue.value.enqueue(value)
return false
}
let subscribe: Bool
if _activeCount < _parent._maxConcurrent {
_activeCount += 1
subscribe = true
}
else {
_queue.value.enqueue(value)
subscribe = false
}
if subscribe {
self.subscribe(value, group: _group)
}
case .Error(let error):
_lock.performLocked {
observer?.on(.Error(error))
observer?.on(.Error(error))
dispose()
case .Completed:
if _activeCount == 0 {
observer?.on(.Completed)
dispose()
}
case .Completed:
_lock.performLocked {
if _activeCount == 0 {
observer?.on(.Completed)
dispose()
}
else {
_sourceSubscription.dispose()
}
_stopped = true
else {
_sourceSubscription.dispose()
}
_stopped = true
}
}
}
@ -239,16 +270,16 @@ class Merge<S: ObservableConvertibleType> : Producer<S.E> {
_maxConcurrent = maxConcurrent
}
override func run<O: ObserverType where O.E == S.E>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
override func run<O: ObserverType where O.E == S.E>(observer: O) -> Disposable {
if _maxConcurrent > 0 {
let sink = MergeConcurrentSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
let sink = MergeConcurrentSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
else {
let sink = MergeSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
let sink = MergeSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
}

View File

@ -15,9 +15,9 @@ class MulticastSink<S: SubjectType, O: ObserverType>: Sink<O>, ObserverType {
private let _parent: MutlicastType
init(parent: MutlicastType, observer: O, cancel: Disposable) {
init(parent: MutlicastType, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {
@ -63,9 +63,9 @@ class Multicast<S: SubjectType, R>: Producer<R> {
_selector = selector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = MulticastSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = MulticastSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -21,10 +21,10 @@ class ObserveOn<E> : Producer<E> {
#endif
}
override func run<O : ObserverType where O.E == E>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ObserveOnSink(scheduler: scheduler, observer: observer, cancel: cancel)
setSink(sink)
return source.subscribe(sink)
override func run<O : ObserverType where O.E == E>(observer: O) -> Disposable {
let sink = ObserveOnSink(scheduler: scheduler, observer: observer)
sink._subscription.disposable = source.subscribe(sink)
return sink
}
#if TRACE_RESOURCES
@ -44,31 +44,30 @@ enum ObserveOnState : Int32 {
class ObserveOnSink<O: ObserverType> : ObserverBase<O.E> {
typealias E = O.E
var cancel: Disposable
var lock = SpinLock()
let scheduler: ImmediateSchedulerType
var observer: O?
var state = ObserveOnState.Stopped
var queue = Queue<Event<E>>(capacity: 10)
let scheduleDisposable = SerialDisposable()
init(scheduler: ImmediateSchedulerType, observer: O, cancel: Disposable) {
self.cancel = cancel
self.scheduler = scheduler
self.observer = observer
let _scheduler: ImmediateSchedulerType
var _lock = SpinLock()
// state
var _state = ObserveOnState.Stopped
var _observer: O?
var _queue = Queue<Event<E>>(capacity: 10)
let _scheduleDisposable = SerialDisposable()
let _subscription = SingleAssignmentDisposable()
init(scheduler: ImmediateSchedulerType, observer: O) {
_scheduler = scheduler
_observer = observer
}
override func onCore(event: Event<E>) {
let shouldStart = lock.calculateLocked { () -> Bool in
self.queue.enqueue(event)
let shouldStart = _lock.calculateLocked { () -> Bool in
self._queue.enqueue(event)
switch self.state {
switch self._state {
case .Stopped:
self.state = .Running
self._state = .Running
return true
case .Running:
return false
@ -76,18 +75,18 @@ class ObserveOnSink<O: ObserverType> : ObserverBase<O.E> {
}
if shouldStart {
scheduleDisposable.disposable = self.scheduler.scheduleRecursive((), action: self.run)
_scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
}
}
func run(state: Void, recurse: Void -> Void) {
let (nextEvent, observer) = self.lock.calculateLocked { () -> (Event<E>?, O?) in
if self.queue.count > 0 {
return (self.queue.dequeue(), self.observer)
let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<E>?, O?) in
if self._queue.count > 0 {
return (self._queue.dequeue(), self._observer)
}
else {
self.state = .Stopped
return (nil, self.observer)
self._state = .Stopped
return (nil, self._observer)
}
}
@ -101,32 +100,33 @@ class ObserveOnSink<O: ObserverType> : ObserverBase<O.E> {
return
}
let shouldContinue = self.lock.calculateLocked { () -> Bool in
if self.queue.count > 0 {
return true
}
else {
self.state = .Stopped
return false
}
}
let shouldContinue = _shouldContinue_synchronized()
if shouldContinue {
recurse()
}
}
func _shouldContinue_synchronized() -> Bool {
_lock.lock(); defer { _lock.unlock() } // {
if self._queue.count > 0 {
return true
}
else {
self._state = .Stopped
return false
}
// }
}
override func dispose() {
super.dispose()
let toDispose = lock.calculateLocked { () -> Disposable in
let originalCancel = self.cancel
self.cancel = NopDisposable.instance
self.scheduleDisposable.dispose()
self.observer = nil
return originalCancel
}
toDispose.dispose()
_subscription.dispose()
_scheduleDisposable.dispose()
_lock.lock(); defer { _lock.unlock() } // {
_observer = nil
// }
}
}

View File

@ -22,12 +22,9 @@ class ObserveOnSerialDispatchQueueSink<O: ObserverType> : ObserverBase<O.E> {
let scheduler: SerialDispatchQueueScheduler
let observer: O
var disposeLock = SpinLock()
let subscription = SingleAssignmentDisposable()
var cancel: Disposable
init(scheduler: SerialDispatchQueueScheduler, observer: O, cancel: Disposable) {
self.cancel = cancel
init(scheduler: SerialDispatchQueueScheduler, observer: O) {
self.scheduler = scheduler
self.observer = observer
super.init()
@ -47,14 +44,8 @@ class ObserveOnSerialDispatchQueueSink<O: ObserverType> : ObserverBase<O.E> {
override func dispose() {
super.dispose()
let toDispose = disposeLock.calculateLocked { () -> Disposable in
let originalCancel = self.cancel
self.cancel = NopDisposable.instance
return originalCancel
}
toDispose.dispose()
subscription.dispose()
}
}
@ -72,10 +63,10 @@ class ObserveOnSerialDispatchQueue<E> : Producer<E> {
#endif
}
override func run<O : ObserverType where O.E == E>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ObserveOnSerialDispatchQueueSink(scheduler: scheduler, observer: observer, cancel: cancel)
setSink(sink)
return source.subscribe(sink)
override func run<O : ObserverType where O.E == E>(observer: O) -> Disposable {
let sink = ObserveOnSerialDispatchQueueSink(scheduler: scheduler, observer: observer)
sink.subscription.disposable = source.subscribe(sink)
return sink
}
#if TRACE_RESOURCES

View File

@ -14,30 +14,17 @@ class Producer<Element> : Observable<Element> {
}
override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = SingleAssignmentDisposable()
let subscription = SingleAssignmentDisposable()
let d = BinaryDisposable(sink, subscription)
let setSink: (Disposable) -> Void = { d in sink.disposable = d }
if !CurrentThreadScheduler.isScheduleRequired {
let disposable = run(observer, cancel: subscription, setSink: setSink)
subscription.disposable = disposable
return run(observer)
}
else {
CurrentThreadScheduler.instance.schedule(sink) { sink in
let disposable = self.run(observer, cancel: subscription, setSink: setSink)
subscription.disposable = disposable
return NopDisposable.instance
return CurrentThreadScheduler.instance.schedule(()) { _ in
return self.run(observer)
}
}
return d
}
func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
abstractMethod()
}
}

View File

@ -27,10 +27,10 @@ class RangeProducer<_CompilerWorkaround> : Producer<Int> {
_scheduler = scheduler
}
override func run<O : ObserverType where O.E == Int>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = RangeSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O : ObserverType where O.E == Int>(observer: O) -> Disposable {
let sink = RangeSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
@ -39,9 +39,9 @@ class RangeSink<_CompilerWorkaround, O: ObserverType where O.E == Int> : Sink<O>
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {

View File

@ -15,11 +15,11 @@ class ReduceSink<SourceType, AccumulateType, O: ObserverType> : Sink<O>, Observe
private let _parent: Parent
private var _accumulation: AccumulateType
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
_accumulation = parent._seed
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<SourceType>) {
@ -66,9 +66,9 @@ class Reduce<SourceType, AccumulateType, ResultType> : Producer<ResultType> {
_mapResult = mapResult
}
override func run<O: ObserverType where O.E == ResultType>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ReduceSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribe(sink)
override func run<O: ObserverType where O.E == ResultType>(observer: O) -> Disposable {
let sink = ReduceSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
}

View File

@ -8,21 +8,23 @@
import Foundation
class RefCountSink<CO: ConnectableObservableType, O: ObserverType where CO.E == O.E> : Sink<O>, ObserverType {
class RefCountSink<CO: ConnectableObservableType, O: ObserverType where CO.E == O.E>
: Sink<O>
, ObserverType {
typealias Element = O.E
typealias Parent = RefCount<CO>
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {
let subscription = _parent._source.subscribeSafe(self)
_parent._lock.performLocked {
_parent._lock.lock(); defer { _parent._lock.unlock() } // {
if _parent._count == 0 {
_parent._count = 1
_parent._connectableSubscription = _parent._source.connect()
@ -30,11 +32,11 @@ class RefCountSink<CO: ConnectableObservableType, O: ObserverType where CO.E ==
else {
_parent._count = _parent._count + 1
}
}
// }
return AnonymousDisposable {
subscription.dispose()
self._parent._lock.performLocked {
self._parent._lock.lock(); defer { self._parent._lock.unlock() } // {
if self._parent._count == 1 {
self._parent._connectableSubscription!.dispose()
self._parent._count = 0
@ -46,7 +48,7 @@ class RefCountSink<CO: ConnectableObservableType, O: ObserverType where CO.E ==
else {
rxFatalError("Something went wrong with RefCount disposing mechanism")
}
}
// }
}
}
@ -74,9 +76,9 @@ class RefCount<CO: ConnectableObservableType>: Producer<CO.E> {
_source = source
}
override func run<O: ObserverType where O.E == CO.E>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = RefCountSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == CO.E>(observer: O) -> Disposable {
let sink = RefCountSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -17,10 +17,11 @@ class RepeatElement<Element> : Producer<Element> {
_scheduler = scheduler
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = RepeatElementSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = RepeatElementSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
@ -29,9 +30,9 @@ class RepeatElementSink<O: ObserverType> : Sink<O> {
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {

View File

@ -8,57 +8,70 @@
import Foundation
class SamplerSink<O: ObserverType, ElementType, SampleType where O.E == ElementType> : ObserverType {
class SamplerSink<O: ObserverType, ElementType, SampleType where O.E == ElementType>
: ObserverType
, LockOwnerType
, SynchronizedOnType {
typealias E = SampleType
typealias Parent = SampleSequenceSink<O, SampleType>
private let _parent: Parent
var _lock: NSRecursiveLock {
return _parent._lock
}
init(parent: Parent) {
_parent = parent
}
func on(event: Event<E>) {
_parent._lock.performLocked {
switch event {
case .Next:
if let element = _parent._element {
if _parent._parent._onlyNew {
_parent._element = nil
}
_parent.observer?.on(.Next(element))
}
synchronizedOn(event)
}
if _parent._atEnd {
_parent.observer?.on(.Completed)
_parent.dispose()
}
case .Error(let e):
_parent.observer?.on(.Error(e))
_parent.dispose()
case .Completed:
if let element = _parent._element {
func _synchronized_on(event: Event<E>) {
switch event {
case .Next:
if let element = _parent._element {
if _parent._parent._onlyNew {
_parent._element = nil
_parent.observer?.on(.Next(element))
}
if _parent._atEnd {
_parent.observer?.on(.Completed)
_parent.dispose()
}
_parent.observer?.on(.Next(element))
}
if _parent._atEnd {
_parent.observer?.on(.Completed)
_parent.dispose()
}
case .Error(let e):
_parent.observer?.on(.Error(e))
_parent.dispose()
case .Completed:
if let element = _parent._element {
_parent._element = nil
_parent.observer?.on(.Next(element))
}
if _parent._atEnd {
_parent.observer?.on(.Completed)
_parent.dispose()
}
}
}
}
class SampleSequenceSink<O: ObserverType, SampleType> : Sink<O>, ObserverType {
class SampleSequenceSink<O: ObserverType, SampleType>
: Sink<O>
, ObserverType
, LockOwnerType
, SynchronizedOnType {
typealias Element = O.E
typealias Parent = Sample<Element, SampleType>
private let _parent: Parent
private let _lock = NSRecursiveLock()
let _lock = NSRecursiveLock()
// state
private var _element = nil as Element?
@ -66,30 +79,32 @@ class SampleSequenceSink<O: ObserverType, SampleType> : Sink<O>, ObserverType {
private let _sourceSubscription = SingleAssignmentDisposable()
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {
_sourceSubscription.disposable = _parent._source.subscribe(self)
let samplerSubscription = _parent._sampler.subscribe(SamplerSink(parent: self))
return CompositeDisposable(_sourceSubscription, samplerSubscription)
return StableCompositeDisposable.create(_sourceSubscription, samplerSubscription)
}
func on(event: Event<Element>) {
_lock.performLocked {
switch event {
case .Next(let element):
_element = element
case .Error:
observer?.on(event)
dispose()
case .Completed:
_atEnd = true
_sourceSubscription.dispose()
}
synchronizedOn(event)
}
func _synchronized_on(event: Event<Element>) {
switch event {
case .Next(let element):
_element = element
case .Error:
observer?.on(event)
dispose()
case .Completed:
_atEnd = true
_sourceSubscription.dispose()
}
}
@ -106,9 +121,9 @@ class Sample<Element, SampleType> : Producer<Element> {
_onlyNew = onlyNew
}
override func run<O: ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = SampleSequenceSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = SampleSequenceSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -15,10 +15,10 @@ class ScanSink<ElementType, Accumulate, O: ObserverType where O.E == Accumulate>
private let _parent: Parent
private var _accumulate: Accumulate
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
_accumulate = parent._seed
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<ElementType>) {
@ -56,9 +56,9 @@ class Scan<Element, Accumulate>: Producer<Accumulate> {
_accumulator = accumulator
}
override func run<O : ObserverType where O.E == Accumulate>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ScanSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribe(sink)
override func run<O : ObserverType where O.E == Accumulate>(observer: O) -> Disposable {
let sink = ScanSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
}

View File

@ -8,14 +8,12 @@
import Foundation
class Sink<O : ObserverType> : Disposable {
class Sink<O : ObserverType> : SingleAssignmentDisposable {
private var _lock = SpinLock()
// state
private var _observer: O?
private var _cancel: Disposable
private var _disposed: Bool = false
var observer: O? {
get {
_lock.lock(); defer { _lock.unlock() }
@ -23,32 +21,24 @@ class Sink<O : ObserverType> : Disposable {
}
}
init(observer: O, cancel: Disposable) {
init(observer: O) {
#if TRACE_RESOURCES
OSAtomicIncrement32(&resourceCount)
#endif
_observer = observer
_cancel = cancel
}
private func _disposeInternal() -> Disposable? {
private func _disposeObserver() {
_lock.lock(); defer { _lock.unlock() }
if _disposed {
return nil
}
let cancel = _cancel
_disposed = true
_observer = nil
_cancel = NopDisposable.instance
return cancel
}
func dispose() {
_disposeInternal()?.dispose()
override func dispose() {
if !disposed {
_disposeObserver()
}
super.dispose()
}
deinit {

View File

@ -18,10 +18,10 @@ class SkipCountSink<ElementType, O: ObserverType where O.E == ElementType> : Sin
var remaining: Int
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
self.parent = parent
self.remaining = parent.count
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<Element>) {
@ -54,10 +54,11 @@ class SkipCount<Element>: Producer<Element> {
self.count = count
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = SkipCountSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return source.subscribe(sink)
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = SkipCountSink(parent: self, observer: observer)
sink.disposable = source.subscribe(sink)
return sink
}
}
@ -72,9 +73,9 @@ class SkipTimeSink<ElementType, S: SchedulerType, O: ObserverType where O.E == E
// state
var open = false
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
self.parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<Element>) {
@ -121,9 +122,9 @@ class SkipTime<Element, S: SchedulerType>: Producer<Element> {
self.duration = duration
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = SkipTimeSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = SkipTimeSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -8,20 +8,27 @@
import Foundation
class SkipUntilSinkOther<ElementType, Other, O: ObserverType where O.E == ElementType> : ObserverType {
class SkipUntilSinkOther<ElementType, Other, O: ObserverType where O.E == ElementType>
: ObserverType
, LockOwnerType
, SynchronizedOnType {
typealias Parent = SkipUntilSink<ElementType, Other, O>
typealias E = Other
private let _parent: Parent
var _lock: NSRecursiveLock {
return _parent._lock
}
private let _singleAssignmentDisposable = SingleAssignmentDisposable()
var disposable: Disposable {
private let _subscription = SingleAssignmentDisposable()
var subscription: Disposable {
get {
abstractMethod()
}
set {
_singleAssignmentDisposable.disposable = newValue
_subscription.disposable = newValue
}
}
@ -33,19 +40,19 @@ class SkipUntilSinkOther<ElementType, Other, O: ObserverType where O.E == Elemen
}
func on(event: Event<E>) {
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
switch event {
case .Next:
_parent._lock.performLocked {
_parent._forwardElements = true
_singleAssignmentDisposable.dispose()
}
_parent._forwardElements = true
_subscription.dispose()
case .Error(let e):
_parent._lock.performLocked {
_parent.observer?.onError(e)
_parent.dispose()
}
_parent.observer?.onError(e)
_parent.dispose()
case .Completed:
_singleAssignmentDisposable.dispose()
_subscription.dispose()
}
}
@ -58,46 +65,43 @@ class SkipUntilSinkOther<ElementType, Other, O: ObserverType where O.E == Elemen
}
class SkipUntilSink<ElementType, Other, O: ObserverType where O.E == ElementType> : Sink<O>, ObserverType {
class SkipUntilSink<ElementType, Other, O: ObserverType where O.E == ElementType>
: Sink<O>
, ObserverType
, LockOwnerType
, SynchronizedOnType {
typealias E = ElementType
typealias Parent = SkipUntil<E, Other>
private let _lock = NSRecursiveLock()
let _lock = NSRecursiveLock()
private let _parent: Parent
private var _forwardElements = false
private let _singleAssignmentDisposable = SingleAssignmentDisposable()
var disposable: Disposable {
get {
abstractMethod()
}
set {
_singleAssignmentDisposable.disposable = newValue
}
}
init(parent: Parent, observer: O, cancel: Disposable) {
private let _sourceSubscription = SingleAssignmentDisposable()
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<E>) {
_lock.performLocked {
switch event {
case .Next:
if _forwardElements {
observer?.on(event)
}
case .Error:
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
switch event {
case .Next:
if _forwardElements {
observer?.on(event)
dispose()
case .Completed:
if _forwardElements {
observer?.on(event)
}
_singleAssignmentDisposable.dispose()
}
case .Error:
observer?.on(event)
dispose()
case .Completed:
if _forwardElements {
observer?.on(event)
}
_sourceSubscription.dispose()
}
}
@ -105,10 +109,10 @@ class SkipUntilSink<ElementType, Other, O: ObserverType where O.E == ElementType
let sourceSubscription = _parent._source.subscribe(self)
let otherObserver = SkipUntilSinkOther(parent: self)
let otherSubscription = _parent._other.subscribe(otherObserver)
disposable = sourceSubscription
otherObserver.disposable = otherSubscription
_sourceSubscription.disposable = sourceSubscription
otherObserver.subscription = otherSubscription
return BinaryDisposable(sourceSubscription, otherSubscription)
return StableCompositeDisposable.create(sourceSubscription, otherSubscription)
}
}
@ -122,9 +126,9 @@ class SkipUntil<Element, Other>: Producer<Element> {
_other = other
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = SkipUntilSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = SkipUntilSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -14,9 +14,9 @@ class SkipWhileSink<ElementType, O: ObserverType where O.E == ElementType> : Sin
private let _parent: Parent
private var _running = false
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<Element>) {
@ -51,9 +51,9 @@ class SkipWhileSinkWithIndex<ElementType, O: ObserverType where O.E == ElementTy
private var _index = 0
private var _running = false
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<Element>) {
@ -100,16 +100,16 @@ class SkipWhile<Element>: Producer<Element> {
_predicateWithIndex = predicate
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
if let _ = _predicate {
let sink = SkipWhileSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribe(sink)
let sink = SkipWhileSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
else {
let sink = SkipWhileSinkWithIndex(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribe(sink)
let sink = SkipWhileSinkWithIndex(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
}
}

View File

@ -18,7 +18,7 @@ class StartWith<Element>: Producer<Element> {
super.init()
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
for e in elements {
observer.on(.Next(e))
}

View File

@ -14,9 +14,9 @@ class SubscribeOnSink<Ob: ObservableType, O: ObserverType where Ob.E == O.E> : S
let parent: Parent
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
self.parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<Element>) {
@ -52,9 +52,9 @@ class SubscribeOn<Ob: ObservableType> : Producer<Ob.E> {
self.scheduler = scheduler
}
override func run<O : ObserverType where O.E == Ob.E>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O : ObserverType where O.E == Ob.E>(observer: O) -> Disposable {
let sink = SubscribeOnSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -8,7 +8,11 @@
import Foundation
class SwitchSink<S: ObservableConvertibleType, O: ObserverType where S.E == O.E> : Sink<O>, ObserverType {
class SwitchSink<S: ObservableConvertibleType, O: ObserverType where S.E == O.E>
: Sink<O>
, ObserverType
, LockOwnerType
, SynchronizedOnType {
typealias E = S
typealias Parent = Switch<S>
@ -16,34 +20,36 @@ class SwitchSink<S: ObservableConvertibleType, O: ObserverType where S.E == O.E>
private let _innerSubscription: SerialDisposable = SerialDisposable()
private let _parent: Parent
private let _lock = NSRecursiveLock()
let _lock = NSRecursiveLock()
// state
private var _stopped = false
private var _latest = 0
private var _hasLatest = false
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {
let subscription = _parent._sources.subscribe(self)
_subscriptions.disposable = subscription
return CompositeDisposable(_subscriptions, _innerSubscription)
return StableCompositeDisposable.create(_subscriptions, _innerSubscription)
}
func on(event: Event<E>) {
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
switch event {
case .Next(let observable):
let latest: Int = _lock.calculateLocked {
_hasLatest = true
_latest = _latest &+ 1
return _latest
}
_hasLatest = true
_latest = _latest &+ 1
let latest = _latest
let d = SingleAssignmentDisposable()
_innerSubscription.disposable = d
@ -51,33 +57,36 @@ class SwitchSink<S: ObservableConvertibleType, O: ObserverType where S.E == O.E>
let disposable = observable.asObservable().subscribe(observer)
d.disposable = disposable
case .Error(let error):
_lock.performLocked {
observer?.on(.Error(error))
dispose()
}
observer?.on(.Error(error))
dispose()
case .Completed:
_lock.performLocked {
_stopped = true
_subscriptions.dispose()
if !_hasLatest {
observer?.on(.Completed)
dispose()
}
_stopped = true
_subscriptions.dispose()
if !_hasLatest {
observer?.on(.Completed)
dispose()
}
}
}
}
class SwitchSinkIter<S: ObservableConvertibleType, O: ObserverType where S.E == O.E> : ObserverType {
class SwitchSinkIter<S: ObservableConvertibleType, O: ObserverType where S.E == O.E>
: ObserverType
, LockOwnerType
, SynchronizedOnType {
typealias E = O.E
typealias Parent = SwitchSink<S, O>
private let _parent: Parent
private let _id: Int
private let _self: Disposable
var _lock: NSRecursiveLock {
return _parent._lock
}
init(parent: Parent, id: Int, _self: Disposable) {
_parent = parent
_id = id
@ -85,32 +94,33 @@ class SwitchSinkIter<S: ObservableConvertibleType, O: ObserverType where S.E ==
}
func on(event: Event<E>) {
return _parent._lock.calculateLocked {
switch event {
case .Next: break
case .Error, .Completed:
_self.dispose()
}
if _parent._latest != _id {
return
}
let observer = _parent.observer
switch event {
case .Next:
observer?.on(event)
case .Error:
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
switch event {
case .Next: break
case .Error, .Completed:
_self.dispose()
}
if _parent._latest != _id {
return
}
let observer = _parent.observer
switch event {
case .Next:
observer?.on(event)
case .Error:
observer?.on(event)
_parent.dispose()
case .Completed:
_parent._hasLatest = false
if _parent._stopped {
observer?.on(event)
_parent.dispose()
case .Completed:
_parent._hasLatest = false
if _parent._stopped {
observer?.on(event)
_parent.dispose()
}
}
}
}
@ -123,9 +133,9 @@ class Switch<S: ObservableConvertibleType> : Producer<S.E> {
_sources = sources
}
override func run<O : ObserverType where O.E == S.E>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = SwitchSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O : ObserverType where O.E == S.E>(observer: O) -> Disposable {
let sink = SwitchSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -18,10 +18,10 @@ class TakeCountSink<ElementType, O: ObserverType where O.E == ElementType> : Sin
private var _remaining: Int
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
_remaining = parent._count
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<E>) {
@ -61,48 +61,54 @@ class TakeCount<Element>: Producer<Element> {
_count = count
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = TakeCountSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribe(sink)
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = TakeCountSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
}
// time version
class TakeTimeSink<ElementType, S: SchedulerType, O: ObserverType where O.E == ElementType> : Sink<O>, ObserverType {
class TakeTimeSink<ElementType, S: SchedulerType, O: ObserverType where O.E == ElementType>
: Sink<O>
, LockOwnerType
, ObserverType
, SynchronizedOnType {
typealias Parent = TakeTime<ElementType, S>
typealias E = ElementType
private let _parent: Parent
private let _lock = NSRecursiveLock()
let _lock = NSRecursiveLock()
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<E>) {
_lock.performLocked {
switch event {
case .Next(let value):
observer?.on(.Next(value))
case .Error:
observer?.on(event)
dispose()
case .Completed:
observer?.on(event)
dispose()
}
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
switch event {
case .Next(let value):
observer?.on(.Next(value))
case .Error:
observer?.on(event)
dispose()
case .Completed:
observer?.on(event)
dispose()
}
}
func tick() {
_lock.performLocked {
observer?.on(.Completed)
dispose()
}
_lock.lock(); defer { _lock.unlock() }
observer?.on(.Completed)
dispose()
}
func run() -> Disposable {
@ -130,9 +136,9 @@ class TakeTime<Element, S: SchedulerType>: Producer<Element> {
_duration = duration
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = TakeTimeSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = TakeTimeSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -17,10 +17,10 @@ class TakeLastSink<ElementType, O: ObserverType where O.E == ElementType> : Sink
private var _elements: Queue<ElementType>
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
_elements = Queue<ElementType>(capacity: parent._count + 1)
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<E>) {
@ -55,9 +55,9 @@ class TakeLast<Element>: Producer<Element> {
_count = count
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = TakeLastSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribe(sink)
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = TakeLastSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
}

View File

@ -8,11 +8,18 @@
import Foundation
class TakeUntilSinkOther<ElementType, Other, O: ObserverType where O.E == ElementType> : ObserverType {
class TakeUntilSinkOther<ElementType, Other, O: ObserverType where O.E == ElementType>
: ObserverType
, LockOwnerType
, SynchronizedOnType {
typealias Parent = TakeUntilSink<ElementType, Other, O>
typealias E = Other
private let _parent: Parent
var _lock: NSRecursiveLock {
return _parent._lock
}
private let _singleAssignmentDisposable = SingleAssignmentDisposable()
@ -33,18 +40,20 @@ class TakeUntilSinkOther<ElementType, Other, O: ObserverType where O.E == Elemen
}
func on(event: Event<E>) {
_parent._lock.performLocked {
switch event {
case .Next:
_parent.observer?.on(.Completed)
_parent.dispose()
case .Error(let e):
_parent.observer?.on(.Error(e))
_parent.dispose()
case .Completed:
_parent._open = true
_singleAssignmentDisposable.dispose()
}
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
switch event {
case .Next:
_parent.observer?.on(.Completed)
_parent.dispose()
case .Error(let e):
_parent.observer?.on(.Error(e))
_parent.dispose()
case .Completed:
_parent._open = true
_singleAssignmentDisposable.dispose()
}
}
@ -55,43 +64,40 @@ class TakeUntilSinkOther<ElementType, Other, O: ObserverType where O.E == Elemen
#endif
}
class TakeUntilSink<ElementType, Other, O: ObserverType where O.E == ElementType> : Sink<O>, ObserverType {
class TakeUntilSink<ElementType, Other, O: ObserverType where O.E == ElementType>
: Sink<O>
, LockOwnerType
, ObserverType
, SynchronizedOnType {
typealias E = ElementType
typealias Parent = TakeUntil<E, Other>
private let _parent: Parent
private let _lock = NSRecursiveLock()
let _lock = NSRecursiveLock()
// state
private var _open = false
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<E>) {
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
switch event {
case .Next:
if _open {
observer?.on(event)
}
else {
_lock.performLocked {
observer?.on(event)
}
}
observer?.on(event)
case .Error:
_lock.performLocked {
observer?.on(event)
dispose()
}
observer?.on(event)
dispose()
case .Completed:
_lock.performLocked {
observer?.on(event)
dispose()
}
observer?.on(event)
dispose()
}
}
@ -101,7 +107,7 @@ class TakeUntilSink<ElementType, Other, O: ObserverType where O.E == ElementType
otherObserver.disposable = otherSubscription
let sourceSubscription = _parent._source.subscribe(self)
return CompositeDisposable(sourceSubscription, otherSubscription)
return StableCompositeDisposable.create(sourceSubscription, otherSubscription)
}
}
@ -115,9 +121,9 @@ class TakeUntil<Element, Other>: Producer<Element> {
_other = other
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = TakeUntilSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = TakeUntilSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -8,7 +8,9 @@
import Foundation
class TakeWhileSink<ElementType, O: ObserverType where O.E == ElementType> : Sink<O>, ObserverType {
class TakeWhileSink<ElementType, O: ObserverType where O.E == ElementType>
: Sink<O>
, ObserverType {
typealias Parent = TakeWhile<ElementType>
typealias Element = ElementType
@ -16,9 +18,9 @@ class TakeWhileSink<ElementType, O: ObserverType where O.E == ElementType> : Sin
private var _running = true
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<Element>) {
@ -50,7 +52,9 @@ class TakeWhileSink<ElementType, O: ObserverType where O.E == ElementType> : Sin
}
class TakeWhileSinkWithIndex<ElementType, O: ObserverType where O.E == ElementType> : Sink<O>, ObserverType {
class TakeWhileSinkWithIndex<ElementType, O: ObserverType where O.E == ElementType>
: Sink<O>
, ObserverType {
typealias Parent = TakeWhile<ElementType>
typealias Element = ElementType
@ -59,9 +63,9 @@ class TakeWhileSinkWithIndex<ElementType, O: ObserverType where O.E == ElementTy
private var _running = true
private var _index = 0
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<Element>) {
@ -114,15 +118,15 @@ class TakeWhile<Element>: Producer<Element> {
_predicateWithIndex = predicate
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
if let _ = _predicate {
let sink = TakeWhileSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribe(sink)
let sink = TakeWhileSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
} else {
let sink = TakeWhileSinkWithIndex(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribe(sink)
let sink = TakeWhileSinkWithIndex(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
}
}

View File

@ -8,13 +8,17 @@
import Foundation
class ThrottleSink<O: ObserverType, Scheduler: SchedulerType> : Sink<O>, ObserverType {
class ThrottleSink<O: ObserverType, Scheduler: SchedulerType>
: Sink<O>
, ObserverType
, LockOwnerType
, SynchronizedOnType {
typealias Element = O.E
typealias ParentType = Throttle<Element, Scheduler>
private let _parent: ParentType
private let _lock = NSRecursiveLock()
let _lock = NSRecursiveLock()
// state
private var _id = 0 as UInt64
@ -22,81 +26,60 @@ class ThrottleSink<O: ObserverType, Scheduler: SchedulerType> : Sink<O>, Observe
let cancellable = SerialDisposable()
init(parent: ParentType, observer: O, cancel: Disposable) {
init(parent: ParentType, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {
let subscription = _parent._source.subscribe(self)
return CompositeDisposable(subscription, cancellable)
return StableCompositeDisposable.create(subscription, cancellable)
}
func on(event: Event<Element>) {
synchronizedOn(event)
}
func _synchronized_on(event: Event<Element>) {
switch event {
case .Next:
break
case .Error, .Completed:
cancellable.dispose()
}
let latestId = _lock.calculateLocked { () -> UInt64 in
let observer = self.observer
let oldValue = _value
case .Next(let element):
_id = _id &+ 1
switch event {
case .Next(let element):
_value = element
case .Error:
_value = nil
observer?.on(event)
dispose()
case .Completed:
_value = nil
if let value = oldValue {
observer?.on(.Next(value))
}
observer?.on(.Completed)
dispose()
}
return _id
}
switch event {
case .Next:
let d = SingleAssignmentDisposable()
self.cancellable.disposable = d
let currentId = _id
_value = element
let scheduler = _parent._scheduler
let dueTime = _parent._dueTime
let disposeTimer = scheduler.scheduleRelative(latestId, dueTime: dueTime) { (id) in
self.propagate()
return NopDisposable.instance
let d = SingleAssignmentDisposable()
self.cancellable.disposable = d
d.disposable = scheduler.scheduleRelative(currentId, dueTime: dueTime, action: self.propagate)
case .Error:
_value = nil
observer?.on(event)
dispose()
case .Completed:
if let value = _value {
_value = nil
observer?.on(.Next(value))
}
d.disposable = disposeTimer
default: break
observer?.on(.Completed)
dispose()
}
}
func propagate() {
let originalValue: Element? = _lock.calculateLocked {
func propagate(currentId: UInt64) -> Disposable {
_lock.lock(); defer { _lock.unlock() } // {
let originalValue = _value
_value = nil
return originalValue
}
if let value = originalValue {
observer?.on(.Next(value))
}
if let value = originalValue where _id == currentId {
_value = nil
observer?.on(.Next(value))
}
// }
return NopDisposable.instance
}
}
@ -112,10 +95,10 @@ class Throttle<Element, Scheduler: SchedulerType> : Producer<Element> {
_scheduler = scheduler
}
override func run<O: ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ThrottleSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = ThrottleSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -13,9 +13,9 @@ class TimerSink<S: SchedulerType, O: ObserverType where O.E == Int64> : Sink<O>
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {
@ -31,9 +31,9 @@ class TimerOneOffSink<S: SchedulerType, O: ObserverType where O.E == Int64> : Si
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {
@ -59,16 +59,16 @@ class Timer<S: SchedulerType>: Producer<Int64> {
_period = period
}
override func run<O : ObserverType where O.E == Int64>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
override func run<O : ObserverType where O.E == Int64>(observer: O) -> Disposable {
if let _ = _period {
let sink = TimerSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
let sink = TimerSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
else {
let sink = TimerOneOffSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
let sink = TimerOneOffSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
}

View File

@ -14,10 +14,10 @@ class ToArraySink<SourceType, O: ObserverType where O.E == [SourceType]> : Sink<
let _parent: Parent
var _list = Array<SourceType>()
init(parent: Parent, observer: O, cancel: Disposable) {
self._parent = parent
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<SourceType>) {
@ -42,9 +42,9 @@ class ToArray<SourceType> : Producer<[SourceType]> {
_source = source
}
override func run<O: ObserverType where O.E == [SourceType]>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ToArraySink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribe(sink)
override func run<O: ObserverType where O.E == [SourceType]>(observer: O) -> Disposable {
let sink = ToArraySink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
}

View File

@ -15,9 +15,9 @@ class UsingSink<SourceType, ResourceType: Disposable, O: ObserverType where O.E
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {
@ -70,9 +70,9 @@ class Using<SourceType, ResourceType: Disposable>: Producer<SourceType> {
_observableFactory = observableFactory
}
override func run<O : ObserverType where O.E == E>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = UsingSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O : ObserverType where O.E == E>(observer: O) -> Disposable {
let sink = UsingSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -8,21 +8,24 @@
import Foundation
class WithLatestFromSink<FirstType, SecondType, ResultType, O: ObserverType where O.E == ResultType > : Sink<O>, ObserverType {
class WithLatestFromSink<FirstType, SecondType, ResultType, O: ObserverType where O.E == ResultType>
: Sink<O>
, ObserverType
, LockOwnerType
, SynchronizedOnType {
typealias Parent = WithLatestFrom<FirstType, SecondType, ResultType>
typealias E = FirstType
private let _parent: Parent
private var _lock = NSRecursiveLock()
var _lock = NSRecursiveLock()
private var _latest: SecondType?
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func run() -> Disposable {
@ -34,60 +37,70 @@ class WithLatestFromSink<FirstType, SecondType, ResultType, O: ObserverType wher
return StableCompositeDisposable.create(fstSubscription, sndSubscription)
}
func on(event: Event<E>) {
_lock.performLocked {
switch event {
case let .Next(value):
guard let latest = _latest else { return }
do {
let res = try _parent._resultSelector(value, latest)
observer?.onNext(res)
} catch let e {
observer?.onError(e)
dispose()
}
case .Completed:
observer?.onComplete()
dispose()
case let .Error(error):
observer?.onError(error)
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
switch event {
case let .Next(value):
guard let latest = _latest else { return }
do {
let res = try _parent._resultSelector(value, latest)
observer?.onNext(res)
} catch let e {
observer?.onError(e)
dispose()
}
case .Completed:
observer?.onComplete()
dispose()
case let .Error(error):
observer?.onError(error)
dispose()
}
}
}
class WithLatestFromSecond<FirstType, SecondType, ResultType, O: ObserverType where O.E == ResultType>: ObserverType {
class WithLatestFromSecond<FirstType, SecondType, ResultType, O: ObserverType where O.E == ResultType>
: ObserverType
, LockOwnerType
, SynchronizedOnType {
typealias Parent = WithLatestFromSink<FirstType, SecondType, ResultType, O>
typealias E = SecondType
private let _parent: Parent
private let _disposable: Disposable
var _lock: NSRecursiveLock {
get {
return _parent._lock
}
}
init(parent: Parent, disposable: Disposable) {
_parent = parent
_disposable = disposable
}
func on(event: Event<E>) {
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
switch event {
case let .Next(value):
_parent._lock.performLocked {
_parent._latest = value
}
_parent._latest = value
case .Completed:
_disposable.dispose()
case let .Error(error):
_parent._lock.performLocked {
_parent.observer?.onError(error)
_parent.dispose()
}
_parent.observer?.onError(error)
_parent.dispose()
}
}
}
class WithLatestFrom<FirstType, SecondType, ResultType>: Producer<ResultType> {
@ -103,10 +116,9 @@ class WithLatestFrom<FirstType, SecondType, ResultType>: Producer<ResultType> {
_resultSelector = resultSelector
}
override func run<O : ObserverType where O.E == ResultType>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = WithLatestFromSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O : ObserverType where O.E == ResultType>(observer: O) -> Disposable {
let sink = WithLatestFromSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -8,7 +8,8 @@
import Foundation
class ZipCollectionTypeSink<C: CollectionType, R, O: ObserverType where C.Generator.Element : ObservableConvertibleType, O.E == R> : Sink<O> {
class ZipCollectionTypeSink<C: CollectionType, R, O: ObserverType where C.Generator.Element : ObservableConvertibleType, O.E == R>
: Sink<O> {
typealias Parent = ZipCollectionType<C, R>
typealias SourceElement = C.Generator.Element.E
@ -23,7 +24,7 @@ class ZipCollectionTypeSink<C: CollectionType, R, O: ObserverType where C.Genera
private var _numberOfDone = 0
private var _subscriptions: [SingleAssignmentDisposable]
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
_values = [Queue<SourceElement>](count: parent.count, repeatedValue: Queue(capacity: 4))
_isDone = [Bool](count: parent.count, repeatedValue: false)
@ -34,11 +35,11 @@ class ZipCollectionTypeSink<C: CollectionType, R, O: ObserverType where C.Genera
_subscriptions.append(SingleAssignmentDisposable())
}
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func on(event: Event<SourceElement>, atIndex: Int) {
_lock.performLocked {
_lock.lock(); defer { _lock.unlock() } // {
switch event {
case .Next(let element):
_values[atIndex].enqueue(element)
@ -97,7 +98,7 @@ class ZipCollectionTypeSink<C: CollectionType, R, O: ObserverType where C.Genera
_subscriptions[atIndex].dispose()
}
}
}
// }
}
func run() -> Disposable {
@ -128,9 +129,9 @@ class ZipCollectionType<C: CollectionType, R where C.Generator.Element : Observa
self.count = Int(self.sources.count.toIntMax())
}
override func run<O : ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ZipCollectionTypeSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O : ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = ZipCollectionTypeSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -39,9 +39,9 @@ class ZipSink2_<E1, E2, O: ObserverType> : ZipSink<O> {
var _values1: Queue<E1> = Queue(capacity: 2)
var _values2: Queue<E2> = Queue(capacity: 2)
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(arity: 2, observer: observer, cancel: cancel)
super.init(arity: 2, observer: observer)
}
override func hasElements(index: Int) -> Bool {
@ -60,8 +60,8 @@ class ZipSink2_<E1, E2, O: ObserverType> : ZipSink<O> {
let subscription1 = SingleAssignmentDisposable()
let subscription2 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
subscription1.disposable = _parent.source1.subscribe(observer1)
subscription2.disposable = _parent.source2.subscribe(observer2)
@ -92,10 +92,10 @@ class Zip2<E1, E2, R> : Producer<R> {
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ZipSink2_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = ZipSink2_(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
@ -129,9 +129,9 @@ class ZipSink3_<E1, E2, E3, O: ObserverType> : ZipSink<O> {
var _values2: Queue<E2> = Queue(capacity: 2)
var _values3: Queue<E3> = Queue(capacity: 2)
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(arity: 3, observer: observer, cancel: cancel)
super.init(arity: 3, observer: observer)
}
override func hasElements(index: Int) -> Bool {
@ -152,9 +152,9 @@ class ZipSink3_<E1, E2, E3, O: ObserverType> : ZipSink<O> {
let subscription2 = SingleAssignmentDisposable()
let subscription3 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
subscription1.disposable = _parent.source1.subscribe(observer1)
subscription2.disposable = _parent.source2.subscribe(observer2)
@ -189,10 +189,10 @@ class Zip3<E1, E2, E3, R> : Producer<R> {
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ZipSink3_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = ZipSink3_(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
@ -227,9 +227,9 @@ class ZipSink4_<E1, E2, E3, E4, O: ObserverType> : ZipSink<O> {
var _values3: Queue<E3> = Queue(capacity: 2)
var _values4: Queue<E4> = Queue(capacity: 2)
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(arity: 4, observer: observer, cancel: cancel)
super.init(arity: 4, observer: observer)
}
override func hasElements(index: Int) -> Bool {
@ -252,10 +252,10 @@ class ZipSink4_<E1, E2, E3, E4, O: ObserverType> : ZipSink<O> {
let subscription3 = SingleAssignmentDisposable()
let subscription4 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4)
let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4)
subscription1.disposable = _parent.source1.subscribe(observer1)
subscription2.disposable = _parent.source2.subscribe(observer2)
@ -294,10 +294,10 @@ class Zip4<E1, E2, E3, E4, R> : Producer<R> {
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ZipSink4_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = ZipSink4_(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
@ -333,9 +333,9 @@ class ZipSink5_<E1, E2, E3, E4, E5, O: ObserverType> : ZipSink<O> {
var _values4: Queue<E4> = Queue(capacity: 2)
var _values5: Queue<E5> = Queue(capacity: 2)
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(arity: 5, observer: observer, cancel: cancel)
super.init(arity: 5, observer: observer)
}
override func hasElements(index: Int) -> Bool {
@ -360,11 +360,11 @@ class ZipSink5_<E1, E2, E3, E4, E5, O: ObserverType> : ZipSink<O> {
let subscription4 = SingleAssignmentDisposable()
let subscription5 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4)
let observer5 = ZipObserver(lock: lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5)
let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4)
let observer5 = ZipObserver(lock: _lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5)
subscription1.disposable = _parent.source1.subscribe(observer1)
subscription2.disposable = _parent.source2.subscribe(observer2)
@ -407,10 +407,10 @@ class Zip5<E1, E2, E3, E4, E5, R> : Producer<R> {
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ZipSink5_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = ZipSink5_(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
@ -447,9 +447,9 @@ class ZipSink6_<E1, E2, E3, E4, E5, E6, O: ObserverType> : ZipSink<O> {
var _values5: Queue<E5> = Queue(capacity: 2)
var _values6: Queue<E6> = Queue(capacity: 2)
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(arity: 6, observer: observer, cancel: cancel)
super.init(arity: 6, observer: observer)
}
override func hasElements(index: Int) -> Bool {
@ -476,12 +476,12 @@ class ZipSink6_<E1, E2, E3, E4, E5, E6, O: ObserverType> : ZipSink<O> {
let subscription5 = SingleAssignmentDisposable()
let subscription6 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4)
let observer5 = ZipObserver(lock: lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5)
let observer6 = ZipObserver(lock: lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6)
let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4)
let observer5 = ZipObserver(lock: _lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5)
let observer6 = ZipObserver(lock: _lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6)
subscription1.disposable = _parent.source1.subscribe(observer1)
subscription2.disposable = _parent.source2.subscribe(observer2)
@ -528,10 +528,10 @@ class Zip6<E1, E2, E3, E4, E5, E6, R> : Producer<R> {
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ZipSink6_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = ZipSink6_(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
@ -569,9 +569,9 @@ class ZipSink7_<E1, E2, E3, E4, E5, E6, E7, O: ObserverType> : ZipSink<O> {
var _values6: Queue<E6> = Queue(capacity: 2)
var _values7: Queue<E7> = Queue(capacity: 2)
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(arity: 7, observer: observer, cancel: cancel)
super.init(arity: 7, observer: observer)
}
override func hasElements(index: Int) -> Bool {
@ -600,13 +600,13 @@ class ZipSink7_<E1, E2, E3, E4, E5, E6, E7, O: ObserverType> : ZipSink<O> {
let subscription6 = SingleAssignmentDisposable()
let subscription7 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4)
let observer5 = ZipObserver(lock: lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5)
let observer6 = ZipObserver(lock: lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6)
let observer7 = ZipObserver(lock: lock, parent: self, index: 6, setNextValue: { self._values7.enqueue($0) }, this: subscription7)
let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4)
let observer5 = ZipObserver(lock: _lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5)
let observer6 = ZipObserver(lock: _lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6)
let observer7 = ZipObserver(lock: _lock, parent: self, index: 6, setNextValue: { self._values7.enqueue($0) }, this: subscription7)
subscription1.disposable = _parent.source1.subscribe(observer1)
subscription2.disposable = _parent.source2.subscribe(observer2)
@ -657,10 +657,10 @@ class Zip7<E1, E2, E3, E4, E5, E6, E7, R> : Producer<R> {
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ZipSink7_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = ZipSink7_(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}
@ -699,9 +699,9 @@ class ZipSink8_<E1, E2, E3, E4, E5, E6, E7, E8, O: ObserverType> : ZipSink<O> {
var _values7: Queue<E7> = Queue(capacity: 2)
var _values8: Queue<E8> = Queue(capacity: 2)
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(arity: 8, observer: observer, cancel: cancel)
super.init(arity: 8, observer: observer)
}
override func hasElements(index: Int) -> Bool {
@ -732,14 +732,14 @@ class ZipSink8_<E1, E2, E3, E4, E5, E6, E7, E8, O: ObserverType> : ZipSink<O> {
let subscription7 = SingleAssignmentDisposable()
let subscription8 = SingleAssignmentDisposable()
let observer1 = ZipObserver(lock: lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4)
let observer5 = ZipObserver(lock: lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5)
let observer6 = ZipObserver(lock: lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6)
let observer7 = ZipObserver(lock: lock, parent: self, index: 6, setNextValue: { self._values7.enqueue($0) }, this: subscription7)
let observer8 = ZipObserver(lock: lock, parent: self, index: 7, setNextValue: { self._values8.enqueue($0) }, this: subscription8)
let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
let observer3 = ZipObserver(lock: _lock, parent: self, index: 2, setNextValue: { self._values3.enqueue($0) }, this: subscription3)
let observer4 = ZipObserver(lock: _lock, parent: self, index: 3, setNextValue: { self._values4.enqueue($0) }, this: subscription4)
let observer5 = ZipObserver(lock: _lock, parent: self, index: 4, setNextValue: { self._values5.enqueue($0) }, this: subscription5)
let observer6 = ZipObserver(lock: _lock, parent: self, index: 5, setNextValue: { self._values6.enqueue($0) }, this: subscription6)
let observer7 = ZipObserver(lock: _lock, parent: self, index: 6, setNextValue: { self._values7.enqueue($0) }, this: subscription7)
let observer8 = ZipObserver(lock: _lock, parent: self, index: 7, setNextValue: { self._values8.enqueue($0) }, this: subscription8)
subscription1.disposable = _parent.source1.subscribe(observer1)
subscription2.disposable = _parent.source2.subscribe(observer2)
@ -794,10 +794,10 @@ class Zip8<E1, E2, E3, E4, E5, E6, E7, E8, R> : Producer<R> {
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ZipSink8_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = ZipSink8_(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -38,9 +38,9 @@ class ZipSink<%= i %>_<<%= (Array(1...i).map { "E\($0)" }).joinWithSeparator(",
" var _values\($0): Queue<E\($0)> = Queue(capacity: 2)"
}).joinWithSeparator("\n") %>
init(parent: Parent, observer: O, cancel: Disposable) {
init(parent: Parent, observer: O) {
_parent = parent
super.init(arity: <%= i %>, observer: observer, cancel: cancel)
super.init(arity: <%= i %>, observer: observer)
}
override func hasElements(index: Int) -> Bool {
@ -61,7 +61,7 @@ class ZipSink<%= i %>_<<%= (Array(1...i).map { "E\($0)" }).joinWithSeparator(",
}).joinWithSeparator("\n") %>
<%= (Array(1...i).map {
" let observer\($0) = ZipObserver(lock: lock, parent: self, index: \($0 - 1), setNextValue: { self._values\($0).enqueue($0) }, this: subscription\($0))"
" let observer\($0) = ZipObserver(lock: _lock, parent: self, index: \($0 - 1), setNextValue: { self._values\($0).enqueue($0) }, this: subscription\($0))"
}).joinWithSeparator("\n") %>
<%= (Array(1...i).map {
@ -93,10 +93,10 @@ class Zip<%= i %><<%= (Array(1...i).map { "E\($0)" }).joinWithSeparator(", ") %>
_resultSelector = resultSelector
}
override func run<O: ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = ZipSink<%= i %>_(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
override func run<O: ObserverType where O.E == R>(observer: O) -> Disposable {
let sink = ZipSink<%= i %>_(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -18,18 +18,18 @@ protocol ZipSinkProtocol : class
class ZipSink<O: ObserverType> : Sink<O>, ZipSinkProtocol {
typealias Element = O.E
let arity: Int
let lock = NSRecursiveLock()
let _arity: Int
let _lock = NSRecursiveLock()
// state
private var _isDone: [Bool]
init(arity: Int, observer: O, cancel: Disposable) {
init(arity: Int, observer: O) {
_isDone = [Bool](count: arity, repeatedValue: false)
self.arity = arity
_arity = arity
super.init(observer: observer, cancel: cancel)
super.init(observer: observer)
}
func getResult() throws -> Element {
@ -43,7 +43,7 @@ class ZipSink<O: ObserverType> : Sink<O>, ZipSinkProtocol {
func next(index: Int) {
var hasValueAll = true
for i in 0 ..< arity {
for i in 0 ..< _arity {
if !hasElements(i) {
hasValueAll = false;
break;
@ -102,13 +102,16 @@ class ZipSink<O: ObserverType> : Sink<O>, ZipSinkProtocol {
}
}
class ZipObserver<ElementType> : ObserverType {
class ZipObserver<ElementType>
: ObserverType
, LockOwnerType
, SynchronizedOnType {
typealias E = ElementType
typealias ValueSetter = (ElementType) -> ()
private var _parent: ZipSinkProtocol?
private let _lock: NSRecursiveLock
let _lock: NSRecursiveLock
// state
private let _index: Int
@ -124,7 +127,10 @@ class ZipObserver<ElementType> : ObserverType {
}
func on(event: Event<E>) {
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
if let _ = _parent {
switch event {
case .Next(_):
@ -136,17 +142,15 @@ class ZipObserver<ElementType> : ObserverType {
}
}
_lock.performLocked {
if let parent = _parent {
switch event {
case .Next(let value):
_setNextValue(value)
parent.next(_index)
case .Error(let error):
parent.fail(error)
case .Completed:
parent.done(_index)
}
if let parent = _parent {
switch event {
case .Next(let value):
_setNextValue(value)
parent.next(_index)
case .Error(let error):
parent.fail(error)
case .Completed:
parent.done(_index)
}
}
}

View File

@ -19,8 +19,8 @@ class TailRecursiveSink<S: SequenceType, O: ObserverType where S.Generator.Eleme
// this is thread safe object
var _gate = AsyncLock()
override init(observer: O, cancel: Disposable) {
super.init(observer: observer, cancel: cancel)
override init(observer: O) {
super.init(observer: observer)
}
func run(sources: S.Generator) -> Disposable {

View File

@ -104,7 +104,7 @@ public class CurrentThreadScheduler : ImmediateSchedulerType {
if CurrentThreadScheduler.isScheduleRequired {
CurrentThreadScheduler.isScheduleRequired = false
action(state)
let disposable = action(state)
defer {
CurrentThreadScheduler.isScheduleRequired = true
@ -112,7 +112,7 @@ public class CurrentThreadScheduler : ImmediateSchedulerType {
}
guard let queue = CurrentThreadScheduler.queue else {
return NopDisposable.instance
return disposable
}
while let latest = queue.value.tryDequeue() {
@ -122,7 +122,7 @@ public class CurrentThreadScheduler : ImmediateSchedulerType {
latest.invoke()
}
return NopDisposable.instance
return disposable
}
let existingQueue = CurrentThreadScheduler.queue

View File

@ -25,17 +25,17 @@ compareTwoImplementations(benchmarkTime: true, first: {
//combineLatest(a,
publishSubject
//.shareReplay(1)
.shareReplay(1)
.map { $0 }
.filter { _ in true }// ){ x, _ in x }
//.map { $0 }
//.filter { _ in true }// ){ x, _ in x }
//.map { $0 }
.flatMap { just($0) }
//.flatMap { just($0) }
.subscribeNext { _ in
}
for i in 0..<100 {
for i in 0..<1000 {
publishSubject.on(.Next(i))
}

View File

@ -3057,12 +3057,16 @@ extension ObservableStandardSequenceOperatorsTest {
func testTakeLast_DecrementCountsFirst() {
let k = BehaviorSubject(value: false)
var elements = [Bool]()
_ = k.takeLast(1).subscribeNext { n in
elements.append(n)
k.on(.Next(!n))
}
k.on(.Completed)
XCTAssertEqual(elements, [false])
}
}

View File

@ -107,7 +107,7 @@ class RxTest: XCTestCase {
#if TRACE_RESOURCES
// give 5 sec to clean up resources
for var i = 0; i < 100; ++i {
for var i = 0; i < 10; ++i {
if self.startResourceCount < resourceCount {
// main schedulers need to finish work
NSRunLoop.currentRunLoop().runMode(NSDefaultRunLoopMode, beforeDate: NSDate(timeIntervalSinceNow: 0.05))

View File

@ -52,7 +52,7 @@
</AdditionalOptions>
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
buildConfiguration = "Release"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
launchStyle = "0"

View File

@ -52,7 +52,7 @@
</AdditionalOptions>
</TestAction>
<LaunchAction
buildConfiguration = "Release-Tests"
buildConfiguration = "Debug"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
launchStyle = "0"

View File

@ -27,7 +27,7 @@
</AdditionalOptions>
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
buildConfiguration = "Release"
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
launchStyle = "0"