Add WithLatestFrom operator

This commit is contained in:
yury 2015-10-20 12:43:35 +03:00
parent 9a182a8f50
commit e453820c08
5 changed files with 431 additions and 1 deletions

View File

@ -585,6 +585,10 @@
D2138C981BB9BEEE00339B5C /* RxCocoa.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093E9B1B8A732E0088E94D /* RxCocoa.swift */; };
D2138C991BB9BEEE00339B5C /* RxTarget.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093E9C1B8A732E0088E94D /* RxTarget.swift */; };
D21C29311BC6A1C300448E70 /* SkipUntil.swift in Sources */ = {isa = PBXBuildFile; fileRef = D285BAC31BC0231000B3F602 /* SkipUntil.swift */; };
D2245A1B1BD5657300E7146F /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */; };
D2245A1C1BD63C4600E7146F /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */; };
D2245A1D1BD63C4700E7146F /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */; };
D2245A1E1BD63C4A00E7146F /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */; };
D22B6D261BC8504A00BCE0AB /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; };
D235B23E1BD003DD007E84DA /* Using.swift in Sources */ = {isa = PBXBuildFile; fileRef = D235B23D1BD003DD007E84DA /* Using.swift */; };
D235B23F1BD003DD007E84DA /* Using.swift in Sources */ = {isa = PBXBuildFile; fileRef = D235B23D1BD003DD007E84DA /* Using.swift */; };
@ -965,6 +969,7 @@
C8F0C04B1BBBFBB9001B112F /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; };
C8F0C0581BBBFBCE001B112F /* RxBlocking.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; };
D2138C751BB9BE9800339B5C /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; };
D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = "<group>"; };
D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipWhile.swift; sourceTree = "<group>"; };
D235B23D1BD003DD007E84DA /* Using.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Using.swift; sourceTree = "<group>"; };
D285BAC31BC0231000B3F602 /* SkipUntil.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipUntil.swift; sourceTree = "<group>"; };
@ -1151,6 +1156,7 @@
C8093C6A1B8A72BE0088E94D /* Implementations */ = {
isa = PBXGroup;
children = (
D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */,
C8093C6B1B8A72BE0088E94D /* Amb.swift */,
C8C3DA111B93A3EA004D233E /* AnonymousObservable.swift */,
C8093C6D1B8A72BE0088E94D /* AsObservable.swift */,
@ -2084,6 +2090,7 @@
C8093D3C1B8A72BE0088E94D /* Skip.swift in Sources */,
C8B144FC1BD2D44500267DCE /* ConcurrentMainScheduler.swift in Sources */,
C8093CF01B8A72BE0088E94D /* StableCompositeDisposable.swift in Sources */,
D2245A1C1BD63C4600E7146F /* WithLatestFrom.swift in Sources */,
C8093D4E1B8A72BE0088E94D /* Zip+arity.swift in Sources */,
C8093D4C1B8A72BE0088E94D /* Timer.swift in Sources */,
C8C3DA071B9393AC004D233E /* Empty.swift in Sources */,
@ -2203,6 +2210,7 @@
C8093D3B1B8A72BE0088E94D /* Skip.swift in Sources */,
C8B144FB1BD2D44500267DCE /* ConcurrentMainScheduler.swift in Sources */,
C8093CEF1B8A72BE0088E94D /* StableCompositeDisposable.swift in Sources */,
D2245A1B1BD5657300E7146F /* WithLatestFrom.swift in Sources */,
C8093D4D1B8A72BE0088E94D /* Zip+arity.swift in Sources */,
C8093D4B1B8A72BE0088E94D /* Timer.swift in Sources */,
C8C3DA061B9393AC004D233E /* Empty.swift in Sources */,
@ -2322,6 +2330,7 @@
C8F0BF9B1BBBFB8B001B112F /* Skip.swift in Sources */,
C8B144FE1BD2D44500267DCE /* ConcurrentMainScheduler.swift in Sources */,
C8F0BF9C1BBBFB8B001B112F /* StableCompositeDisposable.swift in Sources */,
D2245A1E1BD63C4A00E7146F /* WithLatestFrom.swift in Sources */,
C8F0BF9D1BBBFB8B001B112F /* Zip+arity.swift in Sources */,
C8F0BF9E1BBBFB8B001B112F /* Timer.swift in Sources */,
C8F0BF9F1BBBFB8B001B112F /* Empty.swift in Sources */,
@ -2589,6 +2598,7 @@
D2EBEAF11BB9B6AE003A27DC /* BinaryDisposable.swift in Sources */,
C8B144FD1BD2D44500267DCE /* ConcurrentMainScheduler.swift in Sources */,
D2EBEB1B1BB9B6C1003A27DC /* Repeat.swift in Sources */,
D2245A1D1BD63C4700E7146F /* WithLatestFrom.swift in Sources */,
D2EBEAF81BB9B6B2003A27DC /* ScopedDisposable.swift in Sources */,
D2EBEAEA1BB9B697003A27DC /* SchedulerType.swift in Sources */,
D2EBEB031BB9B6C1003A27DC /* CombineLatest+CollectionType.swift in Sources */,

View File

@ -311,7 +311,8 @@
C8DF92EA1B0B38C0009BCF9A /* Images.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = C8DF92E91B0B38C0009BCF9A /* Images.xcassets */; };
C8DF92EB1B0B38C0009BCF9A /* Images.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = C8DF92E91B0B38C0009BCF9A /* Images.xcassets */; };
C8DF92F61B0B43A4009BCF9A /* IntroductionExampleViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8DF92F51B0B43A4009BCF9A /* IntroductionExampleViewController.swift */; };
C8E9D2AF1BD3FD960079D0DB /* ActivityIndicator.swift in Sources */ = {isa = PBXBuildFile; fileRef = C80397391BD3E17D009D8B26 /* ActivityIndicator.swift */; settings = {ASSET_TAGS = (); }; };
C8E9D2AF1BD3FD960079D0DB /* ActivityIndicator.swift in Sources */ = {isa = PBXBuildFile; fileRef = C80397391BD3E17D009D8B26 /* ActivityIndicator.swift */; };
D2245A191BD5654C00E7146F /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */; };
D2AF91981BD3D95900A008C1 /* Using.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2AF91881BD2C51900A008C1 /* Using.swift */; };
EC91FB951BBA144400973245 /* GitHubSearchRepositoriesViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = EC91FB941BBA144400973245 /* GitHubSearchRepositoriesViewController.swift */; };
/* End PBXBuildFile section */
@ -688,6 +689,7 @@
C8DF92F01B0B3E67009BCF9A /* Info-OSX.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; path = "Info-OSX.plist"; sourceTree = "<group>"; };
C8DF92F21B0B3E71009BCF9A /* Info-iOS.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; path = "Info-iOS.plist"; sourceTree = "<group>"; };
C8DF92F51B0B43A4009BCF9A /* IntroductionExampleViewController.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = IntroductionExampleViewController.swift; sourceTree = "<group>"; xcLanguageSpecificationIdentifier = xcode.lang.swift; };
D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = "<group>"; };
D2AF91881BD2C51900A008C1 /* Using.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Using.swift; sourceTree = "<group>"; };
EC91FB941BBA144400973245 /* GitHubSearchRepositoriesViewController.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = GitHubSearchRepositoriesViewController.swift; sourceTree = "<group>"; };
/* End PBXFileReference section */
@ -1135,6 +1137,7 @@
C89464761BC6C2B00055219D /* Zip+arity.swift */,
C89464771BC6C2B00055219D /* Zip+arity.tt */,
C89464781BC6C2B00055219D /* Zip+CollectionType.swift */,
D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */,
);
path = Implementations;
sourceTree = "<group>";
@ -1707,6 +1710,7 @@
C89464B11BC6C2B00055219D /* SingleAssignmentDisposable.swift in Sources */,
C89464AA1BC6C2B00055219D /* DisposeBase.swift in Sources */,
C89465871BC6C2BC0055219D /* RxCollectionViewDelegateProxy.swift in Sources */,
D2245A191BD5654C00E7146F /* WithLatestFrom.swift in Sources */,
C8297E3D1B6CF905000589EA /* SearchViewModel.swift in Sources */,
C89464E61BC6C2B00055219D /* Timer.swift in Sources */,
C8297E3E1B6CF905000589EA /* DetailViewController.swift in Sources */,

View File

@ -0,0 +1,131 @@
//
// WithLatestFrom.swift
// RxExample
//
// Created by Yury Korolev on 10/19/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
class WithLatestFromSink<FirstO: ObservableType, SecondO: ObservableType, ResultType, O: ObserverType where O.E == ResultType > : Sink<O> {
typealias Parent = WithLatestFrom<FirstO, SecondO, ResultType>
typealias SecondType = SecondO.E
private let _parent: Parent
private var _lock = NSRecursiveLock()
private var _latest: SecondO.E?
init(parent: Parent, observer: O, cancel: Disposable) {
_parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
let sndSubscription = SingleAssignmentDisposable()
let fstO = WithLatestFromFirst(parent: self)
let sndO = WithLatestFromSecond(parent: self, disposable: sndSubscription)
let fstSubscription = _parent._first.subscribeSafe(fstO)
sndSubscription.disposable = _parent._second.subscribeSafe(sndO)
return StableCompositeDisposable.create(fstSubscription, sndSubscription)
}
}
class WithLatestFromFirst<FirstO: ObservableType, SecondO: ObservableType, ResultType, O: ObserverType where O.E == ResultType>: ObserverType {
typealias Parent = WithLatestFromSink<FirstO, SecondO, ResultType, O>
typealias E = FirstO.E
private let _parent: Parent
init(parent: Parent) {
_parent = parent
}
func on(event: Event<E>) {
switch event {
case let .Next(value):
guard let latest = _parent._latest else { return }
do {
let res = try _parent._parent._resultSelector(value, latest)
_parent._lock.performLocked {
_parent.observer?.onNext(res)
}
} catch let e {
_parent._lock.performLocked {
_parent.observer?.onError(e)
_parent.dispose()
}
}
case .Completed:
_parent._lock.performLocked {
_parent.observer?.onComplete()
_parent.dispose()
}
case let .Error(error):
_parent._lock.performLocked {
_parent.observer?.onError(error)
_parent.dispose()
}
}
}
}
class WithLatestFromSecond<FirstO: ObservableType, SecondO: ObservableType, ResultType, O: ObserverType where O.E == ResultType>: ObserverType {
typealias Parent = WithLatestFromSink<FirstO, SecondO, ResultType, O>
typealias E = SecondO.E
private let _parent: Parent
private let _disposable: Disposable
init(parent: Parent, disposable: Disposable) {
_parent = parent
_disposable = disposable
}
func on(event: Event<E>) {
switch event {
case let .Next(value):
_parent._latest = value
case .Completed:
_disposable.dispose()
case let .Error(error):
_parent._lock.performLocked {
_parent.observer?.onError(error)
_parent.dispose()
}
}
}
}
class WithLatestFrom<FirstO: ObservableType, SecondO: ObservableType, ResultType>: Producer<ResultType> {
typealias FirstType = FirstO.E
typealias SecondType = SecondO.E
typealias ResultSelector = (FirstType, SecondType) throws -> ResultType
private let _first: FirstO
private let _second: SecondO
private let _resultSelector: ResultSelector
init(first: FirstO, second: SecondO, resultSelector: ResultSelector) {
_first = first
_second = second
_resultSelector = resultSelector
}
override func run<O : ObserverType where O.E == ResultType>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = WithLatestFromSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
}
}

View File

@ -224,3 +224,19 @@ extension SequenceType where Generator.Element : ObservableConvertibleType {
}
}
}
// withLatestFrom
extension ObservableType {
/**
Merges two observable sequences into one observable sequence by combining each element from the first source with the latest element from the second source, if any.
- parameter second: Second observable source.
- parameter resultSelector: Function to invoke for each element from the self source combined with the latest element from the second source, if any.
- returns: An observable sequence containing the result of combining each element of the self source with the latest element from the second source, if any, using the specified result selector function.
*/
public func withLatestFrom<SecondO: ObservableType, ResultType>(second: SecondO, resultSelector: (E, SecondO.E) throws -> ResultType) -> Observable<ResultType> {
return WithLatestFrom(first: self, second: second, resultSelector: resultSelector)
}
}

View File

@ -3948,4 +3948,273 @@ extension ObservableMultipleTest {
XCTAssert(disposed, "disposed")
}
}
// MARK: withLatestFrom
extension ObservableMultipleTest {
func testWithLatestFrom_Simple1() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(90, 1),
next(180, 2),
next(250, 3),
next(260, 4),
next(310, 5),
next(340, 6),
next(410, 7),
next(420, 8),
next(470, 9),
next(550, 10),
completed(590)
])
let ys = scheduler.createHotObservable([
next(255, "bar"),
next(330, "foo"),
next(350, "qux"),
completed(400)
])
let res = scheduler.start {
xs.withLatestFrom(ys, resultSelector: { x, y in "\(x)\(y)"} )
}
XCTAssertEqual(res.messages, [
next(260, "4bar"),
next(310, "5bar"),
next(340, "6foo"),
next(410, "7qux"),
next(420, "8qux"),
next(470, "9qux"),
next(550, "10qux"),
completed(590)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 590)
])
XCTAssertEqual(ys.subscriptions, [
Subscription(200, 400)
])
}
func testWithLatestFrom_Simple2() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(90, 1),
next(180, 2),
next(250, 3),
next(260, 4),
next(310, 5),
next(340, 6),
completed(390)
])
let ys = scheduler.createHotObservable([
next(255, "bar"),
next(330, "foo"),
next(350, "qux"),
next(370, "baz"),
completed(400)
])
let res = scheduler.start {
xs.withLatestFrom(ys, resultSelector: { x, y in "\(x)\(y)"} )
}
XCTAssertEqual(res.messages, [
next(260, "4bar"),
next(310, "5bar"),
next(340, "6foo"),
completed(390)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 390)
])
XCTAssertEqual(ys.subscriptions, [
Subscription(200, 390)
])
}
func testWithLatestFrom_Simple3() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(90, 1),
next(180, 2),
next(250, 3),
next(260, 4),
next(310, 5),
next(340, 6),
completed(390)
])
let ys = scheduler.createHotObservable([
next(245, "bar"),
next(330, "foo"),
next(350, "qux"),
next(370, "baz"),
completed(400)
])
let res = scheduler.start {
xs.withLatestFrom(ys, resultSelector: { x, y in "\(x)\(y)"} )
}
XCTAssertEqual(res.messages, [
next(250, "3bar"),
next(260, "4bar"),
next(310, "5bar"),
next(340, "6foo"),
completed(390)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 390)
])
XCTAssertEqual(ys.subscriptions, [
Subscription(200, 390)
])
}
func testWithLatestFrom_Error1() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(90, 1),
next(180, 2),
next(250, 3),
next(260, 4),
next(310, 5),
next(340, 6),
next(410, 7),
next(420, 8),
next(470, 9),
next(550, 10),
error(590, testError)
])
let ys = scheduler.createHotObservable([
next(255, "bar"),
next(330, "foo"),
next(350, "qux"),
completed(400)
])
let res = scheduler.start {
xs.withLatestFrom(ys, resultSelector: { x, y in "\(x)\(y)"} )
}
XCTAssertEqual(res.messages, [
next(260, "4bar"),
next(310, "5bar"),
next(340, "6foo"),
next(410, "7qux"),
next(420, "8qux"),
next(470, "9qux"),
next(550, "10qux"),
error(590, testError)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 590)
])
XCTAssertEqual(ys.subscriptions, [
Subscription(200, 400)
])
}
func testWithLatestFrom_Error2() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(90, 1),
next(180, 2),
next(250, 3),
next(260, 4),
next(310, 5),
next(340, 6),
completed(390)
])
let ys = scheduler.createHotObservable([
next(255, "bar"),
next(330, "foo"),
next(350, "qux"),
error(370, testError)
])
let res = scheduler.start {
xs.withLatestFrom(ys, resultSelector: { x, y in "\(x)\(y)"} )
}
XCTAssertEqual(res.messages, [
next(260, "4bar"),
next(310, "5bar"),
next(340, "6foo"),
error(370, testError)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 370)
])
XCTAssertEqual(ys.subscriptions, [
Subscription(200, 370)
])
}
func testWithLatestFrom_Error3() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(90, 1),
next(180, 2),
next(250, 3),
next(260, 4),
next(310, 5),
next(340, 6),
completed(390)
])
let ys = scheduler.createHotObservable([
next(255, "bar"),
next(330, "foo"),
next(350, "qux"),
completed(400)
])
let res = scheduler.start {
xs.withLatestFrom(ys) {
(x, y) throws -> String in
if x == 5 {
throw testError
}
return "\(x)\(y)"
}
}
XCTAssertEqual(res.messages, [
next(260, "4bar"),
error(310, testError)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 310)
])
XCTAssertEqual(ys.subscriptions, [
Subscription(200, 310)
])
}
}