Merge pull request #401 from thanegill/doOn-shortcuts
Add doOn shortcut operators
This commit is contained in:
commit
bed5d882df
|
|
@ -129,13 +129,37 @@ extension DriverConvertibleType {
|
|||
- returns: The source sequence with the side-effecting behavior applied.
|
||||
*/
|
||||
@warn_unused_result(message="http://git.io/rxs.uo")
|
||||
public func doOn(onNext onNext: (E -> Void)? = nil, onError: (ErrorType -> Void)? = nil, onCompleted: (() -> Void)? = nil)
|
||||
public func doOn(onNext onNext: (E -> Void)? = nil, onCompleted: (() -> Void)? = nil)
|
||||
-> Driver<E> {
|
||||
let source = self.asObservable()
|
||||
.doOn(onNext: onNext, onError: onError, onCompleted: onCompleted)
|
||||
.doOn(onNext: onNext, onCompleted: onCompleted)
|
||||
|
||||
return Driver(source)
|
||||
}
|
||||
|
||||
/**
|
||||
Invokes an action for each Next event in the observable sequence, and propagates all observer messages through the result sequence.
|
||||
|
||||
- parameter onNext: Action to invoke for each element in the observable sequence.
|
||||
- returns: The source sequence with the side-effecting behavior applied.
|
||||
*/
|
||||
@warn_unused_result(message="http://git.io/rxs.uo")
|
||||
public func doOnNext(onNext: (E -> Void))
|
||||
-> Driver<E> {
|
||||
return self.doOn(onNext: onNext)
|
||||
}
|
||||
|
||||
/**
|
||||
Invokes an action for the Completed event in the observable sequence, and propagates all observer messages through the result sequence.
|
||||
|
||||
- parameter onCompleted: Action to invoke upon graceful termination of the observable sequence.
|
||||
- returns: The source sequence with the side-effecting behavior applied.
|
||||
*/
|
||||
@warn_unused_result(message="http://git.io/rxs.uo")
|
||||
public func doOnCompleted(onCompleted: (() -> Void))
|
||||
-> Driver<E> {
|
||||
return self.doOn(onCompleted: onCompleted)
|
||||
}
|
||||
}
|
||||
|
||||
extension DriverConvertibleType {
|
||||
|
|
|
|||
|
|
@ -75,17 +75,17 @@ class DefaultImageService: ImageService {
|
|||
else {
|
||||
// fetch from network
|
||||
decodedImage = self.$.URLSession.rx_data(NSURLRequest(URL: URL))
|
||||
.doOn(onNext: { data in
|
||||
.doOnNext { data in
|
||||
self._imageDataCache.setObject(data, forKey: URL)
|
||||
})
|
||||
}
|
||||
.flatMap(self.decodeImage)
|
||||
.trackActivity(self.loadingImage)
|
||||
}
|
||||
}
|
||||
|
||||
return decodedImage.doOn(onNext: { image in
|
||||
return decodedImage.doOnNext { image in
|
||||
self._imageCache.setObject(image, forKey: URL)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ extension ObservableType {
|
|||
}
|
||||
}
|
||||
|
||||
// MARK: do
|
||||
// MARK: doOn
|
||||
|
||||
extension ObservableType {
|
||||
|
||||
|
|
@ -113,6 +113,42 @@ extension ObservableType {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
Invokes an action for each Next event in the observable sequence, and propagates all observer messages through the result sequence.
|
||||
|
||||
- parameter onNext: Action to invoke for each element in the observable sequence.
|
||||
- returns: The source sequence with the side-effecting behavior applied.
|
||||
*/
|
||||
@warn_unused_result(message="http://git.io/rxs.uo")
|
||||
public func doOnNext(onNext: (E throws -> Void))
|
||||
-> Observable<E> {
|
||||
return self.doOn(onNext: onNext)
|
||||
}
|
||||
|
||||
/**
|
||||
Invokes an action for the Error event in the observable sequence, and propagates all observer messages through the result sequence.
|
||||
|
||||
- parameter onError: Action to invoke upon errored termination of the observable sequence.
|
||||
- returns: The source sequence with the side-effecting behavior applied.
|
||||
*/
|
||||
@warn_unused_result(message="http://git.io/rxs.uo")
|
||||
public func doOnError(onError: (ErrorType throws -> Void))
|
||||
-> Observable<E> {
|
||||
return self.doOn(onError: onError)
|
||||
}
|
||||
|
||||
/**
|
||||
Invokes an action for the Completed event in the observable sequence, and propagates all observer messages through the result sequence.
|
||||
|
||||
- parameter onCompleted: Action to invoke upon graceful termination of the observable sequence.
|
||||
- returns: The source sequence with the side-effecting behavior applied.
|
||||
*/
|
||||
@warn_unused_result(message="http://git.io/rxs.uo")
|
||||
public func doOnCompleted(onCompleted: (() throws -> Void))
|
||||
-> Observable<E> {
|
||||
return self.doOn(onCompleted: onCompleted)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: startWith
|
||||
|
|
|
|||
|
|
@ -537,6 +537,55 @@ extension DriverTest {
|
|||
let expectedEvents = [.Next(1), .Next(2), .Next(-1), .Completed] as [Event<Int>]
|
||||
XCTAssertEqual(events, expectedEvents)
|
||||
}
|
||||
|
||||
|
||||
func testAsDriver_doOnNext() {
|
||||
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
||||
|
||||
var events = [Int]()
|
||||
|
||||
let driver = hotObservable.asDriver(onErrorJustReturn: -1).doOnNext { e in
|
||||
XCTAssertTrue(isMainThread())
|
||||
events.append(e)
|
||||
}
|
||||
|
||||
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
||||
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
||||
|
||||
hotObservable.on(.Next(1))
|
||||
hotObservable.on(.Next(2))
|
||||
hotObservable.on(.Error(testError))
|
||||
|
||||
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
||||
}
|
||||
|
||||
XCTAssertEqual(results, [1, 2, -1])
|
||||
let expectedEvents = [1, 2, -1]
|
||||
XCTAssertEqual(events, expectedEvents)
|
||||
}
|
||||
|
||||
func testAsDriver_doOnCompleted() {
|
||||
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
|
||||
|
||||
var completed = false
|
||||
let driver = hotObservable.asDriver(onErrorJustReturn: -1).doOnCompleted { e in
|
||||
XCTAssertTrue(isMainThread())
|
||||
completed = true
|
||||
}
|
||||
|
||||
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
|
||||
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
|
||||
|
||||
hotObservable.on(.Next(1))
|
||||
hotObservable.on(.Next(2))
|
||||
hotObservable.on(.Error(testError))
|
||||
|
||||
XCTAssertTrue(hotObservable.subscriptions == [UnsunscribedFromHotObservable])
|
||||
}
|
||||
|
||||
XCTAssertEqual(results, [1, 2, -1])
|
||||
XCTAssertEqual(completed, true)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: distinct until change
|
||||
|
|
|
|||
|
|
@ -271,9 +271,9 @@ extension ObservableSingleTest {
|
|||
}
|
||||
}
|
||||
|
||||
// Do
|
||||
// doOn
|
||||
extension ObservableSingleTest {
|
||||
func testDo_shouldSeeAllValues() {
|
||||
func testDoOn_shouldSeeAllValues() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
|
|
@ -317,7 +317,7 @@ extension ObservableSingleTest {
|
|||
XCTAssertEqual(xs.subscriptions, correctSubscriptions)
|
||||
}
|
||||
|
||||
func testDo_plainAction() {
|
||||
func testDoOn_plainAction() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
|
|
@ -356,8 +356,8 @@ extension ObservableSingleTest {
|
|||
XCTAssertEqual(res.events, correctMessages)
|
||||
XCTAssertEqual(xs.subscriptions, correctSubscriptions)
|
||||
}
|
||||
|
||||
func testDo_nextCompleted() {
|
||||
|
||||
func testDoOn_nextCompleted() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
|
|
@ -404,7 +404,7 @@ extension ObservableSingleTest {
|
|||
XCTAssertEqual(xs.subscriptions, correctSubscriptions)
|
||||
}
|
||||
|
||||
func testDo_completedNever() {
|
||||
func testDoOn_completedNever() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let recordedEvents: [Recorded<Event<Int>>] = [
|
||||
|
|
@ -439,7 +439,7 @@ extension ObservableSingleTest {
|
|||
XCTAssertEqual(xs.subscriptions, correctSubscriptions)
|
||||
}
|
||||
|
||||
func testDo_nextError() {
|
||||
func testDoOn_nextError() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
|
|
@ -486,7 +486,7 @@ extension ObservableSingleTest {
|
|||
XCTAssertEqual(xs.subscriptions, correctSubscriptions)
|
||||
}
|
||||
|
||||
func testDo_nextErrorNot() {
|
||||
func testDoOn_nextErrorNot() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
|
|
@ -533,7 +533,7 @@ extension ObservableSingleTest {
|
|||
XCTAssertEqual(xs.subscriptions, correctSubscriptions)
|
||||
}
|
||||
|
||||
func testDo_Throws() {
|
||||
func testDoOn_Throws() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
|
|
@ -558,6 +558,213 @@ extension ObservableSingleTest {
|
|||
XCTAssertEqual(res.events, correctMessages)
|
||||
XCTAssertEqual(xs.subscriptions, correctSubscriptions)
|
||||
}
|
||||
|
||||
func testDoOnNext_normal() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
next(150, 1),
|
||||
next(210, 2),
|
||||
next(220, 3),
|
||||
next(230, 4),
|
||||
next(240, 5),
|
||||
completed(250)
|
||||
])
|
||||
|
||||
var numberOfTimesInvoked = 0
|
||||
|
||||
let res = scheduler.start { xs.doOnNext { error in
|
||||
numberOfTimesInvoked = numberOfTimesInvoked + 1
|
||||
}
|
||||
}
|
||||
|
||||
let correctMessages = [
|
||||
next(210, 2),
|
||||
next(220, 3),
|
||||
next(230, 4),
|
||||
next(240, 5),
|
||||
completed(250)
|
||||
]
|
||||
|
||||
let correctSubscriptions = [
|
||||
Subscription(200, 250)
|
||||
]
|
||||
|
||||
XCTAssertEqual(res.events, correctMessages)
|
||||
XCTAssertEqual(xs.subscriptions, correctSubscriptions)
|
||||
|
||||
XCTAssertEqual(numberOfTimesInvoked, 4)
|
||||
}
|
||||
|
||||
func testDoOnNext_throws() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
next(150, 1),
|
||||
next(210, 2),
|
||||
next(220, 3),
|
||||
next(230, 4),
|
||||
next(240, 5),
|
||||
completed(250)
|
||||
])
|
||||
|
||||
var numberOfTimesInvoked = 0
|
||||
|
||||
let res = scheduler.start { xs.doOnNext { error in
|
||||
if numberOfTimesInvoked > 2 {
|
||||
throw testError
|
||||
}
|
||||
numberOfTimesInvoked = numberOfTimesInvoked + 1
|
||||
}
|
||||
}
|
||||
|
||||
let correctMessages = [
|
||||
next(210, 2),
|
||||
next(220, 3),
|
||||
next(230, 4),
|
||||
error(240, testError)
|
||||
]
|
||||
|
||||
let correctSubscriptions = [
|
||||
Subscription(200, 240)
|
||||
]
|
||||
|
||||
XCTAssertEqual(res.events, correctMessages)
|
||||
XCTAssertEqual(xs.subscriptions, correctSubscriptions)
|
||||
|
||||
XCTAssertEqual(numberOfTimesInvoked, 3)
|
||||
}
|
||||
|
||||
func testDoOnError_normal() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
next(150, 1),
|
||||
next(210, 2),
|
||||
error(250, testError)
|
||||
])
|
||||
|
||||
var recordedError: ErrorType!
|
||||
var numberOfTimesInvoked = 0
|
||||
|
||||
let res = scheduler.start { xs.doOnError { error in
|
||||
recordedError = error
|
||||
numberOfTimesInvoked = numberOfTimesInvoked + 1
|
||||
}
|
||||
}
|
||||
|
||||
let correctMessages = [
|
||||
next(210, 2),
|
||||
error(250, testError)
|
||||
]
|
||||
|
||||
let correctSubscriptions = [
|
||||
Subscription(200, 250)
|
||||
]
|
||||
|
||||
XCTAssertEqual(res.events, correctMessages)
|
||||
XCTAssertEqual(xs.subscriptions, correctSubscriptions)
|
||||
|
||||
XCTAssertEqual(recordedError as NSError, testError)
|
||||
XCTAssertEqual(numberOfTimesInvoked, 1)
|
||||
}
|
||||
|
||||
func testDoOnError_throws() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
next(150, 1),
|
||||
next(210, 2),
|
||||
error(250, testError)
|
||||
])
|
||||
|
||||
let res = scheduler.start { xs.doOnError { _ in
|
||||
throw testError1
|
||||
}
|
||||
}
|
||||
|
||||
let correctMessages = [
|
||||
next(210, 2),
|
||||
error(250, testError1)
|
||||
]
|
||||
|
||||
let correctSubscriptions = [
|
||||
Subscription(200, 250)
|
||||
]
|
||||
|
||||
XCTAssertEqual(res.events, correctMessages)
|
||||
XCTAssertEqual(xs.subscriptions, correctSubscriptions)
|
||||
}
|
||||
|
||||
func testDoOnCompleted_normal() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
next(150, 1),
|
||||
next(210, 2),
|
||||
next(220, 3),
|
||||
next(230, 4),
|
||||
next(240, 5),
|
||||
completed(250)
|
||||
])
|
||||
|
||||
var didComplete = false
|
||||
|
||||
let res = scheduler.start { xs.doOnCompleted { error in
|
||||
didComplete = true
|
||||
}
|
||||
}
|
||||
|
||||
let correctMessages = [
|
||||
next(210, 2),
|
||||
next(220, 3),
|
||||
next(230, 4),
|
||||
next(240, 5),
|
||||
completed(250)
|
||||
]
|
||||
|
||||
let correctSubscriptions = [
|
||||
Subscription(200, 250)
|
||||
]
|
||||
|
||||
XCTAssertEqual(res.events, correctMessages)
|
||||
XCTAssertEqual(xs.subscriptions, correctSubscriptions)
|
||||
|
||||
XCTAssertEqual(didComplete, true)
|
||||
}
|
||||
|
||||
func testDoOnCompleted_throws() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
next(150, 1),
|
||||
next(210, 2),
|
||||
next(220, 3),
|
||||
next(230, 4),
|
||||
next(240, 5),
|
||||
completed(250)
|
||||
])
|
||||
|
||||
let res = scheduler.start { xs.doOnCompleted { error in
|
||||
throw testError
|
||||
}
|
||||
}
|
||||
|
||||
let correctMessages = [
|
||||
next(210, 2),
|
||||
next(220, 3),
|
||||
next(230, 4),
|
||||
next(240, 5),
|
||||
error(250, testError)
|
||||
]
|
||||
|
||||
let correctSubscriptions = [
|
||||
Subscription(200, 250)
|
||||
]
|
||||
|
||||
XCTAssertEqual(res.events, correctMessages)
|
||||
XCTAssertEqual(xs.subscriptions, correctSubscriptions)
|
||||
}
|
||||
}
|
||||
|
||||
// retry
|
||||
|
|
|
|||
Loading…
Reference in New Issue