Merge commit 'f23eb75adc8aad2f4c5ae620da9cf3acd16cca12' into rx_uisearchbar

* commit 'f23eb75adc8aad2f4c5ae620da9cf3acd16cca12':
  Make Bool as KVORepresentable, to avoid ambiguty in observe()
  Fixes unit tests.
  Fixes delay operator.
  Adds timeout parameter to blocking observable sequence.
  patch the implement of convert to serial queue.
  implement all tests.
  Add real scheduler test.
  Use `SchedulerType.scheduleRecursive` and `CompositeDisposable.removeDisposable`.
  move lock on Error case. I'm sorry I fix mistype in _group.addDisposable argmunets. I think it was missed when rename tempolary field.
  update API.md
  add delay operator
This commit is contained in:
Ryu Tanabe 2016-08-30 22:51:04 +09:00
commit f0863eaca1
12 changed files with 693 additions and 34 deletions

View File

@ -63,7 +63,7 @@ Operators are stateless by default.
#### Observable Utility Operators
* [`delaySubscription`](http://reactivex.io/documentation/operators/delay.html)
* [`delaySubscription` / `delay`](http://reactivex.io/documentation/operators/delay.html)
* [`do` / `doOnNext`](http://reactivex.io/documentation/operators/do.html)
* [`observeOn` / `observeSingleOn`](http://reactivex.io/documentation/operators/observeon.html)
* [`subscribe`](http://reactivex.io/documentation/operators/subscribe.html)

View File

@ -633,6 +633,10 @@
C86409FD1BA593F500D3C4E8 /* Range.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86409FB1BA593F500D3C4E8 /* Range.swift */; };
C8640A031BA5B12A00D3C4E8 /* Repeat.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8640A021BA5B12A00D3C4E8 /* Repeat.swift */; };
C8640A041BA5B12A00D3C4E8 /* Repeat.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8640A021BA5B12A00D3C4E8 /* Repeat.swift */; };
C86B0A561D735CCC005D8A16 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86B0A551D735CCC005D8A16 /* Delay.swift */; };
C86B0A571D735CCC005D8A16 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86B0A551D735CCC005D8A16 /* Delay.swift */; };
C86B0A581D735CCC005D8A16 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86B0A551D735CCC005D8A16 /* Delay.swift */; };
C86B0A591D735CCC005D8A16 /* Delay.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86B0A551D735CCC005D8A16 /* Delay.swift */; };
C86B1E221D42BF5200130546 /* SchedulerTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86B1E211D42BF5200130546 /* SchedulerTests.swift */; };
C86B1E231D42BF5200130546 /* SchedulerTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86B1E211D42BF5200130546 /* SchedulerTests.swift */; };
C86B1E241D42BF5200130546 /* SchedulerTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = C86B1E211D42BF5200130546 /* SchedulerTests.swift */; };
@ -1662,6 +1666,7 @@
C85BA04B1C3878740075D68E /* PerformanceTests.app */ = {isa = PBXFileReference; explicitFileType = wrapper.application; includeInIndex = 0; path = PerformanceTests.app; sourceTree = BUILT_PRODUCTS_DIR; };
C86409FB1BA593F500D3C4E8 /* Range.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Range.swift; sourceTree = "<group>"; };
C8640A021BA5B12A00D3C4E8 /* Repeat.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Repeat.swift; sourceTree = "<group>"; };
C86B0A551D735CCC005D8A16 /* Delay.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Delay.swift; sourceTree = "<group>"; };
C86B1E211D42BF5200130546 /* SchedulerTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SchedulerTests.swift; sourceTree = "<group>"; };
C88253F11B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxCollectionViewReactiveArrayDataSource.swift; sourceTree = "<group>"; };
C88253F21B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxTableViewReactiveArrayDataSource.swift; sourceTree = "<group>"; };
@ -2032,6 +2037,7 @@
C8093C741B8A72BE0088E94D /* ConnectableObservable.swift */,
C8093C751B8A72BE0088E94D /* Debug.swift */,
C8093C761B8A72BE0088E94D /* Deferred.swift */,
C86B0A551D735CCC005D8A16 /* Delay.swift */,
C8093C771B8A72BE0088E94D /* DelaySubscription.swift */,
C8093C781B8A72BE0088E94D /* DistinctUntilChanged.swift */,
C8093C791B8A72BE0088E94D /* Do.swift */,
@ -3926,6 +3932,7 @@
C8640A041BA5B12A00D3C4E8 /* Repeat.swift in Sources */,
79E9DE8A1C3417FD009970AF /* DispatchQueueSchedulerQOS.swift in Sources */,
C8093CF41B8A72BE0088E94D /* Errors.swift in Sources */,
C86B0A571D735CCC005D8A16 /* Delay.swift in Sources */,
C8093D141B8A72BE0088E94D /* Debug.swift in Sources */,
CB883B4B1BE369AA000AC2EE /* AddRef.swift in Sources */,
C8554E2B1C3051620052E67D /* PriorityQueue.swift in Sources */,
@ -4149,6 +4156,7 @@
C8640A031BA5B12A00D3C4E8 /* Repeat.swift in Sources */,
79E9DE891C3417FD009970AF /* DispatchQueueSchedulerQOS.swift in Sources */,
C8093CF31B8A72BE0088E94D /* Errors.swift in Sources */,
C86B0A561D735CCC005D8A16 /* Delay.swift in Sources */,
C8093D131B8A72BE0088E94D /* Debug.swift in Sources */,
CB883B4A1BE369AA000AC2EE /* AddRef.swift in Sources */,
C8093CCD1B8A72BE0088E94D /* Bag.swift in Sources */,
@ -4296,6 +4304,7 @@
C8F0BFE71BBBFB8B001B112F /* Repeat.swift in Sources */,
79E9DE8C1C3417FD009970AF /* DispatchQueueSchedulerQOS.swift in Sources */,
C8F0BFE81BBBFB8B001B112F /* Errors.swift in Sources */,
C86B0A591D735CCC005D8A16 /* Delay.swift in Sources */,
C8F0BFE91BBBFB8B001B112F /* Debug.swift in Sources */,
CB883B4D1BE369AA000AC2EE /* AddRef.swift in Sources */,
C8554E2D1C3051620052E67D /* PriorityQueue.swift in Sources */,
@ -4638,6 +4647,7 @@
D2EBEB3D1BB9B6D8003A27DC /* SchedulerServices+Emulation.swift in Sources */,
D2EBEB1C1BB9B6C1003A27DC /* Sample.swift in Sources */,
D2EBEAFD1BB9B6BA003A27DC /* AnonymousObservable.swift in Sources */,
C86B0A581D735CCC005D8A16 /* Delay.swift in Sources */,
C8554E2C1C3051620052E67D /* PriorityQueue.swift in Sources */,
CB883B4C1BE369AA000AC2EE /* AddRef.swift in Sources */,
D2EBEAFA1BB9B6B2003A27DC /* SingleAssignmentDisposable.swift in Sources */,

View File

@ -24,10 +24,14 @@ extension BlockingObservable {
var error: Swift.Error?
let lock = RunLoopLock()
let lock = RunLoopLock(timeout: timeout)
let d = SingleAssignmentDisposable()
defer {
d.dispose()
}
lock.dispatch {
d.disposable = self.source.subscribe { e in
if d.isDisposed {
@ -47,9 +51,7 @@ extension BlockingObservable {
}
}
lock.run()
d.dispose()
try lock.run()
if let error = error {
throw error
@ -74,7 +76,11 @@ extension BlockingObservable {
let d = SingleAssignmentDisposable()
let lock = RunLoopLock()
defer {
d.dispose()
}
let lock = RunLoopLock(timeout: timeout)
lock.dispatch {
d.disposable = self.source.subscribe { e in
@ -99,9 +105,7 @@ extension BlockingObservable {
}
}
lock.run()
d.dispose()
try lock.run()
if let error = error {
throw error
@ -126,7 +130,11 @@ extension BlockingObservable {
let d = SingleAssignmentDisposable()
let lock = RunLoopLock()
defer {
d.dispose()
}
let lock = RunLoopLock(timeout: timeout)
lock.dispatch {
d.disposable = self.source.subscribe { e in
@ -148,9 +156,7 @@ extension BlockingObservable {
}
}
lock.run()
d.dispose()
try lock.run()
if let error = error {
throw error
@ -186,8 +192,12 @@ extension BlockingObservable {
var error: Swift.Error?
let d = SingleAssignmentDisposable()
defer {
d.dispose()
}
let lock = RunLoopLock()
let lock = RunLoopLock(timeout: timeout)
lock.dispatch {
d.disposable = self.source.subscribe { e in
@ -224,9 +234,8 @@ extension BlockingObservable {
}
}
lock.run()
d.dispose()
try lock.run()
if let error = error {
throw error
}

View File

@ -20,5 +20,6 @@ If you think you need to use a `BlockingObservable` this is usually a sign that
design.
*/
public struct BlockingObservable<E> {
let timeout: RxTimeInterval?
let source: Observable<E>
}
}

View File

@ -15,10 +15,11 @@ extension ObservableConvertibleType {
/**
Converts an Observable into a `BlockingObservable` (an Observable with blocking operators).
- parameter timeout: Maximal time interval BlockingObservable can block without throwing `RxError.timeout`.
- returns: `BlockingObservable` version of `self`
*/
// @warn_unused_result(message:"http://git.io/rxs.uo")
public func toBlocking() -> BlockingObservable<E> {
return BlockingObservable(source: self.asObservable())
public func toBlocking(timeout: RxTimeInterval? = nil) -> BlockingObservable<E> {
return BlockingObservable(timeout: timeout, source: self.asObservable())
}
}

View File

@ -29,17 +29,19 @@ typealias AtomicInt = Int32
#endif
class RunLoopLock {
let currentRunLoop: CFRunLoop
let _currentRunLoop: CFRunLoop
var calledRun: AtomicInt = 0
var calledStop: AtomicInt = 0
var _calledRun: AtomicInt = 0
var _calledStop: AtomicInt = 0
var _timeout: RxTimeInterval?
init() {
currentRunLoop = CFRunLoopGetCurrent()
init(timeout: RxTimeInterval?) {
_timeout = timeout
_currentRunLoop = CFRunLoopGetCurrent()
}
func dispatch(_ action: @escaping () -> ()) {
CFRunLoopPerformBlock(currentRunLoop, CFRunLoopMode.defaultMode.rawValue) {
CFRunLoopPerformBlock(_currentRunLoop, CFRunLoopMode.defaultMode.rawValue) {
if CurrentThreadScheduler.isScheduleRequired {
_ = CurrentThreadScheduler.instance.schedule(()) { _ in
action()
@ -50,23 +52,37 @@ class RunLoopLock {
action()
}
}
CFRunLoopWakeUp(currentRunLoop)
CFRunLoopWakeUp(_currentRunLoop)
}
func stop() {
if AtomicIncrement(&calledStop) != 1 {
if AtomicIncrement(&_calledStop) != 1 {
return
}
CFRunLoopPerformBlock(currentRunLoop, CFRunLoopMode.defaultMode.rawValue) {
CFRunLoopStop(self.currentRunLoop)
CFRunLoopPerformBlock(_currentRunLoop, CFRunLoopMode.defaultMode.rawValue) {
CFRunLoopStop(self._currentRunLoop)
}
CFRunLoopWakeUp(currentRunLoop)
CFRunLoopWakeUp(_currentRunLoop)
}
func run() {
if AtomicIncrement(&calledRun) != 1 {
func run() throws {
if AtomicIncrement(&_calledRun) != 1 {
fatalError("Run can be only called once")
}
CFRunLoopRun()
if let timeout = _timeout {
switch CFRunLoopRunInMode(CFRunLoopMode.defaultMode, timeout, false) {
case .finished:
return
case .handledSource:
return
case .stopped:
return
case .timedOut:
throw RxError.timeout
}
}
else {
CFRunLoopRun()
}
}
}

View File

@ -74,6 +74,17 @@ extension UInt64 : KVORepresentable {
}
}
extension Bool : KVORepresentable {
public typealias KVOType = NSNumber
/**
Constructs `Self` using KVO value.
*/
public init?(KVOValue: KVOType) {
self.init(KVOValue.boolValue)
}
}
extension RawRepresentable where RawValue: KVORepresentable {
/**

View File

@ -132,6 +132,7 @@ import Foundation
var rx_text: ControlProperty<String> { get }
}
@available(*, deprecated)
extension NSTextField : RxTextInput {
/**
Reactive wrapper for `text` property.

View File

@ -0,0 +1,164 @@
//
// Delay.swift
// RxSwift
//
// Created by tarunon on 2016/02/09.
// Copyright © 2016 Krunoslav Zaher. All rights reserved.
//
import Foundation
class DelaySink<ElementType, O: ObserverType>
: Sink<O>
, ObserverType where O.E == ElementType {
typealias E = O.E
typealias Source = Observable<E>
typealias DisposeKey = Bag<Disposable>.KeyType
private let _lock = NSRecursiveLock()
private let _dueTime: RxTimeInterval
private let _scheduler: SchedulerType
private let _sourceSubscription = SingleAssignmentDisposable()
private let _cancelable = SerialDisposable()
// is scheduled some action
private var _active = false
// is "run loop" on different scheduler running
private var _running = false
private var _errorEvent: Event<E>? = nil
// state
private var _queue = Queue<(eventTime: RxTime, event: Event<E>)>(capacity: 0)
private var _disposed = false
init(observer: O, dueTime: RxTimeInterval, scheduler: SchedulerType) {
_dueTime = dueTime
_scheduler = scheduler
super.init(observer: observer)
}
// All of these complications in this method are caused by the fact that
// error should be propagated immediatelly. Error can bepotentially received on different
// scheduler so this process needs to be synchronized somehow.
//
// Another complication is that scheduler is potentially concurrent so internal queue is used.
func drainQueue(state: (), scheduler: AnyRecursiveScheduler<()>) {
_lock.lock() // {
let hasFailed = _errorEvent != nil
if !hasFailed {
_running = true
}
_lock.unlock() // }
if hasFailed {
return
}
var ranAtLeastOnce = false
while true {
_lock.lock() // {
let errorEvent = _errorEvent
let eventToForwardImmediatelly = ranAtLeastOnce ? nil : _queue.dequeue()?.event
let nextEventToScheduleOriginalTime: Date? = ranAtLeastOnce && !_queue.isEmpty ? _queue.peek().eventTime : nil
if let _ = errorEvent {
}
else {
if let _ = eventToForwardImmediatelly {
}
else if let _ = nextEventToScheduleOriginalTime {
_running = false
}
else {
_running = false
_active = false
}
}
_lock.unlock() // {
if let errorEvent = errorEvent {
self.forwardOn(errorEvent)
self.dispose()
return
}
else {
if let eventToForwardImmediatelly = eventToForwardImmediatelly {
ranAtLeastOnce = true
self.forwardOn(eventToForwardImmediatelly)
if case .completed = eventToForwardImmediatelly {
self.dispose()
return
}
}
else if let nextEventToScheduleOriginalTime = nextEventToScheduleOriginalTime {
let elapsedTime = _scheduler.now.timeIntervalSince(nextEventToScheduleOriginalTime)
let interval = _dueTime - elapsedTime
let normalizedInterval = interval < 0.0 ? 0.0 : interval
scheduler.schedule((), dueTime: normalizedInterval)
return
}
else {
return
}
}
}
}
func on(_ event: Event<E>) {
if event.isStopEvent {
_sourceSubscription.dispose()
}
switch event {
case .error(_):
_lock.lock() // {
let shouldSendImmediatelly = !_running
_queue = Queue(capacity: 0)
_errorEvent = event
_lock.unlock() // }
if shouldSendImmediatelly {
forwardOn(event)
dispose()
}
default:
_lock.lock() // {
let shouldSchedule = !_active
_active = true
_queue.enqueue((_scheduler.now, event))
_lock.unlock() // }
if shouldSchedule {
_cancelable.disposable = _scheduler.scheduleRecursive((), dueTime: _dueTime, action: self.drainQueue)
}
}
}
func run(source: Source) -> Disposable {
_sourceSubscription.disposable = source.subscribeSafe(self)
return Disposables.create(_sourceSubscription, _cancelable)
}
}
class Delay<Element>: Producer<Element> {
private let _source: Observable<Element>
private let _dueTime: RxTimeInterval
private let _scheduler: SchedulerType
init(source: Observable<Element>, dueTime: RxTimeInterval, scheduler: SchedulerType) {
_source = source
_dueTime = dueTime
_scheduler = scheduler
}
override func run<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
let sink = DelaySink(observer: observer, dueTime: _dueTime, scheduler: _scheduler)
sink.disposable = sink.run(source: _source)
return sink
}
}

View File

@ -272,3 +272,23 @@ extension ObservableType {
return Timeout(source: self.asObservable(), dueTime: dueTime, other: other.asObservable(), scheduler: scheduler)
}
}
// MARK: delay
extension ObservableType {
/**
Returns an observable sequence by the source observable sequence shifted forward in time by a specified delay. Error events from the source observable sequence are not delayed.
- seealso: [delay operator on reactivex.io](http://reactivex.io/documentation/operators/delay.html)
- parameter dueTime: Relative time shift of the source by.
- parameter scheduler: Scheduler to run the subscription delay timer on.
- returns: the source Observable shifted in time by the specified delay.
*/
// @warn_unused_result(message="http://git.io/rxs.uo")
public func delay(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
-> Observable<E> {
return Delay(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
}
}

View File

@ -69,6 +69,21 @@ extension ObservableBlockingTest {
XCTAssertEqual(d, [1, 2])
}
}
func testToArray_timeout() {
do {
_ = try Observable<Int>.never().toBlocking(timeout: 0.01).toArray()
XCTFail("It should fail")
}
catch let e {
if case .timeout = e as! RxError {
}
else {
XCTFail()
}
}
}
}
// first
@ -126,6 +141,21 @@ extension ObservableBlockingTest {
XCTAssertEqual(d, 1)
}
}
func testFirst_timeout() {
do {
_ = try Observable<Int>.never().toBlocking(timeout: 0.01).first()
XCTFail("It should fail")
}
catch let e {
if case .timeout = e as! RxError {
}
else {
XCTFail()
}
}
}
}
// last
@ -183,6 +213,21 @@ extension ObservableBlockingTest {
XCTAssertEqual(d, 1)
}
}
func testLast_timeout() {
do {
_ = try Observable<Int>.never().toBlocking(timeout: 0.01).last()
XCTFail("It should fail")
}
catch let e {
if case .timeout = e as! RxError {
}
else {
XCTFail()
}
}
}
}
@ -360,4 +405,34 @@ extension ObservableBlockingTest {
XCTAssertEqual(d, 1)
}
}
func testSingle_timeout() {
do {
_ = try Observable<Int>.never().toBlocking(timeout: 0.01).single()
XCTFail("It should fail")
}
catch let e {
if case .timeout = e as! RxError {
}
else {
XCTFail()
}
}
}
func testSinglePredicate_timeout() {
do {
_ = try Observable<Int>.never().toBlocking(timeout: 0.01).single { _ in true }
XCTFail("It should fail")
}
catch let e {
if case .timeout = e as! RxError {
}
else {
XCTFail()
}
}
}
}

View File

@ -1780,3 +1780,354 @@ extension ObservableTimeTest {
])
}
}
// MARK: Delay
extension ObservableTimeTest {
func testDelay_TimeSpan_Simple1() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 1),
next(250, 2),
next(350, 3),
next(450, 4),
completed(550)
])
let res = scheduler.start {
xs.delay(100, scheduler: scheduler)
}
XCTAssertEqual(res.events, [
next(350, 2),
next(450, 3),
next(550, 4),
completed(650)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 550)
])
}
func testDelay_TimeSpan_Simple2() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 1),
next(250, 2),
next(350, 3),
next(450, 4),
completed(550)
])
let res = scheduler.start {
xs.delay(50, scheduler: scheduler)
}
XCTAssertEqual(res.events, [
next(300, 2),
next(400, 3),
next(500, 4),
completed(600)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 550)
])
}
func testDelay_TimeSpan_Simple3() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 1),
next(250, 2),
next(350, 3),
next(450, 4),
completed(550)
])
let res = scheduler.start {
xs.delay(150, scheduler: scheduler)
}
XCTAssertEqual(res.events, [
next(400, 2),
next(500, 3),
next(600, 4),
completed(700)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 550)
])
}
func testDelay_TimeSpan_Error() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 1),
error(250, testError)
])
let res = scheduler.start {
xs.delay(150, scheduler: scheduler)
}
XCTAssertEqual(res.events, [
error(250, testError)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 250)
])
}
func testDelay_TimeSpan_Completed() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 1),
completed(250)
])
let res = scheduler.start {
xs.delay(150, scheduler: scheduler)
}
XCTAssertEqual(res.events, [
completed(400)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 250)
])
}
func testDelay_TimeSpan_Error1() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 1),
next(250, 2),
next(350, 3),
next(450, 4),
error(550, testError)
])
let res = scheduler.start {
xs.delay(50, scheduler: scheduler)
}
XCTAssertEqual(res.events, [
next(300, 2),
next(400, 3),
next(500, 4),
error(550, testError)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 550)
])
}
func testDelay_TimeSpan_Error2() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 1),
next(250, 2),
next(350, 3),
next(450, 4),
error(550, testError)
])
let res = scheduler.start {
xs.delay(150, scheduler: scheduler)
}
XCTAssertEqual(res.events, [
next(400, 2),
next(500, 3),
error(550, testError)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 550)
])
}
func testDelay_TimeSpan_Real_Simple() {
let waitForError: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1)
let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .default)
let s = PublishSubject<Int>()
let res = s.delay(0.01, scheduler: scheduler)
var array = [Int]()
let subscription = res.subscribe(
onNext: { i in
array.append(i)
},
onCompleted: {
waitForError.onCompleted()
})
DispatchQueue.global(qos: .default).async {
s.onNext(1)
s.onNext(2)
s.onNext(3)
s.onCompleted()
}
try! _ = waitForError.toBlocking(timeout: 5.0).first()
subscription.dispose()
XCTAssertEqual([1, 2, 3], array)
}
func testDelay_TimeSpan_Real_Error1() {
let errorReceived: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1)
let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .default)
let s = PublishSubject<Int>()
let res = s.delay(0.01, scheduler: scheduler)
var array = [Int]()
var error: Swift.Error? = nil
let subscription = res.subscribe(
onNext: { i in
array.append(i)
},
onError: { e in
error = e
errorReceived.onCompleted()
})
DispatchQueue.global(qos: .default).async {
s.onNext(1)
s.onNext(2)
s.onNext(3)
s.onError(testError)
}
try! errorReceived.toBlocking(timeout: 5.0).first()
subscription.dispose()
XCTAssertEqual(error! as NSError, testError)
}
func testDelay_TimeSpan_Real_Error2() {
let elementProcessed: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1)
let errorReceived: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1)
let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .default)
let s = PublishSubject<Int>()
let res = s.delay(0.01, scheduler: scheduler)
var array = [Int]()
var err: NSError!
let subscription = res.subscribe(
onNext: { i in
array.append(i)
elementProcessed.onCompleted()
},
onError: { ex in
err = ex as NSError
errorReceived.onCompleted()
})
DispatchQueue.global(qos: .default).async {
s.onNext(1)
try! _ = elementProcessed.toBlocking(timeout: 5.0).first()
s.onError(testError)
}
try! _ = errorReceived.toBlocking(timeout: 5.0).first()
subscription.dispose()
XCTAssertEqual([1], array)
XCTAssertEqual(testError, err)
}
func testDelay_TimeSpan_Real_Error3() {
let elementProcessed: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1)
let errorReceived: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1)
let acknowledged: ReplaySubject<()> = ReplaySubject.create(bufferSize: 1)
let scheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .default)
let s = PublishSubject<Int>()
let res = s.delay(0.01, scheduler: scheduler)
var array = [Int]()
var err: NSError!
let subscription = res.subscribe(
onNext: { i in
array.append(i)
elementProcessed.onCompleted()
try! _ = acknowledged.toBlocking(timeout: 5.0).first()
},
onError: { ex in
err = ex as NSError
errorReceived.onCompleted()
})
DispatchQueue.global(qos: .default).async {
s.onNext(1)
try! _ = elementProcessed.toBlocking(timeout: 5.0).first()
s.onError(testError)
acknowledged.onCompleted()
}
try! _ = errorReceived.toBlocking(timeout: 5.0).first()
subscription.dispose()
XCTAssertEqual([1], array)
XCTAssertEqual(testError, err)
}
func testDelay_TimeSpan_Positive() {
let scheduler = TestScheduler(initialClock: 0)
let msgs = [
next(150, 1),
next(250, 2),
next(350, 3),
next(450, 4),
completed(550)
]
let xs = scheduler.createHotObservable(msgs)
let delay: RxTimeInterval = 42
let res = scheduler.start {
xs.delay(delay, scheduler: scheduler)
}
XCTAssertEqual(res.events,
msgs.map { Recorded(time: $0.time + Int(delay), event: $0.value) }
.filter { $0.time > 200 })
}
func testDelay_TimeSpan_DefaultScheduler() {
let scheduler = MainScheduler.instance
XCTAssertEqual(try! Observable.just(1).delay(0.001, scheduler: scheduler).toBlocking(timeout: 5.0).toArray(), [1])
}
}