`trySend` to `?.on` and cleanup for `scan`
This commit is contained in:
parent
95f072190a
commit
eac2e7a669
|
|
@ -24,18 +24,19 @@ class ScanSink<ElementType, Accumulate, O: ObserverType where O.Element == Accum
|
|||
func on(event: Event<ElementType>) {
|
||||
switch event {
|
||||
case .Next(let element):
|
||||
self.parent.accumulator(self.accumulate, element).map { result -> Void in
|
||||
self.accumulate = result
|
||||
trySendNext(observer, result)
|
||||
}.recover { error in
|
||||
trySendError(self.observer, error)
|
||||
do {
|
||||
self.accumulate = try self.parent.accumulator(self.accumulate, element)
|
||||
observer?.on(.Next(self.accumulate))
|
||||
}
|
||||
catch let error {
|
||||
self.observer?.on(.Error(error))
|
||||
self.dispose()
|
||||
}
|
||||
case .Error(let error):
|
||||
trySendError(observer, error)
|
||||
observer?.on(.Error(error))
|
||||
self.dispose()
|
||||
case .Completed:
|
||||
trySendCompleted(observer)
|
||||
observer?.on(.Completed)
|
||||
self.dispose()
|
||||
}
|
||||
}
|
||||
|
|
@ -43,7 +44,7 @@ class ScanSink<ElementType, Accumulate, O: ObserverType where O.Element == Accum
|
|||
}
|
||||
|
||||
class Scan<Element, Accumulate>: Producer<Accumulate> {
|
||||
typealias Accumulator = (Accumulate, Element) -> RxResult<Accumulate>
|
||||
typealias Accumulator = (Accumulate, Element) throws -> Accumulate
|
||||
|
||||
let source: Observable<Element>
|
||||
let seed: Accumulate
|
||||
|
|
|
|||
|
|
@ -115,13 +115,10 @@ extension ObservableType {
|
|||
// scan
|
||||
|
||||
extension ObservableType {
|
||||
public func scan<A>(seed: A, accumulator: (A, E) -> A)
|
||||
-> Observable<A> {
|
||||
return Scan(source: self.normalize(), seed: seed, accumulator: { success(accumulator($0, $1)) })
|
||||
}
|
||||
|
||||
public func scanOrDie<A>(seed: A, accumulator: (A, E) -> RxResult<A>)
|
||||
|
||||
public func scan<A>(seed: A, accumulator: (A, E) throws -> A)
|
||||
-> Observable<A> {
|
||||
return Scan(source: self.normalize(), seed: seed, accumulator: accumulator)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -908,11 +908,11 @@ extension ObservableSingleTest {
|
|||
let seed = 42
|
||||
|
||||
let res = scheduler.start {
|
||||
xs.scanOrDie(seed) { (a, e) in
|
||||
xs.scan(seed) { (a, e) in
|
||||
if e == 4 {
|
||||
return failure(testError)
|
||||
throw testError
|
||||
} else {
|
||||
return success(a + e)
|
||||
return a + e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue