RxResult -> throws for take

This commit is contained in:
Krunoslav Zaher 2015-08-11 09:52:33 +02:00
parent 0ab1e8d828
commit cc5353792d
1 changed files with 8 additions and 8 deletions

View File

@ -31,18 +31,18 @@ class TakeCountSink<ElementType, O: ObserverType where O.Element == ElementType>
if remaining > 0 {
remaining--
trySendNext(observer, value)
observer?.on(.Next(value))
if remaining == 0 {
trySendCompleted(observer)
observer?.on(.Completed)
self.dispose()
}
}
case .Error:
trySend(observer, event)
observer?.on(event)
self.dispose()
case .Completed:
trySend(observer, event)
observer?.on(event)
self.dispose()
}
}
@ -83,12 +83,12 @@ class TakeTimeSink<ElementType, S: Scheduler, O: ObserverType where O.Element ==
lock.performLocked {
switch event {
case .Next(let value):
trySendNext(observer, value)
observer?.on(.Next(value))
case .Error:
trySend(observer, event)
observer?.on(event)
self.dispose()
case .Completed:
trySend(observer, event)
observer?.on(event)
self.dispose()
}
}
@ -96,7 +96,7 @@ class TakeTimeSink<ElementType, S: Scheduler, O: ObserverType where O.Element ==
func tick() {
lock.performLocked {
trySendCompleted(self.observer)
self.observer?.on(.Completed)
self.dispose()
}
}