Remove SkipUntilTime
This commit is contained in:
parent
c524c8a6ce
commit
f065189ba9
|
|
@ -121,71 +121,3 @@ class SkipUntil<Element, Other>: Producer<Element> {
|
|||
return sink.run()
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: SkipUntil time
|
||||
|
||||
class SkipUntilTimeSink<ElementType, S: SchedulerType, O: ObserverType where O.E == ElementType> : Sink<O>, ObserverType {
|
||||
|
||||
typealias E = ElementType
|
||||
typealias Parent = SkipUntilTime<E, S>
|
||||
|
||||
private let _parent: Parent
|
||||
|
||||
// state
|
||||
private var _open: Bool = false
|
||||
|
||||
init(parent: Parent, observer: O, cancel: Disposable) {
|
||||
_parent = parent
|
||||
super.init(observer: observer, cancel: cancel)
|
||||
}
|
||||
|
||||
func on(event: Event<E>) {
|
||||
switch(event) {
|
||||
case let .Next(element):
|
||||
if _open {
|
||||
observer?.onNext(element)
|
||||
}
|
||||
case let .Error(error):
|
||||
_observer?.onError(error)
|
||||
dispose()
|
||||
case .Completed:
|
||||
_observer?.onComplete()
|
||||
dispose()
|
||||
}
|
||||
}
|
||||
|
||||
func run() -> Disposable {
|
||||
// Actually it should be abs time here. Or diff from now
|
||||
let disposeTimer = _parent._scheduler.scheduleRelative((), dueTime:_parent._startTime) {
|
||||
self._tick()
|
||||
return NopDisposable.instance
|
||||
}
|
||||
let disposeSubscription = _parent._source.subscribeSafe(self)
|
||||
return BinaryDisposable(disposeTimer, disposeSubscription)
|
||||
}
|
||||
|
||||
private func _tick() {
|
||||
_open = true
|
||||
}
|
||||
}
|
||||
|
||||
class SkipUntilTime<Element, S: SchedulerType>: Producer<Element> {
|
||||
typealias TimeInterval = S.TimeInterval
|
||||
|
||||
private let _source: Observable<Element>
|
||||
private let _startTime: TimeInterval
|
||||
private let _scheduler: S
|
||||
|
||||
init(source: Observable<Element>, startTime: TimeInterval, scheduler: S) {
|
||||
_source = source
|
||||
_startTime = startTime
|
||||
_scheduler = scheduler
|
||||
}
|
||||
|
||||
override func run<O : ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
|
||||
let sink = SkipUntilTimeSink(parent: self, observer: observer, cancel: cancel)
|
||||
setSink(sink)
|
||||
return sink.run()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -198,11 +198,3 @@ extension ObservableType {
|
|||
return BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
|
||||
}
|
||||
}
|
||||
|
||||
// SkipUntil time
|
||||
|
||||
extension ObservableType {
|
||||
public func skipUntil<S: SchedulerType>(startTime: S.TimeInterval, _ scheduler: S) -> Observable<E> {
|
||||
return SkipUntilTime(source: self.asObservable(), startTime: startTime, scheduler: scheduler)
|
||||
}
|
||||
}
|
||||
|
|
@ -1304,168 +1304,4 @@ extension ObservableTimeTest {
|
|||
XCTAssertEqual(result!, [4, 5, 6])
|
||||
}
|
||||
|
||||
// MARK: SkipUntil
|
||||
|
||||
func testSkipUntil_Zero() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
next(210, 1),
|
||||
next(220, 2),
|
||||
completed(230)
|
||||
])
|
||||
|
||||
let res = scheduler.start() {
|
||||
xs.skipUntil(0, scheduler)
|
||||
}
|
||||
|
||||
XCTAssertEqual(res.messages, [
|
||||
next(210, 1),
|
||||
next(220, 2),
|
||||
completed(230)
|
||||
])
|
||||
|
||||
XCTAssertEqual(xs.subscriptions, [
|
||||
Subscription(200, 230)
|
||||
])
|
||||
}
|
||||
|
||||
func testSkipUntil_Some() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
next(210, 1),
|
||||
next(220, 2),
|
||||
completed(230)
|
||||
])
|
||||
|
||||
let res = scheduler.start() {
|
||||
xs.skipUntil(215, scheduler)
|
||||
}
|
||||
|
||||
XCTAssertEqual(res.messages, [
|
||||
next(220, 2),
|
||||
completed(230)
|
||||
])
|
||||
|
||||
XCTAssertEqual(xs.subscriptions, [
|
||||
Subscription(200, 230)
|
||||
])
|
||||
}
|
||||
|
||||
func testSkipUntil_Late() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
next(210, 1),
|
||||
next(220, 2),
|
||||
completed(230)
|
||||
])
|
||||
|
||||
let res = scheduler.start() {
|
||||
xs.skipUntil(250, scheduler)
|
||||
}
|
||||
|
||||
XCTAssertEqual(res.messages, [
|
||||
completed(230)
|
||||
])
|
||||
|
||||
XCTAssertEqual(xs.subscriptions, [
|
||||
Subscription(200, 230)
|
||||
])
|
||||
}
|
||||
|
||||
func testSkipUntil_Error() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs:HotObservable<Int> = scheduler.createHotObservable([
|
||||
error(210, testError)
|
||||
])
|
||||
|
||||
let res = scheduler.start() {
|
||||
xs.skipUntil(250, scheduler)
|
||||
}
|
||||
|
||||
XCTAssertEqual(res.messages, [
|
||||
error(210, testError)
|
||||
])
|
||||
|
||||
XCTAssertEqual(xs.subscriptions, [
|
||||
Subscription(200, 210)
|
||||
])
|
||||
}
|
||||
|
||||
func testSkipUntil_Never() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs:HotObservable<Int> = scheduler.createHotObservable([])
|
||||
|
||||
let res = scheduler.start() {
|
||||
xs.skipUntil(250, scheduler)
|
||||
}
|
||||
|
||||
XCTAssertEqual(res.messages, [])
|
||||
|
||||
XCTAssertEqual(xs.subscriptions, [
|
||||
Subscription(200, 1000)
|
||||
])
|
||||
}
|
||||
|
||||
func testSkipUntil_Twice1() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
next(210, 1),
|
||||
next(220, 2),
|
||||
next(230, 3),
|
||||
next(240, 4),
|
||||
next(250, 5),
|
||||
next(260, 6),
|
||||
completed(270)
|
||||
])
|
||||
|
||||
let res = scheduler.start() {
|
||||
xs.skipUntil(215, scheduler).skipUntil(230, scheduler)
|
||||
}
|
||||
|
||||
XCTAssertEqual(res.messages, [
|
||||
next(240, 4),
|
||||
next(250, 5),
|
||||
next(260, 6),
|
||||
completed(270)
|
||||
])
|
||||
|
||||
XCTAssertEqual(xs.subscriptions, [
|
||||
Subscription(200, 270)
|
||||
])
|
||||
}
|
||||
|
||||
func testSkipUntil_Twice2() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
next(210, 1),
|
||||
next(220, 2),
|
||||
next(230, 3),
|
||||
next(240, 4),
|
||||
next(250, 5),
|
||||
next(260, 6),
|
||||
completed(270)
|
||||
])
|
||||
|
||||
let res = scheduler.start() {
|
||||
xs.skipUntil(230, scheduler).skipUntil(215, scheduler)
|
||||
}
|
||||
|
||||
XCTAssertEqual(res.messages, [
|
||||
next(240, 4),
|
||||
next(250, 5),
|
||||
next(260, 6),
|
||||
completed(270)
|
||||
])
|
||||
|
||||
XCTAssertEqual(xs.subscriptions, [
|
||||
Subscription(200, 270)
|
||||
])
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue