From c515ba4b508721377d0a4fdd9933c21abbce9032 Mon Sep 17 00:00:00 2001 From: Krunoslav Zaher Date: Mon, 16 Nov 2015 00:11:30 +0100 Subject: [PATCH] Adds `CurrentThreadScheduler` to blocking operators. --- RxBlocking/BlockingObservable+Operators.swift | 8 ++++---- RxBlocking/RunLoopLock.swift | 15 ++++++++++++++- .../Tests/Observable+BlockingTest.swift | 6 +++--- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/RxBlocking/BlockingObservable+Operators.swift b/RxBlocking/BlockingObservable+Operators.swift index 095165a4..9430803e 100644 --- a/RxBlocking/BlockingObservable+Operators.swift +++ b/RxBlocking/BlockingObservable+Operators.swift @@ -177,6 +177,9 @@ extension BlockingObservable { lock.dispatch { d.disposable = self.source.subscribe { e in + if d.disposed { + return + } switch e { case .Next(let e): do { @@ -196,12 +199,10 @@ extension BlockingObservable { return case .Error(let e): error = e - lock.stop() case .Completed: - if error == nil && element == nil { + if element == nil { error = RxError.NoElements } - break } lock.stop() @@ -209,7 +210,6 @@ extension BlockingObservable { } lock.run() - d.dispose() if let error = error { diff --git a/RxBlocking/RunLoopLock.swift b/RxBlocking/RunLoopLock.swift index 5ea9ba73..94fefa57 100644 --- a/RxBlocking/RunLoopLock.swift +++ b/RxBlocking/RunLoopLock.swift @@ -7,6 +7,9 @@ // import Foundation +#if !RX_NO_MODULE + import RxSwift +#endif class RunLoopLock : NSObject { let currentRunLoop: CFRunLoopRef @@ -16,7 +19,17 @@ class RunLoopLock : NSObject { } func dispatch(action: () -> ()) { - CFRunLoopPerformBlock(currentRunLoop, kCFRunLoopDefaultMode, action) + CFRunLoopPerformBlock(currentRunLoop, kCFRunLoopDefaultMode) { + if CurrentThreadScheduler.isScheduleRequired { + CurrentThreadScheduler.instance.schedule(()) { _ in + action() + return NopDisposable.instance + } + } + else { + action() + } + } CFRunLoopWakeUp(currentRunLoop) } diff --git a/RxTests/RxSwiftTests/Tests/Observable+BlockingTest.swift b/RxTests/RxSwiftTests/Tests/Observable+BlockingTest.swift index 0fd88c2c..378ee1d7 100644 --- a/RxTests/RxSwiftTests/Tests/Observable+BlockingTest.swift +++ b/RxTests/RxSwiftTests/Tests/Observable+BlockingTest.swift @@ -225,7 +225,7 @@ extension ObservableBlockingTest { catch let e { XCTAssertTrue((e as! RxError)._code == RxError.MoreThanOneElement._code) } - XCTAssertEqual(predicateVals, [42, 43, 44, 45]) + XCTAssertEqual(predicateVals, [42, 43, 44]) } @@ -247,7 +247,7 @@ extension ObservableBlockingTest { func testSingle_predicate_throws() { var predicateVals = [Int]() do { - try (sequenceOf(42, 43, 44, 45) as Observable).toBlocking().single( { e in + try (sequenceOf(42, 43, 44, 45, scheduler: CurrentThreadScheduler.instance) as Observable).toBlocking().single( { e in predicateVals.append(e) if e < 43 { return false } throw testError @@ -257,7 +257,7 @@ extension ObservableBlockingTest { catch let e { XCTAssertTrue(e as NSError === testError) } - XCTAssertEqual(predicateVals, [42, 43, 44, 45]) + XCTAssertEqual(predicateVals, [42, 43]) } func testSingle_predicate_fail() {