Improves tail recursive optimizations for `concat`, documents then and adds additional unit tests.

This commit is contained in:
Krunoslav Zaher 2015-12-13 00:07:57 +01:00
parent 0b5b9fe97a
commit d201397fa3
9 changed files with 170 additions and 26 deletions

View File

@ -339,6 +339,21 @@ extension SequenceType where Generator.Element : DriverConvertibleType {
}
}
extension CollectionType where Generator.Element : DriverConvertibleType {
/**
Concatenates all observable sequences in the given sequence, as long as the previous observable sequence terminated successfully.
- returns: An observable sequence that contains the elements of each given sequence, in sequential order.
*/
@warn_unused_result(message="http://git.io/rxs.uo")
public func concat()
-> Driver<Generator.Element.E> {
let source: Observable<Generator.Element.E> = self.map { $0.asDriver() }.concat()
return Driver<Generator.Element.E>(source)
}
}
extension CollectionType where Generator.Element : DriverConvertibleType {
/**

View File

@ -135,9 +135,9 @@ class CatchSequenceSink<S: SequenceType, O: ObserverType where S.Generator.Eleme
self.dispose()
}
override func extract(observable: Observable<Element>) -> S.Generator? {
override func extract(observable: Observable<Element>) -> SequenceGenerator? {
if let onError = observable as? CatchSequence<S> {
return onError.sources.generate()
return (onError.sources.generate(), nil)
}
else {
return nil
@ -156,7 +156,7 @@ class CatchSequence<S: SequenceType where S.Generator.Element : ObservableConver
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = CatchSequenceSink<S, O>(observer: observer)
sink.disposable = sink.run(self.sources.generate())
sink.disposable = sink.run((self.sources.generate(), nil))
return sink
}
}

View File

@ -34,9 +34,9 @@ class ConcatSink<S: SequenceType, O: ObserverType where S.Generator.Element : Ob
return source.subscribe(self)
}
override func extract(observable: Observable<E>) -> S.Generator? {
override func extract(observable: Observable<E>) -> SequenceGenerator? {
if let source = observable as? Concat<S> {
return source._sources.generate()
return (source._sources.generate(), source._count)
}
else {
return nil
@ -48,14 +48,16 @@ class Concat<S: SequenceType where S.Generator.Element : ObservableConvertibleTy
typealias Element = S.Generator.Element.E
private let _sources: S
init(sources: S) {
private let _count: IntMax?
init(sources: S, count: IntMax?) {
_sources = sources
_count = count
}
override func run<O: ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = ConcatSink<S, O>(observer: observer)
sink.disposable = sink.run(_sources.generate())
sink.disposable = sink.run((_sources.generate(), _count))
return sink
}
}

View File

@ -111,13 +111,11 @@ class RetryWhenSequenceSink<S: SequenceType, O: ObserverType, TriggerObservable:
dispose()
}
override func extract(observable: Observable<E>) -> S.Generator? {
if let onError = observable as? RetryWhenSequence<S, TriggerObservable, Error> {
return onError._sources.generate()
}
else {
return nil
}
override func extract(observable: Observable<E>) -> SequenceGenerator? {
// It is important to always return `nil` here because there are sideffects in the `run` method
// that are dependant on particular `retryWhen` operator so single operator stack can't be reused in this
// case.
return nil
}
override func subscribeToNext(source: Observable<E>) -> Disposable {
@ -126,7 +124,7 @@ class RetryWhenSequenceSink<S: SequenceType, O: ObserverType, TriggerObservable:
return iter
}
override func run(sources: S.Generator) -> Disposable {
override func run(sources: SequenceGenerator) -> Disposable {
let triggerSubscription = _handler.subscribe(_notifier.asObserver())
let superSubscription = super.run(sources)
return StableCompositeDisposable.create(superSubscription, triggerSubscription)
@ -146,7 +144,7 @@ class RetryWhenSequence<S: SequenceType, TriggerObservable: ObservableType, Erro
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = RetryWhenSequenceSink<S, O, TriggerObservable, Error>(parent: self, observer: observer)
sink.disposable = sink.run(self._sources.generate())
sink.disposable = sink.run((self._sources.generate(), nil))
return sink
}
}

View File

@ -80,12 +80,39 @@ extension SequenceType where Generator.Element : ObservableConvertibleType {
/**
Concatenates all observable sequences in the given sequence, as long as the previous observable sequence terminated successfully.
This operator has tail recursive optimizations that will prevent stack overflow.
Optimizations will be performed in cases equivalent to following:
[1, [2, [3, .....].concat()].concat].concat()
- returns: An observable sequence that contains the elements of each given sequence, in sequential order.
*/
@warn_unused_result(message="http://git.io/rxs.uo")
public func concat()
-> Observable<Generator.Element.E> {
return Concat(sources: self)
return Concat(sources: self, count: nil)
}
}
extension CollectionType where Generator.Element : ObservableConvertibleType {
/**
Concatenates all observable sequences in the given sequence, as long as the previous observable sequence terminated successfully.
This operator has tail recursive optimizations that will prevent stack overflow and enable generating
infinite observable sequences while using limited amount of memory during generation.
Optimizations will be performed in cases equivalent to following:
[1, [2, [3, .....].concat()].concat].concat()
- returns: An observable sequence that contains the elements of each given sequence, in sequential order.
*/
@warn_unused_result(message="http://git.io/rxs.uo")
public func concat()
-> Observable<Generator.Element.E> {
return Concat(sources: self, count: self.count.toIntMax())
}
}

View File

@ -13,14 +13,19 @@ enum TailRecursiveSinkCommand {
case Dispose
}
#if DEBUG || TRACE_RESOURCES
public var maxTailRecursiveSinkStackSize = 0
#endif
/// This class is usually used with `Generator` version of the operators.
class TailRecursiveSink<S: SequenceType, O: ObserverType where S.Generator.Element: ObservableConvertibleType, S.Generator.Element.E == O.E>
: Sink<O>
, InvocableWithValueType {
typealias Value = TailRecursiveSinkCommand
typealias E = O.E
typealias SequenceGenerator = (generator: S.Generator, remaining: IntMax?)
var _generators:[S.Generator] = []
var _generators: [SequenceGenerator] = []
var _disposed = false
var _subscription = SerialDisposable()
@ -31,7 +36,7 @@ class TailRecursiveSink<S: SequenceType, O: ObserverType where S.Generator.Eleme
super.init(observer: observer)
}
func run(sources: S.Generator) -> Disposable {
func run(sources: SequenceGenerator) -> Disposable {
_generators.append(sources)
schedule(.MoveNext)
@ -58,7 +63,7 @@ class TailRecursiveSink<S: SequenceType, O: ObserverType where S.Generator.Eleme
dispose()
}
func extract(observable: Observable<E>) -> S.Generator? {
func extract(observable: Observable<E>) -> SequenceGenerator? {
abstractMethod()
}
@ -76,11 +81,26 @@ class TailRecursiveSink<S: SequenceType, O: ObserverType where S.Generator.Eleme
return
}
var e = _generators.last!
var (e, left) = _generators.last!
let nextCandidate = e.next()?.asObservable()
_generators.removeLast()
_generators.append(e)
// `left` is a hint of how many elements are left in generator.
// In case this is the last element, then there is no need to push
// that generator on stack.
//
// This is an optimization used to make sure in tail recursive case
// there is no memory leak in case this operator is used to generate non terminating
// sequence.
if let knownLeft = left {
left = knownLeft - 1
}
if left ?? 1 > 0 {
_generators.append((e, left))
}
if nextCandidate == nil {
_generators.removeLast()
@ -90,7 +110,12 @@ class TailRecursiveSink<S: SequenceType, O: ObserverType where S.Generator.Eleme
let nextGenerator = extract(nextCandidate!)
if let nextGenerator = nextGenerator {
self._generators.append(nextGenerator)
_generators.append(nextGenerator)
#if DEBUG || TRACE_RESOURCES
if maxTailRecursiveSinkStackSize < _generators.count {
maxTailRecursiveSinkStackSize = _generators.count
}
#endif
}
else {
next = nextCandidate

View File

@ -728,6 +728,32 @@ extension DriverTest {
// MARK: concat
extension DriverTest {
func testAsDriver_concat_sequenceType() {
let hotObservable1 = BackgroundThreadPrimitiveHotObservable<Int>()
let hotObservable2 = MainThreadPrimitiveHotObservable<Int>()
let driver = AnySequence([hotObservable1.asDriver(onErrorJustReturn: -1), hotObservable2.asDriver(onErrorJustReturn: -2)]).concat()
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
XCTAssertTrue(hotObservable1.subscriptions == [SubscribedToHotObservable])
hotObservable1.on(.Next(1))
hotObservable1.on(.Next(2))
hotObservable1.on(.Error(testError))
XCTAssertTrue(hotObservable1.subscriptions == [UnsunscribedFromHotObservable])
XCTAssertTrue(hotObservable2.subscriptions == [SubscribedToHotObservable])
hotObservable2.on(.Next(4))
hotObservable2.on(.Next(5))
hotObservable2.on(.Error(testError))
XCTAssertTrue(hotObservable2.subscriptions == [UnsunscribedFromHotObservable])
}
XCTAssertEqual(results, [1, 2, -1, 4, 5, -2])
}
func testAsDriver_concat() {
let hotObservable1 = BackgroundThreadPrimitiveHotObservable<Int>()
let hotObservable2 = MainThreadPrimitiveHotObservable<Int>()

View File

@ -915,6 +915,25 @@ extension ObservableMultipleTest {
}
}
// this generates
// [generator(0), [generator(1), [generator(2), ..].concat()].concat()].concat()
func generateCollection<T>(startIndex: Int, _ generator: Int -> Observable<T>) -> Observable<T> {
let all = [0, 1].lazy.map { i in
return i == 0 ? generator(startIndex) : generateCollection(startIndex + 1, generator)
}
return all.concat()
}
// this generates
// [generator(0), [generator(1), [generator(2), ..].concat()].concat()].concat()
// This should
func generateSequence<T>(startIndex: Int, _ generator: Int -> Observable<T>) -> Observable<T> {
let all = AnySequence([0, 1].lazy.map { i in
return i == 0 ? generator(startIndex) : generateSequence(startIndex + 1, generator)
})
return all.concat()
}
// MARK: concat
extension ObservableMultipleTest {
func testConcat_DefaultScheduler() {
@ -1501,6 +1520,34 @@ extension ObservableMultipleTest {
])
}
#if DEBUG || TRACE_RESOURCES
func testConcat_TailRecursionCollection() {
maxTailRecursiveSinkStackSize = 0
let elements = try! generateCollection(0) { i in
just(i, scheduler: CurrentThreadScheduler.instance)
}
.take(10000)
.toBlocking()
.toArray()
XCTAssertEqual(elements, Array(0 ..< 10000))
XCTAssertEqual(maxTailRecursiveSinkStackSize, 1)
}
func testConcat_TailRecursionSequence() {
maxTailRecursiveSinkStackSize = 0
let elements = try! generateSequence(0) { i in
just(i, scheduler: CurrentThreadScheduler.instance)
}
.take(10000)
.toBlocking()
.toArray()
XCTAssertEqual(elements, Array(0 ..< 10000))
XCTAssertTrue(maxTailRecursiveSinkStackSize > 1000)
}
#endif
}
// MARK: merge

View File

@ -31,7 +31,11 @@ if [ "${RELEASE_TEST}" -eq 1 ]; then
. scripts/automation-tests.sh
fi
CONFIGURATIONS=(Release)
CONFIGURATIONS=(Release-Tests)
if [ "${RELEASE_TEST}" -eq 1 ]; then
CONFIGURATIONS=(Release Release-Tests Debug)
fi
# make sure watchos builds
# temporary solution