diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index 048461e9..5805cb62 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -278,6 +278,8 @@ C8093F501B8A732E0088E94D /* RxCocoa.h in Headers */ = {isa = PBXBuildFile; fileRef = C8093ECB1B8A732E0088E94D /* RxCocoa.h */; settings = {ATTRIBUTES = (Public, ); }; }; C8093F5E1B8A73A20088E94D /* Observable+Blocking.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093F581B8A73A20088E94D /* Observable+Blocking.swift */; }; C8093F5F1B8A73A20088E94D /* Observable+Blocking.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093F581B8A73A20088E94D /* Observable+Blocking.swift */; }; + C80D342E1B9245A40014629D /* CombineLatest+CollectionType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C80D342D1B9245A40014629D /* CombineLatest+CollectionType.swift */; settings = {ASSET_TAGS = (); }; }; + C80D342F1B9245A40014629D /* CombineLatest+CollectionType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C80D342D1B9245A40014629D /* CombineLatest+CollectionType.swift */; settings = {ASSET_TAGS = (); }; }; C88254151B8A752B00B02D69 /* CoreDataEntityEvent.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253EF1B8A752B00B02D69 /* CoreDataEntityEvent.swift */; }; C88254161B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F11B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift */; }; C88254171B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F21B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift */; }; @@ -468,6 +470,7 @@ C8093ECB1B8A732E0088E94D /* RxCocoa.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = RxCocoa.h; sourceTree = ""; }; C8093F581B8A73A20088E94D /* Observable+Blocking.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Observable+Blocking.swift"; sourceTree = ""; xcLanguageSpecificationIdentifier = xcode.lang.swift; }; C8093F591B8A73A20088E94D /* README.md */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = net.daringfireball.markdown; path = README.md; sourceTree = ""; }; + C80D342D1B9245A40014629D /* CombineLatest+CollectionType.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "CombineLatest+CollectionType.swift"; sourceTree = ""; }; C88253EF1B8A752B00B02D69 /* CoreDataEntityEvent.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CoreDataEntityEvent.swift; sourceTree = ""; }; C88253F11B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxCollectionViewReactiveArrayDataSource.swift; sourceTree = ""; }; C88253F21B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxTableViewReactiveArrayDataSource.swift; sourceTree = ""; }; @@ -692,6 +695,7 @@ C8093C921B8A72BE0088E94D /* Zip+arity.swift */, C8093C931B8A72BE0088E94D /* Zip+arity.tt */, C8093C941B8A72BE0088E94D /* Zip.swift */, + C80D342D1B9245A40014629D /* CombineLatest+CollectionType.swift */, ); path = Implementations; sourceTree = ""; @@ -1333,6 +1337,7 @@ C8093CEA1B8A72BE0088E94D /* ScopedDispose.swift in Sources */, C8093D261B8A72BE0088E94D /* Multicast.swift in Sources */, C8093D861B8A72BE0088E94D /* Rx.swift in Sources */, + C80D342F1B9245A40014629D /* CombineLatest+CollectionType.swift in Sources */, C8093DA61B8A72BE0088E94D /* SubjectType.swift in Sources */, C8093D5C1B8A72BE0088E94D /* Observable+Debug.swift in Sources */, C8093D581B8A72BE0088E94D /* Observable+Concurrency.swift in Sources */, @@ -1447,6 +1452,7 @@ C8093CE91B8A72BE0088E94D /* ScopedDispose.swift in Sources */, C8093D251B8A72BE0088E94D /* Multicast.swift in Sources */, C8093D851B8A72BE0088E94D /* Rx.swift in Sources */, + C80D342E1B9245A40014629D /* CombineLatest+CollectionType.swift in Sources */, C8093DA51B8A72BE0088E94D /* SubjectType.swift in Sources */, C8093D5B1B8A72BE0088E94D /* Observable+Debug.swift in Sources */, C8093D571B8A72BE0088E94D /* Observable+Concurrency.swift in Sources */, diff --git a/RxExample/RxExample.xcodeproj/project.pbxproj b/RxExample/RxExample.xcodeproj/project.pbxproj index 2d98c531..d12f3cab 100644 --- a/RxExample/RxExample.xcodeproj/project.pbxproj +++ b/RxExample/RxExample.xcodeproj/project.pbxproj @@ -276,6 +276,7 @@ C8A468F21B8A8C2600BF917B /* RxCocoa.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = C8A468ED1B8A8BCC00BF917B /* RxCocoa.framework */; }; C8A468F31B8A8C2600BF917B /* RxSwift.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = C8A468EB1B8A8BC900BF917B /* RxSwift.framework */; }; C8A57F741B40AF7C00D5570A /* Random.xcdatamodeld in Sources */ = {isa = PBXBuildFile; fileRef = C8A57F721B40AF7C00D5570A /* Random.xcdatamodeld */; }; + C8C3D9FC1B935D1E004D233E /* CombineLatest+CollectionType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3D9FB1B935D1E004D233E /* CombineLatest+CollectionType.swift */; settings = {ASSET_TAGS = (); }; }; C8C46DA81B47F7110020D71E /* CollectionViewImageCell.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C46DA31B47F7110020D71E /* CollectionViewImageCell.swift */; }; C8C46DA91B47F7110020D71E /* WikipediaImageCell.xib in Resources */ = {isa = PBXBuildFile; fileRef = C8C46DA41B47F7110020D71E /* WikipediaImageCell.xib */; }; C8C46DAA1B47F7110020D71E /* WikipediaSearchCell.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C46DA51B47F7110020D71E /* WikipediaSearchCell.swift */; }; @@ -435,7 +436,6 @@ C836EBF51B8A7A4500AB941D /* ObserverType+Extensions.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "ObserverType+Extensions.swift"; sourceTree = ""; }; C836EBF61B8A7A4500AB941D /* ObserverType.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ObserverType.swift; sourceTree = ""; }; C836EBF71B8A7A4500AB941D /* PeriodicScheduler.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = PeriodicScheduler.swift; sourceTree = ""; }; - C836EBF91B8A7A4500AB941D /* Rx.pch */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Rx.pch; sourceTree = ""; }; C836EBFA1B8A7A4500AB941D /* Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Rx.swift; sourceTree = ""; }; C836EBFB1B8A7A4500AB941D /* RxBox.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxBox.swift; sourceTree = ""; }; C836EBFC1B8A7A4500AB941D /* RxResult.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxResult.swift; sourceTree = ""; }; @@ -543,6 +543,7 @@ C8A468EF1B8A8BD000BF917B /* RxBlocking.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; }; C8A57F731B40AF7C00D5570A /* Random.xcdatamodel */ = {isa = PBXFileReference; lastKnownFileType = wrapper.xcdatamodel; path = Random.xcdatamodel; sourceTree = ""; }; C8AF26F11B49ABD300131C03 /* README.md */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = net.daringfireball.markdown; path = README.md; sourceTree = ""; }; + C8C3D9FB1B935D1E004D233E /* CombineLatest+CollectionType.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "CombineLatest+CollectionType.swift"; sourceTree = ""; }; C8C46DA31B47F7110020D71E /* CollectionViewImageCell.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CollectionViewImageCell.swift; sourceTree = ""; }; C8C46DA41B47F7110020D71E /* WikipediaImageCell.xib */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = file.xib; path = WikipediaImageCell.xib; sourceTree = ""; }; C8C46DA51B47F7110020D71E /* WikipediaSearchCell.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WikipediaSearchCell.swift; sourceTree = ""; }; @@ -740,7 +741,6 @@ C836EBF51B8A7A4500AB941D /* ObserverType+Extensions.swift */, C836EBF61B8A7A4500AB941D /* ObserverType.swift */, C836EBF71B8A7A4500AB941D /* PeriodicScheduler.swift */, - C836EBF91B8A7A4500AB941D /* Rx.pch */, C836EBFA1B8A7A4500AB941D /* Rx.swift */, C836EBFB1B8A7A4500AB941D /* RxBox.swift */, C836EBFC1B8A7A4500AB941D /* RxResult.swift */, @@ -817,9 +817,10 @@ C836EBB71B8A7A4500AB941D /* AnonymousObservable.swift */, C836EBB81B8A7A4500AB941D /* AsObservable.swift */, C836EBB91B8A7A4500AB941D /* Catch.swift */, + C836EBBC1B8A7A4500AB941D /* CombineLatest.swift */, C836EBBA1B8A7A4500AB941D /* CombineLatest+arity.swift */, C836EBBB1B8A7A4500AB941D /* CombineLatest+arity.tt */, - C836EBBC1B8A7A4500AB941D /* CombineLatest.swift */, + C8C3D9FB1B935D1E004D233E /* CombineLatest+CollectionType.swift */, C836EBBD1B8A7A4500AB941D /* Concat.swift */, C836EBBF1B8A7A4500AB941D /* ConnectableObservable.swift */, C836EBC01B8A7A4500AB941D /* Debug.swift */, @@ -851,9 +852,9 @@ C836EBDA1B8A7A4500AB941D /* TakeWhile.swift */, C836EBDB1B8A7A4500AB941D /* Throttle.swift */, C836EBDC1B8A7A4500AB941D /* Timer.swift */, + C836EBDF1B8A7A4500AB941D /* Zip.swift */, C836EBDD1B8A7A4500AB941D /* Zip+arity.swift */, C836EBDE1B8A7A4500AB941D /* Zip+arity.tt */, - C836EBDF1B8A7A4500AB941D /* Zip.swift */, ); path = Implementations; sourceTree = ""; @@ -1510,6 +1511,7 @@ C836EC3A1B8A7A4500AB941D /* Do.swift in Sources */, C836ECFB1B8A7AA600AB941D /* UISegmentedControl+Rx.swift in Sources */, C836EC431B8A7A4500AB941D /* ObserveSingleOn.swift in Sources */, + C8C3D9FC1B935D1E004D233E /* CombineLatest+CollectionType.swift in Sources */, C836EC0F1B8A7A4500AB941D /* Cancelable.swift in Sources */, C836EC331B8A7A4500AB941D /* Concat.swift in Sources */, C836ECDA1B8A7AA600AB941D /* NSURLSession+Rx.swift in Sources */, diff --git a/RxSwift/Observables/Implementations/CombineLatest+CollectionType.swift b/RxSwift/Observables/Implementations/CombineLatest+CollectionType.swift new file mode 100644 index 00000000..204fb6ca --- /dev/null +++ b/RxSwift/Observables/Implementations/CombineLatest+CollectionType.swift @@ -0,0 +1,123 @@ +// +// CombineLatest+CollectionType.swift +// Rx +// +// Created by Krunoslav Zaher on 8/29/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +class CombineLatestCollectionTypeSink : Sink { + typealias Parent = CombineLatestCollectionType + typealias SourceElement = C.Generator.Element.E + + let parent: Parent + + let lock = NSRecursiveLock() + + // state + var numberOfValues = 0 + var values: [SourceElement?] + var isDone: [Bool] + var numberOfDone = 0 + var subscriptions: [SingleAssignmentDisposable] + + init(parent: Parent, observer: O, cancel: Disposable) { + self.parent = parent + self.values = [SourceElement?](count: parent.count, repeatedValue: nil) + self.isDone = [Bool](count: parent.count, repeatedValue: false) + self.subscriptions = Array() + self.subscriptions.reserveCapacity(parent.count) + + for _ in 0 ..< parent.count { + self.subscriptions.append(SingleAssignmentDisposable()) + } + + super.init(observer: observer, cancel: cancel) + } + + func on(event: Event, atIndex: Int) { + lock.performLocked { + switch event { + case .Next(let element): + if values[atIndex] == nil { + numberOfValues++ + } + + values[atIndex] = element + + if numberOfValues < parent.count { + let numberOfOthersThatAreDone = self.numberOfDone - (isDone[atIndex] ? 1 : 0) + if numberOfOthersThatAreDone == self.parent.count - 1 { + self.observer?.on(.Completed) + self.dispose() + } + return + } + + do { + let result = try parent.resultSelector(values.map { $0! }) + self.observer?.on(.Next(result)) + } + catch let error { + self.observer?.on(.Error(error)) + self.dispose() + } + + case .Error(let error): + self.observer?.on(.Error(error)) + self.dispose() + case .Completed: + if isDone[atIndex] { + return + } + + isDone[atIndex] = true + numberOfDone++ + + if numberOfDone == self.parent.count { + self.observer?.on(.Completed) + self.dispose() + } + else { + self.subscriptions[atIndex].dispose() + } + } + } + } + + func run() -> Disposable { + var j = 0 + for i in parent.sources.startIndex ..< parent.sources.endIndex { + let index = j + self.subscriptions[j].disposable = self.parent.sources[i].subscribeSafe(ObserverOf { event in + self.on(event, atIndex: index) + }) + + j++ + } + + return CompositeDisposable(disposables: self.subscriptions.map { $0 }) + } +} + +class CombineLatestCollectionType : Producer { + typealias ResultSelector = [C.Generator.Element.E] throws -> R + + let sources: C + let resultSelector: ResultSelector + let count: Int + + init(sources: C, resultSelector: ResultSelector) { + self.sources = sources + self.resultSelector = resultSelector + self.count = Int(self.sources.count.toIntMax()) + } + + override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + let sink = CombineLatestCollectionTypeSink(parent: self, observer: observer, cancel: cancel) + setSink(sink) + return sink.run() + } +} \ No newline at end of file diff --git a/RxSwift/Observables/Observable+Multiple.swift b/RxSwift/Observables/Observable+Multiple.swift index 8c8476e2..8aa21b6b 100644 --- a/RxSwift/Observables/Observable+Multiple.swift +++ b/RxSwift/Observables/Observable+Multiple.swift @@ -8,6 +8,14 @@ import Foundation +// combineLatest + +extension CollectionType where Generator.Element : ObservableType { + public func combineLatest(resultSelector: [Generator.Element.E] throws -> R) -> Observable { + return CombineLatestCollectionType(sources: self, resultSelector: resultSelector) + } +} + // switch extension ObservableType where E : ObservableType { diff --git a/RxTests/RxSwiftTests/TestImplementations/Recorded.swift b/RxTests/RxSwiftTests/TestImplementations/Recorded.swift index 580443e4..cfb227a0 100644 --- a/RxTests/RxSwiftTests/TestImplementations/Recorded.swift +++ b/RxTests/RxSwiftTests/TestImplementations/Recorded.swift @@ -40,4 +40,17 @@ struct Recorded : CustomStringConvertible, Equatable { func == (lhs: Recorded, rhs: Recorded) -> Bool { return lhs.time == rhs.time && lhs.event == rhs.event +} + + +// workaround for swift compiler bug +struct EquatableArray : Equatable { + let elements: [Element] + init(_ elements: [Element]) { + self.elements = elements + } +} + +func == (lhs: EquatableArray, rhs: EquatableArray) -> Bool { + return lhs.elements == rhs.elements } \ No newline at end of file diff --git a/RxTests/RxSwiftTests/TestImplementations/Schedulers/TestScheduler.swift b/RxTests/RxSwiftTests/TestImplementations/Schedulers/TestScheduler.swift index 4c867aa5..97a82293 100644 --- a/RxTests/RxSwiftTests/TestImplementations/Schedulers/TestScheduler.swift +++ b/RxTests/RxSwiftTests/TestImplementations/Schedulers/TestScheduler.swift @@ -35,10 +35,6 @@ class TestScheduler : VirtualTimeSchedulerBase { super.init(initialClock: initialClock) } - func advanceTimeFor(interval: Time) { - - } - func createHotObservable(events: [Recorded]) -> HotObservable { return HotObservable(testScheduler: self as AnyObject as! TestScheduler, recordedEvents: events) } diff --git a/RxTests/RxSwiftTests/Tests/Observable+MultipleTest.swift b/RxTests/RxSwiftTests/Tests/Observable+MultipleTest.swift index 99604dc2..ae1ef40b 100644 --- a/RxTests/RxSwiftTests/Tests/Observable+MultipleTest.swift +++ b/RxTests/RxSwiftTests/Tests/Observable+MultipleTest.swift @@ -2843,3 +2843,594 @@ extension ObservableMultipleTest { ]) } } + +// combineLatest + CollectionType + +extension ObservableMultipleTest { + func testCombineLatest_NeverN() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1) + ]) + + let e1 = scheduler.createHotObservable([ + next(150, 1) + ]) + + let e2 = scheduler.createHotObservable([ + next(150, 1) + ]) + + let res = scheduler.start { + [e0, e1, e2].combineLatest { $0.reduce(0, combine:+) } + } + + XCTAssertEqual(res.messages, []) + + for e in [e0, e1, e2] { + XCTAssertEqual(e.subscriptions, [Subscription(200, 1000)]) + } + } + + func testCombineLatest_NeverEmptyN() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1) + ]) + + let e1 = scheduler.createHotObservable([ + next(150, 1), + completed(210) + ]) + + let res = scheduler.start { + [e0, e1].combineLatest { $0.reduce(0, combine:+) } + } + + XCTAssertEqual(res.messages, []) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 1000)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 210)]) + } + + func testCombineLatest_EmptyNeverN() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + completed(210) + ]) + + let e1 = scheduler.createHotObservable([ + next(150, 1) + ]) + + let res = scheduler.start { + [e0, e1].combineLatest { $0.reduce(0, combine:+) } + } + + XCTAssertEqual(res.messages, []) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 210)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 1000)]) + } + + func testCombineLatest_EmptyReturnN() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + completed(210) + ]) + + let e1 = scheduler.createHotObservable([ + next(150, 1), + next(215, 2), + completed(220) + ]) + + let res = scheduler.start { + ([e0, e1] as [HotObservable]).combineLatest { $0.reduce(0, combine:+) } + } + + XCTAssertEqual(res.messages, [ + completed(215) + ]) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 210)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 215)]) + } + + func testCombineLatest_ReturnReturnN() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + next(215, 2), + completed(230) + ]) + + let e1 = scheduler.createHotObservable([ + next(150, 1), + next(220, 3), + completed(240) + ]) + + let res = scheduler.start { + [e0, e1].combineLatest { $0.reduce(0, combine:+) } + } + + XCTAssertEqual(res.messages, [ + next(220, 2 + 3), + completed(240) + ]) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 230)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 240)]) + } + + func testCombineLatest_EmptyErrorN() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + completed(230) + ]) + + let e1 = scheduler.createHotObservable([ + next(150, 1), + error(220, testError), + ]) + + let res = scheduler.start { + [e0, e1].combineLatest { $0.reduce(0, combine:+) } + } + + XCTAssertEqual(res.messages, [ + error(220, testError) + ]) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 220)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 220)]) + } + + func testCombineLatest_ReturnErrorN() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + next(210, 2), + completed(230) + ]) + + let e1 = scheduler.createHotObservable([ + next(150, 1), + error(220, testError), + ]) + + let res = scheduler.start { + [e0, e1].combineLatest { $0.reduce(0, combine:+) } + } + + XCTAssertEqual(res.messages, [ + error(220, testError) + ]) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 220)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 220)]) + } + + func testCombineLatest_ErrorErrorN() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + error(220, testError1) + ]) + + let e1 = scheduler.createHotObservable([ + next(150, 1), + error(230, testError2), + ]) + + let res = scheduler.start { + [e0, e1].combineLatest { $0.reduce(0, combine:+) } + } + + XCTAssertEqual(res.messages, [ + error(220, testError1) + ]) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 220)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 220)]) + } + + func testCombineLatest_NeverErrorN() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + ]) + + let e1 = scheduler.createHotObservable([ + next(150, 1), + error(220, testError2), + ]) + + let res = scheduler.start { + [e0, e1].combineLatest { $0.reduce(0, combine:+) } + } + + XCTAssertEqual(res.messages, [ + error(220, testError2) + ]) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 220)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 220)]) + } + + func testCombineLatest_SomeErrorN() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + next(215, 2), + completed(230) + ]) + + let e1 = scheduler.createHotObservable([ + next(150, 1), + error(220, testError2), + ]) + + let res = scheduler.start { + [e0, e1].combineLatest { $0.reduce(0, combine:+) } + } + + XCTAssertTrue(res.messages == [ + error(220, testError2) + ]) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 220)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 220)]) + } + + func testCombineLatest_ErrorAfterCompletedN() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + next(215, 2), + completed(220) + ]) + + let e1 = scheduler.createHotObservable([ + next(150, 1), + error(230, testError2), + ]) + + let res = scheduler.start { + [e0, e1].combineLatest { $0.reduce(0, combine:+) } + } + + XCTAssertTrue(res.messages == [ + error(230, testError2) + ]) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 220)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 230)]) + } + + func testCombineLatest_InterleavedWithTailN() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + next(215, 2), + next(225, 4), + completed(230) + ]) + + let e1 = scheduler.createHotObservable([ + next(150, 1), + next(220, 3), + next(230, 5), + next(235, 6), + next(240, 7), + completed(250) + ]) + + let res = scheduler.start { + [e0, e1].combineLatest { $0.reduce(0, combine:+) } + } + + XCTAssertEqual(res.messages, [ + next(220, 2 + 3), + next(225, 3 + 4), + next(230, 4 + 5), + next(235, 4 + 6), + next(240, 4 + 7), + completed(250) + ] as [Recorded]) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 230)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 250)]) + } + + func testCombineLatest_ConsecutiveN() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + next(215, 2), + next(225, 4), + completed(230) + ]) + + let e1 = scheduler.createHotObservable([ + next(150, 1), + next(235, 6), + next(240, 7), + completed(250) + ]) + + let res = scheduler.start { + [e0, e1].combineLatest { $0.reduce(0, combine:+) } + } + + XCTAssertEqual(res.messages, [ + next(235, 4 + 6), + next(240, 4 + 7), + completed(250) + ]) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 230)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 250)]) + } + + func testCombineLatest_ConsecutiveNWithErrorLeft() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + next(215, 2), + next(225, 4), + error(230, testError) + ]) + + let e1 = scheduler.createHotObservable([ + next(150, 1), + next(235, 6), + next(240, 7), + completed(250) + ]) + + let res = scheduler.start { + [e0, e1].combineLatest { $0.reduce(0, combine:+) } + } + + XCTAssertEqual(res.messages, [ + error(230, testError) + ]) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 230)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 230)]) + } + + func testCombineLatest_ConsecutiveNWithErrorRight() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + next(215, 2), + next(225, 4), + completed(250) + ]) + + let e1 = scheduler.createHotObservable([ + next(150, 1), + next(235, 6), + next(240, 7), + error(245, testError) + ]) + + let res = scheduler.start { + [e0, e1].combineLatest { $0.reduce(0, combine:+) } + } + + XCTAssertEqual(res.messages, [ + next(235, 4 + 6), + next(240, 4 + 7), + error(245, testError) + ]) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 245)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 245)]) + } + + func testCombineLatest_SelectorThrowsN() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + next(215, 2), + completed(230) + ]) + + let e1 = scheduler.createHotObservable([ + next(150, 1), + next(220, 3), + completed(240) + ]) + + let res = scheduler.start { + [e0, e1].combineLatest { x throws -> Int in throw testError } + } + + XCTAssertEqual(res.messages, [ + error(220, testError) + ]) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 220)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 220)]) + } + + func testCombineLatest_willNeverBeAbleToCombineN() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + completed(250) + ]) + + let e1 = scheduler.createHotObservable([ + next(150, 1), + completed(260) + ]) + + let e2 = scheduler.createHotObservable([ + next(150, 1), + next(500, 2), + completed(800) + ]) + + let res = scheduler.start { + [e0, e1, e2].combineLatest { _ in 42 } + } + + XCTAssertEqual(res.messages, [ + completed(500) + ]) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 250)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 260)]) + XCTAssertEqual(e2.subscriptions, [Subscription(200, 500)]) + } + + func testCombineLatest_typicalN() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + next(210, 1), + next(410, 4), + completed(800) + ]) + let e1 = scheduler.createHotObservable([ + next(150, 1), + next(220, 2), + next(420, 5), + completed(800) + ]) + let e2 = scheduler.createHotObservable([ + next(150, 1), + next(230, 3), + next(430, 6), + completed(800) + ]) + + let res = scheduler.start { + [e0, e1, e2].combineLatest { $0.reduce(0, combine:+) } + } + + XCTAssertEqual(res.messages, [ + next(230, 6), + next(410, 9), + next(420, 12), + next(430, 15), + completed(800) + ]) + + for e in [e0, e1, e2] { + XCTAssertEqual(e.subscriptions, [Subscription(200, 800)]) + } + } + + func testCombineLatest_NAry_symmetric() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + next(210, 1), + next(250, 4), + completed(420) + ]) + let e1 = scheduler.createHotObservable([ + next(150, 1), + next(220, 2), + next(240, 5), + completed(410) + ]) + let e2 = scheduler.createHotObservable([ + next(150, 1), + next(230, 3), + next(260, 6), + completed(400) + ]) + + let res = scheduler.start { + [e0, e1, e2].combineLatest { EquatableArray($0) } + } + + XCTAssertEqual(res.messages, [ + next(230, EquatableArray([1, 2, 3])), + next(240, EquatableArray([1, 5, 3])), + next(250, EquatableArray([4, 5, 3])), + next(260, EquatableArray([4, 5, 6])), + completed(420) + ]) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 420)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 410)]) + XCTAssertEqual(e2.subscriptions, [Subscription(200, 400)]) + + } + + func testCombineLatest_NAry_asymmetric() { + let scheduler = TestScheduler(initialClock: 0) + + let e0 = scheduler.createHotObservable([ + next(150, 1), + next(210, 1), + next(250, 4), + completed(270) + ]) + let e1 = scheduler.createHotObservable([ + next(150, 1), + next(220, 2), + next(240, 5), + next(290, 7), + next(310, 9), + completed(410) + ]) + let e2 = scheduler.createHotObservable([ + next(150, 1), + next(230, 3), + next(260, 6), + next(280, 8), + completed(300) + ]) + + let res = scheduler.start { + [e0, e1, e2].combineLatest { EquatableArray($0) } + } + + XCTAssertEqual(res.messages, [ + next(230, EquatableArray([1, 2, 3])), + next(240, EquatableArray([1, 5, 3])), + next(250, EquatableArray([4, 5, 3])), + next(260, EquatableArray([4, 5, 6])), + next(280, EquatableArray([4, 5, 8])), + next(290, EquatableArray([4, 7, 8])), + next(310, EquatableArray([4, 9, 8])), + completed(410) + ]) + + XCTAssertEqual(e0.subscriptions, [Subscription(200, 270)]) + XCTAssertEqual(e1.subscriptions, [Subscription(200, 410)]) + XCTAssertEqual(e2.subscriptions, [Subscription(200, 300)]) + + } +} \ No newline at end of file