Add SkipWhile operator

This commit is contained in:
yury 2015-10-12 11:59:20 +03:00
parent b4d4e4b4d6
commit cbdf027d3d
5 changed files with 577 additions and 0 deletions

View File

@ -541,6 +541,7 @@
D2138C981BB9BEEE00339B5C /* RxCocoa.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093E9B1B8A732E0088E94D /* RxCocoa.swift */; };
D2138C991BB9BEEE00339B5C /* RxTarget.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093E9C1B8A732E0088E94D /* RxTarget.swift */; };
D21C29311BC6A1C300448E70 /* SkipUntil.swift in Sources */ = {isa = PBXBuildFile; fileRef = D285BAC31BC0231000B3F602 /* SkipUntil.swift */; };
D22B6D261BC8504A00BCE0AB /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; };
D2752D621BC5551A0070C418 /* SkipUntil.swift in Sources */ = {isa = PBXBuildFile; fileRef = D285BAC31BC0231000B3F602 /* SkipUntil.swift */; };
D2752D631BC5551B0070C418 /* SkipUntil.swift in Sources */ = {isa = PBXBuildFile; fileRef = D285BAC31BC0231000B3F602 /* SkipUntil.swift */; };
D285BAC41BC0231000B3F602 /* SkipUntil.swift in Sources */ = {isa = PBXBuildFile; fileRef = D285BAC31BC0231000B3F602 /* SkipUntil.swift */; };
@ -650,6 +651,9 @@
D2EBEB431BB9B6DE003A27DC /* SubjectType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093CC11B8A72BE0088E94D /* SubjectType.swift */; };
D2EBEB441BB9B6DE003A27DC /* Variable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093CC21B8A72BE0088E94D /* Variable.swift */; };
D2EBEB8A1BB9B9EE003A27DC /* Observable+Blocking.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093F581B8A73A20088E94D /* Observable+Blocking.swift */; };
D2FC15B31BCB95E5007361FF /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; };
D2FC15B41BCB95E7007361FF /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; };
D2FC15B51BCB95E8007361FF /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; };
F31F35B01BB4FED800961002 /* UIStepper+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = F31F35AF1BB4FED800961002 /* UIStepper+Rx.swift */; };
/* End PBXBuildFile section */
@ -901,6 +905,7 @@
C8F0C04B1BBBFBB9001B112F /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; };
C8F0C0581BBBFBCE001B112F /* RxBlocking.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; };
D2138C751BB9BE9800339B5C /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; };
D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipWhile.swift; sourceTree = "<group>"; };
D285BAC31BC0231000B3F602 /* SkipUntil.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipUntil.swift; sourceTree = "<group>"; };
D2EA280C1BB9B5A200880ED3 /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; };
D2EBEB811BB9B99D003A27DC /* RxBlocking.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; };
@ -1134,6 +1139,7 @@
C8093C921B8A72BE0088E94D /* Zip+arity.swift */,
C8093C931B8A72BE0088E94D /* Zip+arity.tt */,
C8C3D9FD1B935EDF004D233E /* Zip+CollectionType.swift */,
D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */,
);
path = Implementations;
sourceTree = "<group>";
@ -1997,6 +2003,7 @@
C8093D041B8A72BE0088E94D /* AsObservable.swift in Sources */,
C8093D061B8A72BE0088E94D /* Catch.swift in Sources */,
C8093D0C1B8A72BE0088E94D /* CombineLatest.swift in Sources */,
D2FC15B31BCB95E5007361FF /* SkipWhile.swift in Sources */,
C8093D5E1B8A72BE0088E94D /* Observable+Multiple.swift in Sources */,
C8093D741B8A72BE0088E94D /* ObserverBase.swift in Sources */,
C8093D121B8A72BE0088E94D /* ConnectableObservable.swift in Sources */,
@ -2110,6 +2117,7 @@
C8093D031B8A72BE0088E94D /* AsObservable.swift in Sources */,
C8093D051B8A72BE0088E94D /* Catch.swift in Sources */,
C8093D0B1B8A72BE0088E94D /* CombineLatest.swift in Sources */,
D22B6D261BC8504A00BCE0AB /* SkipWhile.swift in Sources */,
C8093D5D1B8A72BE0088E94D /* Observable+Multiple.swift in Sources */,
C8093D731B8A72BE0088E94D /* ObserverBase.swift in Sources */,
C8093D111B8A72BE0088E94D /* ConnectableObservable.swift in Sources */,
@ -2223,6 +2231,7 @@
C8F0BFAB1BBBFB8B001B112F /* AsObservable.swift in Sources */,
C8F0BFAC1BBBFB8B001B112F /* Catch.swift in Sources */,
C8F0BFAD1BBBFB8B001B112F /* CombineLatest.swift in Sources */,
D2FC15B51BCB95E8007361FF /* SkipWhile.swift in Sources */,
C8F0BFAE1BBBFB8B001B112F /* Observable+Multiple.swift in Sources */,
C8F0BFAF1BBBFB8B001B112F /* ObserverBase.swift in Sources */,
C8F0BFB01BBBFB8B001B112F /* ConnectableObservable.swift in Sources */,
@ -2470,6 +2479,7 @@
D2EBEB381BB9B6D8003A27DC /* ConcurrentDispatchQueueScheduler.swift in Sources */,
D2EBEB131BB9B6C1003A27DC /* Multicast.swift in Sources */,
D2EBEB111BB9B6C1003A27DC /* Map.swift in Sources */,
D2FC15B41BCB95E7007361FF /* SkipWhile.swift in Sources */,
D2EBEB071BB9B6C1003A27DC /* Deferred.swift in Sources */,
D2EBEB2C1BB9B6CA003A27DC /* Observable+Binding.swift in Sources */,
D2EBEB041BB9B6C1003A27DC /* Concat.swift in Sources */,

View File

@ -293,6 +293,7 @@
C8DF92EB1B0B38C0009BCF9A /* Images.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = C8DF92E91B0B38C0009BCF9A /* Images.xcassets */; };
C8DF92F61B0B43A4009BCF9A /* IntroductionExampleViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8DF92F51B0B43A4009BCF9A /* IntroductionExampleViewController.swift */; };
D245D9F41BC6CA0900CAB388 /* SkipUntil.swift in Sources */ = {isa = PBXBuildFile; fileRef = D245D9E61BC6C60800CAB388 /* SkipUntil.swift */; };
D2FC15C41BCBAA13007361FF /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2FC15B61BCBAA01007361FF /* SkipWhile.swift */; };
EC91FB951BBA144400973245 /* GitHubSearchRepositoriesViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = EC91FB941BBA144400973245 /* GitHubSearchRepositoriesViewController.swift */; };
/* End PBXBuildFile section */
@ -652,6 +653,7 @@
C8DF92F21B0B3E71009BCF9A /* Info-iOS.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; path = "Info-iOS.plist"; sourceTree = "<group>"; };
C8DF92F51B0B43A4009BCF9A /* IntroductionExampleViewController.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = IntroductionExampleViewController.swift; sourceTree = "<group>"; xcLanguageSpecificationIdentifier = xcode.lang.swift; };
D245D9E61BC6C60800CAB388 /* SkipUntil.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipUntil.swift; sourceTree = "<group>"; };
D2FC15B61BCBAA01007361FF /* SkipWhile.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipWhile.swift; sourceTree = "<group>"; };
EC91FB941BBA144400973245 /* GitHubSearchRepositoriesViewController.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = GitHubSearchRepositoriesViewController.swift; sourceTree = "<group>"; };
/* End PBXFileReference section */
@ -1081,6 +1083,7 @@
C864093C1BA5909000D3C4E8 /* Implementations */ = {
isa = PBXGroup;
children = (
D2FC15B61BCBAA01007361FF /* SkipWhile.swift */,
D245D9E61BC6C60800CAB388 /* SkipUntil.swift */,
C864093D1BA5909000D3C4E8 /* Amb.swift */,
C864093E1BA5909000D3C4E8 /* AnonymousObservable.swift */,
@ -1599,6 +1602,7 @@
C86409FA1BA5909000D3C4E8 /* Variable.swift in Sources */,
C8297E3A1B6CF905000589EA /* WikipediaSearchViewController.swift in Sources */,
C84B3A5A1BA4345A001B7D88 /* UIGestureRecognizer+Rx.swift in Sources */,
D2FC15C41BCBAA13007361FF /* SkipWhile.swift in Sources */,
C86409AC1BA5909000D3C4E8 /* AnonymousObservable.swift in Sources */,
C84B3A401BA4345A001B7D88 /* NSURLSession+Rx.swift in Sources */,
C8297E3B1B6CF905000589EA /* String+extensions.swift in Sources */,

View File

@ -0,0 +1,115 @@
//
// SkipWhile.swift
// Rx
//
// Created by Yury Korolev on 10/9/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
class SkipWhileSink<ElementType, O: ObserverType where O.E == ElementType> : Sink<O>, ObserverType {
typealias Parent = SkipWhile<ElementType>
typealias Element = ElementType
private let _parent: Parent
private var _running = false
init(parent: Parent, observer: O, cancel: Disposable) {
_parent = parent
super.init(observer: observer, cancel: cancel)
}
func on(event: Event<Element>) {
switch event {
case .Next(let value):
if !_running {
do {
_running = try !_parent._predicate(value)
} catch let e {
_observer?.onError(e)
dispose()
return
}
}
if _running {
_observer?.onNext(value)
}
case .Error, .Completed:
_observer?.on(event)
dispose()
}
}
}
class SkipWhileSinkIndexed<ElementType, O: ObserverType where O.E == ElementType> : Sink<O>, ObserverType {
typealias Parent = SkipWhile<ElementType>
typealias Element = ElementType
private let _parent: Parent
private var _index = 0
private var _running = false
init(parent: Parent, observer: O, cancel: Disposable) {
_parent = parent
super.init(observer: observer, cancel: cancel)
}
func on(event: Event<Element>) {
switch event {
case .Next(let value):
if !_running {
do {
_running = try !_parent._predicateIndexed(value, _index)
_index += 1
} catch let e {
_observer?.onError(e)
dispose()
return
}
}
if _running {
_observer?.onNext(value)
}
case .Error, .Completed:
_observer?.on(event)
dispose()
}
}
}
class SkipWhile<Element>: Producer<Element> {
typealias Predicate = (Element) throws -> Bool
typealias PredicateIndexed = (Element, Int) throws -> Bool
private let _source: Observable<Element>
private let _predicate: Predicate!
private let _predicateIndexed: PredicateIndexed!
init(source: Observable<Element>, predicate: Predicate) {
_source = source
_predicate = predicate
_predicateIndexed = nil
}
init(source: Observable<Element>, predicate: PredicateIndexed) {
_source = source
_predicate = nil
_predicateIndexed = predicate
}
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
if let _ = _predicate {
let sink = SkipWhileSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribeSafe(sink)
}
else {
let sink = SkipWhileSinkIndexed(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return _source.subscribeSafe(sink)
}
}
}

View File

@ -90,6 +90,19 @@ extension ObservableType {
}
}
// SkipWhile
extension ObservableType {
public func skipWhile(predicate: (E) throws -> Bool) -> Observable<E> {
return SkipWhile(source: self.asObservable(), predicate: predicate)
}
public func skipWhileIndexed(predicate: (E, Int) throws -> Bool) -> Observable<E> {
return SkipWhile(source: self.asObservable(), predicate: predicate)
}
}
// map aka select
extension ObservableType {

View File

@ -3128,4 +3128,439 @@ extension ObservableStandardSequenceOperators {
Subscription(200, 400)
])
}
}
// MARK: SkipWhile
extension ObservableStandardSequenceOperators {
func testSkipWhile_Complete_Before() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(90, -1),
next(110, -1),
next(210, 2),
next(260, 5),
next(290, 13),
next(320, 3),
completed(330),
next(350, 7),
next(390, 4),
next(410, 17),
next(450, 8),
next(500, 23),
completed(600)
])
var invoked = 0
let res = scheduler.start() {
xs.skipWhile { x in
invoked += 1
return isPrime(x)
}
}
XCTAssertEqual(res.messages, [
completed(330)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 330)
])
XCTAssertEqual(4, invoked)
}
func testSkipWhile_Complete_After() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(90, -1),
next(110, -1),
next(210, 2),
next(260, 5),
next(290, 13),
next(320, 3),
next(350, 7),
next(390, 4),
next(410, 17),
next(450, 8),
next(500, 23),
completed(600)
])
var invoked = 0
let res = scheduler.start() {
xs.skipWhile { x in
invoked += 1
return isPrime(x)
}
}
XCTAssertEqual(res.messages, [
next(390, 4),
next(410, 17),
next(450, 8),
next(500, 23),
completed(600)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 600)
])
XCTAssertEqual(6, invoked)
}
func testSkipWhile_Error_Before() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(90, -1),
next(110, -1),
next(210, 2),
next(260, 5),
error(270, testError),
next(290, 13),
next(320, 3),
next(350, 7),
next(390, 4),
next(410, 17),
next(450, 8),
next(500, 23)
])
var invoked = 0
let res = scheduler.start() {
xs.skipWhile { x in
invoked += 1
return isPrime(x)
}
}
XCTAssertEqual(res.messages, [
error(270, testError)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 270)
])
XCTAssertEqual(2, invoked)
}
func testSkipWhile_Error_After() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(90, -1),
next(110, -1),
next(210, 2),
next(260, 5),
next(290, 13),
next(320, 3),
next(350, 7),
next(390, 4),
next(410, 17),
next(450, 8),
next(500, 23),
error(600, testError)
])
var invoked = 0
let res = scheduler.start() {
xs.skipWhile { x in
invoked += 1
return isPrime(x)
}
}
XCTAssertEqual(res.messages, [
next(390, 4),
next(410, 17),
next(450, 8),
next(500, 23),
error(600, testError)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 600)
])
XCTAssertEqual(6, invoked)
}
func testSkipWhile_Dispose_Before() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(90, -1),
next(110, -1),
next(210, 2),
next(260, 5),
next(290, 13),
next(320, 3),
next(350, 7),
next(390, 4),
next(410, 17),
next(450, 8),
next(500, 23),
completed(600)
])
var invoked = 0
let res = scheduler.start(300) {
xs.skipWhile { x in
invoked += 1
return isPrime(x)
}
}
XCTAssertEqual(res.messages, [])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 300)
])
XCTAssertEqual(3, invoked)
}
func testSkipWhile_Dispose_After() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(90, -1),
next(110, -1),
next(210, 2),
next(260, 5),
next(290, 13),
next(320, 3),
next(350, 7),
next(390, 4),
next(410, 17),
next(450, 8),
next(500, 23),
completed(600)
])
var invoked = 0
let res = scheduler.start(470) {
xs.skipWhile { x in
invoked += 1
return isPrime(x)
}
}
XCTAssertEqual(res.messages, [
next(390, 4),
next(410, 17),
next(450, 8)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 470)
])
XCTAssertEqual(6, invoked)
}
func testSkipWhile_Zero() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(90, -1),
next(110, -1),
next(205, 100),
next(210, 2),
next(260, 5),
next(290, 13),
next(320, 3),
next(350, 7),
next(390, 4),
next(410, 17),
next(450, 8),
next(500, 23),
completed(600)
])
var invoked = 0
let res = scheduler.start() {
xs.skipWhile { x in
invoked += 1
return isPrime(x)
}
}
XCTAssertEqual(res.messages, [
next(205, 100),
next(210, 2),
next(260, 5),
next(290, 13),
next(320, 3),
next(350, 7),
next(390, 4),
next(410, 17),
next(450, 8),
next(500, 23),
completed(600)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 600)
])
XCTAssertEqual(1, invoked)
}
func testSkipWhile_Throw() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(90, -1),
next(110, -1),
next(210, 2),
next(260, 5),
next(290, 13),
next(320, 3),
next(350, 7),
next(390, 4),
next(410, 17),
next(450, 8),
next(500, 23),
completed(600)
])
var invoked = 0
let res = scheduler.start() {
xs.skipWhile { x in
invoked += 1
if invoked == 3 {
throw testError
}
return isPrime(x)
}
}
XCTAssertEqual(res.messages, [
error(290, testError)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 290)
])
XCTAssertEqual(3, invoked)
}
func testSkipWhile_Index() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(90, -1),
next(110, -1),
next(205, 100),
next(210, 2),
next(260, 5),
next(290, 13),
next(320, 3),
next(350, 7),
next(390, 4),
next(410, 17),
next(450, 8),
next(500, 23),
completed(600)
])
let res = scheduler.start() {
xs.skipWhileIndexed { x, i in i < 5 }
}
XCTAssertEqual(res.messages, [
next(350, 7),
next(390, 4),
next(410, 17),
next(450, 8),
next(500, 23),
completed(600)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 600)
])
}
func testSkipWhile_Index_Throw() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(90, -1),
next(110, -1),
next(205, 100),
next(210, 2),
next(260, 5),
next(290, 13),
next(320, 3),
next(350, 7),
next(390, 4),
error(400, testError)
])
let res = scheduler.start() {
xs.skipWhileIndexed { x, i in i < 5 }
}
XCTAssertEqual(res.messages, [
next(350, 7),
next(390, 4),
error(400, testError)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 400)
])
}
func testSkipWhile_Index_SelectorThrows() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(90, -1),
next(110, -1),
next(205, 100),
next(210, 2),
next(260, 5),
next(290, 13),
next(320, 3),
next(350, 7),
next(390, 4),
completed(400)
])
let res = scheduler.start() {
xs.skipWhileIndexed { x, i in
if i < 5 {
return true
}
throw testError
}
}
XCTAssertEqual(res.messages, [
error(350, testError)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 350)
])
}
}