diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index 25e340cf..b0fd1398 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -12,6 +12,10 @@ B1B7C3BE1BDD39DB0076934E /* TakeLast.swift in Sources */ = {isa = PBXBuildFile; fileRef = B1B7C3BC1BDD39DB0076934E /* TakeLast.swift */; }; B1B7C3BF1BDD39DB0076934E /* TakeLast.swift in Sources */ = {isa = PBXBuildFile; fileRef = B1B7C3BC1BDD39DB0076934E /* TakeLast.swift */; }; B1B7C3C01BDD39DB0076934E /* TakeLast.swift in Sources */ = {isa = PBXBuildFile; fileRef = B1B7C3BC1BDD39DB0076934E /* TakeLast.swift */; }; + B1D8998F1BF653410027B05C /* Timeout.swift in Sources */ = {isa = PBXBuildFile; fileRef = B1D8998E1BF653410027B05C /* Timeout.swift */; }; + B1D899901BF653410027B05C /* Timeout.swift in Sources */ = {isa = PBXBuildFile; fileRef = B1D8998E1BF653410027B05C /* Timeout.swift */; }; + B1D899911BF653410027B05C /* Timeout.swift in Sources */ = {isa = PBXBuildFile; fileRef = B1D8998E1BF653410027B05C /* Timeout.swift */; }; + B1D899921BF653410027B05C /* Timeout.swift in Sources */ = {isa = PBXBuildFile; fileRef = B1D8998E1BF653410027B05C /* Timeout.swift */; }; C8093CC51B8A72BE0088E94D /* Cancelable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C491B8A72BE0088E94D /* Cancelable.swift */; }; C8093CC61B8A72BE0088E94D /* Cancelable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C491B8A72BE0088E94D /* Cancelable.swift */; }; C8093CC71B8A72BE0088E94D /* AsyncLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C4B1B8A72BE0088E94D /* AsyncLock.swift */; }; @@ -875,6 +879,7 @@ /* Begin PBXFileReference section */ A111CE961B91C97C00D0DCEE /* Info.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = ""; }; B1B7C3BC1BDD39DB0076934E /* TakeLast.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = TakeLast.swift; sourceTree = ""; }; + B1D8998E1BF653410027B05C /* Timeout.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Timeout.swift; sourceTree = ""; }; C809396D1B8A71760088E94D /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; }; C80939E71B8A71840088E94D /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; }; C8093BC71B8A71F00088E94D /* RxBlocking.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; }; @@ -1345,6 +1350,7 @@ C8093C8E1B8A72BE0088E94D /* TakeUntil.swift */, C8093C8F1B8A72BE0088E94D /* TakeWhile.swift */, C8093C901B8A72BE0088E94D /* Throttle.swift */, + B1D8998E1BF653410027B05C /* Timeout.swift */, C8093C911B8A72BE0088E94D /* Timer.swift */, CBEE771E1BD649A000AD584C /* ToArray.swift */, D235B23D1BD003DD007E84DA /* Using.swift */, @@ -2344,6 +2350,7 @@ C8093D2C1B8A72BE0088E94D /* ObserveOnSerialDispatchQueue.swift in Sources */, C8093D3E1B8A72BE0088E94D /* StartWith.swift in Sources */, C821DBA31BA4DCAB008F3809 /* Buffer.swift in Sources */, + B1D899901BF653410027B05C /* Timeout.swift in Sources */, C8093D481B8A72BE0088E94D /* TakeWhile.swift in Sources */, C8093D001B8A72BE0088E94D /* Amb.swift in Sources */, C8093D1C1B8A72BE0088E94D /* Do.swift in Sources */, @@ -2482,6 +2489,7 @@ C8093D2B1B8A72BE0088E94D /* ObserveOnSerialDispatchQueue.swift in Sources */, C8093D3D1B8A72BE0088E94D /* StartWith.swift in Sources */, C821DBA21BA4DCAB008F3809 /* Buffer.swift in Sources */, + B1D8998F1BF653410027B05C /* Timeout.swift in Sources */, C8093D471B8A72BE0088E94D /* TakeWhile.swift in Sources */, C8093CFF1B8A72BE0088E94D /* Amb.swift in Sources */, C8093D1B1B8A72BE0088E94D /* Do.swift in Sources */, @@ -2620,6 +2628,7 @@ C8F0BFDA1BBBFB8B001B112F /* ObserveOnSerialDispatchQueue.swift in Sources */, C8F0BFDB1BBBFB8B001B112F /* StartWith.swift in Sources */, C8F0BFDC1BBBFB8B001B112F /* Buffer.swift in Sources */, + B1D899921BF653410027B05C /* Timeout.swift in Sources */, C8F0BFDD1BBBFB8B001B112F /* TakeWhile.swift in Sources */, C8F0BFDE1BBBFB8B001B112F /* Amb.swift in Sources */, C8F0BFDF1BBBFB8B001B112F /* Do.swift in Sources */, @@ -2920,6 +2929,7 @@ D2EBEAFC1BB9B6BA003A27DC /* Amb.swift in Sources */, D2EBEB231BB9B6C1003A27DC /* Take.swift in Sources */, D2EBEAE31BB9B697003A27DC /* Observable+Extensions.swift in Sources */, + B1D899911BF653410027B05C /* Timeout.swift in Sources */, D2EBEB371BB9B6D8003A27DC /* ScheduledItem.swift in Sources */, D2EBEB121BB9B6C1003A27DC /* Merge.swift in Sources */, D2EBEAEF1BB9B6A4003A27DC /* Queue.swift in Sources */, diff --git a/RxExample/RxExample.xcodeproj/project.pbxproj b/RxExample/RxExample.xcodeproj/project.pbxproj index 38b2aa91..de98626a 100644 --- a/RxExample/RxExample.xcodeproj/project.pbxproj +++ b/RxExample/RxExample.xcodeproj/project.pbxproj @@ -17,6 +17,7 @@ 07E300071B14995F00F00100 /* TableViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = 07E300061B14995F00F00100 /* TableViewController.swift */; }; 07E300091B149A2A00F00100 /* User.swift in Sources */ = {isa = PBXBuildFile; fileRef = 07E300081B149A2A00F00100 /* User.swift */; }; 07E3C2331B03605B0010338D /* Dependencies.swift in Sources */ = {isa = PBXBuildFile; fileRef = 07E3C2321B03605B0010338D /* Dependencies.swift */; }; + A27C59051BFC794E00A70332 /* Timeout.swift in Sources */ = {isa = PBXBuildFile; fileRef = A27C59041BFC794E00A70332 /* Timeout.swift */; }; B1604CB51BE49F8D002E1279 /* DownloadableImage.swift in Sources */ = {isa = PBXBuildFile; fileRef = B1604CB41BE49F8D002E1279 /* DownloadableImage.swift */; }; B1604CC21BE5B895002E1279 /* ReachabilityService.swift in Sources */ = {isa = PBXBuildFile; fileRef = B18F3BE11BDB2E8F000AAC79 /* ReachabilityService.swift */; }; B1604CC31BE5B8BD002E1279 /* ReachabilityService.swift in Sources */ = {isa = PBXBuildFile; fileRef = B18F3BE11BDB2E8F000AAC79 /* ReachabilityService.swift */; }; @@ -501,6 +502,7 @@ 07E300061B14995F00F00100 /* TableViewController.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = TableViewController.swift; sourceTree = ""; }; 07E300081B149A2A00F00100 /* User.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = User.swift; sourceTree = ""; }; 07E3C2321B03605B0010338D /* Dependencies.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = Dependencies.swift; path = Examples/Dependencies.swift; sourceTree = ""; }; + A27C59041BFC794E00A70332 /* Timeout.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; name = Timeout.swift; path = ../RxSwift/Observables/Implementations/Timeout.swift; sourceTree = ""; }; B1604CB41BE49F8D002E1279 /* DownloadableImage.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = DownloadableImage.swift; sourceTree = ""; }; B1604CC81BE5BBFA002E1279 /* UIImageView+DownloadableImage.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UIImageView+DownloadableImage.swift"; sourceTree = ""; }; B18F3BBB1BD92EC8000AAC79 /* Reachability.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Reachability.swift; sourceTree = ""; }; @@ -899,6 +901,7 @@ C83366D41AD0293800C668A7 = { isa = PBXGroup; children = ( + A27C59041BFC794E00A70332 /* Timeout.swift */, C81B39F11BC1C28400EF5A9F /* Rx.xcodeproj */, C8A468EF1B8A8BD000BF917B /* RxBlocking.framework */, C8A468ED1B8A8BCC00BF917B /* RxCocoa.framework */, @@ -1715,6 +1718,7 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( + A27C59051BFC794E00A70332 /* Timeout.swift in Sources */, C84CC58B1BDD486300E06A64 /* LockOwnerType.swift in Sources */, C89465971BC6C2BC0055219D /* UIScrollView+Rx.swift in Sources */, C8297E2F1B6CF905000589EA /* RxTableViewSectionedAnimatedDataSource.swift in Sources */, diff --git a/RxSwift/Error.swift b/RxSwift/Error.swift index e6a94387..e0817628 100644 --- a/RxSwift/Error.swift +++ b/RxSwift/Error.swift @@ -41,6 +41,10 @@ public enum RxError Sequence contains more then one element. */ case MoreThanOneElement + /** + Timeout error. + */ + case Timeout } public extension RxError { @@ -61,6 +65,8 @@ public extension RxError { return "Sequence doesn't contain any element." case .MoreThanOneElement: return "Sequence contains more then one element." + case .Timeout: + return "Sequence timeout" } } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Timeout.swift b/RxSwift/Observables/Implementations/Timeout.swift new file mode 100644 index 00000000..fe26b2f4 --- /dev/null +++ b/RxSwift/Observables/Implementations/Timeout.swift @@ -0,0 +1,88 @@ +// +// Timeout.swift +// Rx +// +// Created by Tomi Koskinen on 13/11/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +class TimeoutSink + : Sink + , LockOwnerType + , ObserverType + , SynchronizedOnType { + typealias E = ElementType + typealias Parent = Timeout + + private let _parent: Parent + let _lock = NSRecursiveLock() + + private let _timerD = SerialDisposable() + + init(parent: Parent, observer: O) { + _parent = parent + super.init(observer: observer) + } + + func run() -> Disposable { + _createTimeoutTimer() + return StableCompositeDisposable.create(_timerD, _parent._source.subscribe(self)) + } + + func on(event: Event) { + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { + switch event { + case .Next: + forwardOn(event) + self._createTimeoutTimer() + case .Error: + forwardOn(event) + dispose() + case .Completed: + forwardOn(event) + dispose() + } + } + + private func _createTimeoutTimer() { + if _timerD.disposed { + return + } + + let nextTimer = SingleAssignmentDisposable() + + _timerD.disposable = nextTimer + + nextTimer.disposable = _parent._scheduler.scheduleRelative((), dueTime: _parent._dueTime) { + self.forwardOn(.Error(RxError.Timeout)) + self.dispose() + + return NopDisposable.instance + } + } +} + + +class Timeout : Producer { + + private let _dueTime: Scheduler.TimeInterval + private let _scheduler: Scheduler + private let _source: Observable + + init(source: Observable, dueTime: Scheduler.TimeInterval, scheduler: Scheduler) { + _source = source + _dueTime = dueTime + _scheduler = scheduler + } + + override func run(observer: O) -> Disposable { + let sink = TimeoutSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink + } +} diff --git a/RxSwift/Observables/Observable+Time.swift b/RxSwift/Observables/Observable+Time.swift index 5413105b..4c811465 100644 --- a/RxSwift/Observables/Observable+Time.swift +++ b/RxSwift/Observables/Observable+Time.swift @@ -245,3 +245,21 @@ extension ObservableType { return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler) } } + +// MARK: timeout + +extension ObservableType { + + /** + Applies a timeout policy for each element in the observable sequence. If the next element isn't received within the specified timeout duration starting from its predecessor, a TimeoutError is propagated to the observer. + + - parameter dueTime: Maximum duration between values before a timeout occurs. + - parameter scheduler: Scheduler to run the timeout timer on. + - returns: An observable sequence with a TimeoutError in case of a timeout. + */ + @warn_unused_result(message="http://git.io/rxs.uo") + public func timeout(dueTime: S.TimeInterval, _ scheduler: S) + -> Observable { + return Timeout(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler) + } +} diff --git a/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift b/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift index 1a624d41..2d04b2ff 100644 --- a/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift +++ b/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift @@ -1526,4 +1526,186 @@ extension ObservableTimeTest { XCTAssertEqual(result!, "1 5") } +} + + +// MARK: Timeout + +extension ObservableTimeTest { + func testTimeout_Empty() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 0), + completed(300) + ]) + + let res = scheduler.start { + xs.timeout(200, scheduler) + } + + XCTAssertEqual(res.messages, [ + completed(300) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 300) + ]) + } + + func testTimeout_Error() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 0), + error(300, testError) + ]) + + let res = scheduler.start { + xs.timeout(200, scheduler) + } + + XCTAssertEqual(res.messages, [ + error(300, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 300) + ]) + } + + func testTimeout_Never() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 0), + ]) + + let res = scheduler.start { + xs.timeout(1000, scheduler) + } + + XCTAssertEqual(res.messages, []) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 1000) + ]) + } + + func testTimeout_Duetime_Simple() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createColdObservable([ + next(10, 42), + next(25, 43), + next(40, 44), + next(50, 45), + completed(60) + ]) + + let res = scheduler.start { + xs.timeout(30, scheduler) + } + + XCTAssertEqual(res.messages, [ + next(210, 42), + next(225, 43), + next(240, 44), + next(250, 45), + completed(260) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 260) + ]) + } + + func testTimeout_Duetime_Timeout_Error() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createColdObservable([ + next(10, 42), + next(20, 43), + next(55, 44), + next(60, 45), + completed(70) + ]) + + let res = scheduler.start { + xs.timeout(30, scheduler) + } + + XCTAssertEqual(res.messages, [ + next(210, 42), + next(220, 43), + error(250, RxError.Timeout) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 250) + ]) + } + + func testTimeout_Duetime_Timeout_Exact() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createColdObservable([ + next(10, 42), + next(20, 43), + next(50, 44), + next(60, 45), + completed(70) + ]) + + let res = scheduler.start { + xs.timeout(30, scheduler) + } + + XCTAssertEqual(res.messages, [ + next(210, 42), + next(220, 43), + next(250, 44), + next(260, 45), + completed(270) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 270) + ]) + } + + func testTimeout_Duetime_Disposed() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(205, 1), + next(210, 2), + next(240, 3), + next(280, 4), + next(320, 5), + next(350, 6), + next(370, 7), + next(420, 8), + next(470, 9), + completed(600) + ]) + + let res = scheduler.start(370) { + xs.timeout(40, scheduler) + } + + XCTAssertEqual(res.messages, [ + next(205, 1), + next(210, 2), + next(240, 3), + next(280, 4), + next(320, 5), + next(350, 6), + next(370, 7) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 370) + ]) + } } \ No newline at end of file