Adds `shareReplayLatestWhileConnected()`.

This commit is contained in:
Krunoslav Zaher 2015-12-06 14:47:17 +01:00
parent 827ee57fff
commit fb970bb4e7
2 changed files with 109 additions and 1 deletions

View File

@ -0,0 +1,92 @@
//
// ShareReplay1WhileConnected.swift
// Rx
//
// Created by Krunoslav Zaher on 12/6/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
// optimized version of share replay for most common case
final class ShareReplay1WhileConnected<Element>
: Observable<Element>
, ObserverType
, SynchronizedUnsubscribeType {
typealias DisposeKey = Bag<AnyObserver<Element>>.KeyType
private let _source: Observable<Element>
private var _lock = NSRecursiveLock()
private var _connection: SingleAssignmentDisposable?
private var _element: Element?
private var _observers = Bag<AnyObserver<Element>>()
init(source: Observable<Element>) {
self._source = source
}
override func subscribe<O : ObserverType where O.E == E>(observer: O) -> Disposable {
_lock.lock(); defer { _lock.unlock() }
return _synchronized_subscribe(observer)
}
func _synchronized_subscribe<O : ObserverType where O.E == E>(observer: O) -> Disposable {
if let element = self._element {
observer.on(.Next(element))
}
let initialCount = self._observers.count
let disposeKey = self._observers.insert(AnyObserver(observer))
if initialCount == 0 {
let connection = SingleAssignmentDisposable()
_connection = connection
connection.disposable = self._source.subscribe(self)
}
return SubscriptionDisposable(owner: self, key: disposeKey)
}
func synchronizedUnsubscribe(disposeKey: DisposeKey) {
_lock.lock(); defer { _lock.unlock() }
_synchronized_unsubscribe(disposeKey)
}
func _synchronized_unsubscribe(disposeKey: DisposeKey) {
// if already unsubscribed, just return
if self._observers.removeKey(disposeKey) == nil {
return
}
if _observers.count == 0 {
_connection?.dispose()
_connection = nil
_element = nil
}
}
func on(event: Event<E>) {
_lock.lock(); defer { _lock.unlock() }
_synchronized_on(event)
}
func _synchronized_on(event: Event<E>) {
switch event {
case .Next(let element):
_element = element
_observers.on(event)
case .Error, .Completed:
_element = nil
_connection?.dispose()
_connection = nil
let observers = _observers
_observers = Bag()
observers.on(event)
}
}
}

View File

@ -136,7 +136,7 @@ extension ObservableType {
extension ObservableType {
/**
Returns an observable sequence that shares a single subscription to the underlying sequence replaying notifications subject to a maximum time length for the replay buffer.
Returns an observable sequence that shares a single subscription to the underlying sequence, and immediately upon subscription replays maximum number of elements in buffer.
This operator is a specialization of replay which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed.
@ -153,4 +153,20 @@ extension ObservableType {
return self.replay(bufferSize).refCount()
}
}
/**
Returns an observable sequence that shares a single subscription to the underlying sequence, and immediately upon subscription replays latest element in buffer.
This operator is a specialization of replay which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed.
Unlike `shareReplay(bufferSize: Int)`, this operator will clear latest element from replay buffer in case number of subscribers drops from one to zero. In case sequence
completes or errors out replay buffer is also cleared.
- returns: An observable sequence that contains the elements of a sequence produced by multicasting the source sequence.
*/
@warn_unused_result(message="http://git.io/rxs.uo")
public func shareReplayLatestWhileConnected()
-> Observable<E> {
return ShareReplay1WhileConnected(source: self.asObservable())
}
}