Fixes anomaly with `Timeout`. #367
This commit is contained in:
parent
79a6f59de2
commit
b7e0ca68fe
|
|
@ -89,9 +89,7 @@ class TimeoutSink<ElementType, O: ObserverType where O.E == ElementType>: Sink<O
|
|||
}
|
||||
|
||||
if timerWins {
|
||||
if let other = self._parent._other {
|
||||
self._subscription.disposable = other.subscribeSafe(self.forwarder())
|
||||
}
|
||||
self._subscription.disposable = self._parent._other.subscribeSafe(self.forwarder())
|
||||
}
|
||||
|
||||
return NopDisposable.instance
|
||||
|
|
@ -104,10 +102,10 @@ class Timeout<Element> : Producer<Element> {
|
|||
|
||||
private let _source: Observable<Element>
|
||||
private let _dueTime: RxTimeInterval
|
||||
private let _other: Observable<Element>?
|
||||
private let _other: Observable<Element>
|
||||
private let _scheduler: SchedulerType
|
||||
|
||||
init(source: Observable<Element>, dueTime: RxTimeInterval, other: Observable<Element>?, scheduler: SchedulerType) {
|
||||
init(source: Observable<Element>, dueTime: RxTimeInterval, other: Observable<Element>, scheduler: SchedulerType) {
|
||||
_source = source
|
||||
_dueTime = dueTime
|
||||
_other = other
|
||||
|
|
|
|||
|
|
@ -354,13 +354,13 @@ extension ObservableType {
|
|||
@warn_unused_result(message="http://git.io/rxs.uo")
|
||||
public func timeout(dueTime: RxTimeInterval, scheduler: SchedulerType)
|
||||
-> Observable<E> {
|
||||
return Timeout(source: self.asObservable(), dueTime: dueTime, other: nil, scheduler: scheduler)
|
||||
return Timeout(source: self.asObservable(), dueTime: dueTime, other: Observable.error(RxError.Timeout), scheduler: scheduler)
|
||||
}
|
||||
|
||||
@available(*, deprecated=2.0.0, message="Please use version with named scheduler parameter.")
|
||||
public func timeout(dueTime: RxTimeInterval, _ scheduler: SchedulerType)
|
||||
-> Observable<E> {
|
||||
return Timeout(source: self.asObservable(), dueTime: dueTime, other: nil, scheduler: scheduler)
|
||||
return Timeout(source: self.asObservable(), dueTime: dueTime, other: Observable.error(RxError.Timeout), scheduler: scheduler)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -1650,6 +1650,32 @@ extension ObservableTimeTest {
|
|||
Subscription(200, 270)
|
||||
])
|
||||
}
|
||||
|
||||
func testTimeout_Duetime_Timeout() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createColdObservable([
|
||||
next(10, 42),
|
||||
next(20, 43),
|
||||
next(50, 44),
|
||||
next(60, 45),
|
||||
completed(70)
|
||||
])
|
||||
|
||||
let res = scheduler.start {
|
||||
xs.timeout(25, scheduler: scheduler)
|
||||
}
|
||||
|
||||
XCTAssertEqual(res.events, [
|
||||
next(210, 42),
|
||||
next(220, 43),
|
||||
error(245, RxError.Timeout)
|
||||
])
|
||||
|
||||
XCTAssertEqual(xs.subscriptions, [
|
||||
Subscription(200, 245)
|
||||
])
|
||||
}
|
||||
|
||||
func testTimeout_Duetime_Disposed() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
|
|
|||
Loading…
Reference in New Issue