Merge branch 'feature-timeout' of https://github.com/tottakai/RxSwift into tottakai-feature-timeout

This commit is contained in:
Junior B 2015-11-24 16:29:31 +01:00
commit d7fa4060f1
6 changed files with 308 additions and 0 deletions

View File

@ -12,6 +12,10 @@
B1B7C3BE1BDD39DB0076934E /* TakeLast.swift in Sources */ = {isa = PBXBuildFile; fileRef = B1B7C3BC1BDD39DB0076934E /* TakeLast.swift */; };
B1B7C3BF1BDD39DB0076934E /* TakeLast.swift in Sources */ = {isa = PBXBuildFile; fileRef = B1B7C3BC1BDD39DB0076934E /* TakeLast.swift */; };
B1B7C3C01BDD39DB0076934E /* TakeLast.swift in Sources */ = {isa = PBXBuildFile; fileRef = B1B7C3BC1BDD39DB0076934E /* TakeLast.swift */; };
B1D8998F1BF653410027B05C /* Timeout.swift in Sources */ = {isa = PBXBuildFile; fileRef = B1D8998E1BF653410027B05C /* Timeout.swift */; };
B1D899901BF653410027B05C /* Timeout.swift in Sources */ = {isa = PBXBuildFile; fileRef = B1D8998E1BF653410027B05C /* Timeout.swift */; };
B1D899911BF653410027B05C /* Timeout.swift in Sources */ = {isa = PBXBuildFile; fileRef = B1D8998E1BF653410027B05C /* Timeout.swift */; };
B1D899921BF653410027B05C /* Timeout.swift in Sources */ = {isa = PBXBuildFile; fileRef = B1D8998E1BF653410027B05C /* Timeout.swift */; };
C8093CC51B8A72BE0088E94D /* Cancelable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C491B8A72BE0088E94D /* Cancelable.swift */; };
C8093CC61B8A72BE0088E94D /* Cancelable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C491B8A72BE0088E94D /* Cancelable.swift */; };
C8093CC71B8A72BE0088E94D /* AsyncLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C4B1B8A72BE0088E94D /* AsyncLock.swift */; };
@ -875,6 +879,7 @@
/* Begin PBXFileReference section */
A111CE961B91C97C00D0DCEE /* Info.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = "<group>"; };
B1B7C3BC1BDD39DB0076934E /* TakeLast.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = TakeLast.swift; sourceTree = "<group>"; };
B1D8998E1BF653410027B05C /* Timeout.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Timeout.swift; sourceTree = "<group>"; };
C809396D1B8A71760088E94D /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; };
C80939E71B8A71840088E94D /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; };
C8093BC71B8A71F00088E94D /* RxBlocking.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; };
@ -1345,6 +1350,7 @@
C8093C8E1B8A72BE0088E94D /* TakeUntil.swift */,
C8093C8F1B8A72BE0088E94D /* TakeWhile.swift */,
C8093C901B8A72BE0088E94D /* Throttle.swift */,
B1D8998E1BF653410027B05C /* Timeout.swift */,
C8093C911B8A72BE0088E94D /* Timer.swift */,
CBEE771E1BD649A000AD584C /* ToArray.swift */,
D235B23D1BD003DD007E84DA /* Using.swift */,
@ -2344,6 +2350,7 @@
C8093D2C1B8A72BE0088E94D /* ObserveOnSerialDispatchQueue.swift in Sources */,
C8093D3E1B8A72BE0088E94D /* StartWith.swift in Sources */,
C821DBA31BA4DCAB008F3809 /* Buffer.swift in Sources */,
B1D899901BF653410027B05C /* Timeout.swift in Sources */,
C8093D481B8A72BE0088E94D /* TakeWhile.swift in Sources */,
C8093D001B8A72BE0088E94D /* Amb.swift in Sources */,
C8093D1C1B8A72BE0088E94D /* Do.swift in Sources */,
@ -2482,6 +2489,7 @@
C8093D2B1B8A72BE0088E94D /* ObserveOnSerialDispatchQueue.swift in Sources */,
C8093D3D1B8A72BE0088E94D /* StartWith.swift in Sources */,
C821DBA21BA4DCAB008F3809 /* Buffer.swift in Sources */,
B1D8998F1BF653410027B05C /* Timeout.swift in Sources */,
C8093D471B8A72BE0088E94D /* TakeWhile.swift in Sources */,
C8093CFF1B8A72BE0088E94D /* Amb.swift in Sources */,
C8093D1B1B8A72BE0088E94D /* Do.swift in Sources */,
@ -2620,6 +2628,7 @@
C8F0BFDA1BBBFB8B001B112F /* ObserveOnSerialDispatchQueue.swift in Sources */,
C8F0BFDB1BBBFB8B001B112F /* StartWith.swift in Sources */,
C8F0BFDC1BBBFB8B001B112F /* Buffer.swift in Sources */,
B1D899921BF653410027B05C /* Timeout.swift in Sources */,
C8F0BFDD1BBBFB8B001B112F /* TakeWhile.swift in Sources */,
C8F0BFDE1BBBFB8B001B112F /* Amb.swift in Sources */,
C8F0BFDF1BBBFB8B001B112F /* Do.swift in Sources */,
@ -2920,6 +2929,7 @@
D2EBEAFC1BB9B6BA003A27DC /* Amb.swift in Sources */,
D2EBEB231BB9B6C1003A27DC /* Take.swift in Sources */,
D2EBEAE31BB9B697003A27DC /* Observable+Extensions.swift in Sources */,
B1D899911BF653410027B05C /* Timeout.swift in Sources */,
D2EBEB371BB9B6D8003A27DC /* ScheduledItem.swift in Sources */,
D2EBEB121BB9B6C1003A27DC /* Merge.swift in Sources */,
D2EBEAEF1BB9B6A4003A27DC /* Queue.swift in Sources */,

View File

@ -17,6 +17,7 @@
07E300071B14995F00F00100 /* TableViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = 07E300061B14995F00F00100 /* TableViewController.swift */; };
07E300091B149A2A00F00100 /* User.swift in Sources */ = {isa = PBXBuildFile; fileRef = 07E300081B149A2A00F00100 /* User.swift */; };
07E3C2331B03605B0010338D /* Dependencies.swift in Sources */ = {isa = PBXBuildFile; fileRef = 07E3C2321B03605B0010338D /* Dependencies.swift */; };
A27C59051BFC794E00A70332 /* Timeout.swift in Sources */ = {isa = PBXBuildFile; fileRef = A27C59041BFC794E00A70332 /* Timeout.swift */; };
B1604CB51BE49F8D002E1279 /* DownloadableImage.swift in Sources */ = {isa = PBXBuildFile; fileRef = B1604CB41BE49F8D002E1279 /* DownloadableImage.swift */; };
B1604CC21BE5B895002E1279 /* ReachabilityService.swift in Sources */ = {isa = PBXBuildFile; fileRef = B18F3BE11BDB2E8F000AAC79 /* ReachabilityService.swift */; };
B1604CC31BE5B8BD002E1279 /* ReachabilityService.swift in Sources */ = {isa = PBXBuildFile; fileRef = B18F3BE11BDB2E8F000AAC79 /* ReachabilityService.swift */; };
@ -501,6 +502,7 @@
07E300061B14995F00F00100 /* TableViewController.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = TableViewController.swift; sourceTree = "<group>"; };
07E300081B149A2A00F00100 /* User.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = User.swift; sourceTree = "<group>"; };
07E3C2321B03605B0010338D /* Dependencies.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = Dependencies.swift; path = Examples/Dependencies.swift; sourceTree = "<group>"; };
A27C59041BFC794E00A70332 /* Timeout.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; name = Timeout.swift; path = ../RxSwift/Observables/Implementations/Timeout.swift; sourceTree = "<group>"; };
B1604CB41BE49F8D002E1279 /* DownloadableImage.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = DownloadableImage.swift; sourceTree = "<group>"; };
B1604CC81BE5BBFA002E1279 /* UIImageView+DownloadableImage.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UIImageView+DownloadableImage.swift"; sourceTree = "<group>"; };
B18F3BBB1BD92EC8000AAC79 /* Reachability.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Reachability.swift; sourceTree = "<group>"; };
@ -899,6 +901,7 @@
C83366D41AD0293800C668A7 = {
isa = PBXGroup;
children = (
A27C59041BFC794E00A70332 /* Timeout.swift */,
C81B39F11BC1C28400EF5A9F /* Rx.xcodeproj */,
C8A468EF1B8A8BD000BF917B /* RxBlocking.framework */,
C8A468ED1B8A8BCC00BF917B /* RxCocoa.framework */,
@ -1715,6 +1718,7 @@
isa = PBXSourcesBuildPhase;
buildActionMask = 2147483647;
files = (
A27C59051BFC794E00A70332 /* Timeout.swift in Sources */,
C84CC58B1BDD486300E06A64 /* LockOwnerType.swift in Sources */,
C89465971BC6C2BC0055219D /* UIScrollView+Rx.swift in Sources */,
C8297E2F1B6CF905000589EA /* RxTableViewSectionedAnimatedDataSource.swift in Sources */,

View File

@ -41,6 +41,10 @@ public enum RxError
Sequence contains more then one element.
*/
case MoreThanOneElement
/**
Timeout error.
*/
case Timeout
}
public extension RxError {
@ -61,6 +65,8 @@ public extension RxError {
return "Sequence doesn't contain any element."
case .MoreThanOneElement:
return "Sequence contains more then one element."
case .Timeout:
return "Sequence timeout"
}
}
}

View File

@ -0,0 +1,88 @@
//
// Timeout.swift
// Rx
//
// Created by Tomi Koskinen on 13/11/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
class TimeoutSink<ElementType, Scheduler: SchedulerType, O: ObserverType where O.E == ElementType>
: Sink<O>
, LockOwnerType
, ObserverType
, SynchronizedOnType {
typealias E = ElementType
typealias Parent = Timeout<E, Scheduler>
private let _parent: Parent
let _lock = NSRecursiveLock()
private let _timerD = SerialDisposable()
init(parent: Parent, observer: O) {
_parent = parent
super.init(observer: observer)
}
func run() -> Disposable {
_createTimeoutTimer()
return StableCompositeDisposable.create(_timerD, _parent._source.subscribe(self))
}
func on(event: Event<E>) {
synchronizedOn(event)
}
func _synchronized_on(event: Event<E>) {
switch event {
case .Next:
forwardOn(event)
self._createTimeoutTimer()
case .Error:
forwardOn(event)
dispose()
case .Completed:
forwardOn(event)
dispose()
}
}
private func _createTimeoutTimer() {
if _timerD.disposed {
return
}
let nextTimer = SingleAssignmentDisposable()
_timerD.disposable = nextTimer
nextTimer.disposable = _parent._scheduler.scheduleRelative((), dueTime: _parent._dueTime) {
self.forwardOn(.Error(RxError.Timeout))
self.dispose()
return NopDisposable.instance
}
}
}
class Timeout<Element, Scheduler: SchedulerType> : Producer<Element> {
private let _dueTime: Scheduler.TimeInterval
private let _scheduler: Scheduler
private let _source: Observable<Element>
init(source: Observable<Element>, dueTime: Scheduler.TimeInterval, scheduler: Scheduler) {
_source = source
_dueTime = dueTime
_scheduler = scheduler
}
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = TimeoutSink(parent: self, observer: observer)
sink.disposable = sink.run()
return sink
}
}

View File

@ -245,3 +245,21 @@ extension ObservableType {
return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
}
}
// MARK: timeout
extension ObservableType {
/**
Applies a timeout policy for each element in the observable sequence. If the next element isn't received within the specified timeout duration starting from its predecessor, a TimeoutError is propagated to the observer.
- parameter dueTime: Maximum duration between values before a timeout occurs.
- parameter scheduler: Scheduler to run the timeout timer on.
- returns: An observable sequence with a TimeoutError in case of a timeout.
*/
@warn_unused_result(message="http://git.io/rxs.uo")
public func timeout<S: SchedulerType>(dueTime: S.TimeInterval, _ scheduler: S)
-> Observable<E> {
return Timeout(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
}
}

View File

@ -1526,4 +1526,186 @@ extension ObservableTimeTest {
XCTAssertEqual(result!, "1 5")
}
}
// MARK: Timeout
extension ObservableTimeTest {
func testTimeout_Empty() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 0),
completed(300)
])
let res = scheduler.start {
xs.timeout(200, scheduler)
}
XCTAssertEqual(res.messages, [
completed(300)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 300)
])
}
func testTimeout_Error() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 0),
error(300, testError)
])
let res = scheduler.start {
xs.timeout(200, scheduler)
}
XCTAssertEqual(res.messages, [
error(300, testError)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 300)
])
}
func testTimeout_Never() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 0),
])
let res = scheduler.start {
xs.timeout(1000, scheduler)
}
XCTAssertEqual(res.messages, [])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 1000)
])
}
func testTimeout_Duetime_Simple() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createColdObservable([
next(10, 42),
next(25, 43),
next(40, 44),
next(50, 45),
completed(60)
])
let res = scheduler.start {
xs.timeout(30, scheduler)
}
XCTAssertEqual(res.messages, [
next(210, 42),
next(225, 43),
next(240, 44),
next(250, 45),
completed(260)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 260)
])
}
func testTimeout_Duetime_Timeout_Error() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createColdObservable([
next(10, 42),
next(20, 43),
next(55, 44),
next(60, 45),
completed(70)
])
let res = scheduler.start {
xs.timeout(30, scheduler)
}
XCTAssertEqual(res.messages, [
next(210, 42),
next(220, 43),
error(250, RxError.Timeout)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 250)
])
}
func testTimeout_Duetime_Timeout_Exact() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createColdObservable([
next(10, 42),
next(20, 43),
next(50, 44),
next(60, 45),
completed(70)
])
let res = scheduler.start {
xs.timeout(30, scheduler)
}
XCTAssertEqual(res.messages, [
next(210, 42),
next(220, 43),
next(250, 44),
next(260, 45),
completed(270)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 270)
])
}
func testTimeout_Duetime_Disposed() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(205, 1),
next(210, 2),
next(240, 3),
next(280, 4),
next(320, 5),
next(350, 6),
next(370, 7),
next(420, 8),
next(470, 9),
completed(600)
])
let res = scheduler.start(370) {
xs.timeout(40, scheduler)
}
XCTAssertEqual(res.messages, [
next(205, 1),
next(210, 2),
next(240, 3),
next(280, 4),
next(320, 5),
next(350, 6),
next(370, 7)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 370)
])
}
}