Refactors `ReplaySubject` and `BehaviorSubject` to be consistent with `ReplaySubject`.

This commit is contained in:
Krunoslav Zaher 2015-11-01 12:47:06 +01:00
parent d0824eca4f
commit d4cd9cf69b
2 changed files with 128 additions and 153 deletions

View File

@ -8,37 +8,24 @@
import Foundation
private class BehaviorSubjectSubscription<Element> : Disposable {
typealias Parent = BehaviorSubject<Element>
typealias DisposeKey = Bag<AnyObserver<Element>>.KeyType
private let _parent: Parent
private var _disposeKey: DisposeKey?
init(parent: BehaviorSubject<Element>, disposeKey: DisposeKey) {
_parent = parent
_disposeKey = disposeKey
}
func dispose() {
_parent._lock.performLocked {
if let disposeKey = _disposeKey {
_parent._observers.removeKey(disposeKey)
_disposeKey = nil
}
}
}
}
/**
Represents a value that changes over time.
Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.
*/
public final class BehaviorSubject<Element> : Observable<Element>, SubjectType, ObserverType, Disposable {
public final class BehaviorSubject<Element>
: Observable<Element>
, SubjectType
, ObserverType
, LockOwnerType
, SynchronizedOnType
, SynchronizedSubscribeType
, SynchronizedUnsubscribeType
, Disposable {
public typealias SubjectObserverType = BehaviorSubject<Element>
typealias DisposeKey = Bag<AnyObserver<Element>>.KeyType
private let _lock = NSRecursiveLock()
let _lock = NSRecursiveLock()
// state
private var _disposed = false
@ -50,9 +37,7 @@ public final class BehaviorSubject<Element> : Observable<Element>, SubjectType,
Indicates whether the subject has been disposed.
*/
public var disposed: Bool {
return _lock.calculateLocked {
return _disposed
}
return _disposed
}
/**
@ -70,7 +55,7 @@ public final class BehaviorSubject<Element> : Observable<Element>, SubjectType,
- returns: Latest value.
*/
public func value() throws -> Element {
return try _lock.calculateLockedOrFail {
_lock.lock(); defer { _lock.unlock() } // {
if _disposed {
throw RxError.DisposedError
}
@ -82,7 +67,7 @@ public final class BehaviorSubject<Element> : Observable<Element>, SubjectType,
else {
return _value
}
}
//}
}
/**
@ -91,20 +76,22 @@ public final class BehaviorSubject<Element> : Observable<Element>, SubjectType,
- parameter event: Event to send to the observers.
*/
public func on(event: Event<E>) {
_lock.performLocked {
if _stoppedEvent != nil || _disposed {
return
}
switch event {
case .Next(let value):
_value = value
case .Error, .Completed:
_stoppedEvent = event
}
_observers.on(event)
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
if _stoppedEvent != nil || _disposed {
return
}
switch event {
case .Next(let value):
_value = value
case .Error, .Completed:
_stoppedEvent = event
}
_observers.on(event)
}
/**
@ -114,22 +101,32 @@ public final class BehaviorSubject<Element> : Observable<Element>, SubjectType,
- returns: Disposable object that can be used to unsubscribe the observer from the subject.
*/
public override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
return _lock.calculateLocked {
if _disposed {
observer.on(.Error(RxError.DisposedError))
return NopDisposable.instance
}
if let stoppedEvent = _stoppedEvent {
observer.on(stoppedEvent)
return NopDisposable.instance
}
let key = _observers.insert(observer.asObserver())
observer.on(.Next(_value))
return BehaviorSubjectSubscription(parent: self, disposeKey: key)
return synchronizedSubscribe(observer)
}
func _synchronized_subscribe<O : ObserverType where O.E == E>(observer: O) -> Disposable {
if _disposed {
observer.on(.Error(RxError.DisposedError))
return NopDisposable.instance
}
if let stoppedEvent = _stoppedEvent {
observer.on(stoppedEvent)
return NopDisposable.instance
}
let key = _observers.insert(observer.asObserver())
observer.on(.Next(_value))
return SubscriptionDisposable(owner: self, key: key)
}
func _synchronized_unsubscribe(disposeKey: DisposeKey) {
if _disposed {
return
}
_ = _observers.removeKey(disposeKey)
}
/**

View File

@ -13,7 +13,11 @@ Represents an object that is both an observable sequence as well as an observer.
Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.
*/
public class ReplaySubject<Element> : Observable<Element>, SubjectType, ObserverType, Disposable {
public class ReplaySubject<Element>
: Observable<Element>
, SubjectType
, ObserverType
, Disposable {
public typealias SubjectObserverType = ReplaySubject<Element>
typealias DisposeKey = Bag<AnyObserver<Element>>.KeyType
@ -60,9 +64,15 @@ public class ReplaySubject<Element> : Observable<Element>, SubjectType, Observer
}
}
class ReplayBufferBase<Element> : ReplaySubject<Element> {
class ReplayBufferBase<Element>
: ReplaySubject<Element>
, LockOwnerType
, SynchronizedOnType
, SynchronizedSubscribeType
, SynchronizedUnsubscribeType
, SynchronizedDisposeType {
private let _lock = NSRecursiveLock()
let _lock = NSRecursiveLock()
// state
private var _disposed = false
@ -86,73 +96,72 @@ class ReplayBufferBase<Element> : ReplaySubject<Element> {
}
override func on(event: Event<Element>) {
_lock.performLocked {
if _disposed {
return
}
if _stoppedEvent != nil {
return
}
switch event {
case .Next(let value):
addValueToBuffer(value)
trim()
_observers.on(event)
case .Error, .Completed:
_stoppedEvent = event
trim()
_observers.on(event)
_observers.removeAll()
}
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
if _disposed {
return
}
if _stoppedEvent != nil {
return
}
switch event {
case .Next(let value):
addValueToBuffer(value)
trim()
_observers.on(event)
case .Error, .Completed:
_stoppedEvent = event
trim()
_observers.on(event)
_observers.removeAll()
}
}
override func subscribe<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
return _lock.calculateLocked {
if _disposed {
observer.on(.Error(RxError.DisposedError))
return NopDisposable.instance
}
let AnyObserver = observer.asObserver()
replayBuffer(AnyObserver)
if let stoppedEvent = _stoppedEvent {
observer.on(stoppedEvent)
return NopDisposable.instance
}
else {
let key = _observers.insert(AnyObserver)
return ReplaySubscription(subject: self, disposeKey: key)
}
}
return synchronizedSubscribe(observer)
}
override func unsubscribe(key: DisposeKey) {
_lock.performLocked {
if _disposed {
return
}
_ = _observers.removeKey(key)
func _synchronized_subscribe<O : ObserverType where O.E == E>(observer: O) -> Disposable {
if _disposed {
observer.on(.Error(RxError.DisposedError))
return NopDisposable.instance
}
let AnyObserver = observer.asObserver()
replayBuffer(AnyObserver)
if let stoppedEvent = _stoppedEvent {
observer.on(stoppedEvent)
return NopDisposable.instance
}
else {
let key = _observers.insert(AnyObserver)
return SubscriptionDisposable(owner: self, key: key)
}
}
func lockedDispose() {
_disposed = true
_stoppedEvent = nil
_observers.removeAll()
func _synchronized_unsubscribe(disposeKey: DisposeKey) {
if _disposed {
return
}
_ = _observers.removeKey(disposeKey)
}
override func dispose() {
super.dispose()
_lock.performLocked {
lockedDispose()
}
synchronizedDispose()
}
func _synchronized_dispose() {
_disposed = true
_stoppedEvent = nil
_observers.removeAll()
}
}
@ -176,10 +185,9 @@ class ReplayOne<Element> : ReplayBufferBase<Element> {
observer.on(.Next(value))
}
}
override func lockedDispose() {
super.lockedDispose()
override func _synchronized_dispose() {
super._synchronized_dispose()
_value = nil
}
}
@ -200,9 +208,9 @@ class ReplayManyBase<Element> : ReplayBufferBase<Element> {
observer.on(.Next(item))
}
}
override func lockedDispose() {
super.lockedDispose()
override func _synchronized_dispose() {
super._synchronized_dispose()
_queue = Queue(capacity: 0)
}
}
@ -231,34 +239,4 @@ class ReplayAll<Element> : ReplayManyBase<Element> {
override func trim() {
}
}
class ReplaySubscription<Element> : Disposable {
typealias Subject = ReplaySubject<Element>
typealias DisposeKey = ReplayBufferBase<Element>.DisposeKey
private var _lock = SpinLock()
// state
private var _subject: Subject?
private var _disposeKey: DisposeKey?
init(subject: Subject, disposeKey: DisposeKey) {
_subject = subject
_disposeKey = disposeKey
}
func dispose() {
let oldState = _lock.calculateLocked { () -> (Subject?, DisposeKey?) in
let state = (self._subject, self._disposeKey)
self._subject = nil
self._disposeKey = nil
return state
}
if let subject = oldState.0, let disposeKey = oldState.1 {
subject.unsubscribe(disposeKey)
}
}
}
}