diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index 4f997bc2..c6e0a40e 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -541,6 +541,7 @@ D2138C981BB9BEEE00339B5C /* RxCocoa.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093E9B1B8A732E0088E94D /* RxCocoa.swift */; }; D2138C991BB9BEEE00339B5C /* RxTarget.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093E9C1B8A732E0088E94D /* RxTarget.swift */; }; D21C29311BC6A1C300448E70 /* SkipUntil.swift in Sources */ = {isa = PBXBuildFile; fileRef = D285BAC31BC0231000B3F602 /* SkipUntil.swift */; }; + D22B6D261BC8504A00BCE0AB /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; }; D2752D621BC5551A0070C418 /* SkipUntil.swift in Sources */ = {isa = PBXBuildFile; fileRef = D285BAC31BC0231000B3F602 /* SkipUntil.swift */; }; D2752D631BC5551B0070C418 /* SkipUntil.swift in Sources */ = {isa = PBXBuildFile; fileRef = D285BAC31BC0231000B3F602 /* SkipUntil.swift */; }; D285BAC41BC0231000B3F602 /* SkipUntil.swift in Sources */ = {isa = PBXBuildFile; fileRef = D285BAC31BC0231000B3F602 /* SkipUntil.swift */; }; @@ -650,6 +651,9 @@ D2EBEB431BB9B6DE003A27DC /* SubjectType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093CC11B8A72BE0088E94D /* SubjectType.swift */; }; D2EBEB441BB9B6DE003A27DC /* Variable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093CC21B8A72BE0088E94D /* Variable.swift */; }; D2EBEB8A1BB9B9EE003A27DC /* Observable+Blocking.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093F581B8A73A20088E94D /* Observable+Blocking.swift */; }; + D2FC15B31BCB95E5007361FF /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; }; + D2FC15B41BCB95E7007361FF /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; }; + D2FC15B51BCB95E8007361FF /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; }; F31F35B01BB4FED800961002 /* UIStepper+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = F31F35AF1BB4FED800961002 /* UIStepper+Rx.swift */; }; /* End PBXBuildFile section */ @@ -901,6 +905,7 @@ C8F0C04B1BBBFBB9001B112F /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; }; C8F0C0581BBBFBCE001B112F /* RxBlocking.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; }; D2138C751BB9BE9800339B5C /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; }; + D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipWhile.swift; sourceTree = ""; }; D285BAC31BC0231000B3F602 /* SkipUntil.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipUntil.swift; sourceTree = ""; }; D2EA280C1BB9B5A200880ED3 /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; }; D2EBEB811BB9B99D003A27DC /* RxBlocking.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; }; @@ -1134,6 +1139,7 @@ C8093C921B8A72BE0088E94D /* Zip+arity.swift */, C8093C931B8A72BE0088E94D /* Zip+arity.tt */, C8C3D9FD1B935EDF004D233E /* Zip+CollectionType.swift */, + D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */, ); path = Implementations; sourceTree = ""; @@ -1997,6 +2003,7 @@ C8093D041B8A72BE0088E94D /* AsObservable.swift in Sources */, C8093D061B8A72BE0088E94D /* Catch.swift in Sources */, C8093D0C1B8A72BE0088E94D /* CombineLatest.swift in Sources */, + D2FC15B31BCB95E5007361FF /* SkipWhile.swift in Sources */, C8093D5E1B8A72BE0088E94D /* Observable+Multiple.swift in Sources */, C8093D741B8A72BE0088E94D /* ObserverBase.swift in Sources */, C8093D121B8A72BE0088E94D /* ConnectableObservable.swift in Sources */, @@ -2110,6 +2117,7 @@ C8093D031B8A72BE0088E94D /* AsObservable.swift in Sources */, C8093D051B8A72BE0088E94D /* Catch.swift in Sources */, C8093D0B1B8A72BE0088E94D /* CombineLatest.swift in Sources */, + D22B6D261BC8504A00BCE0AB /* SkipWhile.swift in Sources */, C8093D5D1B8A72BE0088E94D /* Observable+Multiple.swift in Sources */, C8093D731B8A72BE0088E94D /* ObserverBase.swift in Sources */, C8093D111B8A72BE0088E94D /* ConnectableObservable.swift in Sources */, @@ -2223,6 +2231,7 @@ C8F0BFAB1BBBFB8B001B112F /* AsObservable.swift in Sources */, C8F0BFAC1BBBFB8B001B112F /* Catch.swift in Sources */, C8F0BFAD1BBBFB8B001B112F /* CombineLatest.swift in Sources */, + D2FC15B51BCB95E8007361FF /* SkipWhile.swift in Sources */, C8F0BFAE1BBBFB8B001B112F /* Observable+Multiple.swift in Sources */, C8F0BFAF1BBBFB8B001B112F /* ObserverBase.swift in Sources */, C8F0BFB01BBBFB8B001B112F /* ConnectableObservable.swift in Sources */, @@ -2470,6 +2479,7 @@ D2EBEB381BB9B6D8003A27DC /* ConcurrentDispatchQueueScheduler.swift in Sources */, D2EBEB131BB9B6C1003A27DC /* Multicast.swift in Sources */, D2EBEB111BB9B6C1003A27DC /* Map.swift in Sources */, + D2FC15B41BCB95E7007361FF /* SkipWhile.swift in Sources */, D2EBEB071BB9B6C1003A27DC /* Deferred.swift in Sources */, D2EBEB2C1BB9B6CA003A27DC /* Observable+Binding.swift in Sources */, D2EBEB041BB9B6C1003A27DC /* Concat.swift in Sources */, diff --git a/RxExample/RxExample.xcodeproj/project.pbxproj b/RxExample/RxExample.xcodeproj/project.pbxproj index 0372b305..01ac11d0 100644 --- a/RxExample/RxExample.xcodeproj/project.pbxproj +++ b/RxExample/RxExample.xcodeproj/project.pbxproj @@ -293,6 +293,7 @@ C8DF92EB1B0B38C0009BCF9A /* Images.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = C8DF92E91B0B38C0009BCF9A /* Images.xcassets */; }; C8DF92F61B0B43A4009BCF9A /* IntroductionExampleViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8DF92F51B0B43A4009BCF9A /* IntroductionExampleViewController.swift */; }; D245D9F41BC6CA0900CAB388 /* SkipUntil.swift in Sources */ = {isa = PBXBuildFile; fileRef = D245D9E61BC6C60800CAB388 /* SkipUntil.swift */; }; + D2FC15C41BCBAA13007361FF /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2FC15B61BCBAA01007361FF /* SkipWhile.swift */; }; EC91FB951BBA144400973245 /* GitHubSearchRepositoriesViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = EC91FB941BBA144400973245 /* GitHubSearchRepositoriesViewController.swift */; }; /* End PBXBuildFile section */ @@ -652,6 +653,7 @@ C8DF92F21B0B3E71009BCF9A /* Info-iOS.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; path = "Info-iOS.plist"; sourceTree = ""; }; C8DF92F51B0B43A4009BCF9A /* IntroductionExampleViewController.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = IntroductionExampleViewController.swift; sourceTree = ""; xcLanguageSpecificationIdentifier = xcode.lang.swift; }; D245D9E61BC6C60800CAB388 /* SkipUntil.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipUntil.swift; sourceTree = ""; }; + D2FC15B61BCBAA01007361FF /* SkipWhile.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipWhile.swift; sourceTree = ""; }; EC91FB941BBA144400973245 /* GitHubSearchRepositoriesViewController.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = GitHubSearchRepositoriesViewController.swift; sourceTree = ""; }; /* End PBXFileReference section */ @@ -1081,6 +1083,7 @@ C864093C1BA5909000D3C4E8 /* Implementations */ = { isa = PBXGroup; children = ( + D2FC15B61BCBAA01007361FF /* SkipWhile.swift */, D245D9E61BC6C60800CAB388 /* SkipUntil.swift */, C864093D1BA5909000D3C4E8 /* Amb.swift */, C864093E1BA5909000D3C4E8 /* AnonymousObservable.swift */, @@ -1599,6 +1602,7 @@ C86409FA1BA5909000D3C4E8 /* Variable.swift in Sources */, C8297E3A1B6CF905000589EA /* WikipediaSearchViewController.swift in Sources */, C84B3A5A1BA4345A001B7D88 /* UIGestureRecognizer+Rx.swift in Sources */, + D2FC15C41BCBAA13007361FF /* SkipWhile.swift in Sources */, C86409AC1BA5909000D3C4E8 /* AnonymousObservable.swift in Sources */, C84B3A401BA4345A001B7D88 /* NSURLSession+Rx.swift in Sources */, C8297E3B1B6CF905000589EA /* String+extensions.swift in Sources */, diff --git a/RxSwift/Observables/Implementations/SkipWhile.swift b/RxSwift/Observables/Implementations/SkipWhile.swift new file mode 100644 index 00000000..cd5d6e53 --- /dev/null +++ b/RxSwift/Observables/Implementations/SkipWhile.swift @@ -0,0 +1,115 @@ +// +// SkipWhile.swift +// Rx +// +// Created by Yury Korolev on 10/9/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +class SkipWhileSink : Sink, ObserverType { + + typealias Parent = SkipWhile + typealias Element = ElementType + + private let _parent: Parent + private var _running = false + + init(parent: Parent, observer: O, cancel: Disposable) { + _parent = parent + super.init(observer: observer, cancel: cancel) + } + + func on(event: Event) { + switch event { + case .Next(let value): + if !_running { + do { + _running = try !_parent._predicate(value) + } catch let e { + _observer?.onError(e) + dispose() + return + } + } + + if _running { + _observer?.onNext(value) + } + case .Error, .Completed: + _observer?.on(event) + dispose() + } + } +} + +class SkipWhileSinkIndexed : Sink, ObserverType { + + typealias Parent = SkipWhile + typealias Element = ElementType + + private let _parent: Parent + private var _index = 0 + private var _running = false + + init(parent: Parent, observer: O, cancel: Disposable) { + _parent = parent + super.init(observer: observer, cancel: cancel) + } + + func on(event: Event) { + switch event { + case .Next(let value): + if !_running { + do { + _running = try !_parent._predicateIndexed(value, _index) + _index += 1 + } catch let e { + _observer?.onError(e) + dispose() + return + } + } + + if _running { + _observer?.onNext(value) + } + case .Error, .Completed: + _observer?.on(event) + dispose() + } + } +} + +class SkipWhile: Producer { + typealias Predicate = (Element) throws -> Bool + typealias PredicateIndexed = (Element, Int) throws -> Bool + + private let _source: Observable + private let _predicate: Predicate! + private let _predicateIndexed: PredicateIndexed! + + init(source: Observable, predicate: Predicate) { + _source = source + _predicate = predicate + _predicateIndexed = nil + } + + init(source: Observable, predicate: PredicateIndexed) { + _source = source + _predicate = nil + _predicateIndexed = predicate + } + + override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + if let _ = _predicate { + let sink = SkipWhileSink(parent: self, observer: observer, cancel: cancel) + setSink(sink) + return _source.subscribeSafe(sink) + } + else { + let sink = SkipWhileSinkIndexed(parent: self, observer: observer, cancel: cancel) + setSink(sink) + return _source.subscribeSafe(sink) + } + } +} diff --git a/RxSwift/Observables/Observable+StandardSequenceOperators.swift b/RxSwift/Observables/Observable+StandardSequenceOperators.swift index 021f6c65..f4cc161a 100644 --- a/RxSwift/Observables/Observable+StandardSequenceOperators.swift +++ b/RxSwift/Observables/Observable+StandardSequenceOperators.swift @@ -90,6 +90,19 @@ extension ObservableType { } } +// SkipWhile + +extension ObservableType { + + public func skipWhile(predicate: (E) throws -> Bool) -> Observable { + return SkipWhile(source: self.asObservable(), predicate: predicate) + } + + public func skipWhileIndexed(predicate: (E, Int) throws -> Bool) -> Observable { + return SkipWhile(source: self.asObservable(), predicate: predicate) + } +} + // map aka select extension ObservableType { diff --git a/RxTests/RxSwiftTests/Tests/Observable+StandardSequenceOperatorsTest.swift b/RxTests/RxSwiftTests/Tests/Observable+StandardSequenceOperatorsTest.swift index 5a27aff3..bca0a779 100644 --- a/RxTests/RxSwiftTests/Tests/Observable+StandardSequenceOperatorsTest.swift +++ b/RxTests/RxSwiftTests/Tests/Observable+StandardSequenceOperatorsTest.swift @@ -3128,4 +3128,439 @@ extension ObservableStandardSequenceOperators { Subscription(200, 400) ]) } +} + +// MARK: SkipWhile +extension ObservableStandardSequenceOperators { + + func testSkipWhile_Complete_Before() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(90, -1), + next(110, -1), + next(210, 2), + next(260, 5), + next(290, 13), + next(320, 3), + completed(330), + next(350, 7), + next(390, 4), + next(410, 17), + next(450, 8), + next(500, 23), + completed(600) + ]) + + var invoked = 0 + + let res = scheduler.start() { + xs.skipWhile { x in + invoked += 1 + return isPrime(x) + } + } + + XCTAssertEqual(res.messages, [ + completed(330) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 330) + ]) + + XCTAssertEqual(4, invoked) + } + + func testSkipWhile_Complete_After() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(90, -1), + next(110, -1), + next(210, 2), + next(260, 5), + next(290, 13), + next(320, 3), + next(350, 7), + next(390, 4), + next(410, 17), + next(450, 8), + next(500, 23), + completed(600) + ]) + + var invoked = 0 + + let res = scheduler.start() { + xs.skipWhile { x in + invoked += 1 + return isPrime(x) + } + } + + XCTAssertEqual(res.messages, [ + next(390, 4), + next(410, 17), + next(450, 8), + next(500, 23), + completed(600) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 600) + ]) + + XCTAssertEqual(6, invoked) + } + + func testSkipWhile_Error_Before() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(90, -1), + next(110, -1), + next(210, 2), + next(260, 5), + error(270, testError), + next(290, 13), + next(320, 3), + next(350, 7), + next(390, 4), + next(410, 17), + next(450, 8), + next(500, 23) + ]) + + var invoked = 0 + + let res = scheduler.start() { + xs.skipWhile { x in + invoked += 1 + return isPrime(x) + } + } + + + + XCTAssertEqual(res.messages, [ + error(270, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 270) + ]) + + XCTAssertEqual(2, invoked) + } + + func testSkipWhile_Error_After() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(90, -1), + next(110, -1), + next(210, 2), + next(260, 5), + next(290, 13), + next(320, 3), + next(350, 7), + next(390, 4), + next(410, 17), + next(450, 8), + next(500, 23), + error(600, testError) + ]) + + var invoked = 0 + + let res = scheduler.start() { + xs.skipWhile { x in + invoked += 1 + return isPrime(x) + } + } + + XCTAssertEqual(res.messages, [ + next(390, 4), + next(410, 17), + next(450, 8), + next(500, 23), + error(600, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 600) + ]) + + XCTAssertEqual(6, invoked) + } + + func testSkipWhile_Dispose_Before() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(90, -1), + next(110, -1), + next(210, 2), + next(260, 5), + next(290, 13), + next(320, 3), + next(350, 7), + next(390, 4), + next(410, 17), + next(450, 8), + next(500, 23), + completed(600) + ]) + + var invoked = 0 + + let res = scheduler.start(300) { + xs.skipWhile { x in + invoked += 1 + return isPrime(x) + } + } + + XCTAssertEqual(res.messages, []) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 300) + ]) + + XCTAssertEqual(3, invoked) + } + + func testSkipWhile_Dispose_After() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(90, -1), + next(110, -1), + next(210, 2), + next(260, 5), + next(290, 13), + next(320, 3), + next(350, 7), + next(390, 4), + next(410, 17), + next(450, 8), + next(500, 23), + completed(600) + ]) + + var invoked = 0 + + let res = scheduler.start(470) { + xs.skipWhile { x in + invoked += 1 + return isPrime(x) + } + } + + XCTAssertEqual(res.messages, [ + next(390, 4), + next(410, 17), + next(450, 8) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 470) + ]) + + XCTAssertEqual(6, invoked) + } + + func testSkipWhile_Zero() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(90, -1), + next(110, -1), + next(205, 100), + next(210, 2), + next(260, 5), + next(290, 13), + next(320, 3), + next(350, 7), + next(390, 4), + next(410, 17), + next(450, 8), + next(500, 23), + completed(600) + ]) + + var invoked = 0 + + let res = scheduler.start() { + xs.skipWhile { x in + invoked += 1 + return isPrime(x) + } + } + + XCTAssertEqual(res.messages, [ + next(205, 100), + next(210, 2), + next(260, 5), + next(290, 13), + next(320, 3), + next(350, 7), + next(390, 4), + next(410, 17), + next(450, 8), + next(500, 23), + completed(600) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 600) + ]) + + XCTAssertEqual(1, invoked) + } + + func testSkipWhile_Throw() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(90, -1), + next(110, -1), + next(210, 2), + next(260, 5), + next(290, 13), + next(320, 3), + next(350, 7), + next(390, 4), + next(410, 17), + next(450, 8), + next(500, 23), + completed(600) + ]) + + var invoked = 0 + + let res = scheduler.start() { + xs.skipWhile { x in + invoked += 1 + if invoked == 3 { + throw testError + } + return isPrime(x) + } + } + + XCTAssertEqual(res.messages, [ + error(290, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 290) + ]) + + XCTAssertEqual(3, invoked) + } + + func testSkipWhile_Index() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(90, -1), + next(110, -1), + next(205, 100), + next(210, 2), + next(260, 5), + next(290, 13), + next(320, 3), + next(350, 7), + next(390, 4), + next(410, 17), + next(450, 8), + next(500, 23), + completed(600) + ]) + + let res = scheduler.start() { + xs.skipWhileIndexed { x, i in i < 5 } + } + + XCTAssertEqual(res.messages, [ + next(350, 7), + next(390, 4), + next(410, 17), + next(450, 8), + next(500, 23), + completed(600) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 600) + ]) + } + + func testSkipWhile_Index_Throw() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(90, -1), + next(110, -1), + next(205, 100), + next(210, 2), + next(260, 5), + next(290, 13), + next(320, 3), + next(350, 7), + next(390, 4), + error(400, testError) + ]) + + let res = scheduler.start() { + xs.skipWhileIndexed { x, i in i < 5 } + } + + XCTAssertEqual(res.messages, [ + next(350, 7), + next(390, 4), + error(400, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 400) + ]) + } + + func testSkipWhile_Index_SelectorThrows() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(90, -1), + next(110, -1), + next(205, 100), + next(210, 2), + next(260, 5), + next(290, 13), + next(320, 3), + next(350, 7), + next(390, 4), + completed(400) + ]) + + let res = scheduler.start() { + xs.skipWhileIndexed { x, i in + if i < 5 { + return true + } + throw testError + } + } + + XCTAssertEqual(res.messages, [ + error(350, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 350) + ]) + } } \ No newline at end of file