diff --git a/RxCocoa/Common/CocoaUnits/Driver/Driver+Operators.swift b/RxCocoa/Common/CocoaUnits/Driver/Driver+Operators.swift index 570124bf..5ec70627 100644 --- a/RxCocoa/Common/CocoaUnits/Driver/Driver+Operators.swift +++ b/RxCocoa/Common/CocoaUnits/Driver/Driver+Operators.swift @@ -339,6 +339,21 @@ extension SequenceType where Generator.Element : DriverConvertibleType { } } +extension CollectionType where Generator.Element : DriverConvertibleType { + + /** + Concatenates all observable sequences in the given sequence, as long as the previous observable sequence terminated successfully. + + - returns: An observable sequence that contains the elements of each given sequence, in sequential order. + */ + @warn_unused_result(message="http://git.io/rxs.uo") + public func concat() + -> Driver { + let source: Observable = self.map { $0.asDriver() }.concat() + return Driver(source) + } +} + extension CollectionType where Generator.Element : DriverConvertibleType { /** diff --git a/RxSwift/Observables/Implementations/Catch.swift b/RxSwift/Observables/Implementations/Catch.swift index adf53c0e..abf02c6d 100644 --- a/RxSwift/Observables/Implementations/Catch.swift +++ b/RxSwift/Observables/Implementations/Catch.swift @@ -135,9 +135,9 @@ class CatchSequenceSink) -> S.Generator? { + override func extract(observable: Observable) -> SequenceGenerator? { if let onError = observable as? CatchSequence { - return onError.sources.generate() + return (onError.sources.generate(), nil) } else { return nil @@ -156,7 +156,7 @@ class CatchSequence(observer: O) -> Disposable { let sink = CatchSequenceSink(observer: observer) - sink.disposable = sink.run(self.sources.generate()) + sink.disposable = sink.run((self.sources.generate(), nil)) return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Concat.swift b/RxSwift/Observables/Implementations/Concat.swift index c8f5205a..ed360918 100644 --- a/RxSwift/Observables/Implementations/Concat.swift +++ b/RxSwift/Observables/Implementations/Concat.swift @@ -34,9 +34,9 @@ class ConcatSink) -> S.Generator? { + override func extract(observable: Observable) -> SequenceGenerator? { if let source = observable as? Concat { - return source._sources.generate() + return (source._sources.generate(), source._count) } else { return nil @@ -48,14 +48,16 @@ class Concat(observer: O) -> Disposable { let sink = ConcatSink(observer: observer) - sink.disposable = sink.run(_sources.generate()) + sink.disposable = sink.run((_sources.generate(), _count)) return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/RetryWhen.swift b/RxSwift/Observables/Implementations/RetryWhen.swift index 3ab3fa0d..724218ed 100644 --- a/RxSwift/Observables/Implementations/RetryWhen.swift +++ b/RxSwift/Observables/Implementations/RetryWhen.swift @@ -111,13 +111,11 @@ class RetryWhenSequenceSink) -> S.Generator? { - if let onError = observable as? RetryWhenSequence { - return onError._sources.generate() - } - else { - return nil - } + override func extract(observable: Observable) -> SequenceGenerator? { + // It is important to always return `nil` here because there are sideffects in the `run` method + // that are dependant on particular `retryWhen` operator so single operator stack can't be reused in this + // case. + return nil } override func subscribeToNext(source: Observable) -> Disposable { @@ -126,7 +124,7 @@ class RetryWhenSequenceSink Disposable { + override func run(sources: SequenceGenerator) -> Disposable { let triggerSubscription = _handler.subscribe(_notifier.asObserver()) let superSubscription = super.run(sources) return StableCompositeDisposable.create(superSubscription, triggerSubscription) @@ -146,7 +144,7 @@ class RetryWhenSequence(observer: O) -> Disposable { let sink = RetryWhenSequenceSink(parent: self, observer: observer) - sink.disposable = sink.run(self._sources.generate()) + sink.disposable = sink.run((self._sources.generate(), nil)) return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Observable+Multiple.swift b/RxSwift/Observables/Observable+Multiple.swift index a425d747..b47f990f 100644 --- a/RxSwift/Observables/Observable+Multiple.swift +++ b/RxSwift/Observables/Observable+Multiple.swift @@ -80,12 +80,39 @@ extension SequenceType where Generator.Element : ObservableConvertibleType { /** Concatenates all observable sequences in the given sequence, as long as the previous observable sequence terminated successfully. + This operator has tail recursive optimizations that will prevent stack overflow. + + Optimizations will be performed in cases equivalent to following: + + [1, [2, [3, .....].concat()].concat].concat() + - returns: An observable sequence that contains the elements of each given sequence, in sequential order. */ @warn_unused_result(message="http://git.io/rxs.uo") public func concat() -> Observable { - return Concat(sources: self) + return Concat(sources: self, count: nil) + } +} + +extension CollectionType where Generator.Element : ObservableConvertibleType { + + /** + Concatenates all observable sequences in the given sequence, as long as the previous observable sequence terminated successfully. + + This operator has tail recursive optimizations that will prevent stack overflow and enable generating + infinite observable sequences while using limited amount of memory during generation. + + Optimizations will be performed in cases equivalent to following: + + [1, [2, [3, .....].concat()].concat].concat() + + - returns: An observable sequence that contains the elements of each given sequence, in sequential order. + */ + @warn_unused_result(message="http://git.io/rxs.uo") + public func concat() + -> Observable { + return Concat(sources: self, count: self.count.toIntMax()) } } diff --git a/RxSwift/Observers/TailRecursiveSink.swift b/RxSwift/Observers/TailRecursiveSink.swift index 6f4a85f6..44fb0554 100644 --- a/RxSwift/Observers/TailRecursiveSink.swift +++ b/RxSwift/Observers/TailRecursiveSink.swift @@ -13,14 +13,19 @@ enum TailRecursiveSinkCommand { case Dispose } +#if DEBUG || TRACE_RESOURCES +public var maxTailRecursiveSinkStackSize = 0 +#endif + /// This class is usually used with `Generator` version of the operators. class TailRecursiveSink : Sink , InvocableWithValueType { typealias Value = TailRecursiveSinkCommand typealias E = O.E + typealias SequenceGenerator = (generator: S.Generator, remaining: IntMax?) - var _generators:[S.Generator] = [] + var _generators: [SequenceGenerator] = [] var _disposed = false var _subscription = SerialDisposable() @@ -31,7 +36,7 @@ class TailRecursiveSink Disposable { + func run(sources: SequenceGenerator) -> Disposable { _generators.append(sources) schedule(.MoveNext) @@ -58,7 +63,7 @@ class TailRecursiveSink) -> S.Generator? { + func extract(observable: Observable) -> SequenceGenerator? { abstractMethod() } @@ -76,11 +81,26 @@ class TailRecursiveSink 0 { + _generators.append((e, left)) + } if nextCandidate == nil { _generators.removeLast() @@ -90,7 +110,12 @@ class TailRecursiveSink() + let hotObservable2 = MainThreadPrimitiveHotObservable() + + let driver = AnySequence([hotObservable1.asDriver(onErrorJustReturn: -1), hotObservable2.asDriver(onErrorJustReturn: -2)]).concat() + + let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) { + XCTAssertTrue(hotObservable1.subscriptions == [SubscribedToHotObservable]) + + hotObservable1.on(.Next(1)) + hotObservable1.on(.Next(2)) + hotObservable1.on(.Error(testError)) + + XCTAssertTrue(hotObservable1.subscriptions == [UnsunscribedFromHotObservable]) + XCTAssertTrue(hotObservable2.subscriptions == [SubscribedToHotObservable]) + + hotObservable2.on(.Next(4)) + hotObservable2.on(.Next(5)) + hotObservable2.on(.Error(testError)) + + XCTAssertTrue(hotObservable2.subscriptions == [UnsunscribedFromHotObservable]) + } + + XCTAssertEqual(results, [1, 2, -1, 4, 5, -2]) + } + func testAsDriver_concat() { let hotObservable1 = BackgroundThreadPrimitiveHotObservable() let hotObservable2 = MainThreadPrimitiveHotObservable() diff --git a/RxTests/RxSwiftTests/Tests/Observable+MultipleTest.swift b/RxTests/RxSwiftTests/Tests/Observable+MultipleTest.swift index d34c0231..15feb966 100644 --- a/RxTests/RxSwiftTests/Tests/Observable+MultipleTest.swift +++ b/RxTests/RxSwiftTests/Tests/Observable+MultipleTest.swift @@ -915,6 +915,25 @@ extension ObservableMultipleTest { } } +// this generates +// [generator(0), [generator(1), [generator(2), ..].concat()].concat()].concat() +func generateCollection(startIndex: Int, _ generator: Int -> Observable) -> Observable { + let all = [0, 1].lazy.map { i in + return i == 0 ? generator(startIndex) : generateCollection(startIndex + 1, generator) + } + return all.concat() +} + +// this generates +// [generator(0), [generator(1), [generator(2), ..].concat()].concat()].concat() +// This should +func generateSequence(startIndex: Int, _ generator: Int -> Observable) -> Observable { + let all = AnySequence([0, 1].lazy.map { i in + return i == 0 ? generator(startIndex) : generateSequence(startIndex + 1, generator) + }) + return all.concat() +} + // MARK: concat extension ObservableMultipleTest { func testConcat_DefaultScheduler() { @@ -1501,6 +1520,34 @@ extension ObservableMultipleTest { ]) } + +#if DEBUG || TRACE_RESOURCES + func testConcat_TailRecursionCollection() { + maxTailRecursiveSinkStackSize = 0 + let elements = try! generateCollection(0) { i in + just(i, scheduler: CurrentThreadScheduler.instance) + } + .take(10000) + .toBlocking() + .toArray() + + XCTAssertEqual(elements, Array(0 ..< 10000)) + XCTAssertEqual(maxTailRecursiveSinkStackSize, 1) + } + + func testConcat_TailRecursionSequence() { + maxTailRecursiveSinkStackSize = 0 + let elements = try! generateSequence(0) { i in + just(i, scheduler: CurrentThreadScheduler.instance) + } + .take(10000) + .toBlocking() + .toArray() + + XCTAssertEqual(elements, Array(0 ..< 10000)) + XCTAssertTrue(maxTailRecursiveSinkStackSize > 1000) + } +#endif } // MARK: merge diff --git a/scripts/pre-release-tests.sh b/scripts/pre-release-tests.sh index d8ae809f..0d68d3d8 100755 --- a/scripts/pre-release-tests.sh +++ b/scripts/pre-release-tests.sh @@ -31,7 +31,11 @@ if [ "${RELEASE_TEST}" -eq 1 ]; then . scripts/automation-tests.sh fi -CONFIGURATIONS=(Release) +CONFIGURATIONS=(Release-Tests) + +if [ "${RELEASE_TEST}" -eq 1 ]; then + CONFIGURATIONS=(Release Release-Tests Debug) +fi # make sure watchos builds # temporary solution