diff --git a/RxSwift/Subjects/BehaviorSubject.swift b/RxSwift/Subjects/BehaviorSubject.swift index 7623332d..4059ce87 100644 --- a/RxSwift/Subjects/BehaviorSubject.swift +++ b/RxSwift/Subjects/BehaviorSubject.swift @@ -8,37 +8,24 @@ import Foundation -private class BehaviorSubjectSubscription : Disposable { - typealias Parent = BehaviorSubject - typealias DisposeKey = Bag>.KeyType - - private let _parent: Parent - private var _disposeKey: DisposeKey? - - init(parent: BehaviorSubject, disposeKey: DisposeKey) { - _parent = parent - _disposeKey = disposeKey - } - - func dispose() { - _parent._lock.performLocked { - if let disposeKey = _disposeKey { - _parent._observers.removeKey(disposeKey) - _disposeKey = nil - } - } - } -} - /** Represents a value that changes over time. Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications. */ -public final class BehaviorSubject : Observable, SubjectType, ObserverType, Disposable { +public final class BehaviorSubject + : Observable + , SubjectType + , ObserverType + , LockOwnerType + , SynchronizedOnType + , SynchronizedSubscribeType + , SynchronizedUnsubscribeType + , Disposable { public typealias SubjectObserverType = BehaviorSubject + typealias DisposeKey = Bag>.KeyType - private let _lock = NSRecursiveLock() + let _lock = NSRecursiveLock() // state private var _disposed = false @@ -50,9 +37,7 @@ public final class BehaviorSubject : Observable, SubjectType, Indicates whether the subject has been disposed. */ public var disposed: Bool { - return _lock.calculateLocked { - return _disposed - } + return _disposed } /** @@ -70,7 +55,7 @@ public final class BehaviorSubject : Observable, SubjectType, - returns: Latest value. */ public func value() throws -> Element { - return try _lock.calculateLockedOrFail { + _lock.lock(); defer { _lock.unlock() } // { if _disposed { throw RxError.DisposedError } @@ -82,7 +67,7 @@ public final class BehaviorSubject : Observable, SubjectType, else { return _value } - } + //} } /** @@ -91,20 +76,22 @@ public final class BehaviorSubject : Observable, SubjectType, - parameter event: Event to send to the observers. */ public func on(event: Event) { - _lock.performLocked { - if _stoppedEvent != nil || _disposed { - return - } - - switch event { - case .Next(let value): - _value = value - case .Error, .Completed: - _stoppedEvent = event - } - - _observers.on(event) + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { + if _stoppedEvent != nil || _disposed { + return } + + switch event { + case .Next(let value): + _value = value + case .Error, .Completed: + _stoppedEvent = event + } + + _observers.on(event) } /** @@ -114,22 +101,32 @@ public final class BehaviorSubject : Observable, SubjectType, - returns: Disposable object that can be used to unsubscribe the observer from the subject. */ public override func subscribe(observer: O) -> Disposable { - return _lock.calculateLocked { - if _disposed { - observer.on(.Error(RxError.DisposedError)) - return NopDisposable.instance - } - - if let stoppedEvent = _stoppedEvent { - observer.on(stoppedEvent) - return NopDisposable.instance - } - - let key = _observers.insert(observer.asObserver()) - observer.on(.Next(_value)) - - return BehaviorSubjectSubscription(parent: self, disposeKey: key) + return synchronizedSubscribe(observer) + } + + func _synchronized_subscribe(observer: O) -> Disposable { + if _disposed { + observer.on(.Error(RxError.DisposedError)) + return NopDisposable.instance } + + if let stoppedEvent = _stoppedEvent { + observer.on(stoppedEvent) + return NopDisposable.instance + } + + let key = _observers.insert(observer.asObserver()) + observer.on(.Next(_value)) + + return SubscriptionDisposable(owner: self, key: key) + } + + func _synchronized_unsubscribe(disposeKey: DisposeKey) { + if _disposed { + return + } + + _ = _observers.removeKey(disposeKey) } /** diff --git a/RxSwift/Subjects/ReplaySubject.swift b/RxSwift/Subjects/ReplaySubject.swift index 97332e4c..3ad58d4f 100644 --- a/RxSwift/Subjects/ReplaySubject.swift +++ b/RxSwift/Subjects/ReplaySubject.swift @@ -13,7 +13,11 @@ Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies. */ -public class ReplaySubject : Observable, SubjectType, ObserverType, Disposable { +public class ReplaySubject + : Observable + , SubjectType + , ObserverType + , Disposable { public typealias SubjectObserverType = ReplaySubject typealias DisposeKey = Bag>.KeyType @@ -60,9 +64,15 @@ public class ReplaySubject : Observable, SubjectType, Observer } } -class ReplayBufferBase : ReplaySubject { +class ReplayBufferBase + : ReplaySubject + , LockOwnerType + , SynchronizedOnType + , SynchronizedSubscribeType + , SynchronizedUnsubscribeType + , SynchronizedDisposeType { - private let _lock = NSRecursiveLock() + let _lock = NSRecursiveLock() // state private var _disposed = false @@ -86,73 +96,72 @@ class ReplayBufferBase : ReplaySubject { } override func on(event: Event) { - _lock.performLocked { - if _disposed { - return - } - - if _stoppedEvent != nil { - return - } - - switch event { - case .Next(let value): - addValueToBuffer(value) - trim() - _observers.on(event) - case .Error, .Completed: - _stoppedEvent = event - trim() - _observers.on(event) - _observers.removeAll() - } - + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { + if _disposed { + return + } + + if _stoppedEvent != nil { + return + } + + switch event { + case .Next(let value): + addValueToBuffer(value) + trim() + _observers.on(event) + case .Error, .Completed: + _stoppedEvent = event + trim() + _observers.on(event) + _observers.removeAll() } } override func subscribe(observer: O) -> Disposable { - return _lock.calculateLocked { - if _disposed { - observer.on(.Error(RxError.DisposedError)) - return NopDisposable.instance - } - - let AnyObserver = observer.asObserver() - - replayBuffer(AnyObserver) - if let stoppedEvent = _stoppedEvent { - observer.on(stoppedEvent) - return NopDisposable.instance - } - else { - let key = _observers.insert(AnyObserver) - return ReplaySubscription(subject: self, disposeKey: key) - } - } + return synchronizedSubscribe(observer) } - - override func unsubscribe(key: DisposeKey) { - _lock.performLocked { - if _disposed { - return - } - - _ = _observers.removeKey(key) + + func _synchronized_subscribe(observer: O) -> Disposable { + if _disposed { + observer.on(.Error(RxError.DisposedError)) + return NopDisposable.instance + } + + let AnyObserver = observer.asObserver() + + replayBuffer(AnyObserver) + if let stoppedEvent = _stoppedEvent { + observer.on(stoppedEvent) + return NopDisposable.instance + } + else { + let key = _observers.insert(AnyObserver) + return SubscriptionDisposable(owner: self, key: key) } } - func lockedDispose() { - _disposed = true - _stoppedEvent = nil - _observers.removeAll() + func _synchronized_unsubscribe(disposeKey: DisposeKey) { + if _disposed { + return + } + + _ = _observers.removeKey(disposeKey) } override func dispose() { super.dispose() - - _lock.performLocked { - lockedDispose() - } + + synchronizedDispose() + } + + func _synchronized_dispose() { + _disposed = true + _stoppedEvent = nil + _observers.removeAll() } } @@ -176,10 +185,9 @@ class ReplayOne : ReplayBufferBase { observer.on(.Next(value)) } } - - override func lockedDispose() { - super.lockedDispose() - + + override func _synchronized_dispose() { + super._synchronized_dispose() _value = nil } } @@ -200,9 +208,9 @@ class ReplayManyBase : ReplayBufferBase { observer.on(.Next(item)) } } - - override func lockedDispose() { - super.lockedDispose() + + override func _synchronized_dispose() { + super._synchronized_dispose() _queue = Queue(capacity: 0) } } @@ -231,34 +239,4 @@ class ReplayAll : ReplayManyBase { override func trim() { } -} - -class ReplaySubscription : Disposable { - typealias Subject = ReplaySubject - typealias DisposeKey = ReplayBufferBase.DisposeKey - - private var _lock = SpinLock() - - // state - private var _subject: Subject? - private var _disposeKey: DisposeKey? - - init(subject: Subject, disposeKey: DisposeKey) { - _subject = subject - _disposeKey = disposeKey - } - - func dispose() { - let oldState = _lock.calculateLocked { () -> (Subject?, DisposeKey?) in - let state = (self._subject, self._disposeKey) - self._subject = nil - self._disposeKey = nil - - return state - } - - if let subject = oldState.0, let disposeKey = oldState.1 { - subject.unsubscribe(disposeKey) - } - } -} +} \ No newline at end of file