Adds `CurrentThreadScheduler` to blocking operators.
This commit is contained in:
parent
db478b1b9a
commit
c515ba4b50
|
|
@ -177,6 +177,9 @@ extension BlockingObservable {
|
|||
|
||||
lock.dispatch {
|
||||
d.disposable = self.source.subscribe { e in
|
||||
if d.disposed {
|
||||
return
|
||||
}
|
||||
switch e {
|
||||
case .Next(let e):
|
||||
do {
|
||||
|
|
@ -196,12 +199,10 @@ extension BlockingObservable {
|
|||
return
|
||||
case .Error(let e):
|
||||
error = e
|
||||
lock.stop()
|
||||
case .Completed:
|
||||
if error == nil && element == nil {
|
||||
if element == nil {
|
||||
error = RxError.NoElements
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
lock.stop()
|
||||
|
|
@ -209,7 +210,6 @@ extension BlockingObservable {
|
|||
}
|
||||
|
||||
lock.run()
|
||||
|
||||
d.dispose()
|
||||
|
||||
if let error = error {
|
||||
|
|
|
|||
|
|
@ -7,6 +7,9 @@
|
|||
//
|
||||
|
||||
import Foundation
|
||||
#if !RX_NO_MODULE
|
||||
import RxSwift
|
||||
#endif
|
||||
|
||||
class RunLoopLock : NSObject {
|
||||
let currentRunLoop: CFRunLoopRef
|
||||
|
|
@ -16,7 +19,17 @@ class RunLoopLock : NSObject {
|
|||
}
|
||||
|
||||
func dispatch(action: () -> ()) {
|
||||
CFRunLoopPerformBlock(currentRunLoop, kCFRunLoopDefaultMode, action)
|
||||
CFRunLoopPerformBlock(currentRunLoop, kCFRunLoopDefaultMode) {
|
||||
if CurrentThreadScheduler.isScheduleRequired {
|
||||
CurrentThreadScheduler.instance.schedule(()) { _ in
|
||||
action()
|
||||
return NopDisposable.instance
|
||||
}
|
||||
}
|
||||
else {
|
||||
action()
|
||||
}
|
||||
}
|
||||
CFRunLoopWakeUp(currentRunLoop)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -225,7 +225,7 @@ extension ObservableBlockingTest {
|
|||
catch let e {
|
||||
XCTAssertTrue((e as! RxError)._code == RxError.MoreThanOneElement._code)
|
||||
}
|
||||
XCTAssertEqual(predicateVals, [42, 43, 44, 45])
|
||||
XCTAssertEqual(predicateVals, [42, 43, 44])
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -247,7 +247,7 @@ extension ObservableBlockingTest {
|
|||
func testSingle_predicate_throws() {
|
||||
var predicateVals = [Int]()
|
||||
do {
|
||||
try (sequenceOf(42, 43, 44, 45) as Observable<Int>).toBlocking().single( { e in
|
||||
try (sequenceOf(42, 43, 44, 45, scheduler: CurrentThreadScheduler.instance) as Observable<Int>).toBlocking().single( { e in
|
||||
predicateVals.append(e)
|
||||
if e < 43 { return false }
|
||||
throw testError
|
||||
|
|
@ -257,7 +257,7 @@ extension ObservableBlockingTest {
|
|||
catch let e {
|
||||
XCTAssertTrue(e as NSError === testError)
|
||||
}
|
||||
XCTAssertEqual(predicateVals, [42, 43, 44, 45])
|
||||
XCTAssertEqual(predicateVals, [42, 43])
|
||||
}
|
||||
|
||||
func testSingle_predicate_fail() {
|
||||
|
|
|
|||
Loading…
Reference in New Issue