Merge branch 'swift-3.0' of github.com:kzaher/RxSwift into swift-3.0

This commit is contained in:
Krunoslav Zaher 2016-08-01 10:47:06 +02:00
commit 3b34850675
17 changed files with 116 additions and 76 deletions

View File

@ -123,7 +123,7 @@ extension DriverConvertibleType {
public func doOn(_ eventHandler: (Event<E>) -> Void)
-> Driver<E> {
let source = self.asObservable()
.doOn(eventHandler: eventHandler)
.doOn(eventHandler)
return Driver(source)
}
@ -137,7 +137,7 @@ extension DriverConvertibleType {
- returns: The source sequence with the side-effecting behavior applied.
*/
// @warn_unused_result(message:"http://git.io/rxs.uo")
public func doOn(_ onNext: ((E) -> Void)? = nil, onError: ((ErrorProtocol) -> Void)? = nil, onCompleted: (() -> Void)? = nil)
public func doOn(onNext: ((E) -> Void)? = nil, onError: ((ErrorProtocol) -> Void)? = nil, onCompleted: (() -> Void)? = nil)
-> Driver<E> {
let source = self.asObservable()
.doOn(onNext: onNext, onError: onError, onCompleted: onCompleted)
@ -152,9 +152,9 @@ extension DriverConvertibleType {
- returns: The source sequence with the side-effecting behavior applied.
*/
// @warn_unused_result(message:"http://git.io/rxs.uo")
public func doOnNext(_ onNext: ((E) -> Void))
public func `do`(onNext: ((E) -> Void))
-> Driver<E> {
return self.doOn(onNext)
return self.doOn(onNext: onNext)
}
/**
@ -164,7 +164,7 @@ extension DriverConvertibleType {
- returns: The source sequence with the side-effecting behavior applied.
*/
// @warn_unused_result(message:"http://git.io/rxs.uo")
public func doOnCompleted(_ onCompleted: (() -> Void))
public func `do`(onCompleted: (() -> Void))
-> Driver<E> {
return self.doOn(onCompleted: onCompleted)
}
@ -399,7 +399,7 @@ extension Collection where Iterator.Element : DriverConvertibleType {
*/
// @warn_unused_result(message:"http://git.io/rxs.uo")
public func zip<R>(_ resultSelector: ([Generator.Element.E]) throws -> R) -> Driver<R> {
let source = self.map { $0.asDriver().asObservable() }.zip(resultSelector: resultSelector)
let source = self.map { $0.asDriver().asObservable() }.zip(resultSelector)
return Driver<R>(source)
}
}
@ -415,7 +415,7 @@ extension Collection where Iterator.Element : DriverConvertibleType {
*/
// @warn_unused_result(message:"http://git.io/rxs.uo")
public func combineLatest<R>(_ resultSelector: ([Generator.Element.E]) throws -> R) -> Driver<R> {
let source = self.map { $0.asDriver().asObservable() }.combineLatest(resultSelector: resultSelector)
let source = self.map { $0.asDriver().asObservable() }.combineLatest(resultSelector)
return Driver<R>(source)
}
}

View File

@ -75,17 +75,17 @@ class DefaultImageService: ImageService {
else {
// fetch from network
decodedImage = self.$.URLSession.rx_data(URLRequest(url: url))
.doOnNext { data in
.do(onNext: { data in
self._imageDataCache.setObject(data, forKey: url)
}
})
.flatMap(self.decodeImage)
.trackActivity(self.loadingImage)
}
}
return decodedImage.doOnNext { image in
return decodedImage.do(onNext: { image in
self._imageCache.setObject(image, forKey: url)
}
})
}
}

View File

@ -297,7 +297,15 @@ extension Bag where T: ObserverType {
/**
Dispatches `dispose` to all disposables contained inside bag.
*/
@available(*, deprecated, renamed: "disposeAll(in:)")
public func disposeAllIn(_ bag: Bag<Disposable>) {
disposeAll(in: bag)
}
/**
Dispatches `dispose` to all disposables contained inside bag.
*/
public func disposeAll(in bag: Bag<Disposable>) {
if bag._onlyFastPath {
bag._value0?.dispose()
return

View File

@ -77,17 +77,29 @@ public class CompositeDisposable : DisposeBase, Disposable, Cancelable {
- returns: Key that can be used to remove disposable from composite disposable. In case dispose bag was already
disposed `nil` will be returned.
*/
@available(*, deprecated, renamed: "insert(_:)")
public func addDisposable(_ disposable: Disposable) -> DisposeKey? {
let key = _addDisposable(disposable)
return insert(disposable)
}
/**
Adds a disposable to the CompositeDisposable or disposes the disposable if the CompositeDisposable is disposed.
- parameter disposable: Disposable to add.
- returns: Key that can be used to remove disposable from composite disposable. In case dispose bag was already
disposed `nil` will be returned.
*/
public func insert(_ disposable: Disposable) -> DisposeKey? {
let key = _insert(disposable)
if key == nil {
disposable.dispose()
}
return key
}
private func _addDisposable(_ disposable: Disposable) -> DisposeKey? {
private func _insert(_ disposable: Disposable) -> DisposeKey? {
_lock.lock(); defer { _lock.unlock() }
return _disposables?.insert(disposable)
@ -102,15 +114,25 @@ public class CompositeDisposable : DisposeBase, Disposable, Cancelable {
}
/**
Removes and disposes the disposable identified by `disposeKey` from the CompositeDisposable.
- parameter disposeKey: Key used to identify disposable to be removed.
*/
Removes and disposes the disposable identified by `disposeKey` from the CompositeDisposable.
- parameter disposeKey: Key used to identify disposable to be removed.
*/
@available(*, deprecated, renamed: "remove(for:)")
public func removeDisposable(_ disposeKey: DisposeKey) {
_removeDisposable(disposeKey)?.dispose()
remove(for: disposeKey)
}
private func _removeDisposable(_ disposeKey: DisposeKey) -> Disposable? {
/**
Removes and disposes the disposable identified by `disposeKey` from the CompositeDisposable.
- parameter disposeKey: Key used to identify disposable to be removed.
*/
public func remove(for disposeKey: DisposeKey) {
_remove(for: disposeKey)?.dispose()
}
private func _remove(for disposeKey: DisposeKey) -> Disposable? {
_lock.lock(); defer { _lock.unlock() }
return _disposables?.removeKey(disposeKey)
}
@ -120,7 +142,7 @@ public class CompositeDisposable : DisposeBase, Disposable, Cancelable {
*/
public func dispose() {
if let disposables = _dispose() {
disposeAllIn(disposables)
disposeAll(in: disposables)
}
}

View File

@ -15,7 +15,7 @@ extension Disposable {
- parameter bag: `DisposeBag` to add `self` to.
*/
public func addDisposableTo(_ bag: DisposeBag) {
bag.addDisposable(self)
bag.insert(self)
}
}
@ -51,11 +51,21 @@ public class DisposeBag: DisposeBase {
- parameter disposable: Disposable to add.
*/
@available(*, deprecated, renamed: "insert(_:)")
public func addDisposable(_ disposable: Disposable) {
_addDisposable(disposable)?.dispose()
insert(disposable)
}
private func _addDisposable(_ disposable: Disposable) -> Disposable? {
/**
Adds `disposable` to be disposed when dispose bag is being deinited.
- parameter disposable: Disposable to add.
*/
public func insert(_ disposable: Disposable) {
_insert(disposable)?.dispose()
}
private func _insert(_ disposable: Disposable) -> Disposable? {
_lock.lock(); defer { _lock.unlock() }
if _disposed {
return disposable

View File

@ -42,7 +42,7 @@ class MergeLimitedSinkIter<S: ObservableConvertibleType, O: ObserverType where S
_parent.forwardOn(event)
_parent.dispose()
case .completed:
_parent._group.removeDisposable(_disposeKey)
_parent._group.remove(for: _disposeKey)
if let next = _parent._queue.dequeue() {
_parent.subscribe(next, group: _parent._group)
}
@ -81,12 +81,12 @@ class MergeLimitedSink<S: ObservableConvertibleType, O: ObserverType where S.E =
init(maxConcurrent: Int, observer: O) {
_maxConcurrent = maxConcurrent
let _ = _group.addDisposable(_sourceSubscription)
let _ = _group.insert(_sourceSubscription)
super.init(observer: observer)
}
func run(_ source: Observable<S>) -> Disposable {
let _ = _group.addDisposable(_sourceSubscription)
let _ = _group.insert(_sourceSubscription)
let disposable = source.subscribe(self)
_sourceSubscription.disposable = disposable
@ -96,7 +96,7 @@ class MergeLimitedSink<S: ObservableConvertibleType, O: ObserverType where S.E =
func subscribe(_ innerSource: E, group: CompositeDisposable) {
let subscription = SingleAssignmentDisposable()
let key = group.addDisposable(subscription)
let key = group.insert(subscription)
if let key = key {
let observer = MergeLimitedSinkIter(parent: self, disposeKey: key)
@ -253,7 +253,7 @@ class MergeSinkIter<SourceType, S: ObservableConvertibleType, O: ObserverType wh
_parent.dispose()
// }
case .completed:
_parent._group.removeDisposable(_disposeKey)
_parent._group.remove(for: _disposeKey)
// If this has returned true that means that `Completed` should be sent.
// In case there is a race who will sent first completed,
// lock will sort it out. When first Completed message is sent
@ -331,7 +331,7 @@ class MergeSink<SourceType, S: ObservableConvertibleType, O: ObserverType where
func subscribeInner(_ source: Observable<O.E>) {
let iterDisposable = SingleAssignmentDisposable()
if let disposeKey = _group.addDisposable(iterDisposable) {
if let disposeKey = _group.insert(iterDisposable) {
let iter = MergeSinkIter(parent: self, disposeKey: disposeKey)
let subscription = source.subscribe(iter)
iterDisposable.disposable = subscription
@ -339,7 +339,7 @@ class MergeSink<SourceType, S: ObservableConvertibleType, O: ObserverType where
}
func run(_ source: Observable<SourceType>) -> Disposable {
let _ = _group.addDisposable(_sourceSubscription)
let _ = _group.insert(_sourceSubscription)
let subscription = source.subscribe(self)
_sourceSubscription.disposable = subscription

View File

@ -31,7 +31,7 @@ class WindowTimeCountSink<Element, O: ObserverType where O.E == Observable<Eleme
init(parent: Parent, observer: O) {
_parent = parent
let _ = _groupDisposable.addDisposable(_timerD)
let _ = _groupDisposable.insert(_timerD)
_refCountDisposable = RefCountDisposable(disposable: _groupDisposable)
super.init(observer: observer)
@ -42,7 +42,7 @@ class WindowTimeCountSink<Element, O: ObserverType where O.E == Observable<Eleme
forwardOn(.next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable()))
createTimer(_windowId)
let _ = _groupDisposable.addDisposable(_parent._source.subscribeSafe(self))
let _ = _groupDisposable.insert(_parent._source.subscribeSafe(self))
return _refCountDisposable
}

View File

@ -21,7 +21,7 @@ extension Collection where Iterator.Element : ObservableType {
- returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function.
*/
// @warn_unused_result(message:"http://git.io/rxs.uo")
public func combineLatest<R>(resultSelector: ([Generator.Element.E]) throws -> R) -> Observable<R> {
public func combineLatest<R>(_ resultSelector: ([Generator.Element.E]) throws -> R) -> Observable<R> {
return CombineLatestCollectionType(sources: self, resultSelector: resultSelector)
}
}
@ -39,7 +39,7 @@ extension Collection where Iterator.Element : ObservableType {
- returns: An observable sequence containing the result of combining elements of the sources using the specified result selector function.
*/
// @warn_unused_result(message:"http://git.io/rxs.uo")
public func zip<R>(resultSelector: ([Generator.Element.E]) throws -> R) -> Observable<R> {
public func zip<R>(_ resultSelector: ([Generator.Element.E]) throws -> R) -> Observable<R> {
return ZipCollectionType(sources: self, resultSelector: resultSelector)
}
}

View File

@ -84,7 +84,7 @@ extension ObservableType {
- returns: The source sequence with the side-effecting behavior applied.
*/
// @warn_unused_result(message:"http://git.io/rxs.uo")
public func doOn(eventHandler: (Event<E>) throws -> Void)
public func doOn(_ eventHandler: (Event<E>) throws -> Void)
-> Observable<E> {
return Do(source: self.asObservable(), eventHandler: eventHandler)
}
@ -121,7 +121,7 @@ extension ObservableType {
- returns: The source sequence with the side-effecting behavior applied.
*/
// @warn_unused_result(message:"http://git.io/rxs.uo")
public func doOnNext(onNext: ((E) throws -> Void))
public func `do`(onNext: ((E) throws -> Void))
-> Observable<E> {
return self.doOn(onNext: onNext)
}
@ -133,7 +133,7 @@ extension ObservableType {
- returns: The source sequence with the side-effecting behavior applied.
*/
// @warn_unused_result(message:"http://git.io/rxs.uo")
public func doOnError(onError: ((ErrorProtocol) throws -> Void))
public func `do`(onError: ((ErrorProtocol) throws -> Void))
-> Observable<E> {
return self.doOn(onError: onError)
}
@ -145,7 +145,7 @@ extension ObservableType {
- returns: The source sequence with the side-effecting behavior applied.
*/
// @warn_unused_result(message:"http://git.io/rxs.uo")
public func doOnCompleted(onCompleted: (() throws -> Void))
public func `do`(onCompleted: (() throws -> Void))
-> Observable<E> {
return self.doOn(onCompleted: onCompleted)
}
@ -229,7 +229,7 @@ extension ObservableType {
- returns: An observable sequence producing the elements of the given sequence repeatedly until it terminates successfully or is notified to error or complete.
*/
// @warn_unused_result(message:"http://git.io/rxs.uo")
public func retryWhen<TriggerObservable: ObservableType>(notificationHandler: (Observable<ErrorProtocol>) -> TriggerObservable)
public func retryWhen<TriggerObservable: ObservableType>(_ notificationHandler: (Observable<ErrorProtocol>) -> TriggerObservable)
-> Observable<E> {
return RetryWhenSequence(sources: InfiniteSequence(repeatedValue: self.asObservable()), notificationHandler: notificationHandler)
}

View File

@ -60,12 +60,12 @@ extension DispatchQueueConfiguration {
if compositeDisposable.disposed {
return
}
_ = compositeDisposable.addDisposable(action(state))
_ = compositeDisposable.insert(action(state))
cancelTimer.dispose()
})
timer.resume()
_ = compositeDisposable.addDisposable(cancelTimer)
_ = compositeDisposable.insert(cancelTimer)
return compositeDisposable
}

View File

@ -44,12 +44,12 @@ public class OperationQueueScheduler: ImmediateSchedulerType {
}
let disposable = action(state)
let _ = compositeDisposableWeak?.addDisposable(disposable)
let _ = compositeDisposableWeak?.insert(disposable)
}
self.operationQueue.addOperation(operation)
let _ = compositeDisposable.addDisposable(AnonymousDisposable(operation.cancel))
let _ = compositeDisposable.insert(AnonymousDisposable(operation.cancel))
return compositeDisposable
}

View File

@ -47,7 +47,7 @@ class AnyRecursiveScheduler<State> {
let action = self._lock.calculateLocked { () -> Action? in
if isAdded {
self._group.removeDisposable(removeKey!)
self._group.remove(for: removeKey!)
}
else {
isDone = true
@ -65,7 +65,7 @@ class AnyRecursiveScheduler<State> {
_lock.performLocked {
if !isDone {
removeKey = _group.addDisposable(d)
removeKey = _group.insert(d)
isAdded = true
}
}
@ -90,7 +90,7 @@ class AnyRecursiveScheduler<State> {
let action = self._lock.calculateLocked { () -> Action? in
if isAdded {
self._group.removeDisposable(removeKey!)
self._group.remove(for: removeKey!)
}
else {
isDone = true
@ -108,7 +108,7 @@ class AnyRecursiveScheduler<State> {
_lock.performLocked {
if !isDone {
removeKey = _group.addDisposable(d)
removeKey = _group.insert(d)
isAdded = true
}
}
@ -160,7 +160,7 @@ class RecursiveImmediateScheduler<State> {
let action = self._lock.calculateLocked { () -> Action? in
if isAdded {
self._group.removeDisposable(removeKey!)
self._group.remove(for: removeKey!)
}
else {
isDone = true
@ -178,7 +178,7 @@ class RecursiveImmediateScheduler<State> {
_lock.performLocked {
if !isDone {
removeKey = _group.addDisposable(d)
removeKey = _group.insert(d)
isAdded = true
}
}

View File

@ -128,7 +128,7 @@ public class VirtualTimeScheduler<Converter: VirtualTimeConverterType>
_schedulerQueue.enqueue(item)
_ = compositeDisposable.addDisposable(item)
_ = compositeDisposable.insert(item)
return compositeDisposable
}

View File

@ -548,10 +548,10 @@ extension DriverTest {
var events = [Int]()
let driver = hotObservable.asDriver(onErrorJustReturn: -1).doOnNext { e in
let driver = hotObservable.asDriver(onErrorJustReturn: -1).do(onNext: { e in
XCTAssertTrue(isMainThread())
events.append(e)
}
})
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])
@ -572,10 +572,10 @@ extension DriverTest {
let hotObservable = BackgroundThreadPrimitiveHotObservable<Int>()
var completed = false
let driver = hotObservable.asDriver(onErrorJustReturn: -1).doOnCompleted { e in
let driver = hotObservable.asDriver(onErrorJustReturn: -1).do(onCompleted: { e in
XCTAssertTrue(isMainThread())
completed = true
}
})
let results = subscribeTwiceOnBackgroundSchedulerAndOnlyOneSubscription(driver) {
XCTAssertTrue(hotObservable.subscriptions == [SubscribedToHotObservable])

View File

@ -73,11 +73,11 @@ class DisposableTest : RxTest {
var numberDisposed = 0
let compositeDisposable = CompositeDisposable()
let result1 = compositeDisposable.addDisposable(AnonymousDisposable {
let result1 = compositeDisposable.insert(AnonymousDisposable {
numberDisposed += 1
})
_ = compositeDisposable.addDisposable(AnonymousDisposable {
_ = compositeDisposable.insert(AnonymousDisposable {
numberDisposed += 1
})
@ -89,7 +89,7 @@ class DisposableTest : RxTest {
XCTAssertEqual(numberDisposed, 2)
XCTAssertEqual(compositeDisposable.count, 0)
let result = compositeDisposable.addDisposable(AnonymousDisposable {
let result = compositeDisposable.insert(AnonymousDisposable {
numberDisposed += 1
})
@ -131,11 +131,11 @@ class DisposableTest : RxTest {
var numberDisposed = 0
let compositeDisposable = CompositeDisposable()
let result1 = compositeDisposable.addDisposable(AnonymousDisposable {
let result1 = compositeDisposable.insert(AnonymousDisposable {
numberDisposed += 1
})
let result2 = compositeDisposable.addDisposable(AnonymousDisposable {
let result2 = compositeDisposable.insert(AnonymousDisposable {
numberDisposed += 1
})

View File

@ -60,7 +60,7 @@ extension ObservableConcurrencyTest {
let compositeDisposable = CompositeDisposable()
for test in tests {
_ = compositeDisposable.addDisposable(runDispatchQueueSchedulerTests(scheduler, tests: test))
_ = compositeDisposable.insert(runDispatchQueueSchedulerTests(scheduler, tests: test))
}
compositeDisposable.dispose()

View File

@ -573,9 +573,9 @@ extension ObservableSingleTest {
var numberOfTimesInvoked = 0
let res = scheduler.start { xs.doOnNext { error in
let res = scheduler.start { xs.do(onNext: { error in
numberOfTimesInvoked = numberOfTimesInvoked + 1
}
})
}
let correctMessages = [
@ -610,12 +610,12 @@ extension ObservableSingleTest {
var numberOfTimesInvoked = 0
let res = scheduler.start { xs.doOnNext { error in
let res = scheduler.start { xs.do(onNext: { error in
if numberOfTimesInvoked > 2 {
throw testError
}
numberOfTimesInvoked = numberOfTimesInvoked + 1
}
})
}
let correctMessages = [
@ -647,10 +647,10 @@ extension ObservableSingleTest {
var recordedError: ErrorProtocol!
var numberOfTimesInvoked = 0
let res = scheduler.start { xs.doOnError { error in
let res = scheduler.start { xs.do(onError: { error in
recordedError = error
numberOfTimesInvoked = numberOfTimesInvoked + 1
}
})
}
let correctMessages = [
@ -678,9 +678,9 @@ extension ObservableSingleTest {
error(250, testError)
])
let res = scheduler.start { xs.doOnError { _ in
let res = scheduler.start { xs.do(onError: { _ in
throw testError1
}
})
}
let correctMessages = [
@ -710,9 +710,9 @@ extension ObservableSingleTest {
var didComplete = false
let res = scheduler.start { xs.doOnCompleted { error in
let res = scheduler.start { xs.do(onCompleted: { error in
didComplete = true
}
})
}
let correctMessages = [
@ -745,9 +745,9 @@ extension ObservableSingleTest {
completed(250)
])
let res = scheduler.start { xs.doOnCompleted { error in
let res = scheduler.start { xs.do(onCompleted: { error in
throw testError
}
})
}
let correctMessages = [