add delay operator

This commit is contained in:
tarunon 2016-02-09 19:48:06 +09:00
parent f4bb68696c
commit 26893a379b
5 changed files with 185 additions and 1 deletions

View File

@ -1107,6 +1107,10 @@
D2FC15B31BCB95E5007361FF /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; };
D2FC15B41BCB95E7007361FF /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; };
D2FC15B51BCB95E8007361FF /* SkipWhile.swift in Sources */ = {isa = PBXBuildFile; fileRef = D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */; };
EB8293841C698D1A00315EB6 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = EB8293831C698D1A00315EB6 /* Delay.swift */; };
EB8293851C69A70700315EB6 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = EB8293831C698D1A00315EB6 /* Delay.swift */; };
EB8293861C69A70700315EB6 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = EB8293831C698D1A00315EB6 /* Delay.swift */; };
EB8293871C69A70800315EB6 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = EB8293831C698D1A00315EB6 /* Delay.swift */; };
F31F35B01BB4FED800961002 /* UIStepper+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = F31F35AF1BB4FED800961002 /* UIStepper+Rx.swift */; };
/* End PBXBuildFile section */
@ -1653,6 +1657,7 @@
D285BAC31BC0231000B3F602 /* SkipUntil.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipUntil.swift; sourceTree = "<group>"; };
D2EA280C1BB9B5A200880ED3 /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; };
D2EBEB811BB9B99D003A27DC /* RxBlocking.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; };
EB8293831C698D1A00315EB6 /* Delay.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Delay.swift; sourceTree = "<group>"; };
F31F35AF1BB4FED800961002 /* UIStepper+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UIStepper+Rx.swift"; sourceTree = "<group>"; };
/* End PBXFileReference section */
@ -1915,6 +1920,7 @@
C8093C741B8A72BE0088E94D /* ConnectableObservable.swift */,
C8093C751B8A72BE0088E94D /* Debug.swift */,
C8093C761B8A72BE0088E94D /* Deferred.swift */,
EB8293831C698D1A00315EB6 /* Delay.swift */,
C8093C771B8A72BE0088E94D /* DelaySubscription.swift */,
C8093C781B8A72BE0088E94D /* DistinctUntilChanged.swift */,
C8093C791B8A72BE0088E94D /* Do.swift */,
@ -3685,6 +3691,7 @@
C8093D341B8A72BE0088E94D /* RefCount.swift in Sources */,
C8093D0E1B8A72BE0088E94D /* Concat.swift in Sources */,
C8093CCA1B8A72BE0088E94D /* Lock.swift in Sources */,
EB8293851C69A70700315EB6 /* Delay.swift in Sources */,
C8093D441B8A72BE0088E94D /* Take.swift in Sources */,
C84CC5591BDCF51200E06A64 /* SynchronizedSubscribeType.swift in Sources */,
C8093D321B8A72BE0088E94D /* Reduce.swift in Sources */,
@ -3907,6 +3914,7 @@
C8093D331B8A72BE0088E94D /* RefCount.swift in Sources */,
C8093D0D1B8A72BE0088E94D /* Concat.swift in Sources */,
C8093CC91B8A72BE0088E94D /* Lock.swift in Sources */,
EB8293841C698D1A00315EB6 /* Delay.swift in Sources */,
C8093D431B8A72BE0088E94D /* Take.swift in Sources */,
C84CC5581BDCF51200E06A64 /* SynchronizedSubscribeType.swift in Sources */,
C8093D311B8A72BE0088E94D /* Reduce.swift in Sources */,
@ -4053,6 +4061,7 @@
C8F0BFE11BBBFB8B001B112F /* RefCount.swift in Sources */,
C8F0BFE21BBBFB8B001B112F /* Concat.swift in Sources */,
C8F0BFE31BBBFB8B001B112F /* Lock.swift in Sources */,
EB8293871C69A70800315EB6 /* Delay.swift in Sources */,
C8F0BFE41BBBFB8B001B112F /* Take.swift in Sources */,
C84CC55B1BDCF51200E06A64 /* SynchronizedSubscribeType.swift in Sources */,
C8F0BFE51BBBFB8B001B112F /* Reduce.swift in Sources */,
@ -4370,6 +4379,7 @@
D2EBEB2B1BB9B6CA003A27DC /* Observable+Aggregate.swift in Sources */,
D2EBEB291BB9B6C1003A27DC /* Zip+arity.swift in Sources */,
D2EBEB241BB9B6C1003A27DC /* TakeUntil.swift in Sources */,
EB8293861C69A70700315EB6 /* Delay.swift in Sources */,
C84CC55A1BDCF51200E06A64 /* SynchronizedSubscribeType.swift in Sources */,
D2EBEB3B1BB9B6D8003A27DC /* OperationQueueScheduler.swift in Sources */,
D2EBEAE51BB9B697003A27DC /* AnyObserver.swift in Sources */,

View File

@ -427,6 +427,7 @@
D2245A191BD5654C00E7146F /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */; };
D2AF91981BD3D95900A008C1 /* Using.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2AF91881BD2C51900A008C1 /* Using.swift */; };
E3EE18D21C4D68F900834224 /* UIApplication+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = E3EE18D11C4D68F900834224 /* UIApplication+Rx.swift */; };
EBF032401C69FAEB00C81573 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = EBF0323F1C69FAEB00C81573 /* Delay.swift */; };
/* End PBXBuildFile section */
/* Begin PBXContainerItemProxy section */
@ -935,6 +936,7 @@
D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = "<group>"; };
D2AF91881BD2C51900A008C1 /* Using.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Using.swift; sourceTree = "<group>"; };
E3EE18D11C4D68F900834224 /* UIApplication+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UIApplication+Rx.swift"; sourceTree = "<group>"; };
EBF0323F1C69FAEB00C81573 /* Delay.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Delay.swift; sourceTree = "<group>"; };
/* End PBXFileReference section */
/* Begin PBXFrameworksBuildPhase section */
@ -1406,6 +1408,7 @@
C89464531BC6C2B00055219D /* ConnectableObservable.swift */,
C89464541BC6C2B00055219D /* Debug.swift */,
C89464551BC6C2B00055219D /* Deferred.swift */,
EBF0323F1C69FAEB00C81573 /* Delay.swift */,
C89464561BC6C2B00055219D /* DelaySubscription.swift */,
C89464571BC6C2B00055219D /* DistinctUntilChanged.swift */,
C89464581BC6C2B00055219D /* Do.swift */,
@ -2209,6 +2212,7 @@
C894657D1BC6C2BC0055219D /* RxTarget.swift in Sources */,
C80DDED21BCE9046006A1832 /* ControlEvent+Driver.swift in Sources */,
C89464EE1BC6C2B00055219D /* Observable+Creation.swift in Sources */,
EBF032401C69FAEB00C81573 /* Delay.swift in Sources */,
C894659A1BC6C2BC0055219D /* UISlider+Rx.swift in Sources */,
C89465891BC6C2BC0055219D /* RxSearchBarDelegateProxy.swift in Sources */,
C89464C21BC6C2B00055219D /* CombineLatest.swift in Sources */,

View File

@ -0,0 +1,78 @@
//
// Delay.swift
// Rx
//
// Created by tarunon on 2016/02/09.
// Copyright © 2016 Krunoslav Zaher. All rights reserved.
//
import Foundation
class DelaySink<ElementType, O: ObserverType where O.E == ElementType>
: Sink<O>
, ObserverType {
typealias Source = Observable<ElementType>
typealias E = O.E
// state
private let _group = CompositeDisposable()
private let _sourceSubscription = SingleAssignmentDisposable()
private let _lock = NSRecursiveLock()
private let _dueTime: RxTimeInterval
private let _scheduler: SchedulerType
init(observer: O, dueTime: RxTimeInterval, scheduler: SchedulerType) {
_dueTime = dueTime
_scheduler = scheduler
super.init(observer: observer)
}
func on(event: Event<E>) {
_lock.lock(); defer { _lock.unlock() }
switch event {
case .Error(_):
forwardOn(event)
dispose()
default:
let delayDisposable = SingleAssignmentDisposable()
if let _ = _group.addDisposable(disposable) {
delayDisposable.disposable = _scheduler.scheduleRecursive((), dueTime: _dueTime) { _ in
self.forwardOn(event)
if event.isStopEvent {
self.dispose()
}
delayDisposable.dispose()
}
}
}
}
func run(source: Source) -> Disposable {
_group.addDisposable(_sourceSubscription)
let subscription = source.subscribe(self)
_sourceSubscription.disposable = subscription
return _group
}
}
class Delay<Element>: Producer<Element> {
private let _source: Observable<Element>
private let _dueTime: RxTimeInterval
private let _scheduler: SchedulerType
init(source: Observable<Element>, dueTime: RxTimeInterval, scheduler: SchedulerType) {
_source = source
_dueTime = dueTime
_scheduler = scheduler
}
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = DelaySink(observer: observer, dueTime: _dueTime, scheduler: _scheduler)
sink.disposable = sink.run(_source)
return sink
}
}

View File

@ -272,3 +272,23 @@ extension ObservableType {
return Timeout(source: self.asObservable(), dueTime: dueTime, other: other.asObservable(), scheduler: scheduler)
}
}
// MARK: delay
extension ObservableType {
/**
Returns an observable sequence by the source observable sequence shifted forward in time by a specified delay. Error events from the source observable sequence are not delayed.
- seealso: [delay operator on reactivex.io](http://reactivex.io/documentation/operators/delay.html)
- parameter dueTime: Relative time shift of the source by.
- parameter scheduler: Scheduler to run the subscription delay timer on.
- returns: the source Observable shifted in time by the specified delay.
*/
@warn_unused_result(message="http://git.io/rxs.uo")
public func delay(dueTime: RxTimeInterval, scheduler: SchedulerType)
-> Observable<E> {
return Delay(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
}
}

View File

@ -1778,4 +1778,76 @@ extension ObservableTimeTest {
XCTAssertEqual(ys.subscriptions, [
])
}
}
}
// MARK: Delay
extension ObservableTimeTest {
func testDelay_TimeSpan_Simple() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createColdObservable([
next(50, 42),
next(60, 43),
completed(70)
])
let res = scheduler.start {
xs.delay(30, scheduler: scheduler)
}
XCTAssertEqual(res.events, [
next(280, 42),
next(290, 43),
completed(300)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 270)
])
}
func testDelay_TimeSpan_Error() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createColdObservable([
next(50, 42),
next(60, 43),
error(70, testError)
])
let res = scheduler.start {
xs.delay(30, scheduler: scheduler)
}
XCTAssertEqual(res.events, [
error(270, testError)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 270)
])
}
func testDelay_TimeSpan_Dispose() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createColdObservable([
next(50, 42),
next(60, 43),
error(70, testError)
])
let res = scheduler.start(291) {
xs.delay(30, scheduler: scheduler)
}
XCTAssertEqual(res.events, [
error(270, testError)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 270)
])
}
}