From fb970bb4e75bf355f97a9ecefdd7bb69684b3e10 Mon Sep 17 00:00:00 2001 From: Krunoslav Zaher Date: Sun, 6 Dec 2015 14:47:17 +0100 Subject: [PATCH] Adds `shareReplayLatestWhileConnected()`. --- .../ShareReplay1WhileConnected.swift | 92 +++++++++++++++++++ RxSwift/Observables/Observable+Binding.swift | 18 +++- 2 files changed, 109 insertions(+), 1 deletion(-) create mode 100644 RxSwift/Observables/Implementations/ShareReplay1WhileConnected.swift diff --git a/RxSwift/Observables/Implementations/ShareReplay1WhileConnected.swift b/RxSwift/Observables/Implementations/ShareReplay1WhileConnected.swift new file mode 100644 index 00000000..af776b1c --- /dev/null +++ b/RxSwift/Observables/Implementations/ShareReplay1WhileConnected.swift @@ -0,0 +1,92 @@ +// +// ShareReplay1WhileConnected.swift +// Rx +// +// Created by Krunoslav Zaher on 12/6/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +// optimized version of share replay for most common case +final class ShareReplay1WhileConnected + : Observable + , ObserverType + , SynchronizedUnsubscribeType { + + typealias DisposeKey = Bag>.KeyType + + private let _source: Observable + + private var _lock = NSRecursiveLock() + + private var _connection: SingleAssignmentDisposable? + private var _element: Element? + private var _observers = Bag>() + + init(source: Observable) { + self._source = source + } + + override func subscribe(observer: O) -> Disposable { + _lock.lock(); defer { _lock.unlock() } + return _synchronized_subscribe(observer) + } + + func _synchronized_subscribe(observer: O) -> Disposable { + if let element = self._element { + observer.on(.Next(element)) + } + + let initialCount = self._observers.count + + let disposeKey = self._observers.insert(AnyObserver(observer)) + + if initialCount == 0 { + let connection = SingleAssignmentDisposable() + _connection = connection + + connection.disposable = self._source.subscribe(self) + } + + return SubscriptionDisposable(owner: self, key: disposeKey) + } + + func synchronizedUnsubscribe(disposeKey: DisposeKey) { + _lock.lock(); defer { _lock.unlock() } + _synchronized_unsubscribe(disposeKey) + } + + func _synchronized_unsubscribe(disposeKey: DisposeKey) { + // if already unsubscribed, just return + if self._observers.removeKey(disposeKey) == nil { + return + } + + if _observers.count == 0 { + _connection?.dispose() + _connection = nil + _element = nil + } + } + + func on(event: Event) { + _lock.lock(); defer { _lock.unlock() } + _synchronized_on(event) + } + + func _synchronized_on(event: Event) { + switch event { + case .Next(let element): + _element = element + _observers.on(event) + case .Error, .Completed: + _element = nil + _connection?.dispose() + _connection = nil + let observers = _observers + _observers = Bag() + observers.on(event) + } + } +} \ No newline at end of file diff --git a/RxSwift/Observables/Observable+Binding.swift b/RxSwift/Observables/Observable+Binding.swift index 907eca3f..e6804930 100644 --- a/RxSwift/Observables/Observable+Binding.swift +++ b/RxSwift/Observables/Observable+Binding.swift @@ -136,7 +136,7 @@ extension ObservableType { extension ObservableType { /** - Returns an observable sequence that shares a single subscription to the underlying sequence replaying notifications subject to a maximum time length for the replay buffer. + Returns an observable sequence that shares a single subscription to the underlying sequence, and immediately upon subscription replays maximum number of elements in buffer. This operator is a specialization of replay which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed. @@ -153,4 +153,20 @@ extension ObservableType { return self.replay(bufferSize).refCount() } } + + /** + Returns an observable sequence that shares a single subscription to the underlying sequence, and immediately upon subscription replays latest element in buffer. + + This operator is a specialization of replay which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed. + + Unlike `shareReplay(bufferSize: Int)`, this operator will clear latest element from replay buffer in case number of subscribers drops from one to zero. In case sequence + completes or errors out replay buffer is also cleared. + + - returns: An observable sequence that contains the elements of a sequence produced by multicasting the source sequence. + */ + @warn_unused_result(message="http://git.io/rxs.uo") + public func shareReplayLatestWhileConnected() + -> Observable { + return ShareReplay1WhileConnected(source: self.asObservable()) + } } \ No newline at end of file