diff --git a/Documentation/API.md b/Documentation/API.md index c25d5dbc..dd737c06 100644 --- a/Documentation/API.md +++ b/Documentation/API.md @@ -63,7 +63,7 @@ Operators are stateless by default. #### Observable Utility Operators - * [`delaySubscription`](http://reactivex.io/documentation/operators/delay.html) + * [`delaySubscription` / `delay`](http://reactivex.io/documentation/operators/delay.html) * [`do` / `doOnNext`](http://reactivex.io/documentation/operators/do.html) * [`observeOn` / `observeSingleOn`](http://reactivex.io/documentation/operators/observeon.html) * [`subscribe`](http://reactivex.io/documentation/operators/subscribe.html) diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index a2dddcd6..fbbb1b52 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -633,6 +633,10 @@ C86409FD1BA593F500D3C4E8 /* Range.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86409FB1BA593F500D3C4E8 /* Range.swift */; }; C8640A031BA5B12A00D3C4E8 /* Repeat.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8640A021BA5B12A00D3C4E8 /* Repeat.swift */; }; C8640A041BA5B12A00D3C4E8 /* Repeat.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8640A021BA5B12A00D3C4E8 /* Repeat.swift */; }; + C86B0A561D735CCC005D8A16 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86B0A551D735CCC005D8A16 /* Delay.swift */; }; + C86B0A571D735CCC005D8A16 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86B0A551D735CCC005D8A16 /* Delay.swift */; }; + C86B0A581D735CCC005D8A16 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86B0A551D735CCC005D8A16 /* Delay.swift */; }; + C86B0A591D735CCC005D8A16 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86B0A551D735CCC005D8A16 /* Delay.swift */; }; C86B1E221D42BF5200130546 /* SchedulerTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86B1E211D42BF5200130546 /* SchedulerTests.swift */; }; C86B1E231D42BF5200130546 /* SchedulerTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86B1E211D42BF5200130546 /* SchedulerTests.swift */; }; C86B1E241D42BF5200130546 /* SchedulerTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86B1E211D42BF5200130546 /* SchedulerTests.swift */; }; @@ -1662,6 +1666,7 @@ C85BA04B1C3878740075D68E /* PerformanceTests.app */ = {isa = PBXFileReference; explicitFileType = wrapper.application; includeInIndex = 0; path = PerformanceTests.app; sourceTree = BUILT_PRODUCTS_DIR; }; C86409FB1BA593F500D3C4E8 /* Range.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Range.swift; sourceTree = ""; }; C8640A021BA5B12A00D3C4E8 /* Repeat.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Repeat.swift; sourceTree = ""; }; + C86B0A551D735CCC005D8A16 /* Delay.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Delay.swift; sourceTree = ""; }; C86B1E211D42BF5200130546 /* SchedulerTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SchedulerTests.swift; sourceTree = ""; }; C88253F11B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxCollectionViewReactiveArrayDataSource.swift; sourceTree = ""; }; C88253F21B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxTableViewReactiveArrayDataSource.swift; sourceTree = ""; }; @@ -2032,6 +2037,7 @@ C8093C741B8A72BE0088E94D /* ConnectableObservable.swift */, C8093C751B8A72BE0088E94D /* Debug.swift */, C8093C761B8A72BE0088E94D /* Deferred.swift */, + C86B0A551D735CCC005D8A16 /* Delay.swift */, C8093C771B8A72BE0088E94D /* DelaySubscription.swift */, C8093C781B8A72BE0088E94D /* DistinctUntilChanged.swift */, C8093C791B8A72BE0088E94D /* Do.swift */, @@ -3926,6 +3932,7 @@ C8640A041BA5B12A00D3C4E8 /* Repeat.swift in Sources */, 79E9DE8A1C3417FD009970AF /* DispatchQueueSchedulerQOS.swift in Sources */, C8093CF41B8A72BE0088E94D /* Errors.swift in Sources */, + C86B0A571D735CCC005D8A16 /* Delay.swift in Sources */, C8093D141B8A72BE0088E94D /* Debug.swift in Sources */, CB883B4B1BE369AA000AC2EE /* AddRef.swift in Sources */, C8554E2B1C3051620052E67D /* PriorityQueue.swift in Sources */, @@ -4149,6 +4156,7 @@ C8640A031BA5B12A00D3C4E8 /* Repeat.swift in Sources */, 79E9DE891C3417FD009970AF /* DispatchQueueSchedulerQOS.swift in Sources */, C8093CF31B8A72BE0088E94D /* Errors.swift in Sources */, + C86B0A561D735CCC005D8A16 /* Delay.swift in Sources */, C8093D131B8A72BE0088E94D /* Debug.swift in Sources */, CB883B4A1BE369AA000AC2EE /* AddRef.swift in Sources */, C8093CCD1B8A72BE0088E94D /* Bag.swift in Sources */, @@ -4296,6 +4304,7 @@ C8F0BFE71BBBFB8B001B112F /* Repeat.swift in Sources */, 79E9DE8C1C3417FD009970AF /* DispatchQueueSchedulerQOS.swift in Sources */, C8F0BFE81BBBFB8B001B112F /* Errors.swift in Sources */, + C86B0A591D735CCC005D8A16 /* Delay.swift in Sources */, C8F0BFE91BBBFB8B001B112F /* Debug.swift in Sources */, CB883B4D1BE369AA000AC2EE /* AddRef.swift in Sources */, C8554E2D1C3051620052E67D /* PriorityQueue.swift in Sources */, @@ -4638,6 +4647,7 @@ D2EBEB3D1BB9B6D8003A27DC /* SchedulerServices+Emulation.swift in Sources */, D2EBEB1C1BB9B6C1003A27DC /* Sample.swift in Sources */, D2EBEAFD1BB9B6BA003A27DC /* AnonymousObservable.swift in Sources */, + C86B0A581D735CCC005D8A16 /* Delay.swift in Sources */, C8554E2C1C3051620052E67D /* PriorityQueue.swift in Sources */, CB883B4C1BE369AA000AC2EE /* AddRef.swift in Sources */, D2EBEAFA1BB9B6B2003A27DC /* SingleAssignmentDisposable.swift in Sources */, diff --git a/RxBlocking/BlockingObservable+Operators.swift b/RxBlocking/BlockingObservable+Operators.swift index b8ee3506..3d4674b8 100644 --- a/RxBlocking/BlockingObservable+Operators.swift +++ b/RxBlocking/BlockingObservable+Operators.swift @@ -24,10 +24,14 @@ extension BlockingObservable { var error: Swift.Error? - let lock = RunLoopLock() + let lock = RunLoopLock(timeout: timeout) let d = SingleAssignmentDisposable() + defer { + d.dispose() + } + lock.dispatch { d.disposable = self.source.subscribe { e in if d.isDisposed { @@ -47,9 +51,7 @@ extension BlockingObservable { } } - lock.run() - - d.dispose() + try lock.run() if let error = error { throw error @@ -74,7 +76,11 @@ extension BlockingObservable { let d = SingleAssignmentDisposable() - let lock = RunLoopLock() + defer { + d.dispose() + } + + let lock = RunLoopLock(timeout: timeout) lock.dispatch { d.disposable = self.source.subscribe { e in @@ -99,9 +105,7 @@ extension BlockingObservable { } } - lock.run() - - d.dispose() + try lock.run() if let error = error { throw error @@ -126,7 +130,11 @@ extension BlockingObservable { let d = SingleAssignmentDisposable() - let lock = RunLoopLock() + defer { + d.dispose() + } + + let lock = RunLoopLock(timeout: timeout) lock.dispatch { d.disposable = self.source.subscribe { e in @@ -148,9 +156,7 @@ extension BlockingObservable { } } - lock.run() - - d.dispose() + try lock.run() if let error = error { throw error @@ -186,8 +192,12 @@ extension BlockingObservable { var error: Swift.Error? let d = SingleAssignmentDisposable() + + defer { + d.dispose() + } - let lock = RunLoopLock() + let lock = RunLoopLock(timeout: timeout) lock.dispatch { d.disposable = self.source.subscribe { e in @@ -224,9 +234,8 @@ extension BlockingObservable { } } - lock.run() - d.dispose() - + try lock.run() + if let error = error { throw error } diff --git a/RxBlocking/BlockingObservable.swift b/RxBlocking/BlockingObservable.swift index 2197ce13..062ac381 100644 --- a/RxBlocking/BlockingObservable.swift +++ b/RxBlocking/BlockingObservable.swift @@ -20,5 +20,6 @@ If you think you need to use a `BlockingObservable` this is usually a sign that design. */ public struct BlockingObservable { + let timeout: RxTimeInterval? let source: Observable -} \ No newline at end of file +} diff --git a/RxBlocking/ObservableConvertibleType+Blocking.swift b/RxBlocking/ObservableConvertibleType+Blocking.swift index 79a08d76..d44bc66d 100644 --- a/RxBlocking/ObservableConvertibleType+Blocking.swift +++ b/RxBlocking/ObservableConvertibleType+Blocking.swift @@ -15,10 +15,11 @@ extension ObservableConvertibleType { /** Converts an Observable into a `BlockingObservable` (an Observable with blocking operators). + - parameter timeout: Maximal time interval BlockingObservable can block without throwing `RxError.timeout`. - returns: `BlockingObservable` version of `self` */ // @warn_unused_result(message:"http://git.io/rxs.uo") - public func toBlocking() -> BlockingObservable { - return BlockingObservable(source: self.asObservable()) + public func toBlocking(timeout: RxTimeInterval? = nil) -> BlockingObservable { + return BlockingObservable(timeout: timeout, source: self.asObservable()) } } diff --git a/RxBlocking/RunLoopLock.swift b/RxBlocking/RunLoopLock.swift index dd802fa1..7df595bc 100644 --- a/RxBlocking/RunLoopLock.swift +++ b/RxBlocking/RunLoopLock.swift @@ -29,17 +29,19 @@ typealias AtomicInt = Int32 #endif class RunLoopLock { - let currentRunLoop: CFRunLoop + let _currentRunLoop: CFRunLoop - var calledRun: AtomicInt = 0 - var calledStop: AtomicInt = 0 + var _calledRun: AtomicInt = 0 + var _calledStop: AtomicInt = 0 + var _timeout: RxTimeInterval? - init() { - currentRunLoop = CFRunLoopGetCurrent() + init(timeout: RxTimeInterval?) { + _timeout = timeout + _currentRunLoop = CFRunLoopGetCurrent() } func dispatch(_ action: @escaping () -> ()) { - CFRunLoopPerformBlock(currentRunLoop, CFRunLoopMode.defaultMode.rawValue) { + CFRunLoopPerformBlock(_currentRunLoop, CFRunLoopMode.defaultMode.rawValue) { if CurrentThreadScheduler.isScheduleRequired { _ = CurrentThreadScheduler.instance.schedule(()) { _ in action() @@ -50,23 +52,37 @@ class RunLoopLock { action() } } - CFRunLoopWakeUp(currentRunLoop) + CFRunLoopWakeUp(_currentRunLoop) } func stop() { - if AtomicIncrement(&calledStop) != 1 { + if AtomicIncrement(&_calledStop) != 1 { return } - CFRunLoopPerformBlock(currentRunLoop, CFRunLoopMode.defaultMode.rawValue) { - CFRunLoopStop(self.currentRunLoop) + CFRunLoopPerformBlock(_currentRunLoop, CFRunLoopMode.defaultMode.rawValue) { + CFRunLoopStop(self._currentRunLoop) } - CFRunLoopWakeUp(currentRunLoop) + CFRunLoopWakeUp(_currentRunLoop) } - func run() { - if AtomicIncrement(&calledRun) != 1 { + func run() throws { + if AtomicIncrement(&_calledRun) != 1 { fatalError("Run can be only called once") } - CFRunLoopRun() + if let timeout = _timeout { + switch CFRunLoopRunInMode(CFRunLoopMode.defaultMode, timeout, false) { + case .finished: + return + case .handledSource: + return + case .stopped: + return + case .timedOut: + throw RxError.timeout + } + } + else { + CFRunLoopRun() + } } } diff --git a/RxCocoa/Common/KVORepresentable+Swift.swift b/RxCocoa/Common/KVORepresentable+Swift.swift index e77ba506..a0378a4a 100644 --- a/RxCocoa/Common/KVORepresentable+Swift.swift +++ b/RxCocoa/Common/KVORepresentable+Swift.swift @@ -74,6 +74,17 @@ extension UInt64 : KVORepresentable { } } +extension Bool : KVORepresentable { + public typealias KVOType = NSNumber + + /** + Constructs `Self` using KVO value. + */ + public init?(KVOValue: KVOType) { + self.init(KVOValue.boolValue) + } +} + extension RawRepresentable where RawValue: KVORepresentable { /** diff --git a/RxCocoa/Common/TextInput.swift b/RxCocoa/Common/TextInput.swift index 4c675af5..787a309a 100644 --- a/RxCocoa/Common/TextInput.swift +++ b/RxCocoa/Common/TextInput.swift @@ -132,6 +132,7 @@ import Foundation var rx_text: ControlProperty { get } } + @available(*, deprecated) extension NSTextField : RxTextInput { /** Reactive wrapper for `text` property. diff --git a/RxSwift/Observables/Implementations/Delay.swift b/RxSwift/Observables/Implementations/Delay.swift new file mode 100644 index 00000000..cc1389aa --- /dev/null +++ b/RxSwift/Observables/Implementations/Delay.swift @@ -0,0 +1,164 @@ +// +// Delay.swift +// RxSwift +// +// Created by tarunon on 2016/02/09. +// Copyright © 2016 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +class DelaySink + : Sink + , ObserverType where O.E == ElementType { + typealias E = O.E + typealias Source = Observable + typealias DisposeKey = Bag.KeyType + + private let _lock = NSRecursiveLock() + + private let _dueTime: RxTimeInterval + private let _scheduler: SchedulerType + + private let _sourceSubscription = SingleAssignmentDisposable() + private let _cancelable = SerialDisposable() + + // is scheduled some action + private var _active = false + // is "run loop" on different scheduler running + private var _running = false + private var _errorEvent: Event? = nil + + // state + private var _queue = Queue<(eventTime: RxTime, event: Event)>(capacity: 0) + private var _disposed = false + + init(observer: O, dueTime: RxTimeInterval, scheduler: SchedulerType) { + _dueTime = dueTime + _scheduler = scheduler + super.init(observer: observer) + } + + // All of these complications in this method are caused by the fact that + // error should be propagated immediatelly. Error can bepotentially received on different + // scheduler so this process needs to be synchronized somehow. + // + // Another complication is that scheduler is potentially concurrent so internal queue is used. + func drainQueue(state: (), scheduler: AnyRecursiveScheduler<()>) { + + _lock.lock() // { + let hasFailed = _errorEvent != nil + if !hasFailed { + _running = true + } + _lock.unlock() // } + + if hasFailed { + return + } + + var ranAtLeastOnce = false + + while true { + _lock.lock() // { + let errorEvent = _errorEvent + + let eventToForwardImmediatelly = ranAtLeastOnce ? nil : _queue.dequeue()?.event + let nextEventToScheduleOriginalTime: Date? = ranAtLeastOnce && !_queue.isEmpty ? _queue.peek().eventTime : nil + + if let _ = errorEvent { + } + else { + if let _ = eventToForwardImmediatelly { + } + else if let _ = nextEventToScheduleOriginalTime { + _running = false + } + else { + _running = false + _active = false + } + } + _lock.unlock() // { + + if let errorEvent = errorEvent { + self.forwardOn(errorEvent) + self.dispose() + return + } + else { + if let eventToForwardImmediatelly = eventToForwardImmediatelly { + ranAtLeastOnce = true + self.forwardOn(eventToForwardImmediatelly) + if case .completed = eventToForwardImmediatelly { + self.dispose() + return + } + } + else if let nextEventToScheduleOriginalTime = nextEventToScheduleOriginalTime { + let elapsedTime = _scheduler.now.timeIntervalSince(nextEventToScheduleOriginalTime) + let interval = _dueTime - elapsedTime + let normalizedInterval = interval < 0.0 ? 0.0 : interval + scheduler.schedule((), dueTime: normalizedInterval) + return + } + else { + return + } + } + } + } + + func on(_ event: Event) { + if event.isStopEvent { + _sourceSubscription.dispose() + } + + switch event { + case .error(_): + _lock.lock() // { + let shouldSendImmediatelly = !_running + _queue = Queue(capacity: 0) + _errorEvent = event + _lock.unlock() // } + + if shouldSendImmediatelly { + forwardOn(event) + dispose() + } + default: + _lock.lock() // { + let shouldSchedule = !_active + _active = true + _queue.enqueue((_scheduler.now, event)) + _lock.unlock() // } + + if shouldSchedule { + _cancelable.disposable = _scheduler.scheduleRecursive((), dueTime: _dueTime, action: self.drainQueue) + } + } + } + + func run(source: Source) -> Disposable { + _sourceSubscription.disposable = source.subscribeSafe(self) + return Disposables.create(_sourceSubscription, _cancelable) + } +} + +class Delay: Producer { + private let _source: Observable + private let _dueTime: RxTimeInterval + private let _scheduler: SchedulerType + + init(source: Observable, dueTime: RxTimeInterval, scheduler: SchedulerType) { + _source = source + _dueTime = dueTime + _scheduler = scheduler + } + + override func run(_ observer: O) -> Disposable where O.E == Element { + let sink = DelaySink(observer: observer, dueTime: _dueTime, scheduler: _scheduler) + sink.disposable = sink.run(source: _source) + return sink + } +} diff --git a/RxSwift/Observables/Observable+Time.swift b/RxSwift/Observables/Observable+Time.swift index fdd9b4af..dc526ad8 100644 --- a/RxSwift/Observables/Observable+Time.swift +++ b/RxSwift/Observables/Observable+Time.swift @@ -272,3 +272,23 @@ extension ObservableType { return Timeout(source: self.asObservable(), dueTime: dueTime, other: other.asObservable(), scheduler: scheduler) } } + +// MARK: delay + +extension ObservableType { + + /** + Returns an observable sequence by the source observable sequence shifted forward in time by a specified delay. Error events from the source observable sequence are not delayed. + + - seealso: [delay operator on reactivex.io](http://reactivex.io/documentation/operators/delay.html) + + - parameter dueTime: Relative time shift of the source by. + - parameter scheduler: Scheduler to run the subscription delay timer on. + - returns: the source Observable shifted in time by the specified delay. + */ + // @warn_unused_result(message="http://git.io/rxs.uo") + public func delay(_ dueTime: RxTimeInterval, scheduler: SchedulerType) + -> Observable { + return Delay(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler) + } +} diff --git a/Tests/RxSwiftTests/Tests/Observable+BlockingTest.swift b/Tests/RxSwiftTests/Tests/Observable+BlockingTest.swift index 2a298f45..328ef25f 100644 --- a/Tests/RxSwiftTests/Tests/Observable+BlockingTest.swift +++ b/Tests/RxSwiftTests/Tests/Observable+BlockingTest.swift @@ -69,6 +69,21 @@ extension ObservableBlockingTest { XCTAssertEqual(d, [1, 2]) } } + + func testToArray_timeout() { + do { + _ = try Observable.never().toBlocking(timeout: 0.01).toArray() + XCTFail("It should fail") + } + catch let e { + if case .timeout = e as! RxError { + + } + else { + XCTFail() + } + } + } } // first @@ -126,6 +141,21 @@ extension ObservableBlockingTest { XCTAssertEqual(d, 1) } } + + func testFirst_timeout() { + do { + _ = try Observable.never().toBlocking(timeout: 0.01).first() + XCTFail("It should fail") + } + catch let e { + if case .timeout = e as! RxError { + + } + else { + XCTFail() + } + } + } } // last @@ -183,6 +213,21 @@ extension ObservableBlockingTest { XCTAssertEqual(d, 1) } } + + func testLast_timeout() { + do { + _ = try Observable.never().toBlocking(timeout: 0.01).last() + XCTFail("It should fail") + } + catch let e { + if case .timeout = e as! RxError { + + } + else { + XCTFail() + } + } + } } @@ -360,4 +405,34 @@ extension ObservableBlockingTest { XCTAssertEqual(d, 1) } } + + func testSingle_timeout() { + do { + _ = try Observable.never().toBlocking(timeout: 0.01).single() + XCTFail("It should fail") + } + catch let e { + if case .timeout = e as! RxError { + + } + else { + XCTFail() + } + } + } + + func testSinglePredicate_timeout() { + do { + _ = try Observable.never().toBlocking(timeout: 0.01).single { _ in true } + XCTFail("It should fail") + } + catch let e { + if case .timeout = e as! RxError { + + } + else { + XCTFail() + } + } + } } diff --git a/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift b/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift index 50187435..0db3abf2 100644 --- a/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift +++ b/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift @@ -1780,3 +1780,354 @@ extension ObservableTimeTest { ]) } } + +// MARK: Delay +extension ObservableTimeTest { + + func testDelay_TimeSpan_Simple1() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 1), + next(250, 2), + next(350, 3), + next(450, 4), + completed(550) + ]) + + let res = scheduler.start { + xs.delay(100, scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + next(350, 2), + next(450, 3), + next(550, 4), + completed(650) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 550) + ]) + } + + func testDelay_TimeSpan_Simple2() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 1), + next(250, 2), + next(350, 3), + next(450, 4), + completed(550) + ]) + + let res = scheduler.start { + xs.delay(50, scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + next(300, 2), + next(400, 3), + next(500, 4), + completed(600) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 550) + ]) + } + + func testDelay_TimeSpan_Simple3() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 1), + next(250, 2), + next(350, 3), + next(450, 4), + completed(550) + ]) + + let res = scheduler.start { + xs.delay(150, scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + next(400, 2), + next(500, 3), + next(600, 4), + completed(700) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 550) + ]) + } + + func testDelay_TimeSpan_Error() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 1), + error(250, testError) + ]) + + let res = scheduler.start { + xs.delay(150, scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + error(250, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 250) + ]) + } + + func testDelay_TimeSpan_Completed() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 1), + completed(250) + ]) + + let res = scheduler.start { + xs.delay(150, scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + completed(400) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 250) + ]) + } + + func testDelay_TimeSpan_Error1() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 1), + next(250, 2), + next(350, 3), + next(450, 4), + error(550, testError) + ]) + + let res = scheduler.start { + xs.delay(50, scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + next(300, 2), + next(400, 3), + next(500, 4), + error(550, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 550) + ]) + } + + func testDelay_TimeSpan_Error2() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 1), + next(250, 2), + next(350, 3), + next(450, 4), + error(550, testError) + ]) + + let res = scheduler.start { + xs.delay(150, scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + next(400, 2), + next(500, 3), + error(550, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 550) + ]) + } + + func testDelay_TimeSpan_Real_Simple() { + let waitForError: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1) + let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .default) + + let s = PublishSubject() + + let res = s.delay(0.01, scheduler: scheduler) + + var array = [Int]() + + let subscription = res.subscribe( + onNext: { i in + array.append(i) + }, + onCompleted: { + waitForError.onCompleted() + }) + + DispatchQueue.global(qos: .default).async { + s.onNext(1) + s.onNext(2) + s.onNext(3) + s.onCompleted() + } + + try! _ = waitForError.toBlocking(timeout: 5.0).first() + + subscription.dispose() + + XCTAssertEqual([1, 2, 3], array) + } + + func testDelay_TimeSpan_Real_Error1() { + let errorReceived: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1) + let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .default) + + let s = PublishSubject() + + let res = s.delay(0.01, scheduler: scheduler) + + var array = [Int]() + + var error: Swift.Error? = nil + + let subscription = res.subscribe( + onNext: { i in + array.append(i) + }, + onError: { e in + error = e + errorReceived.onCompleted() + }) + + DispatchQueue.global(qos: .default).async { + s.onNext(1) + s.onNext(2) + s.onNext(3) + s.onError(testError) + } + + try! errorReceived.toBlocking(timeout: 5.0).first() + + subscription.dispose() + + XCTAssertEqual(error! as NSError, testError) + } + + func testDelay_TimeSpan_Real_Error2() { + let elementProcessed: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1) + let errorReceived: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1) + let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .default) + + let s = PublishSubject() + + let res = s.delay(0.01, scheduler: scheduler) + + var array = [Int]() + var err: NSError! + + let subscription = res.subscribe( + onNext: { i in + array.append(i) + elementProcessed.onCompleted() + }, + onError: { ex in + err = ex as NSError + errorReceived.onCompleted() + }) + + DispatchQueue.global(qos: .default).async { + s.onNext(1) + try! _ = elementProcessed.toBlocking(timeout: 5.0).first() + s.onError(testError) + } + + try! _ = errorReceived.toBlocking(timeout: 5.0).first() + + subscription.dispose() + + XCTAssertEqual([1], array) + XCTAssertEqual(testError, err) + } + + + func testDelay_TimeSpan_Real_Error3() { + let elementProcessed: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1) + let errorReceived: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1) + let acknowledged: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1) + let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .default) + + let s = PublishSubject() + + let res = s.delay(0.01, scheduler: scheduler) + + var array = [Int]() + var err: NSError! + + let subscription = res.subscribe( + onNext: { i in + array.append(i) + elementProcessed.onCompleted() + try! _ = acknowledged.toBlocking(timeout: 5.0).first() + }, + onError: { ex in + err = ex as NSError + errorReceived.onCompleted() + }) + + DispatchQueue.global(qos: .default).async { + s.onNext(1) + try! _ = elementProcessed.toBlocking(timeout: 5.0).first() + s.onError(testError) + acknowledged.onCompleted() + } + + try! _ = errorReceived.toBlocking(timeout: 5.0).first() + + subscription.dispose() + + XCTAssertEqual([1], array) + XCTAssertEqual(testError, err) + } + + func testDelay_TimeSpan_Positive() { + let scheduler = TestScheduler(initialClock: 0) + + let msgs = [ + next(150, 1), + next(250, 2), + next(350, 3), + next(450, 4), + completed(550) + ] + + let xs = scheduler.createHotObservable(msgs) + + let delay: RxTimeInterval = 42 + let res = scheduler.start { + xs.delay(delay, scheduler: scheduler) + } + + XCTAssertEqual(res.events, + msgs.map { Recorded(time: $0.time + Int(delay), event: $0.value) } + .filter { $0.time > 200 }) + } + + func testDelay_TimeSpan_DefaultScheduler() { + let scheduler = MainScheduler.instance + XCTAssertEqual(try! Observable.just(1).delay(0.001, scheduler: scheduler).toBlocking(timeout: 5.0).toArray(), [1]) + } +}