diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index 5065dec5..ad928926 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -585,6 +585,10 @@ D2138C981BB9BEEE00339B5C /* RxCocoa.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093E9B1B8A732E0088E94D /* RxCocoa.swift */; }; D2138C991BB9BEEE00339B5C /* RxTarget.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093E9C1B8A732E0088E94D /* RxTarget.swift */; }; D21C29311BC6A1C300448E70 /* SkipUntil.swift in Sources */ = {isa = PBXBuildFile; fileRef = D285BAC31BC0231000B3F602 /* SkipUntil.swift */; }; + D2245A1B1BD5657300E7146F /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */; }; + D2245A1C1BD63C4600E7146F /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */; }; + D2245A1D1BD63C4700E7146F /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */; }; + D2245A1E1BD63C4A00E7146F /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */; }; D22B6D261BC8504A00BCE0AB /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; }; D235B23E1BD003DD007E84DA /* Using.swift in Sources */ = {isa = PBXBuildFile; fileRef = D235B23D1BD003DD007E84DA /* Using.swift */; }; D235B23F1BD003DD007E84DA /* Using.swift in Sources */ = {isa = PBXBuildFile; fileRef = D235B23D1BD003DD007E84DA /* Using.swift */; }; @@ -965,6 +969,7 @@ C8F0C04B1BBBFBB9001B112F /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; }; C8F0C0581BBBFBCE001B112F /* RxBlocking.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; }; D2138C751BB9BE9800339B5C /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; }; + D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = ""; }; D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipWhile.swift; sourceTree = ""; }; D235B23D1BD003DD007E84DA /* Using.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Using.swift; sourceTree = ""; }; D285BAC31BC0231000B3F602 /* SkipUntil.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipUntil.swift; sourceTree = ""; }; @@ -1151,6 +1156,7 @@ C8093C6A1B8A72BE0088E94D /* Implementations */ = { isa = PBXGroup; children = ( + D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */, C8093C6B1B8A72BE0088E94D /* Amb.swift */, C8C3DA111B93A3EA004D233E /* AnonymousObservable.swift */, C8093C6D1B8A72BE0088E94D /* AsObservable.swift */, @@ -2084,6 +2090,7 @@ C8093D3C1B8A72BE0088E94D /* Skip.swift in Sources */, C8B144FC1BD2D44500267DCE /* ConcurrentMainScheduler.swift in Sources */, C8093CF01B8A72BE0088E94D /* StableCompositeDisposable.swift in Sources */, + D2245A1C1BD63C4600E7146F /* WithLatestFrom.swift in Sources */, C8093D4E1B8A72BE0088E94D /* Zip+arity.swift in Sources */, C8093D4C1B8A72BE0088E94D /* Timer.swift in Sources */, C8C3DA071B9393AC004D233E /* Empty.swift in Sources */, @@ -2203,6 +2210,7 @@ C8093D3B1B8A72BE0088E94D /* Skip.swift in Sources */, C8B144FB1BD2D44500267DCE /* ConcurrentMainScheduler.swift in Sources */, C8093CEF1B8A72BE0088E94D /* StableCompositeDisposable.swift in Sources */, + D2245A1B1BD5657300E7146F /* WithLatestFrom.swift in Sources */, C8093D4D1B8A72BE0088E94D /* Zip+arity.swift in Sources */, C8093D4B1B8A72BE0088E94D /* Timer.swift in Sources */, C8C3DA061B9393AC004D233E /* Empty.swift in Sources */, @@ -2322,6 +2330,7 @@ C8F0BF9B1BBBFB8B001B112F /* Skip.swift in Sources */, C8B144FE1BD2D44500267DCE /* ConcurrentMainScheduler.swift in Sources */, C8F0BF9C1BBBFB8B001B112F /* StableCompositeDisposable.swift in Sources */, + D2245A1E1BD63C4A00E7146F /* WithLatestFrom.swift in Sources */, C8F0BF9D1BBBFB8B001B112F /* Zip+arity.swift in Sources */, C8F0BF9E1BBBFB8B001B112F /* Timer.swift in Sources */, C8F0BF9F1BBBFB8B001B112F /* Empty.swift in Sources */, @@ -2589,6 +2598,7 @@ D2EBEAF11BB9B6AE003A27DC /* BinaryDisposable.swift in Sources */, C8B144FD1BD2D44500267DCE /* ConcurrentMainScheduler.swift in Sources */, D2EBEB1B1BB9B6C1003A27DC /* Repeat.swift in Sources */, + D2245A1D1BD63C4700E7146F /* WithLatestFrom.swift in Sources */, D2EBEAF81BB9B6B2003A27DC /* ScopedDisposable.swift in Sources */, D2EBEAEA1BB9B697003A27DC /* SchedulerType.swift in Sources */, D2EBEB031BB9B6C1003A27DC /* CombineLatest+CollectionType.swift in Sources */, diff --git a/RxExample/RxExample.xcodeproj/project.pbxproj b/RxExample/RxExample.xcodeproj/project.pbxproj index ebc65c94..44ac88cc 100644 --- a/RxExample/RxExample.xcodeproj/project.pbxproj +++ b/RxExample/RxExample.xcodeproj/project.pbxproj @@ -311,7 +311,8 @@ C8DF92EA1B0B38C0009BCF9A /* Images.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = C8DF92E91B0B38C0009BCF9A /* Images.xcassets */; }; C8DF92EB1B0B38C0009BCF9A /* Images.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = C8DF92E91B0B38C0009BCF9A /* Images.xcassets */; }; C8DF92F61B0B43A4009BCF9A /* IntroductionExampleViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8DF92F51B0B43A4009BCF9A /* IntroductionExampleViewController.swift */; }; - C8E9D2AF1BD3FD960079D0DB /* ActivityIndicator.swift in Sources */ = {isa = PBXBuildFile; fileRef = C80397391BD3E17D009D8B26 /* ActivityIndicator.swift */; settings = {ASSET_TAGS = (); }; }; + C8E9D2AF1BD3FD960079D0DB /* ActivityIndicator.swift in Sources */ = {isa = PBXBuildFile; fileRef = C80397391BD3E17D009D8B26 /* ActivityIndicator.swift */; }; + D2245A191BD5654C00E7146F /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */; }; D2AF91981BD3D95900A008C1 /* Using.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2AF91881BD2C51900A008C1 /* Using.swift */; }; EC91FB951BBA144400973245 /* GitHubSearchRepositoriesViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = EC91FB941BBA144400973245 /* GitHubSearchRepositoriesViewController.swift */; }; /* End PBXBuildFile section */ @@ -688,6 +689,7 @@ C8DF92F01B0B3E67009BCF9A /* Info-OSX.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; path = "Info-OSX.plist"; sourceTree = ""; }; C8DF92F21B0B3E71009BCF9A /* Info-iOS.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; path = "Info-iOS.plist"; sourceTree = ""; }; C8DF92F51B0B43A4009BCF9A /* IntroductionExampleViewController.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = IntroductionExampleViewController.swift; sourceTree = ""; xcLanguageSpecificationIdentifier = xcode.lang.swift; }; + D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = ""; }; D2AF91881BD2C51900A008C1 /* Using.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Using.swift; sourceTree = ""; }; EC91FB941BBA144400973245 /* GitHubSearchRepositoriesViewController.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = GitHubSearchRepositoriesViewController.swift; sourceTree = ""; }; /* End PBXFileReference section */ @@ -1135,6 +1137,7 @@ C89464761BC6C2B00055219D /* Zip+arity.swift */, C89464771BC6C2B00055219D /* Zip+arity.tt */, C89464781BC6C2B00055219D /* Zip+CollectionType.swift */, + D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */, ); path = Implementations; sourceTree = ""; @@ -1707,6 +1710,7 @@ C89464B11BC6C2B00055219D /* SingleAssignmentDisposable.swift in Sources */, C89464AA1BC6C2B00055219D /* DisposeBase.swift in Sources */, C89465871BC6C2BC0055219D /* RxCollectionViewDelegateProxy.swift in Sources */, + D2245A191BD5654C00E7146F /* WithLatestFrom.swift in Sources */, C8297E3D1B6CF905000589EA /* SearchViewModel.swift in Sources */, C89464E61BC6C2B00055219D /* Timer.swift in Sources */, C8297E3E1B6CF905000589EA /* DetailViewController.swift in Sources */, diff --git a/RxSwift/Observables/Implementations/WithLatestFrom.swift b/RxSwift/Observables/Implementations/WithLatestFrom.swift new file mode 100644 index 00000000..f205e1e7 --- /dev/null +++ b/RxSwift/Observables/Implementations/WithLatestFrom.swift @@ -0,0 +1,131 @@ +// +// WithLatestFrom.swift +// RxExample +// +// Created by Yury Korolev on 10/19/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +class WithLatestFromSink : Sink { + + typealias Parent = WithLatestFrom + typealias SecondType = SecondO.E + + private let _parent: Parent + + private var _lock = NSRecursiveLock() + private var _latest: SecondO.E? + + + init(parent: Parent, observer: O, cancel: Disposable) { + _parent = parent + + super.init(observer: observer, cancel: cancel) + } + + func run() -> Disposable { + let sndSubscription = SingleAssignmentDisposable() + let fstO = WithLatestFromFirst(parent: self) + let sndO = WithLatestFromSecond(parent: self, disposable: sndSubscription) + + let fstSubscription = _parent._first.subscribeSafe(fstO) + sndSubscription.disposable = _parent._second.subscribeSafe(sndO) + + return StableCompositeDisposable.create(fstSubscription, sndSubscription) + } +} + +class WithLatestFromFirst: ObserverType { + + typealias Parent = WithLatestFromSink + typealias E = FirstO.E + private let _parent: Parent + + init(parent: Parent) { + _parent = parent + } + + func on(event: Event) { + switch event { + case let .Next(value): + guard let latest = _parent._latest else { return } + do { + let res = try _parent._parent._resultSelector(value, latest) + + _parent._lock.performLocked { + _parent.observer?.onNext(res) + } + } catch let e { + _parent._lock.performLocked { + _parent.observer?.onError(e) + _parent.dispose() + } + } + case .Completed: + _parent._lock.performLocked { + _parent.observer?.onComplete() + _parent.dispose() + } + case let .Error(error): + _parent._lock.performLocked { + _parent.observer?.onError(error) + _parent.dispose() + } + } + } +} + +class WithLatestFromSecond: ObserverType { + + typealias Parent = WithLatestFromSink + typealias E = SecondO.E + + private let _parent: Parent + private let _disposable: Disposable + + init(parent: Parent, disposable: Disposable) { + _parent = parent + _disposable = disposable + } + + func on(event: Event) { + switch event { + case let .Next(value): + _parent._latest = value + case .Completed: + _disposable.dispose() + case let .Error(error): + _parent._lock.performLocked { + _parent.observer?.onError(error) + _parent.dispose() + } + } + } + +} + +class WithLatestFrom: Producer { + + typealias FirstType = FirstO.E + typealias SecondType = SecondO.E + typealias ResultSelector = (FirstType, SecondType) throws -> ResultType + + private let _first: FirstO + private let _second: SecondO + private let _resultSelector: ResultSelector + + init(first: FirstO, second: SecondO, resultSelector: ResultSelector) { + _first = first + _second = second + _resultSelector = resultSelector + } + + override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + + let sink = WithLatestFromSink(parent: self, observer: observer, cancel: cancel) + setSink(sink) + return sink.run() + } +} \ No newline at end of file diff --git a/RxSwift/Observables/Observable+Multiple.swift b/RxSwift/Observables/Observable+Multiple.swift index 6cb14adb..0e0af8ce 100644 --- a/RxSwift/Observables/Observable+Multiple.swift +++ b/RxSwift/Observables/Observable+Multiple.swift @@ -224,3 +224,19 @@ extension SequenceType where Generator.Element : ObservableConvertibleType { } } } + +// withLatestFrom + +extension ObservableType { + + /** + Merges two observable sequences into one observable sequence by combining each element from the first source with the latest element from the second source, if any. + + - parameter second: Second observable source. + - parameter resultSelector: Function to invoke for each element from the self source combined with the latest element from the second source, if any. + - returns: An observable sequence containing the result of combining each element of the self source with the latest element from the second source, if any, using the specified result selector function. + */ + public func withLatestFrom(second: SecondO, resultSelector: (E, SecondO.E) throws -> ResultType) -> Observable { + return WithLatestFrom(first: self, second: second, resultSelector: resultSelector) + } +} diff --git a/RxTests/RxSwiftTests/Tests/Observable+MultipleTest.swift b/RxTests/RxSwiftTests/Tests/Observable+MultipleTest.swift index 7b424157..b016476e 100644 --- a/RxTests/RxSwiftTests/Tests/Observable+MultipleTest.swift +++ b/RxTests/RxSwiftTests/Tests/Observable+MultipleTest.swift @@ -3948,4 +3948,273 @@ extension ObservableMultipleTest { XCTAssert(disposed, "disposed") } +} + + +// MARK: withLatestFrom + +extension ObservableMultipleTest { + + func testWithLatestFrom_Simple1() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(90, 1), + next(180, 2), + next(250, 3), + next(260, 4), + next(310, 5), + next(340, 6), + next(410, 7), + next(420, 8), + next(470, 9), + next(550, 10), + completed(590) + ]) + + let ys = scheduler.createHotObservable([ + next(255, "bar"), + next(330, "foo"), + next(350, "qux"), + completed(400) + ]) + + let res = scheduler.start { + xs.withLatestFrom(ys, resultSelector: { x, y in "\(x)\(y)"} ) + } + + XCTAssertEqual(res.messages, [ + next(260, "4bar"), + next(310, "5bar"), + next(340, "6foo"), + next(410, "7qux"), + next(420, "8qux"), + next(470, "9qux"), + next(550, "10qux"), + completed(590) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 590) + ]) + + XCTAssertEqual(ys.subscriptions, [ + Subscription(200, 400) + ]) + } + + func testWithLatestFrom_Simple2() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(90, 1), + next(180, 2), + next(250, 3), + next(260, 4), + next(310, 5), + next(340, 6), + completed(390) + ]) + + let ys = scheduler.createHotObservable([ + next(255, "bar"), + next(330, "foo"), + next(350, "qux"), + next(370, "baz"), + completed(400) + ]) + + let res = scheduler.start { + xs.withLatestFrom(ys, resultSelector: { x, y in "\(x)\(y)"} ) + } + + XCTAssertEqual(res.messages, [ + next(260, "4bar"), + next(310, "5bar"), + next(340, "6foo"), + completed(390) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 390) + ]) + + XCTAssertEqual(ys.subscriptions, [ + Subscription(200, 390) + ]) + } + + func testWithLatestFrom_Simple3() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(90, 1), + next(180, 2), + next(250, 3), + next(260, 4), + next(310, 5), + next(340, 6), + completed(390) + ]) + + let ys = scheduler.createHotObservable([ + next(245, "bar"), + next(330, "foo"), + next(350, "qux"), + next(370, "baz"), + completed(400) + ]) + + let res = scheduler.start { + xs.withLatestFrom(ys, resultSelector: { x, y in "\(x)\(y)"} ) + } + + XCTAssertEqual(res.messages, [ + next(250, "3bar"), + next(260, "4bar"), + next(310, "5bar"), + next(340, "6foo"), + completed(390) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 390) + ]) + + XCTAssertEqual(ys.subscriptions, [ + Subscription(200, 390) + ]) + } + + func testWithLatestFrom_Error1() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(90, 1), + next(180, 2), + next(250, 3), + next(260, 4), + next(310, 5), + next(340, 6), + next(410, 7), + next(420, 8), + next(470, 9), + next(550, 10), + error(590, testError) + ]) + + let ys = scheduler.createHotObservable([ + next(255, "bar"), + next(330, "foo"), + next(350, "qux"), + completed(400) + ]) + + let res = scheduler.start { + xs.withLatestFrom(ys, resultSelector: { x, y in "\(x)\(y)"} ) + } + + XCTAssertEqual(res.messages, [ + next(260, "4bar"), + next(310, "5bar"), + next(340, "6foo"), + next(410, "7qux"), + next(420, "8qux"), + next(470, "9qux"), + next(550, "10qux"), + error(590, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 590) + ]) + + XCTAssertEqual(ys.subscriptions, [ + Subscription(200, 400) + ]) + } + + func testWithLatestFrom_Error2() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(90, 1), + next(180, 2), + next(250, 3), + next(260, 4), + next(310, 5), + next(340, 6), + completed(390) + ]) + + let ys = scheduler.createHotObservable([ + next(255, "bar"), + next(330, "foo"), + next(350, "qux"), + error(370, testError) + ]) + + let res = scheduler.start { + xs.withLatestFrom(ys, resultSelector: { x, y in "\(x)\(y)"} ) + } + + XCTAssertEqual(res.messages, [ + next(260, "4bar"), + next(310, "5bar"), + next(340, "6foo"), + error(370, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 370) + ]) + + XCTAssertEqual(ys.subscriptions, [ + Subscription(200, 370) + ]) + } + + func testWithLatestFrom_Error3() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(90, 1), + next(180, 2), + next(250, 3), + next(260, 4), + next(310, 5), + next(340, 6), + completed(390) + ]) + + let ys = scheduler.createHotObservable([ + next(255, "bar"), + next(330, "foo"), + next(350, "qux"), + completed(400) + ]) + + let res = scheduler.start { + xs.withLatestFrom(ys) { + (x, y) throws -> String in + if x == 5 { + throw testError + } + return "\(x)\(y)" + } + } + + XCTAssertEqual(res.messages, [ + next(260, "4bar"), + error(310, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 310) + ]) + + XCTAssertEqual(ys.subscriptions, [ + Subscription(200, 310) + ]) + } } \ No newline at end of file