From 26893a379b7c0c803dc5f7bc5ae8a4341732ae28 Mon Sep 17 00:00:00 2001 From: tarunon Date: Tue, 9 Feb 2016 19:48:06 +0900 Subject: [PATCH 01/11] add delay operator --- Rx.xcodeproj/project.pbxproj | 10 +++ RxExample/RxExample.xcodeproj/project.pbxproj | 4 + .../Observables/Implementations/Delay.swift | 78 +++++++++++++++++++ RxSwift/Observables/Observable+Time.swift | 20 +++++ .../Tests/Observable+TimeTest.swift | 74 +++++++++++++++++- 5 files changed, 185 insertions(+), 1 deletion(-) create mode 100644 RxSwift/Observables/Implementations/Delay.swift diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index c5fc008d..089159dc 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -1107,6 +1107,10 @@ D2FC15B31BCB95E5007361FF /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; }; D2FC15B41BCB95E7007361FF /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; }; D2FC15B51BCB95E8007361FF /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; }; + EB8293841C698D1A00315EB6 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = EB8293831C698D1A00315EB6 /* Delay.swift */; }; + EB8293851C69A70700315EB6 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = EB8293831C698D1A00315EB6 /* Delay.swift */; }; + EB8293861C69A70700315EB6 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = EB8293831C698D1A00315EB6 /* Delay.swift */; }; + EB8293871C69A70800315EB6 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = EB8293831C698D1A00315EB6 /* Delay.swift */; }; F31F35B01BB4FED800961002 /* UIStepper+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = F31F35AF1BB4FED800961002 /* UIStepper+Rx.swift */; }; /* End PBXBuildFile section */ @@ -1653,6 +1657,7 @@ D285BAC31BC0231000B3F602 /* SkipUntil.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipUntil.swift; sourceTree = ""; }; D2EA280C1BB9B5A200880ED3 /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; }; D2EBEB811BB9B99D003A27DC /* RxBlocking.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; }; + EB8293831C698D1A00315EB6 /* Delay.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Delay.swift; sourceTree = ""; }; F31F35AF1BB4FED800961002 /* UIStepper+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UIStepper+Rx.swift"; sourceTree = ""; }; /* End PBXFileReference section */ @@ -1915,6 +1920,7 @@ C8093C741B8A72BE0088E94D /* ConnectableObservable.swift */, C8093C751B8A72BE0088E94D /* Debug.swift */, C8093C761B8A72BE0088E94D /* Deferred.swift */, + EB8293831C698D1A00315EB6 /* Delay.swift */, C8093C771B8A72BE0088E94D /* DelaySubscription.swift */, C8093C781B8A72BE0088E94D /* DistinctUntilChanged.swift */, C8093C791B8A72BE0088E94D /* Do.swift */, @@ -3685,6 +3691,7 @@ C8093D341B8A72BE0088E94D /* RefCount.swift in Sources */, C8093D0E1B8A72BE0088E94D /* Concat.swift in Sources */, C8093CCA1B8A72BE0088E94D /* Lock.swift in Sources */, + EB8293851C69A70700315EB6 /* Delay.swift in Sources */, C8093D441B8A72BE0088E94D /* Take.swift in Sources */, C84CC5591BDCF51200E06A64 /* SynchronizedSubscribeType.swift in Sources */, C8093D321B8A72BE0088E94D /* Reduce.swift in Sources */, @@ -3907,6 +3914,7 @@ C8093D331B8A72BE0088E94D /* RefCount.swift in Sources */, C8093D0D1B8A72BE0088E94D /* Concat.swift in Sources */, C8093CC91B8A72BE0088E94D /* Lock.swift in Sources */, + EB8293841C698D1A00315EB6 /* Delay.swift in Sources */, C8093D431B8A72BE0088E94D /* Take.swift in Sources */, C84CC5581BDCF51200E06A64 /* SynchronizedSubscribeType.swift in Sources */, C8093D311B8A72BE0088E94D /* Reduce.swift in Sources */, @@ -4053,6 +4061,7 @@ C8F0BFE11BBBFB8B001B112F /* RefCount.swift in Sources */, C8F0BFE21BBBFB8B001B112F /* Concat.swift in Sources */, C8F0BFE31BBBFB8B001B112F /* Lock.swift in Sources */, + EB8293871C69A70800315EB6 /* Delay.swift in Sources */, C8F0BFE41BBBFB8B001B112F /* Take.swift in Sources */, C84CC55B1BDCF51200E06A64 /* SynchronizedSubscribeType.swift in Sources */, C8F0BFE51BBBFB8B001B112F /* Reduce.swift in Sources */, @@ -4370,6 +4379,7 @@ D2EBEB2B1BB9B6CA003A27DC /* Observable+Aggregate.swift in Sources */, D2EBEB291BB9B6C1003A27DC /* Zip+arity.swift in Sources */, D2EBEB241BB9B6C1003A27DC /* TakeUntil.swift in Sources */, + EB8293861C69A70700315EB6 /* Delay.swift in Sources */, C84CC55A1BDCF51200E06A64 /* SynchronizedSubscribeType.swift in Sources */, D2EBEB3B1BB9B6D8003A27DC /* OperationQueueScheduler.swift in Sources */, D2EBEAE51BB9B697003A27DC /* AnyObserver.swift in Sources */, diff --git a/RxExample/RxExample.xcodeproj/project.pbxproj b/RxExample/RxExample.xcodeproj/project.pbxproj index d16cbc66..54e6f0aa 100644 --- a/RxExample/RxExample.xcodeproj/project.pbxproj +++ b/RxExample/RxExample.xcodeproj/project.pbxproj @@ -427,6 +427,7 @@ D2245A191BD5654C00E7146F /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */; }; D2AF91981BD3D95900A008C1 /* Using.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2AF91881BD2C51900A008C1 /* Using.swift */; }; E3EE18D21C4D68F900834224 /* UIApplication+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = E3EE18D11C4D68F900834224 /* UIApplication+Rx.swift */; }; + EBF032401C69FAEB00C81573 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = EBF0323F1C69FAEB00C81573 /* Delay.swift */; }; /* End PBXBuildFile section */ /* Begin PBXContainerItemProxy section */ @@ -935,6 +936,7 @@ D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = ""; }; D2AF91881BD2C51900A008C1 /* Using.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Using.swift; sourceTree = ""; }; E3EE18D11C4D68F900834224 /* UIApplication+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UIApplication+Rx.swift"; sourceTree = ""; }; + EBF0323F1C69FAEB00C81573 /* Delay.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Delay.swift; sourceTree = ""; }; /* End PBXFileReference section */ /* Begin PBXFrameworksBuildPhase section */ @@ -1406,6 +1408,7 @@ C89464531BC6C2B00055219D /* ConnectableObservable.swift */, C89464541BC6C2B00055219D /* Debug.swift */, C89464551BC6C2B00055219D /* Deferred.swift */, + EBF0323F1C69FAEB00C81573 /* Delay.swift */, C89464561BC6C2B00055219D /* DelaySubscription.swift */, C89464571BC6C2B00055219D /* DistinctUntilChanged.swift */, C89464581BC6C2B00055219D /* Do.swift */, @@ -2209,6 +2212,7 @@ C894657D1BC6C2BC0055219D /* RxTarget.swift in Sources */, C80DDED21BCE9046006A1832 /* ControlEvent+Driver.swift in Sources */, C89464EE1BC6C2B00055219D /* Observable+Creation.swift in Sources */, + EBF032401C69FAEB00C81573 /* Delay.swift in Sources */, C894659A1BC6C2BC0055219D /* UISlider+Rx.swift in Sources */, C89465891BC6C2BC0055219D /* RxSearchBarDelegateProxy.swift in Sources */, C89464C21BC6C2B00055219D /* CombineLatest.swift in Sources */, diff --git a/RxSwift/Observables/Implementations/Delay.swift b/RxSwift/Observables/Implementations/Delay.swift new file mode 100644 index 00000000..a5955819 --- /dev/null +++ b/RxSwift/Observables/Implementations/Delay.swift @@ -0,0 +1,78 @@ +// +// Delay.swift +// Rx +// +// Created by tarunon on 2016/02/09. +// Copyright © 2016 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +class DelaySink + : Sink +, ObserverType { + typealias Source = Observable + typealias E = O.E + + // state + private let _group = CompositeDisposable() + private let _sourceSubscription = SingleAssignmentDisposable() + + private let _lock = NSRecursiveLock() + + private let _dueTime: RxTimeInterval + private let _scheduler: SchedulerType + + init(observer: O, dueTime: RxTimeInterval, scheduler: SchedulerType) { + _dueTime = dueTime + _scheduler = scheduler + super.init(observer: observer) + } + + func on(event: Event) { + _lock.lock(); defer { _lock.unlock() } + switch event { + case .Error(_): + forwardOn(event) + dispose() + default: + let delayDisposable = SingleAssignmentDisposable() + if let _ = _group.addDisposable(disposable) { + delayDisposable.disposable = _scheduler.scheduleRecursive((), dueTime: _dueTime) { _ in + self.forwardOn(event) + if event.isStopEvent { + self.dispose() + } + delayDisposable.dispose() + } + } + } + } + + func run(source: Source) -> Disposable { + _group.addDisposable(_sourceSubscription) + + let subscription = source.subscribe(self) + _sourceSubscription.disposable = subscription + + return _group + } +} + +class Delay: Producer { + private let _source: Observable + private let _dueTime: RxTimeInterval + private let _scheduler: SchedulerType + + init(source: Observable, dueTime: RxTimeInterval, scheduler: SchedulerType) { + _source = source + _dueTime = dueTime + _scheduler = scheduler + } + + override func run(observer: O) -> Disposable { + let sink = DelaySink(observer: observer, dueTime: _dueTime, scheduler: _scheduler) + sink.disposable = sink.run(_source) + return sink + } +} \ No newline at end of file diff --git a/RxSwift/Observables/Observable+Time.swift b/RxSwift/Observables/Observable+Time.swift index 9050ce54..932d2711 100644 --- a/RxSwift/Observables/Observable+Time.swift +++ b/RxSwift/Observables/Observable+Time.swift @@ -272,3 +272,23 @@ extension ObservableType { return Timeout(source: self.asObservable(), dueTime: dueTime, other: other.asObservable(), scheduler: scheduler) } } + +// MARK: delay + +extension ObservableType { + + /** + Returns an observable sequence by the source observable sequence shifted forward in time by a specified delay. Error events from the source observable sequence are not delayed. + + - seealso: [delay operator on reactivex.io](http://reactivex.io/documentation/operators/delay.html) + + - parameter dueTime: Relative time shift of the source by. + - parameter scheduler: Scheduler to run the subscription delay timer on. + - returns: the source Observable shifted in time by the specified delay. + */ + @warn_unused_result(message="http://git.io/rxs.uo") + public func delay(dueTime: RxTimeInterval, scheduler: SchedulerType) + -> Observable { + return Delay(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler) + } +} diff --git a/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift b/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift index 6e4ba7f0..15e96c77 100644 --- a/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift +++ b/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift @@ -1778,4 +1778,76 @@ extension ObservableTimeTest { XCTAssertEqual(ys.subscriptions, [ ]) } -} \ No newline at end of file +} + +// MARK: Delay +extension ObservableTimeTest { + + func testDelay_TimeSpan_Simple() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createColdObservable([ + next(50, 42), + next(60, 43), + completed(70) + ]) + + let res = scheduler.start { + xs.delay(30, scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + next(280, 42), + next(290, 43), + completed(300) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 270) + ]) + } + + func testDelay_TimeSpan_Error() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createColdObservable([ + next(50, 42), + next(60, 43), + error(70, testError) + ]) + + let res = scheduler.start { + xs.delay(30, scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + error(270, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 270) + ]) + } + + func testDelay_TimeSpan_Dispose() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createColdObservable([ + next(50, 42), + next(60, 43), + error(70, testError) + ]) + + let res = scheduler.start(291) { + xs.delay(30, scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + error(270, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 270) + ]) + } +} From 5e9431af0f8a05524d8974fe3998ce240f8e2411 Mon Sep 17 00:00:00 2001 From: tarunon Date: Wed, 10 Feb 2016 16:43:04 +0900 Subject: [PATCH 02/11] update API.md --- Documentation/API.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Documentation/API.md b/Documentation/API.md index 32651338..1739e67d 100644 --- a/Documentation/API.md +++ b/Documentation/API.md @@ -61,7 +61,7 @@ Operators are stateless by default. #### Observable Utility Operators - * [`delaySubscription`](http://reactivex.io/documentation/operators/delay.html) + * [`delaySubscription` / `delay`](http://reactivex.io/documentation/operators/delay.html) * [`do` / `doOnNext`](http://reactivex.io/documentation/operators/do.html) * [`observeOn` / `observeSingleOn`](http://reactivex.io/documentation/operators/observeon.html) * [`subscribe`](http://reactivex.io/documentation/operators/subscribe.html) From 1f7da69e6575ee7b60ef195eb432faf0a274225d Mon Sep 17 00:00:00 2001 From: tarunon Date: Fri, 26 Feb 2016 16:04:38 +0900 Subject: [PATCH 03/11] move lock on Error case. I'm sorry I fix mistype in _group.addDisposable argmunets. I think it was missed when rename tempolary field. --- RxSwift/Observables/Implementations/Delay.swift | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/RxSwift/Observables/Implementations/Delay.swift b/RxSwift/Observables/Implementations/Delay.swift index a5955819..f90fa350 100644 --- a/RxSwift/Observables/Implementations/Delay.swift +++ b/RxSwift/Observables/Implementations/Delay.swift @@ -30,14 +30,15 @@ class DelaySink } func on(event: Event) { - _lock.lock(); defer { _lock.unlock() } switch event { case .Error(_): - forwardOn(event) - dispose() + _lock.lock(); defer { _lock.unlock() } // lock { + forwardOn(event) + dispose() + // } default: let delayDisposable = SingleAssignmentDisposable() - if let _ = _group.addDisposable(disposable) { + if let _ = _group.addDisposable(delayDisposable) { delayDisposable.disposable = _scheduler.scheduleRecursive((), dueTime: _dueTime) { _ in self.forwardOn(event) if event.isStopEvent { From 7c73060c5c9aa3fd72bc0540cdbbbb2d03e591fe Mon Sep 17 00:00:00 2001 From: tarunon Date: Mon, 29 Feb 2016 18:12:08 +0900 Subject: [PATCH 04/11] Use `SchedulerType.scheduleRecursive` and `CompositeDisposable.removeDisposable`. --- RxSwift/Observables/Implementations/Delay.swift | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/RxSwift/Observables/Implementations/Delay.swift b/RxSwift/Observables/Implementations/Delay.swift index f90fa350..5b725a7a 100644 --- a/RxSwift/Observables/Implementations/Delay.swift +++ b/RxSwift/Observables/Implementations/Delay.swift @@ -38,13 +38,14 @@ class DelaySink // } default: let delayDisposable = SingleAssignmentDisposable() - if let _ = _group.addDisposable(delayDisposable) { - delayDisposable.disposable = _scheduler.scheduleRecursive((), dueTime: _dueTime) { _ in - self.forwardOn(event) + if let key = _group.addDisposable(delayDisposable) { + delayDisposable.disposable = _scheduler.scheduleRecursive((self, key), dueTime: _dueTime) { state, _ in + let (sink, key) = state + sink.forwardOn(event) + sink._group.removeDisposable(key) if event.isStopEvent { - self.dispose() + sink.dispose() } - delayDisposable.dispose() } } } From 66aee7d8bd69aa6a4ffea10814db93f30d979411 Mon Sep 17 00:00:00 2001 From: tarunon Date: Mon, 7 Mar 2016 16:18:10 +0900 Subject: [PATCH 05/11] Add real scheduler test. --- .../RxSwiftTests/Tests/Observable+TimeTest.swift | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift b/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift index 15e96c77..96a0651e 100644 --- a/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift +++ b/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift @@ -1850,4 +1850,20 @@ extension ObservableTimeTest { Subscription(200, 270) ]) } + + func test_DelayWithRealScheduler() { + let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Default) + + let start = NSDate() + + let a = try! [Observable.just(0), Observable.never()].toObservable().concat() + .delay(2.0, scheduler: scheduler) + .toBlocking() + .first() + + let end = NSDate() + + XCTAssertEqualWithAccuracy(2, end.timeIntervalSinceDate(start), accuracy: 0.5) + XCTAssertEqual(a, 0) + } } From 6e30c7812dc009b8fa4c0199f8f3aa6c6d892f6e Mon Sep 17 00:00:00 2001 From: tarunon Date: Mon, 14 Mar 2016 22:50:43 +0900 Subject: [PATCH 06/11] implement all tests. --- .../Tests/Observable+TimeTest.swift | 283 +++++++++++++++--- 1 file changed, 247 insertions(+), 36 deletions(-) diff --git a/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift b/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift index 96a0651e..31ae95c5 100644 --- a/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift +++ b/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift @@ -1783,87 +1783,298 @@ extension ObservableTimeTest { // MARK: Delay extension ObservableTimeTest { - func testDelay_TimeSpan_Simple() { + func testDelay_TimeSpan_Simple1() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 1), + next(250, 2), + next(350, 3), + next(450, 4), + completed(550) + ]) + + let res = scheduler.start { + xs.delay(100, scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + next(350, 2), + next(450, 3), + next(550, 4), + completed(650) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 550) + ]) + } + + func testDelay_TimeSpan_Simple2() { let scheduler = TestScheduler(initialClock: 0) - let xs = scheduler.createColdObservable([ - next(50, 42), - next(60, 43), - completed(70) + let xs = scheduler.createHotObservable([ + next(150, 1), + next(250, 2), + next(350, 3), + next(450, 4), + completed(550) ]) let res = scheduler.start { - xs.delay(30, scheduler: scheduler) + xs.delay(50, scheduler: scheduler) } XCTAssertEqual(res.events, [ - next(280, 42), - next(290, 43), - completed(300) + next(300, 2), + next(400, 3), + next(500, 4), + completed(600) ]) XCTAssertEqual(xs.subscriptions, [ - Subscription(200, 270) + Subscription(200, 550) ]) } - func testDelay_TimeSpan_Error() { + func testDelay_TimeSpan_Simple3() { let scheduler = TestScheduler(initialClock: 0) - let xs = scheduler.createColdObservable([ - next(50, 42), - next(60, 43), - error(70, testError) + let xs = scheduler.createHotObservable([ + next(150, 1), + next(250, 2), + next(350, 3), + next(450, 4), + completed(550) ]) let res = scheduler.start { - xs.delay(30, scheduler: scheduler) + xs.delay(150, scheduler: scheduler) } XCTAssertEqual(res.events, [ - error(270, testError) + next(400, 2), + next(500, 3), + next(600, 4), + completed(700) ]) XCTAssertEqual(xs.subscriptions, [ - Subscription(200, 270) + Subscription(200, 550) ]) } - func testDelay_TimeSpan_Dispose() { + func testDelay_TimeSpan_Error1() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 1), + next(250, 2), + next(350, 3), + next(450, 4), + error(550, testError) + ]) + + let res = scheduler.start { + xs.delay(50, scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + next(300, 2), + next(400, 3), + next(500, 4), + error(550, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 550) + ]) + } + + func testDelay_TimeSpan_Error2() { let scheduler = TestScheduler(initialClock: 0) - let xs = scheduler.createColdObservable([ - next(50, 42), - next(60, 43), - error(70, testError) + let xs = scheduler.createHotObservable([ + next(150, 1), + next(250, 2), + next(350, 3), + next(450, 4), + error(550, testError) ]) - let res = scheduler.start(291) { - xs.delay(30, scheduler: scheduler) + let res = scheduler.start { + xs.delay(150, scheduler: scheduler) } XCTAssertEqual(res.events, [ - error(270, testError) + next(400, 2), + next(500, 3), + error(550, testError) ]) XCTAssertEqual(xs.subscriptions, [ - Subscription(200, 270) + Subscription(200, 550) ]) } - func test_DelayWithRealScheduler() { + func testDelay_TimeSpan_Real_Simple() { + let expectation = self.expectationWithDescription("") let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Default) - let start = NSDate() + let s = PublishSubject() + + let res = s.delay(0.01, scheduler: scheduler) + + var array = [Int]() - let a = try! [Observable.just(0), Observable.never()].toObservable().concat() - .delay(2.0, scheduler: scheduler) - .toBlocking() - .first() + let subscription = res.subscribe( + onNext: { i in + array.append(i) + }, + onCompleted: { + expectation.fulfill() + }) - let end = NSDate() + dispatch_async(dispatch_get_global_queue(0, 0)) { + s.onNext(1) + s.onNext(2) + s.onNext(3) + s.onCompleted() + } - XCTAssertEqualWithAccuracy(2, end.timeIntervalSinceDate(start), accuracy: 0.5) - XCTAssertEqual(a, 0) + waitForExpectationsWithTimeout(1.0, handler: nil) + + subscription.dispose() + + XCTAssertEqual([1, 2, 3], array) + } + + func testDelay_TimeSpan_Real_Error1() { + let expectation = self.expectationWithDescription("") + let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Default) + + let s = PublishSubject() + + let res = s.delay(0.01, scheduler: scheduler) + + var array = [Int]() + + let subscription = res.subscribe( + onNext: { i in + array.append(i) + }, + onError: { _ in + expectation.fulfill() + }) + + dispatch_async(dispatch_get_global_queue(0, 0)) { + s.onNext(1) + s.onNext(2) + s.onNext(3) + s.onError(testError) + } + + waitForExpectationsWithTimeout(1.0, handler: nil) + + subscription.dispose() + + XCTAssertEqual([], array) + } + + func testDelay_TimeSpan_Real_Error2() { + let expectation = self.expectationWithDescription("") + let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Default) + + let s = PublishSubject() + + let res = s.delay(0.01, scheduler: scheduler) + + var array = [Int]() + var err: NSError! + + let subscription = res.subscribe( + onNext: { i in + array.append(i) + }, + onError: { ex in + err = ex as NSError + expectation.fulfill() + }) + + dispatch_async(dispatch_get_global_queue(0, 0)) { + s.onNext(1) + NSThread.sleepForTimeInterval(0.5) + s.onError(testError) + } + + waitForExpectationsWithTimeout(1.0, handler: nil) + + subscription.dispose() + + XCTAssertEqual([1], array) + XCTAssertEqual(testError, err) + } + + func testDelay_TimeSpan_Real_Error3() { + let expectation = self.expectationWithDescription("") + let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Default) + + let s = PublishSubject() + + let res = s.delay(0.01, scheduler: scheduler) + + var array = [Int]() + var err: NSError! + + let subscription = res.subscribe( + onNext: { i in + array.append(i) + NSThread.sleepForTimeInterval(0.5) + }, + onError: { ex in + err = ex as NSError + expectation.fulfill() + }) + + dispatch_async(dispatch_get_global_queue(0, 0)) { + s.onNext(1) + NSThread.sleepForTimeInterval(0.5) + s.onError(testError) + } + + waitForExpectationsWithTimeout(1.0, handler: nil) + + subscription.dispose() + + XCTAssertEqual([1], array) + XCTAssertEqual(testError, err) + } + + func testDelay_TimeSpan_Positive() { + let scheduler = TestScheduler(initialClock: 0) + + let msgs = [ + next(150, 1), + next(250, 2), + next(350, 3), + next(450, 4), + completed(550) + ] + + let xs = scheduler.createHotObservable(msgs) + + let delay: RxTimeInterval = 42 + let res = scheduler.start { + xs.delay(delay, scheduler: scheduler) + } + + XCTAssertEqual(res.events, + msgs.map { Recorded(time: $0.time + Int(delay), event: $0.value) } + .filter { $0.time > 200 }) + } + + func testDelay_TimeSpan_DefaultScheduler() { + let scheduler = MainScheduler.instance + XCTAssertEqual(try! Observable.just(1).delay(0.001, scheduler: scheduler).toBlocking().toArray(), [1]) } } From 3dfd56d8dac20fd896d0b5d5c4d8132dcd2b031a Mon Sep 17 00:00:00 2001 From: tarunon Date: Mon, 14 Mar 2016 22:51:25 +0900 Subject: [PATCH 07/11] patch the implement of convert to serial queue. --- .../Observables/Implementations/Delay.swift | 60 +++++++++++++------ 1 file changed, 41 insertions(+), 19 deletions(-) diff --git a/RxSwift/Observables/Implementations/Delay.swift b/RxSwift/Observables/Implementations/Delay.swift index 5b725a7a..311d25c1 100644 --- a/RxSwift/Observables/Implementations/Delay.swift +++ b/RxSwift/Observables/Implementations/Delay.swift @@ -10,15 +10,20 @@ import Foundation class DelaySink : Sink -, ObserverType { - typealias Source = Observable + , ObserverType { typealias E = O.E + typealias Source = Observable + typealias DisposeKey = Bag.KeyType // state private let _group = CompositeDisposable() private let _sourceSubscription = SingleAssignmentDisposable() private let _lock = NSRecursiveLock() + + private var _queue = Queue<(onTime: RxTime, event: Event)>(capacity: 0) + private var _running = false + private var _disposed = false private let _dueTime: RxTimeInterval private let _scheduler: SchedulerType @@ -29,33 +34,50 @@ class DelaySink super.init(observer: observer) } + func drainQueue(key: DisposeKey) -> Disposable { + _lock.lock(); defer { _lock.unlock() } // lock { + if !_queue.isEmpty { + let (onTime, event) = _queue.peek() + let timeInterval = _scheduler.now.timeIntervalSinceDate(onTime) + if timeInterval < _dueTime { + return _scheduler.scheduleRelative(key, dueTime: _dueTime - timeInterval, action: drainQueue) + } + _queue.dequeue() + forwardOn(event) + if event.isStopEvent { + dispose() + } else { + return drainQueue(key) + } + } + _running = false + _group.removeDisposable(key) + return NopDisposable.instance + // } + } + func on(event: Event) { - switch event { - case .Error(_): - _lock.lock(); defer { _lock.unlock() } // lock { + _lock.lock(); defer { _lock.unlock() } // lock { + switch event { + case .Error(_): forwardOn(event) dispose() - // } - default: - let delayDisposable = SingleAssignmentDisposable() - if let key = _group.addDisposable(delayDisposable) { - delayDisposable.disposable = _scheduler.scheduleRecursive((self, key), dueTime: _dueTime) { state, _ in - let (sink, key) = state - sink.forwardOn(event) - sink._group.removeDisposable(key) - if event.isStopEvent { - sink.dispose() + default: + _queue.enqueue((_scheduler.now, event)) + if !_running { + _running = true + let delayDisposable = SingleAssignmentDisposable() + if let key = _group.addDisposable(delayDisposable) { + delayDisposable.disposable = _scheduler.scheduleRelative(key, dueTime: _dueTime, action: drainQueue) } } } - } + // } } func run(source: Source) -> Disposable { _group.addDisposable(_sourceSubscription) - - let subscription = source.subscribe(self) - _sourceSubscription.disposable = subscription + _sourceSubscription.disposable = source.subscribe(self) return _group } From 09a844e9de247ebb73e1801a8d47119ba99094e1 Mon Sep 17 00:00:00 2001 From: Krunoslav Zaher Date: Sun, 28 Aug 2016 23:34:40 +0200 Subject: [PATCH 08/11] Adds timeout parameter to blocking observable sequence. --- RxBlocking/BlockingObservable+Operators.swift | 41 ++++++---- RxBlocking/BlockingObservable.swift | 3 +- .../ObservableConvertibleType+Blocking.swift | 5 +- RxBlocking/RunLoopLock.swift | 44 +++++++---- .../Tests/Observable+BlockingTest.swift | 75 +++++++++++++++++++ 5 files changed, 135 insertions(+), 33 deletions(-) diff --git a/RxBlocking/BlockingObservable+Operators.swift b/RxBlocking/BlockingObservable+Operators.swift index b8ee3506..3d4674b8 100644 --- a/RxBlocking/BlockingObservable+Operators.swift +++ b/RxBlocking/BlockingObservable+Operators.swift @@ -24,10 +24,14 @@ extension BlockingObservable { var error: Swift.Error? - let lock = RunLoopLock() + let lock = RunLoopLock(timeout: timeout) let d = SingleAssignmentDisposable() + defer { + d.dispose() + } + lock.dispatch { d.disposable = self.source.subscribe { e in if d.isDisposed { @@ -47,9 +51,7 @@ extension BlockingObservable { } } - lock.run() - - d.dispose() + try lock.run() if let error = error { throw error @@ -74,7 +76,11 @@ extension BlockingObservable { let d = SingleAssignmentDisposable() - let lock = RunLoopLock() + defer { + d.dispose() + } + + let lock = RunLoopLock(timeout: timeout) lock.dispatch { d.disposable = self.source.subscribe { e in @@ -99,9 +105,7 @@ extension BlockingObservable { } } - lock.run() - - d.dispose() + try lock.run() if let error = error { throw error @@ -126,7 +130,11 @@ extension BlockingObservable { let d = SingleAssignmentDisposable() - let lock = RunLoopLock() + defer { + d.dispose() + } + + let lock = RunLoopLock(timeout: timeout) lock.dispatch { d.disposable = self.source.subscribe { e in @@ -148,9 +156,7 @@ extension BlockingObservable { } } - lock.run() - - d.dispose() + try lock.run() if let error = error { throw error @@ -186,8 +192,12 @@ extension BlockingObservable { var error: Swift.Error? let d = SingleAssignmentDisposable() + + defer { + d.dispose() + } - let lock = RunLoopLock() + let lock = RunLoopLock(timeout: timeout) lock.dispatch { d.disposable = self.source.subscribe { e in @@ -224,9 +234,8 @@ extension BlockingObservable { } } - lock.run() - d.dispose() - + try lock.run() + if let error = error { throw error } diff --git a/RxBlocking/BlockingObservable.swift b/RxBlocking/BlockingObservable.swift index 2197ce13..062ac381 100644 --- a/RxBlocking/BlockingObservable.swift +++ b/RxBlocking/BlockingObservable.swift @@ -20,5 +20,6 @@ If you think you need to use a `BlockingObservable` this is usually a sign that design. */ public struct BlockingObservable { + let timeout: RxTimeInterval? let source: Observable -} \ No newline at end of file +} diff --git a/RxBlocking/ObservableConvertibleType+Blocking.swift b/RxBlocking/ObservableConvertibleType+Blocking.swift index 79a08d76..d44bc66d 100644 --- a/RxBlocking/ObservableConvertibleType+Blocking.swift +++ b/RxBlocking/ObservableConvertibleType+Blocking.swift @@ -15,10 +15,11 @@ extension ObservableConvertibleType { /** Converts an Observable into a `BlockingObservable` (an Observable with blocking operators). + - parameter timeout: Maximal time interval BlockingObservable can block without throwing `RxError.timeout`. - returns: `BlockingObservable` version of `self` */ // @warn_unused_result(message:"http://git.io/rxs.uo") - public func toBlocking() -> BlockingObservable { - return BlockingObservable(source: self.asObservable()) + public func toBlocking(timeout: RxTimeInterval? = nil) -> BlockingObservable { + return BlockingObservable(timeout: timeout, source: self.asObservable()) } } diff --git a/RxBlocking/RunLoopLock.swift b/RxBlocking/RunLoopLock.swift index dd802fa1..7df595bc 100644 --- a/RxBlocking/RunLoopLock.swift +++ b/RxBlocking/RunLoopLock.swift @@ -29,17 +29,19 @@ typealias AtomicInt = Int32 #endif class RunLoopLock { - let currentRunLoop: CFRunLoop + let _currentRunLoop: CFRunLoop - var calledRun: AtomicInt = 0 - var calledStop: AtomicInt = 0 + var _calledRun: AtomicInt = 0 + var _calledStop: AtomicInt = 0 + var _timeout: RxTimeInterval? - init() { - currentRunLoop = CFRunLoopGetCurrent() + init(timeout: RxTimeInterval?) { + _timeout = timeout + _currentRunLoop = CFRunLoopGetCurrent() } func dispatch(_ action: @escaping () -> ()) { - CFRunLoopPerformBlock(currentRunLoop, CFRunLoopMode.defaultMode.rawValue) { + CFRunLoopPerformBlock(_currentRunLoop, CFRunLoopMode.defaultMode.rawValue) { if CurrentThreadScheduler.isScheduleRequired { _ = CurrentThreadScheduler.instance.schedule(()) { _ in action() @@ -50,23 +52,37 @@ class RunLoopLock { action() } } - CFRunLoopWakeUp(currentRunLoop) + CFRunLoopWakeUp(_currentRunLoop) } func stop() { - if AtomicIncrement(&calledStop) != 1 { + if AtomicIncrement(&_calledStop) != 1 { return } - CFRunLoopPerformBlock(currentRunLoop, CFRunLoopMode.defaultMode.rawValue) { - CFRunLoopStop(self.currentRunLoop) + CFRunLoopPerformBlock(_currentRunLoop, CFRunLoopMode.defaultMode.rawValue) { + CFRunLoopStop(self._currentRunLoop) } - CFRunLoopWakeUp(currentRunLoop) + CFRunLoopWakeUp(_currentRunLoop) } - func run() { - if AtomicIncrement(&calledRun) != 1 { + func run() throws { + if AtomicIncrement(&_calledRun) != 1 { fatalError("Run can be only called once") } - CFRunLoopRun() + if let timeout = _timeout { + switch CFRunLoopRunInMode(CFRunLoopMode.defaultMode, timeout, false) { + case .finished: + return + case .handledSource: + return + case .stopped: + return + case .timedOut: + throw RxError.timeout + } + } + else { + CFRunLoopRun() + } } } diff --git a/Tests/RxSwiftTests/Tests/Observable+BlockingTest.swift b/Tests/RxSwiftTests/Tests/Observable+BlockingTest.swift index 2a298f45..328ef25f 100644 --- a/Tests/RxSwiftTests/Tests/Observable+BlockingTest.swift +++ b/Tests/RxSwiftTests/Tests/Observable+BlockingTest.swift @@ -69,6 +69,21 @@ extension ObservableBlockingTest { XCTAssertEqual(d, [1, 2]) } } + + func testToArray_timeout() { + do { + _ = try Observable.never().toBlocking(timeout: 0.01).toArray() + XCTFail("It should fail") + } + catch let e { + if case .timeout = e as! RxError { + + } + else { + XCTFail() + } + } + } } // first @@ -126,6 +141,21 @@ extension ObservableBlockingTest { XCTAssertEqual(d, 1) } } + + func testFirst_timeout() { + do { + _ = try Observable.never().toBlocking(timeout: 0.01).first() + XCTFail("It should fail") + } + catch let e { + if case .timeout = e as! RxError { + + } + else { + XCTFail() + } + } + } } // last @@ -183,6 +213,21 @@ extension ObservableBlockingTest { XCTAssertEqual(d, 1) } } + + func testLast_timeout() { + do { + _ = try Observable.never().toBlocking(timeout: 0.01).last() + XCTFail("It should fail") + } + catch let e { + if case .timeout = e as! RxError { + + } + else { + XCTFail() + } + } + } } @@ -360,4 +405,34 @@ extension ObservableBlockingTest { XCTAssertEqual(d, 1) } } + + func testSingle_timeout() { + do { + _ = try Observable.never().toBlocking(timeout: 0.01).single() + XCTFail("It should fail") + } + catch let e { + if case .timeout = e as! RxError { + + } + else { + XCTFail() + } + } + } + + func testSinglePredicate_timeout() { + do { + _ = try Observable.never().toBlocking(timeout: 0.01).single { _ in true } + XCTFail("It should fail") + } + catch let e { + if case .timeout = e as! RxError { + + } + else { + XCTFail() + } + } + } } From 8c7fed0603cf1a189631f46146029082dc10b720 Mon Sep 17 00:00:00 2001 From: Krunoslav Zaher Date: Sun, 28 Aug 2016 23:35:00 +0200 Subject: [PATCH 09/11] Fixes delay operator. --- .../Observables/Implementations/Delay.swift | 150 +++++++++++++----- RxSwift/Observables/Observable+Time.swift | 4 +- .../Tests/Observable+TimeTest.swift | 120 ++++++++++---- 3 files changed, 194 insertions(+), 80 deletions(-) diff --git a/RxSwift/Observables/Implementations/Delay.swift b/RxSwift/Observables/Implementations/Delay.swift index 35d86dc6..cc1389aa 100644 --- a/RxSwift/Observables/Implementations/Delay.swift +++ b/RxSwift/Observables/Implementations/Delay.swift @@ -15,71 +15,133 @@ class DelaySink typealias Source = Observable typealias DisposeKey = Bag.KeyType - // state - private let _group = CompositeDisposable() - private let _sourceSubscription = SingleAssignmentDisposable() - private let _lock = NSRecursiveLock() - - private var _queue = Queue<(time: RxTime, event: Event)>(capacity: 0) - private var _running = false - private var _disposed = false private let _dueTime: RxTimeInterval private let _scheduler: SchedulerType + private let _sourceSubscription = SingleAssignmentDisposable() + private let _cancelable = SerialDisposable() + + // is scheduled some action + private var _active = false + // is "run loop" on different scheduler running + private var _running = false + private var _errorEvent: Event? = nil + + // state + private var _queue = Queue<(eventTime: RxTime, event: Event)>(capacity: 0) + private var _disposed = false + init(observer: O, dueTime: RxTimeInterval, scheduler: SchedulerType) { _dueTime = dueTime _scheduler = scheduler super.init(observer: observer) } - - func drainQueue(key: DisposeKey) -> Disposable { - _lock.lock(); defer { _lock.unlock() } // lock { - if !_queue.isEmpty { - let (time, event) = _queue.peek() - let timeInterval = _scheduler.now.timeIntervalSince(time) - if timeInterval < _dueTime { - return _scheduler.scheduleRelative(key, dueTime: _dueTime - timeInterval, action: drainQueue) + + // All of these complications in this method are caused by the fact that + // error should be propagated immediatelly. Error can bepotentially received on different + // scheduler so this process needs to be synchronized somehow. + // + // Another complication is that scheduler is potentially concurrent so internal queue is used. + func drainQueue(state: (), scheduler: AnyRecursiveScheduler<()>) { + + _lock.lock() // { + let hasFailed = _errorEvent != nil + if !hasFailed { + _running = true + } + _lock.unlock() // } + + if hasFailed { + return + } + + var ranAtLeastOnce = false + + while true { + _lock.lock() // { + let errorEvent = _errorEvent + + let eventToForwardImmediatelly = ranAtLeastOnce ? nil : _queue.dequeue()?.event + let nextEventToScheduleOriginalTime: Date? = ranAtLeastOnce && !_queue.isEmpty ? _queue.peek().eventTime : nil + + if let _ = errorEvent { } - _queue.dequeue() - forwardOn(event) - if event.isStopEvent { - dispose() - } else { - return drainQueue(key: key) + else { + if let _ = eventToForwardImmediatelly { + } + else if let _ = nextEventToScheduleOriginalTime { + _running = false + } + else { + _running = false + _active = false + } + } + _lock.unlock() // { + + if let errorEvent = errorEvent { + self.forwardOn(errorEvent) + self.dispose() + return + } + else { + if let eventToForwardImmediatelly = eventToForwardImmediatelly { + ranAtLeastOnce = true + self.forwardOn(eventToForwardImmediatelly) + if case .completed = eventToForwardImmediatelly { + self.dispose() + return + } + } + else if let nextEventToScheduleOriginalTime = nextEventToScheduleOriginalTime { + let elapsedTime = _scheduler.now.timeIntervalSince(nextEventToScheduleOriginalTime) + let interval = _dueTime - elapsedTime + let normalizedInterval = interval < 0.0 ? 0.0 : interval + scheduler.schedule((), dueTime: normalizedInterval) + return + } + else { + return } } - _running = false - _grou(key) - return NopDisposable.instance - // } + } } func on(_ event: Event) { - _lock.lock(); defer { _lock.unlock() } // lock { - switch event { - case .error(_): + if event.isStopEvent { + _sourceSubscription.dispose() + } + + switch event { + case .error(_): + _lock.lock() // { + let shouldSendImmediatelly = !_running + _queue = Queue(capacity: 0) + _errorEvent = event + _lock.unlock() // } + + if shouldSendImmediatelly { forwardOn(event) dispose() - default: - _queue.enqueue((_scheduler.now, event)) - if !_running { - _running = true - let delayDisposable = SingleAssignmentDisposable() - if let key = _group.addDisposable(delayDisposable) { - delayDisposable.disposable = _scheduler.scheduleRelative(key, dueTime: _dueTime, action: drainQueue) - } - } } - // } + default: + _lock.lock() // { + let shouldSchedule = !_active + _active = true + _queue.enqueue((_scheduler.now, event)) + _lock.unlock() // } + + if shouldSchedule { + _cancelable.disposable = _scheduler.scheduleRecursive((), dueTime: _dueTime, action: self.drainQueue) + } + } } func run(source: Source) -> Disposable { - _group.addDisposable(_sourceSubscription) - _sourceSubscription.disposable = source.subscribe(self) - - return _group + _sourceSubscription.disposable = source.subscribeSafe(self) + return Disposables.create(_sourceSubscription, _cancelable) } } @@ -93,7 +155,7 @@ class Delay: Producer { _dueTime = dueTime _scheduler = scheduler } - + override func run(_ observer: O) -> Disposable where O.E == Element { let sink = DelaySink(observer: observer, dueTime: _dueTime, scheduler: _scheduler) sink.disposable = sink.run(source: _source) diff --git a/RxSwift/Observables/Observable+Time.swift b/RxSwift/Observables/Observable+Time.swift index bdf0b9fc..dc526ad8 100644 --- a/RxSwift/Observables/Observable+Time.swift +++ b/RxSwift/Observables/Observable+Time.swift @@ -286,8 +286,8 @@ extension ObservableType { - parameter scheduler: Scheduler to run the subscription delay timer on. - returns: the source Observable shifted in time by the specified delay. */ - @warn_unused_result(message="http://git.io/rxs.uo") - public func delay(dueTime: RxTimeInterval, scheduler: SchedulerType) + // @warn_unused_result(message="http://git.io/rxs.uo") + public func delay(_ dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable { return Delay(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler) } diff --git a/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift b/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift index d09a655f..e160fb4f 100644 --- a/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift +++ b/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift @@ -1864,7 +1864,49 @@ extension ObservableTimeTest { Subscription(200, 550) ]) } - + + func testDelay_TimeSpan_Error() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 1), + error(250, testError) + ]) + + let res = scheduler.start { + xs.delay(150, scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + error(250, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 250) + ]) + } + + func testDelay_TimeSpan_Completed() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 1), + completed(250) + ]) + + let res = scheduler.start { + xs.delay(150, scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + completed(250) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 250) + ]) + } + func testDelay_TimeSpan_Error1() { let scheduler = TestScheduler(initialClock: 0) @@ -1919,8 +1961,8 @@ extension ObservableTimeTest { } func testDelay_TimeSpan_Real_Simple() { - let expectation = self.expectationWithDescription("") - let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Default) + let waitForError: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1) + let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .default) let s = PublishSubject() @@ -1933,17 +1975,17 @@ extension ObservableTimeTest { array.append(i) }, onCompleted: { - expectation.fulfill() + waitForError.onCompleted() }) - dispatch_async(dispatch_get_global_queue(0, 0)) { + DispatchQueue.global(qos: .default).async { s.onNext(1) s.onNext(2) s.onNext(3) s.onCompleted() } - - waitForExpectationsWithTimeout(1.0, handler: nil) + + try! _ = waitForError.toBlocking(timeout: 5.0).first() subscription.dispose() @@ -1951,40 +1993,44 @@ extension ObservableTimeTest { } func testDelay_TimeSpan_Real_Error1() { - let expectation = self.expectationWithDescription("") - let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Default) + let errorReceived: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1) + let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .default) let s = PublishSubject() - + let res = s.delay(0.01, scheduler: scheduler) var array = [Int]() + + var error: Swift.Error? = nil let subscription = res.subscribe( onNext: { i in array.append(i) }, - onError: { _ in - expectation.fulfill() + onError: { e in + error = e + errorReceived.onCompleted() }) - dispatch_async(dispatch_get_global_queue(0, 0)) { + DispatchQueue.global(qos: .default).async { s.onNext(1) s.onNext(2) s.onNext(3) s.onError(testError) } - - waitForExpectationsWithTimeout(1.0, handler: nil) + + try! errorReceived.toBlocking(timeout: 5.0).first() subscription.dispose() - - XCTAssertEqual([], array) + + XCTAssertEqual(error! as NSError, testError) } func testDelay_TimeSpan_Real_Error2() { - let expectation = self.expectationWithDescription("") - let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Default) + let elementProcessed: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1) + let errorReceived: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1) + let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .default) let s = PublishSubject() @@ -1996,29 +2042,33 @@ extension ObservableTimeTest { let subscription = res.subscribe( onNext: { i in array.append(i) + elementProcessed.onCompleted() }, onError: { ex in err = ex as NSError - expectation.fulfill() + errorReceived.onCompleted() }) - dispatch_async(dispatch_get_global_queue(0, 0)) { + DispatchQueue.global(qos: .default).async { s.onNext(1) - NSThread.sleepForTimeInterval(0.5) + try! _ = elementProcessed.toBlocking(timeout: 5.0).first() s.onError(testError) } - - waitForExpectationsWithTimeout(1.0, handler: nil) + + try! _ = errorReceived.toBlocking(timeout: 5.0).first() subscription.dispose() XCTAssertEqual([1], array) XCTAssertEqual(testError, err) } - + + func testDelay_TimeSpan_Real_Error3() { - let expectation = self.expectationWithDescription("") - let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .Default) + let elementProcessed: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1) + let errorReceived: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1) + let acknowledged: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1) + let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .default) let s = PublishSubject() @@ -2030,20 +2080,22 @@ extension ObservableTimeTest { let subscription = res.subscribe( onNext: { i in array.append(i) - NSThread.sleepForTimeInterval(0.5) + elementProcessed.onCompleted() + try! _ = acknowledged.toBlocking(timeout: 5.0).first() }, onError: { ex in err = ex as NSError - expectation.fulfill() + errorReceived.onCompleted() }) - dispatch_async(dispatch_get_global_queue(0, 0)) { + DispatchQueue.global(qos: .default).async { s.onNext(1) - NSThread.sleepForTimeInterval(0.5) + try! _ = elementProcessed.toBlocking(timeout: 5.0).first() s.onError(testError) + acknowledged.onCompleted() } - - waitForExpectationsWithTimeout(1.0, handler: nil) + + try! _ = errorReceived.toBlocking(timeout: 5.0).first() subscription.dispose() @@ -2076,6 +2128,6 @@ extension ObservableTimeTest { func testDelay_TimeSpan_DefaultScheduler() { let scheduler = MainScheduler.instance - XCTAssertEqual(try! Observable.just(1).delay(0.001, scheduler: scheduler).toBlocking().toArray(), [1]) + XCTAssertEqual(try! Observable.just(1).delay(0.001, scheduler: scheduler).toBlocking(timeout: 5.0).toArray(), [1]) } } From 3b67c57328d859cd019048e9726a8d04c8fe5a15 Mon Sep 17 00:00:00 2001 From: Krunoslav Zaher Date: Sun, 28 Aug 2016 23:38:38 +0200 Subject: [PATCH 10/11] Fixes unit tests. --- Tests/RxSwiftTests/Tests/Observable+TimeTest.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift b/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift index e160fb4f..0db3abf2 100644 --- a/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift +++ b/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift @@ -1899,7 +1899,7 @@ extension ObservableTimeTest { } XCTAssertEqual(res.events, [ - completed(250) + completed(400) ]) XCTAssertEqual(xs.subscriptions, [ From 76b1c00661624ac1d0fdc28885a748f93533157d Mon Sep 17 00:00:00 2001 From: Douglas Heriot Date: Mon, 29 Aug 2016 12:00:04 +1000 Subject: [PATCH 11/11] Make Bool as KVORepresentable, to avoid ambiguty in observe() Also mark rx_text usage as deprecated, to avoid warning. --- RxCocoa/Common/KVORepresentable+Swift.swift | 11 +++++++++++ RxCocoa/Common/TextInput.swift | 1 + 2 files changed, 12 insertions(+) diff --git a/RxCocoa/Common/KVORepresentable+Swift.swift b/RxCocoa/Common/KVORepresentable+Swift.swift index e77ba506..a0378a4a 100644 --- a/RxCocoa/Common/KVORepresentable+Swift.swift +++ b/RxCocoa/Common/KVORepresentable+Swift.swift @@ -74,6 +74,17 @@ extension UInt64 : KVORepresentable { } } +extension Bool : KVORepresentable { + public typealias KVOType = NSNumber + + /** + Constructs `Self` using KVO value. + */ + public init?(KVOValue: KVOType) { + self.init(KVOValue.boolValue) + } +} + extension RawRepresentable where RawValue: KVORepresentable { /** diff --git a/RxCocoa/Common/TextInput.swift b/RxCocoa/Common/TextInput.swift index 4c675af5..787a309a 100644 --- a/RxCocoa/Common/TextInput.swift +++ b/RxCocoa/Common/TextInput.swift @@ -132,6 +132,7 @@ import Foundation var rx_text: ControlProperty { get } } + @available(*, deprecated) extension NSTextField : RxTextInput { /** Reactive wrapper for `text` property.