diff --git a/RxCocoa/Common/CocoaUnits/Driver/Driver+Operators.swift b/RxCocoa/Common/CocoaUnits/Driver/Driver+Operators.swift index fff37635..570124bf 100644 --- a/RxCocoa/Common/CocoaUnits/Driver/Driver+Operators.swift +++ b/RxCocoa/Common/CocoaUnits/Driver/Driver+Operators.swift @@ -84,6 +84,25 @@ extension DriverConvertibleType { } } +extension DriverConvertibleType { + + /** + Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence. + If element is received while there is some projected observable sequence being merged it will simply be ignored. + + - parameter selector: A transform function to apply to element that was observed while no observable is executing in parallel. + - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence that was received while no other sequence was being calculated. + */ + @warn_unused_result(message="http://git.io/rxs.uo") + public func flatMapFirst(selector: (E) -> Driver) + -> Driver { + let source: Observable = self + .asObservable() + .flatMapFirst(selector) + return Driver(source) + } +} + extension DriverConvertibleType { /** diff --git a/RxSwift/Observables/Observable+StandardSequenceOperators.swift b/RxSwift/Observables/Observable+StandardSequenceOperators.swift index 755a2007..f3d7778e 100644 --- a/RxSwift/Observables/Observable+StandardSequenceOperators.swift +++ b/RxSwift/Observables/Observable+StandardSequenceOperators.swift @@ -206,10 +206,11 @@ extension ObservableType { extension ObservableType { /** - Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence. - - - parameter selector: A transform function to apply to each element. - - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence. + Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence. + If element is received while there is some projected observable sequence being merged it will simply be ignored. + + - parameter selector: A transform function to apply to element that was observed while no observable is executing in parallel. + - returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence that was received while no other sequence was being calculated. */ @warn_unused_result(message="http://git.io/rxs.uo") public func flatMapFirst(selector: (E) throws -> O) diff --git a/RxTests/RxSwiftTests/Tests/Driver+Test.swift b/RxTests/RxSwiftTests/Tests/Driver+Test.swift index bbd41db7..3c5fe358 100644 --- a/RxTests/RxSwiftTests/Tests/Driver+Test.swift +++ b/RxTests/RxSwiftTests/Tests/Driver+Test.swift @@ -278,6 +278,50 @@ extension DriverTest { } } +// MARK: flatMapFirst +extension DriverTest { + func testAsDriver_flatMapFirst() { + let hotObservable = BackgroundThreadPrimitiveHotObservable() + let hotObservable1 = MainThreadPrimitiveHotObservable() + let hotObservable2 = MainThreadPrimitiveHotObservable() + let errorHotObservable = MainThreadPrimitiveHotObservable() + + let drivers: [Driver] = [ + hotObservable1.asDriver(onErrorJustReturn: -2), + hotObservable2.asDriver(onErrorJustReturn: -3), + errorHotObservable.asDriver(onErrorJustReturn: -4), + ] + + let driver = hotObservable.asDriver(onErrorJustReturn: 2).flatMapFirst { drivers[$0] } + + let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) { + XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable]) + + hotObservable.on(.Next(0)) + hotObservable.on(.Next(1)) + + hotObservable1.on(.Next(1)) + hotObservable1.on(.Next(2)) + hotObservable1.on(.Error(testError)) + + hotObservable2.on(.Next(10)) + hotObservable2.on(.Next(11)) + hotObservable2.on(.Error(testError)) + + hotObservable.on(.Error(testError)) + + errorHotObservable.on(.Completed) + hotObservable.on(.Completed) + + XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable]) + } + + XCTAssertEqual(results, [ + 1, 2, -2, + ]) + } +} + // MARK: doOn extension DriverTest { func testAsDriver_doOn() {