diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index c5fc008d..089159dc 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -1107,6 +1107,10 @@ D2FC15B31BCB95E5007361FF /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; }; D2FC15B41BCB95E7007361FF /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; }; D2FC15B51BCB95E8007361FF /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; }; + EB8293841C698D1A00315EB6 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = EB8293831C698D1A00315EB6 /* Delay.swift */; }; + EB8293851C69A70700315EB6 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = EB8293831C698D1A00315EB6 /* Delay.swift */; }; + EB8293861C69A70700315EB6 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = EB8293831C698D1A00315EB6 /* Delay.swift */; }; + EB8293871C69A70800315EB6 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = EB8293831C698D1A00315EB6 /* Delay.swift */; }; F31F35B01BB4FED800961002 /* UIStepper+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = F31F35AF1BB4FED800961002 /* UIStepper+Rx.swift */; }; /* End PBXBuildFile section */ @@ -1653,6 +1657,7 @@ D285BAC31BC0231000B3F602 /* SkipUntil.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipUntil.swift; sourceTree = ""; }; D2EA280C1BB9B5A200880ED3 /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; }; D2EBEB811BB9B99D003A27DC /* RxBlocking.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; }; + EB8293831C698D1A00315EB6 /* Delay.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Delay.swift; sourceTree = ""; }; F31F35AF1BB4FED800961002 /* UIStepper+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UIStepper+Rx.swift"; sourceTree = ""; }; /* End PBXFileReference section */ @@ -1915,6 +1920,7 @@ C8093C741B8A72BE0088E94D /* ConnectableObservable.swift */, C8093C751B8A72BE0088E94D /* Debug.swift */, C8093C761B8A72BE0088E94D /* Deferred.swift */, + EB8293831C698D1A00315EB6 /* Delay.swift */, C8093C771B8A72BE0088E94D /* DelaySubscription.swift */, C8093C781B8A72BE0088E94D /* DistinctUntilChanged.swift */, C8093C791B8A72BE0088E94D /* Do.swift */, @@ -3685,6 +3691,7 @@ C8093D341B8A72BE0088E94D /* RefCount.swift in Sources */, C8093D0E1B8A72BE0088E94D /* Concat.swift in Sources */, C8093CCA1B8A72BE0088E94D /* Lock.swift in Sources */, + EB8293851C69A70700315EB6 /* Delay.swift in Sources */, C8093D441B8A72BE0088E94D /* Take.swift in Sources */, C84CC5591BDCF51200E06A64 /* SynchronizedSubscribeType.swift in Sources */, C8093D321B8A72BE0088E94D /* Reduce.swift in Sources */, @@ -3907,6 +3914,7 @@ C8093D331B8A72BE0088E94D /* RefCount.swift in Sources */, C8093D0D1B8A72BE0088E94D /* Concat.swift in Sources */, C8093CC91B8A72BE0088E94D /* Lock.swift in Sources */, + EB8293841C698D1A00315EB6 /* Delay.swift in Sources */, C8093D431B8A72BE0088E94D /* Take.swift in Sources */, C84CC5581BDCF51200E06A64 /* SynchronizedSubscribeType.swift in Sources */, C8093D311B8A72BE0088E94D /* Reduce.swift in Sources */, @@ -4053,6 +4061,7 @@ C8F0BFE11BBBFB8B001B112F /* RefCount.swift in Sources */, C8F0BFE21BBBFB8B001B112F /* Concat.swift in Sources */, C8F0BFE31BBBFB8B001B112F /* Lock.swift in Sources */, + EB8293871C69A70800315EB6 /* Delay.swift in Sources */, C8F0BFE41BBBFB8B001B112F /* Take.swift in Sources */, C84CC55B1BDCF51200E06A64 /* SynchronizedSubscribeType.swift in Sources */, C8F0BFE51BBBFB8B001B112F /* Reduce.swift in Sources */, @@ -4370,6 +4379,7 @@ D2EBEB2B1BB9B6CA003A27DC /* Observable+Aggregate.swift in Sources */, D2EBEB291BB9B6C1003A27DC /* Zip+arity.swift in Sources */, D2EBEB241BB9B6C1003A27DC /* TakeUntil.swift in Sources */, + EB8293861C69A70700315EB6 /* Delay.swift in Sources */, C84CC55A1BDCF51200E06A64 /* SynchronizedSubscribeType.swift in Sources */, D2EBEB3B1BB9B6D8003A27DC /* OperationQueueScheduler.swift in Sources */, D2EBEAE51BB9B697003A27DC /* AnyObserver.swift in Sources */, diff --git a/RxExample/RxExample.xcodeproj/project.pbxproj b/RxExample/RxExample.xcodeproj/project.pbxproj index d16cbc66..54e6f0aa 100644 --- a/RxExample/RxExample.xcodeproj/project.pbxproj +++ b/RxExample/RxExample.xcodeproj/project.pbxproj @@ -427,6 +427,7 @@ D2245A191BD5654C00E7146F /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */; }; D2AF91981BD3D95900A008C1 /* Using.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2AF91881BD2C51900A008C1 /* Using.swift */; }; E3EE18D21C4D68F900834224 /* UIApplication+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = E3EE18D11C4D68F900834224 /* UIApplication+Rx.swift */; }; + EBF032401C69FAEB00C81573 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = EBF0323F1C69FAEB00C81573 /* Delay.swift */; }; /* End PBXBuildFile section */ /* Begin PBXContainerItemProxy section */ @@ -935,6 +936,7 @@ D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = ""; }; D2AF91881BD2C51900A008C1 /* Using.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Using.swift; sourceTree = ""; }; E3EE18D11C4D68F900834224 /* UIApplication+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UIApplication+Rx.swift"; sourceTree = ""; }; + EBF0323F1C69FAEB00C81573 /* Delay.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Delay.swift; sourceTree = ""; }; /* End PBXFileReference section */ /* Begin PBXFrameworksBuildPhase section */ @@ -1406,6 +1408,7 @@ C89464531BC6C2B00055219D /* ConnectableObservable.swift */, C89464541BC6C2B00055219D /* Debug.swift */, C89464551BC6C2B00055219D /* Deferred.swift */, + EBF0323F1C69FAEB00C81573 /* Delay.swift */, C89464561BC6C2B00055219D /* DelaySubscription.swift */, C89464571BC6C2B00055219D /* DistinctUntilChanged.swift */, C89464581BC6C2B00055219D /* Do.swift */, @@ -2209,6 +2212,7 @@ C894657D1BC6C2BC0055219D /* RxTarget.swift in Sources */, C80DDED21BCE9046006A1832 /* ControlEvent+Driver.swift in Sources */, C89464EE1BC6C2B00055219D /* Observable+Creation.swift in Sources */, + EBF032401C69FAEB00C81573 /* Delay.swift in Sources */, C894659A1BC6C2BC0055219D /* UISlider+Rx.swift in Sources */, C89465891BC6C2BC0055219D /* RxSearchBarDelegateProxy.swift in Sources */, C89464C21BC6C2B00055219D /* CombineLatest.swift in Sources */, diff --git a/RxSwift/Observables/Implementations/Delay.swift b/RxSwift/Observables/Implementations/Delay.swift new file mode 100644 index 00000000..a5955819 --- /dev/null +++ b/RxSwift/Observables/Implementations/Delay.swift @@ -0,0 +1,78 @@ +// +// Delay.swift +// Rx +// +// Created by tarunon on 2016/02/09. +// Copyright © 2016 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +class DelaySink + : Sink +, ObserverType { + typealias Source = Observable + typealias E = O.E + + // state + private let _group = CompositeDisposable() + private let _sourceSubscription = SingleAssignmentDisposable() + + private let _lock = NSRecursiveLock() + + private let _dueTime: RxTimeInterval + private let _scheduler: SchedulerType + + init(observer: O, dueTime: RxTimeInterval, scheduler: SchedulerType) { + _dueTime = dueTime + _scheduler = scheduler + super.init(observer: observer) + } + + func on(event: Event) { + _lock.lock(); defer { _lock.unlock() } + switch event { + case .Error(_): + forwardOn(event) + dispose() + default: + let delayDisposable = SingleAssignmentDisposable() + if let _ = _group.addDisposable(disposable) { + delayDisposable.disposable = _scheduler.scheduleRecursive((), dueTime: _dueTime) { _ in + self.forwardOn(event) + if event.isStopEvent { + self.dispose() + } + delayDisposable.dispose() + } + } + } + } + + func run(source: Source) -> Disposable { + _group.addDisposable(_sourceSubscription) + + let subscription = source.subscribe(self) + _sourceSubscription.disposable = subscription + + return _group + } +} + +class Delay: Producer { + private let _source: Observable + private let _dueTime: RxTimeInterval + private let _scheduler: SchedulerType + + init(source: Observable, dueTime: RxTimeInterval, scheduler: SchedulerType) { + _source = source + _dueTime = dueTime + _scheduler = scheduler + } + + override func run(observer: O) -> Disposable { + let sink = DelaySink(observer: observer, dueTime: _dueTime, scheduler: _scheduler) + sink.disposable = sink.run(_source) + return sink + } +} \ No newline at end of file diff --git a/RxSwift/Observables/Observable+Time.swift b/RxSwift/Observables/Observable+Time.swift index 9050ce54..932d2711 100644 --- a/RxSwift/Observables/Observable+Time.swift +++ b/RxSwift/Observables/Observable+Time.swift @@ -272,3 +272,23 @@ extension ObservableType { return Timeout(source: self.asObservable(), dueTime: dueTime, other: other.asObservable(), scheduler: scheduler) } } + +// MARK: delay + +extension ObservableType { + + /** + Returns an observable sequence by the source observable sequence shifted forward in time by a specified delay. Error events from the source observable sequence are not delayed. + + - seealso: [delay operator on reactivex.io](http://reactivex.io/documentation/operators/delay.html) + + - parameter dueTime: Relative time shift of the source by. + - parameter scheduler: Scheduler to run the subscription delay timer on. + - returns: the source Observable shifted in time by the specified delay. + */ + @warn_unused_result(message="http://git.io/rxs.uo") + public func delay(dueTime: RxTimeInterval, scheduler: SchedulerType) + -> Observable { + return Delay(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler) + } +} diff --git a/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift b/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift index 6e4ba7f0..15e96c77 100644 --- a/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift +++ b/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift @@ -1778,4 +1778,76 @@ extension ObservableTimeTest { XCTAssertEqual(ys.subscriptions, [ ]) } -} \ No newline at end of file +} + +// MARK: Delay +extension ObservableTimeTest { + + func testDelay_TimeSpan_Simple() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createColdObservable([ + next(50, 42), + next(60, 43), + completed(70) + ]) + + let res = scheduler.start { + xs.delay(30, scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + next(280, 42), + next(290, 43), + completed(300) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 270) + ]) + } + + func testDelay_TimeSpan_Error() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createColdObservable([ + next(50, 42), + next(60, 43), + error(70, testError) + ]) + + let res = scheduler.start { + xs.delay(30, scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + error(270, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 270) + ]) + } + + func testDelay_TimeSpan_Dispose() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createColdObservable([ + next(50, 42), + next(60, 43), + error(70, testError) + ]) + + let res = scheduler.start(291) { + xs.delay(30, scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + error(270, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 270) + ]) + } +}