diff --git a/RxBlocking/BlockingObservable+Operators.swift b/RxBlocking/BlockingObservable+Operators.swift index b8ee3506..3d4674b8 100644 --- a/RxBlocking/BlockingObservable+Operators.swift +++ b/RxBlocking/BlockingObservable+Operators.swift @@ -24,10 +24,14 @@ extension BlockingObservable { var error: Swift.Error? - let lock = RunLoopLock() + let lock = RunLoopLock(timeout: timeout) let d = SingleAssignmentDisposable() + defer { + d.dispose() + } + lock.dispatch { d.disposable = self.source.subscribe { e in if d.isDisposed { @@ -47,9 +51,7 @@ extension BlockingObservable { } } - lock.run() - - d.dispose() + try lock.run() if let error = error { throw error @@ -74,7 +76,11 @@ extension BlockingObservable { let d = SingleAssignmentDisposable() - let lock = RunLoopLock() + defer { + d.dispose() + } + + let lock = RunLoopLock(timeout: timeout) lock.dispatch { d.disposable = self.source.subscribe { e in @@ -99,9 +105,7 @@ extension BlockingObservable { } } - lock.run() - - d.dispose() + try lock.run() if let error = error { throw error @@ -126,7 +130,11 @@ extension BlockingObservable { let d = SingleAssignmentDisposable() - let lock = RunLoopLock() + defer { + d.dispose() + } + + let lock = RunLoopLock(timeout: timeout) lock.dispatch { d.disposable = self.source.subscribe { e in @@ -148,9 +156,7 @@ extension BlockingObservable { } } - lock.run() - - d.dispose() + try lock.run() if let error = error { throw error @@ -186,8 +192,12 @@ extension BlockingObservable { var error: Swift.Error? let d = SingleAssignmentDisposable() + + defer { + d.dispose() + } - let lock = RunLoopLock() + let lock = RunLoopLock(timeout: timeout) lock.dispatch { d.disposable = self.source.subscribe { e in @@ -224,9 +234,8 @@ extension BlockingObservable { } } - lock.run() - d.dispose() - + try lock.run() + if let error = error { throw error } diff --git a/RxBlocking/BlockingObservable.swift b/RxBlocking/BlockingObservable.swift index 2197ce13..062ac381 100644 --- a/RxBlocking/BlockingObservable.swift +++ b/RxBlocking/BlockingObservable.swift @@ -20,5 +20,6 @@ If you think you need to use a `BlockingObservable` this is usually a sign that design. */ public struct BlockingObservable { + let timeout: RxTimeInterval? let source: Observable -} \ No newline at end of file +} diff --git a/RxBlocking/ObservableConvertibleType+Blocking.swift b/RxBlocking/ObservableConvertibleType+Blocking.swift index 79a08d76..d44bc66d 100644 --- a/RxBlocking/ObservableConvertibleType+Blocking.swift +++ b/RxBlocking/ObservableConvertibleType+Blocking.swift @@ -15,10 +15,11 @@ extension ObservableConvertibleType { /** Converts an Observable into a `BlockingObservable` (an Observable with blocking operators). + - parameter timeout: Maximal time interval BlockingObservable can block without throwing `RxError.timeout`. - returns: `BlockingObservable` version of `self` */ // @warn_unused_result(message:"http://git.io/rxs.uo") - public func toBlocking() -> BlockingObservable { - return BlockingObservable(source: self.asObservable()) + public func toBlocking(timeout: RxTimeInterval? = nil) -> BlockingObservable { + return BlockingObservable(timeout: timeout, source: self.asObservable()) } } diff --git a/RxBlocking/RunLoopLock.swift b/RxBlocking/RunLoopLock.swift index dd802fa1..7df595bc 100644 --- a/RxBlocking/RunLoopLock.swift +++ b/RxBlocking/RunLoopLock.swift @@ -29,17 +29,19 @@ typealias AtomicInt = Int32 #endif class RunLoopLock { - let currentRunLoop: CFRunLoop + let _currentRunLoop: CFRunLoop - var calledRun: AtomicInt = 0 - var calledStop: AtomicInt = 0 + var _calledRun: AtomicInt = 0 + var _calledStop: AtomicInt = 0 + var _timeout: RxTimeInterval? - init() { - currentRunLoop = CFRunLoopGetCurrent() + init(timeout: RxTimeInterval?) { + _timeout = timeout + _currentRunLoop = CFRunLoopGetCurrent() } func dispatch(_ action: @escaping () -> ()) { - CFRunLoopPerformBlock(currentRunLoop, CFRunLoopMode.defaultMode.rawValue) { + CFRunLoopPerformBlock(_currentRunLoop, CFRunLoopMode.defaultMode.rawValue) { if CurrentThreadScheduler.isScheduleRequired { _ = CurrentThreadScheduler.instance.schedule(()) { _ in action() @@ -50,23 +52,37 @@ class RunLoopLock { action() } } - CFRunLoopWakeUp(currentRunLoop) + CFRunLoopWakeUp(_currentRunLoop) } func stop() { - if AtomicIncrement(&calledStop) != 1 { + if AtomicIncrement(&_calledStop) != 1 { return } - CFRunLoopPerformBlock(currentRunLoop, CFRunLoopMode.defaultMode.rawValue) { - CFRunLoopStop(self.currentRunLoop) + CFRunLoopPerformBlock(_currentRunLoop, CFRunLoopMode.defaultMode.rawValue) { + CFRunLoopStop(self._currentRunLoop) } - CFRunLoopWakeUp(currentRunLoop) + CFRunLoopWakeUp(_currentRunLoop) } - func run() { - if AtomicIncrement(&calledRun) != 1 { + func run() throws { + if AtomicIncrement(&_calledRun) != 1 { fatalError("Run can be only called once") } - CFRunLoopRun() + if let timeout = _timeout { + switch CFRunLoopRunInMode(CFRunLoopMode.defaultMode, timeout, false) { + case .finished: + return + case .handledSource: + return + case .stopped: + return + case .timedOut: + throw RxError.timeout + } + } + else { + CFRunLoopRun() + } } } diff --git a/Tests/RxSwiftTests/Tests/Observable+BlockingTest.swift b/Tests/RxSwiftTests/Tests/Observable+BlockingTest.swift index 2a298f45..328ef25f 100644 --- a/Tests/RxSwiftTests/Tests/Observable+BlockingTest.swift +++ b/Tests/RxSwiftTests/Tests/Observable+BlockingTest.swift @@ -69,6 +69,21 @@ extension ObservableBlockingTest { XCTAssertEqual(d, [1, 2]) } } + + func testToArray_timeout() { + do { + _ = try Observable.never().toBlocking(timeout: 0.01).toArray() + XCTFail("It should fail") + } + catch let e { + if case .timeout = e as! RxError { + + } + else { + XCTFail() + } + } + } } // first @@ -126,6 +141,21 @@ extension ObservableBlockingTest { XCTAssertEqual(d, 1) } } + + func testFirst_timeout() { + do { + _ = try Observable.never().toBlocking(timeout: 0.01).first() + XCTFail("It should fail") + } + catch let e { + if case .timeout = e as! RxError { + + } + else { + XCTFail() + } + } + } } // last @@ -183,6 +213,21 @@ extension ObservableBlockingTest { XCTAssertEqual(d, 1) } } + + func testLast_timeout() { + do { + _ = try Observable.never().toBlocking(timeout: 0.01).last() + XCTFail("It should fail") + } + catch let e { + if case .timeout = e as! RxError { + + } + else { + XCTFail() + } + } + } } @@ -360,4 +405,34 @@ extension ObservableBlockingTest { XCTAssertEqual(d, 1) } } + + func testSingle_timeout() { + do { + _ = try Observable.never().toBlocking(timeout: 0.01).single() + XCTFail("It should fail") + } + catch let e { + if case .timeout = e as! RxError { + + } + else { + XCTFail() + } + } + } + + func testSinglePredicate_timeout() { + do { + _ = try Observable.never().toBlocking(timeout: 0.01).single { _ in true } + XCTFail("It should fail") + } + catch let e { + if case .timeout = e as! RxError { + + } + else { + XCTFail() + } + } + } }