Adds collection version of `combineLatest`.

This commit is contained in:
Krunoslav Zaher 2015-08-30 17:49:35 +02:00
parent d8b345fca9
commit 8e16a8fad4
7 changed files with 747 additions and 8 deletions

View File

@ -278,6 +278,8 @@
C8093F501B8A732E0088E94D /* RxCocoa.h in Headers */ = {isa = PBXBuildFile; fileRef = C8093ECB1B8A732E0088E94D /* RxCocoa.h */; settings = {ATTRIBUTES = (Public, ); }; };
C8093F5E1B8A73A20088E94D /* Observable+Blocking.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093F581B8A73A20088E94D /* Observable+Blocking.swift */; };
C8093F5F1B8A73A20088E94D /* Observable+Blocking.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093F581B8A73A20088E94D /* Observable+Blocking.swift */; };
C80D342E1B9245A40014629D /* CombineLatest+CollectionType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C80D342D1B9245A40014629D /* CombineLatest+CollectionType.swift */; settings = {ASSET_TAGS = (); }; };
C80D342F1B9245A40014629D /* CombineLatest+CollectionType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C80D342D1B9245A40014629D /* CombineLatest+CollectionType.swift */; settings = {ASSET_TAGS = (); }; };
C88254151B8A752B00B02D69 /* CoreDataEntityEvent.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253EF1B8A752B00B02D69 /* CoreDataEntityEvent.swift */; };
C88254161B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F11B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift */; };
C88254171B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F21B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift */; };
@ -468,6 +470,7 @@
C8093ECB1B8A732E0088E94D /* RxCocoa.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = RxCocoa.h; sourceTree = "<group>"; };
C8093F581B8A73A20088E94D /* Observable+Blocking.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Observable+Blocking.swift"; sourceTree = "<group>"; xcLanguageSpecificationIdentifier = xcode.lang.swift; };
C8093F591B8A73A20088E94D /* README.md */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = net.daringfireball.markdown; path = README.md; sourceTree = "<group>"; };
C80D342D1B9245A40014629D /* CombineLatest+CollectionType.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "CombineLatest+CollectionType.swift"; sourceTree = "<group>"; };
C88253EF1B8A752B00B02D69 /* CoreDataEntityEvent.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CoreDataEntityEvent.swift; sourceTree = "<group>"; };
C88253F11B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxCollectionViewReactiveArrayDataSource.swift; sourceTree = "<group>"; };
C88253F21B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxTableViewReactiveArrayDataSource.swift; sourceTree = "<group>"; };
@ -692,6 +695,7 @@
C8093C921B8A72BE0088E94D /* Zip+arity.swift */,
C8093C931B8A72BE0088E94D /* Zip+arity.tt */,
C8093C941B8A72BE0088E94D /* Zip.swift */,
C80D342D1B9245A40014629D /* CombineLatest+CollectionType.swift */,
);
path = Implementations;
sourceTree = "<group>";
@ -1333,6 +1337,7 @@
C8093CEA1B8A72BE0088E94D /* ScopedDispose.swift in Sources */,
C8093D261B8A72BE0088E94D /* Multicast.swift in Sources */,
C8093D861B8A72BE0088E94D /* Rx.swift in Sources */,
C80D342F1B9245A40014629D /* CombineLatest+CollectionType.swift in Sources */,
C8093DA61B8A72BE0088E94D /* SubjectType.swift in Sources */,
C8093D5C1B8A72BE0088E94D /* Observable+Debug.swift in Sources */,
C8093D581B8A72BE0088E94D /* Observable+Concurrency.swift in Sources */,
@ -1447,6 +1452,7 @@
C8093CE91B8A72BE0088E94D /* ScopedDispose.swift in Sources */,
C8093D251B8A72BE0088E94D /* Multicast.swift in Sources */,
C8093D851B8A72BE0088E94D /* Rx.swift in Sources */,
C80D342E1B9245A40014629D /* CombineLatest+CollectionType.swift in Sources */,
C8093DA51B8A72BE0088E94D /* SubjectType.swift in Sources */,
C8093D5B1B8A72BE0088E94D /* Observable+Debug.swift in Sources */,
C8093D571B8A72BE0088E94D /* Observable+Concurrency.swift in Sources */,

View File

@ -276,6 +276,7 @@
C8A468F21B8A8C2600BF917B /* RxCocoa.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = C8A468ED1B8A8BCC00BF917B /* RxCocoa.framework */; };
C8A468F31B8A8C2600BF917B /* RxSwift.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = C8A468EB1B8A8BC900BF917B /* RxSwift.framework */; };
C8A57F741B40AF7C00D5570A /* Random.xcdatamodeld in Sources */ = {isa = PBXBuildFile; fileRef = C8A57F721B40AF7C00D5570A /* Random.xcdatamodeld */; };
C8C3D9FC1B935D1E004D233E /* CombineLatest+CollectionType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3D9FB1B935D1E004D233E /* CombineLatest+CollectionType.swift */; settings = {ASSET_TAGS = (); }; };
C8C46DA81B47F7110020D71E /* CollectionViewImageCell.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C46DA31B47F7110020D71E /* CollectionViewImageCell.swift */; };
C8C46DA91B47F7110020D71E /* WikipediaImageCell.xib in Resources */ = {isa = PBXBuildFile; fileRef = C8C46DA41B47F7110020D71E /* WikipediaImageCell.xib */; };
C8C46DAA1B47F7110020D71E /* WikipediaSearchCell.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C46DA51B47F7110020D71E /* WikipediaSearchCell.swift */; };
@ -435,7 +436,6 @@
C836EBF51B8A7A4500AB941D /* ObserverType+Extensions.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "ObserverType+Extensions.swift"; sourceTree = "<group>"; };
C836EBF61B8A7A4500AB941D /* ObserverType.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ObserverType.swift; sourceTree = "<group>"; };
C836EBF71B8A7A4500AB941D /* PeriodicScheduler.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = PeriodicScheduler.swift; sourceTree = "<group>"; };
C836EBF91B8A7A4500AB941D /* Rx.pch */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = Rx.pch; sourceTree = "<group>"; };
C836EBFA1B8A7A4500AB941D /* Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Rx.swift; sourceTree = "<group>"; };
C836EBFB1B8A7A4500AB941D /* RxBox.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxBox.swift; sourceTree = "<group>"; };
C836EBFC1B8A7A4500AB941D /* RxResult.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxResult.swift; sourceTree = "<group>"; };
@ -543,6 +543,7 @@
C8A468EF1B8A8BD000BF917B /* RxBlocking.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; };
C8A57F731B40AF7C00D5570A /* Random.xcdatamodel */ = {isa = PBXFileReference; lastKnownFileType = wrapper.xcdatamodel; path = Random.xcdatamodel; sourceTree = "<group>"; };
C8AF26F11B49ABD300131C03 /* README.md */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = net.daringfireball.markdown; path = README.md; sourceTree = "<group>"; };
C8C3D9FB1B935D1E004D233E /* CombineLatest+CollectionType.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "CombineLatest+CollectionType.swift"; sourceTree = "<group>"; };
C8C46DA31B47F7110020D71E /* CollectionViewImageCell.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CollectionViewImageCell.swift; sourceTree = "<group>"; };
C8C46DA41B47F7110020D71E /* WikipediaImageCell.xib */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = file.xib; path = WikipediaImageCell.xib; sourceTree = "<group>"; };
C8C46DA51B47F7110020D71E /* WikipediaSearchCell.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WikipediaSearchCell.swift; sourceTree = "<group>"; };
@ -740,7 +741,6 @@
C836EBF51B8A7A4500AB941D /* ObserverType+Extensions.swift */,
C836EBF61B8A7A4500AB941D /* ObserverType.swift */,
C836EBF71B8A7A4500AB941D /* PeriodicScheduler.swift */,
C836EBF91B8A7A4500AB941D /* Rx.pch */,
C836EBFA1B8A7A4500AB941D /* Rx.swift */,
C836EBFB1B8A7A4500AB941D /* RxBox.swift */,
C836EBFC1B8A7A4500AB941D /* RxResult.swift */,
@ -817,9 +817,10 @@
C836EBB71B8A7A4500AB941D /* AnonymousObservable.swift */,
C836EBB81B8A7A4500AB941D /* AsObservable.swift */,
C836EBB91B8A7A4500AB941D /* Catch.swift */,
C836EBBC1B8A7A4500AB941D /* CombineLatest.swift */,
C836EBBA1B8A7A4500AB941D /* CombineLatest+arity.swift */,
C836EBBB1B8A7A4500AB941D /* CombineLatest+arity.tt */,
C836EBBC1B8A7A4500AB941D /* CombineLatest.swift */,
C8C3D9FB1B935D1E004D233E /* CombineLatest+CollectionType.swift */,
C836EBBD1B8A7A4500AB941D /* Concat.swift */,
C836EBBF1B8A7A4500AB941D /* ConnectableObservable.swift */,
C836EBC01B8A7A4500AB941D /* Debug.swift */,
@ -851,9 +852,9 @@
C836EBDA1B8A7A4500AB941D /* TakeWhile.swift */,
C836EBDB1B8A7A4500AB941D /* Throttle.swift */,
C836EBDC1B8A7A4500AB941D /* Timer.swift */,
C836EBDF1B8A7A4500AB941D /* Zip.swift */,
C836EBDD1B8A7A4500AB941D /* Zip+arity.swift */,
C836EBDE1B8A7A4500AB941D /* Zip+arity.tt */,
C836EBDF1B8A7A4500AB941D /* Zip.swift */,
);
path = Implementations;
sourceTree = "<group>";
@ -1510,6 +1511,7 @@
C836EC3A1B8A7A4500AB941D /* Do.swift in Sources */,
C836ECFB1B8A7AA600AB941D /* UISegmentedControl+Rx.swift in Sources */,
C836EC431B8A7A4500AB941D /* ObserveSingleOn.swift in Sources */,
C8C3D9FC1B935D1E004D233E /* CombineLatest+CollectionType.swift in Sources */,
C836EC0F1B8A7A4500AB941D /* Cancelable.swift in Sources */,
C836EC331B8A7A4500AB941D /* Concat.swift in Sources */,
C836ECDA1B8A7AA600AB941D /* NSURLSession+Rx.swift in Sources */,

View File

@ -0,0 +1,123 @@
//
// CombineLatest+CollectionType.swift
// Rx
//
// Created by Krunoslav Zaher on 8/29/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
class CombineLatestCollectionTypeSink<C: CollectionType, R, O: ObserverType where C.Generator.Element : ObservableType, O.E == R> : Sink<O> {
typealias Parent = CombineLatestCollectionType<C, R>
typealias SourceElement = C.Generator.Element.E
let parent: Parent
let lock = NSRecursiveLock()
// state
var numberOfValues = 0
var values: [SourceElement?]
var isDone: [Bool]
var numberOfDone = 0
var subscriptions: [SingleAssignmentDisposable]
init(parent: Parent, observer: O, cancel: Disposable) {
self.parent = parent
self.values = [SourceElement?](count: parent.count, repeatedValue: nil)
self.isDone = [Bool](count: parent.count, repeatedValue: false)
self.subscriptions = Array<SingleAssignmentDisposable>()
self.subscriptions.reserveCapacity(parent.count)
for _ in 0 ..< parent.count {
self.subscriptions.append(SingleAssignmentDisposable())
}
super.init(observer: observer, cancel: cancel)
}
func on(event: Event<SourceElement>, atIndex: Int) {
lock.performLocked {
switch event {
case .Next(let element):
if values[atIndex] == nil {
numberOfValues++
}
values[atIndex] = element
if numberOfValues < parent.count {
let numberOfOthersThatAreDone = self.numberOfDone - (isDone[atIndex] ? 1 : 0)
if numberOfOthersThatAreDone == self.parent.count - 1 {
self.observer?.on(.Completed)
self.dispose()
}
return
}
do {
let result = try parent.resultSelector(values.map { $0! })
self.observer?.on(.Next(result))
}
catch let error {
self.observer?.on(.Error(error))
self.dispose()
}
case .Error(let error):
self.observer?.on(.Error(error))
self.dispose()
case .Completed:
if isDone[atIndex] {
return
}
isDone[atIndex] = true
numberOfDone++
if numberOfDone == self.parent.count {
self.observer?.on(.Completed)
self.dispose()
}
else {
self.subscriptions[atIndex].dispose()
}
}
}
}
func run() -> Disposable {
var j = 0
for i in parent.sources.startIndex ..< parent.sources.endIndex {
let index = j
self.subscriptions[j].disposable = self.parent.sources[i].subscribeSafe(ObserverOf { event in
self.on(event, atIndex: index)
})
j++
}
return CompositeDisposable(disposables: self.subscriptions.map { $0 })
}
}
class CombineLatestCollectionType<C: CollectionType, R where C.Generator.Element : ObservableType> : Producer<R> {
typealias ResultSelector = [C.Generator.Element.E] throws -> R
let sources: C
let resultSelector: ResultSelector
let count: Int
init(sources: C, resultSelector: ResultSelector) {
self.sources = sources
self.resultSelector = resultSelector
self.count = Int(self.sources.count.toIntMax())
}
override func run<O : ObserverType where O.E == R>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = CombineLatestCollectionTypeSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
}
}

View File

@ -8,6 +8,14 @@
import Foundation
// combineLatest
extension CollectionType where Generator.Element : ObservableType {
public func combineLatest<R>(resultSelector: [Generator.Element.E] throws -> R) -> Observable<R> {
return CombineLatestCollectionType(sources: self, resultSelector: resultSelector)
}
}
// switch
extension ObservableType where E : ObservableType {

View File

@ -40,4 +40,17 @@ struct Recorded<Element : Equatable> : CustomStringConvertible, Equatable {
func == <T: Equatable>(lhs: Recorded<T>, rhs: Recorded<T>) -> Bool {
return lhs.time == rhs.time && lhs.event == rhs.event
}
// workaround for swift compiler bug
struct EquatableArray<Element: Equatable> : Equatable {
let elements: [Element]
init(_ elements: [Element]) {
self.elements = elements
}
}
func == <E: Equatable>(lhs: EquatableArray<E>, rhs: EquatableArray<E>) -> Bool {
return lhs.elements == rhs.elements
}

View File

@ -35,10 +35,6 @@ class TestScheduler : VirtualTimeSchedulerBase {
super.init(initialClock: initialClock)
}
func advanceTimeFor(interval: Time) {
}
func createHotObservable<Element>(events: [Recorded<Element>]) -> HotObservable<Element> {
return HotObservable(testScheduler: self as AnyObject as! TestScheduler, recordedEvents: events)
}

View File

@ -2843,3 +2843,594 @@ extension ObservableMultipleTest {
])
}
}
// combineLatest + CollectionType
extension ObservableMultipleTest {
func testCombineLatest_NeverN() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1)
])
let e1 = scheduler.createHotObservable([
next(150, 1)
])
let e2 = scheduler.createHotObservable([
next(150, 1)
])
let res = scheduler.start {
[e0, e1, e2].combineLatest { $0.reduce(0, combine:+) }
}
XCTAssertEqual(res.messages, [])
for e in [e0, e1, e2] {
XCTAssertEqual(e.subscriptions, [Subscription(200, 1000)])
}
}
func testCombineLatest_NeverEmptyN() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1)
])
let e1 = scheduler.createHotObservable([
next(150, 1),
completed(210)
])
let res = scheduler.start {
[e0, e1].combineLatest { $0.reduce(0, combine:+) }
}
XCTAssertEqual(res.messages, [])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 1000)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 210)])
}
func testCombineLatest_EmptyNeverN() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
completed(210)
])
let e1 = scheduler.createHotObservable([
next(150, 1)
])
let res = scheduler.start {
[e0, e1].combineLatest { $0.reduce(0, combine:+) }
}
XCTAssertEqual(res.messages, [])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 210)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 1000)])
}
func testCombineLatest_EmptyReturnN() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
completed(210)
])
let e1 = scheduler.createHotObservable([
next(150, 1),
next(215, 2),
completed(220)
])
let res = scheduler.start {
([e0, e1] as [HotObservable<Int>]).combineLatest { $0.reduce(0, combine:+) }
}
XCTAssertEqual(res.messages, [
completed(215)
])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 210)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 215)])
}
func testCombineLatest_ReturnReturnN() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
next(215, 2),
completed(230)
])
let e1 = scheduler.createHotObservable([
next(150, 1),
next(220, 3),
completed(240)
])
let res = scheduler.start {
[e0, e1].combineLatest { $0.reduce(0, combine:+) }
}
XCTAssertEqual(res.messages, [
next(220, 2 + 3),
completed(240)
])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 230)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 240)])
}
func testCombineLatest_EmptyErrorN() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
completed(230)
])
let e1 = scheduler.createHotObservable([
next(150, 1),
error(220, testError),
])
let res = scheduler.start {
[e0, e1].combineLatest { $0.reduce(0, combine:+) }
}
XCTAssertEqual(res.messages, [
error(220, testError)
])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 220)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 220)])
}
func testCombineLatest_ReturnErrorN() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
next(210, 2),
completed(230)
])
let e1 = scheduler.createHotObservable([
next(150, 1),
error(220, testError),
])
let res = scheduler.start {
[e0, e1].combineLatest { $0.reduce(0, combine:+) }
}
XCTAssertEqual(res.messages, [
error(220, testError)
])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 220)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 220)])
}
func testCombineLatest_ErrorErrorN() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
error(220, testError1)
])
let e1 = scheduler.createHotObservable([
next(150, 1),
error(230, testError2),
])
let res = scheduler.start {
[e0, e1].combineLatest { $0.reduce(0, combine:+) }
}
XCTAssertEqual(res.messages, [
error(220, testError1)
])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 220)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 220)])
}
func testCombineLatest_NeverErrorN() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
])
let e1 = scheduler.createHotObservable([
next(150, 1),
error(220, testError2),
])
let res = scheduler.start {
[e0, e1].combineLatest { $0.reduce(0, combine:+) }
}
XCTAssertEqual(res.messages, [
error(220, testError2)
])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 220)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 220)])
}
func testCombineLatest_SomeErrorN() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
next(215, 2),
completed(230)
])
let e1 = scheduler.createHotObservable([
next(150, 1),
error(220, testError2),
])
let res = scheduler.start {
[e0, e1].combineLatest { $0.reduce(0, combine:+) }
}
XCTAssertTrue(res.messages == [
error(220, testError2)
])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 220)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 220)])
}
func testCombineLatest_ErrorAfterCompletedN() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
next(215, 2),
completed(220)
])
let e1 = scheduler.createHotObservable([
next(150, 1),
error(230, testError2),
])
let res = scheduler.start {
[e0, e1].combineLatest { $0.reduce(0, combine:+) }
}
XCTAssertTrue(res.messages == [
error(230, testError2)
])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 220)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 230)])
}
func testCombineLatest_InterleavedWithTailN() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
next(215, 2),
next(225, 4),
completed(230)
])
let e1 = scheduler.createHotObservable([
next(150, 1),
next(220, 3),
next(230, 5),
next(235, 6),
next(240, 7),
completed(250)
])
let res = scheduler.start {
[e0, e1].combineLatest { $0.reduce(0, combine:+) }
}
XCTAssertEqual(res.messages, [
next(220, 2 + 3),
next(225, 3 + 4),
next(230, 4 + 5),
next(235, 4 + 6),
next(240, 4 + 7),
completed(250)
] as [Recorded<Int>])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 230)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 250)])
}
func testCombineLatest_ConsecutiveN() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
next(215, 2),
next(225, 4),
completed(230)
])
let e1 = scheduler.createHotObservable([
next(150, 1),
next(235, 6),
next(240, 7),
completed(250)
])
let res = scheduler.start {
[e0, e1].combineLatest { $0.reduce(0, combine:+) }
}
XCTAssertEqual(res.messages, [
next(235, 4 + 6),
next(240, 4 + 7),
completed(250)
])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 230)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 250)])
}
func testCombineLatest_ConsecutiveNWithErrorLeft() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
next(215, 2),
next(225, 4),
error(230, testError)
])
let e1 = scheduler.createHotObservable([
next(150, 1),
next(235, 6),
next(240, 7),
completed(250)
])
let res = scheduler.start {
[e0, e1].combineLatest { $0.reduce(0, combine:+) }
}
XCTAssertEqual(res.messages, [
error(230, testError)
])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 230)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 230)])
}
func testCombineLatest_ConsecutiveNWithErrorRight() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
next(215, 2),
next(225, 4),
completed(250)
])
let e1 = scheduler.createHotObservable([
next(150, 1),
next(235, 6),
next(240, 7),
error(245, testError)
])
let res = scheduler.start {
[e0, e1].combineLatest { $0.reduce(0, combine:+) }
}
XCTAssertEqual(res.messages, [
next(235, 4 + 6),
next(240, 4 + 7),
error(245, testError)
])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 245)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 245)])
}
func testCombineLatest_SelectorThrowsN() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
next(215, 2),
completed(230)
])
let e1 = scheduler.createHotObservable([
next(150, 1),
next(220, 3),
completed(240)
])
let res = scheduler.start {
[e0, e1].combineLatest { x throws -> Int in throw testError }
}
XCTAssertEqual(res.messages, [
error(220, testError)
])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 220)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 220)])
}
func testCombineLatest_willNeverBeAbleToCombineN() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
completed(250)
])
let e1 = scheduler.createHotObservable([
next(150, 1),
completed(260)
])
let e2 = scheduler.createHotObservable([
next(150, 1),
next(500, 2),
completed(800)
])
let res = scheduler.start {
[e0, e1, e2].combineLatest { _ in 42 }
}
XCTAssertEqual(res.messages, [
completed(500)
])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 250)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 260)])
XCTAssertEqual(e2.subscriptions, [Subscription(200, 500)])
}
func testCombineLatest_typicalN() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
next(210, 1),
next(410, 4),
completed(800)
])
let e1 = scheduler.createHotObservable([
next(150, 1),
next(220, 2),
next(420, 5),
completed(800)
])
let e2 = scheduler.createHotObservable([
next(150, 1),
next(230, 3),
next(430, 6),
completed(800)
])
let res = scheduler.start {
[e0, e1, e2].combineLatest { $0.reduce(0, combine:+) }
}
XCTAssertEqual(res.messages, [
next(230, 6),
next(410, 9),
next(420, 12),
next(430, 15),
completed(800)
])
for e in [e0, e1, e2] {
XCTAssertEqual(e.subscriptions, [Subscription(200, 800)])
}
}
func testCombineLatest_NAry_symmetric() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
next(210, 1),
next(250, 4),
completed(420)
])
let e1 = scheduler.createHotObservable([
next(150, 1),
next(220, 2),
next(240, 5),
completed(410)
])
let e2 = scheduler.createHotObservable([
next(150, 1),
next(230, 3),
next(260, 6),
completed(400)
])
let res = scheduler.start {
[e0, e1, e2].combineLatest { EquatableArray($0) }
}
XCTAssertEqual(res.messages, [
next(230, EquatableArray([1, 2, 3])),
next(240, EquatableArray([1, 5, 3])),
next(250, EquatableArray([4, 5, 3])),
next(260, EquatableArray([4, 5, 6])),
completed(420)
])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 420)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 410)])
XCTAssertEqual(e2.subscriptions, [Subscription(200, 400)])
}
func testCombineLatest_NAry_asymmetric() {
let scheduler = TestScheduler(initialClock: 0)
let e0 = scheduler.createHotObservable([
next(150, 1),
next(210, 1),
next(250, 4),
completed(270)
])
let e1 = scheduler.createHotObservable([
next(150, 1),
next(220, 2),
next(240, 5),
next(290, 7),
next(310, 9),
completed(410)
])
let e2 = scheduler.createHotObservable([
next(150, 1),
next(230, 3),
next(260, 6),
next(280, 8),
completed(300)
])
let res = scheduler.start {
[e0, e1, e2].combineLatest { EquatableArray($0) }
}
XCTAssertEqual(res.messages, [
next(230, EquatableArray([1, 2, 3])),
next(240, EquatableArray([1, 5, 3])),
next(250, EquatableArray([4, 5, 3])),
next(260, EquatableArray([4, 5, 6])),
next(280, EquatableArray([4, 5, 8])),
next(290, EquatableArray([4, 7, 8])),
next(310, EquatableArray([4, 9, 8])),
completed(410)
])
XCTAssertEqual(e0.subscriptions, [Subscription(200, 270)])
XCTAssertEqual(e1.subscriptions, [Subscription(200, 410)])
XCTAssertEqual(e2.subscriptions, [Subscription(200, 300)])
}
}