Update all known call sites to use the new insert/remove methods on CompositeDisposable.
This commit is contained in:
parent
2cb931d8e9
commit
fbee131886
|
|
@ -42,7 +42,7 @@ class MergeLimitedSinkIter<S: ObservableConvertibleType, O: ObserverType where S
|
|||
_parent.forwardOn(event)
|
||||
_parent.dispose()
|
||||
case .completed:
|
||||
_parent._group.removeDisposable(_disposeKey)
|
||||
_parent._group.removeDisposable(forKey: _disposeKey)
|
||||
if let next = _parent._queue.dequeue() {
|
||||
_parent.subscribe(next, group: _parent._group)
|
||||
}
|
||||
|
|
@ -81,12 +81,12 @@ class MergeLimitedSink<S: ObservableConvertibleType, O: ObserverType where S.E =
|
|||
init(maxConcurrent: Int, observer: O) {
|
||||
_maxConcurrent = maxConcurrent
|
||||
|
||||
let _ = _group.addDisposable(_sourceSubscription)
|
||||
let _ = _group.insert(_sourceSubscription)
|
||||
super.init(observer: observer)
|
||||
}
|
||||
|
||||
func run(_ source: Observable<S>) -> Disposable {
|
||||
let _ = _group.addDisposable(_sourceSubscription)
|
||||
let _ = _group.insert(_sourceSubscription)
|
||||
|
||||
let disposable = source.subscribe(self)
|
||||
_sourceSubscription.disposable = disposable
|
||||
|
|
@ -96,7 +96,7 @@ class MergeLimitedSink<S: ObservableConvertibleType, O: ObserverType where S.E =
|
|||
func subscribe(_ innerSource: E, group: CompositeDisposable) {
|
||||
let subscription = SingleAssignmentDisposable()
|
||||
|
||||
let key = group.addDisposable(subscription)
|
||||
let key = group.insert(subscription)
|
||||
|
||||
if let key = key {
|
||||
let observer = MergeLimitedSinkIter(parent: self, disposeKey: key)
|
||||
|
|
@ -253,7 +253,7 @@ class MergeSinkIter<SourceType, S: ObservableConvertibleType, O: ObserverType wh
|
|||
_parent.dispose()
|
||||
// }
|
||||
case .completed:
|
||||
_parent._group.removeDisposable(_disposeKey)
|
||||
_parent._group.removeDisposable(forKey: _disposeKey)
|
||||
// If this has returned true that means that `Completed` should be sent.
|
||||
// In case there is a race who will sent first completed,
|
||||
// lock will sort it out. When first Completed message is sent
|
||||
|
|
@ -331,7 +331,7 @@ class MergeSink<SourceType, S: ObservableConvertibleType, O: ObserverType where
|
|||
|
||||
func subscribeInner(_ source: Observable<O.E>) {
|
||||
let iterDisposable = SingleAssignmentDisposable()
|
||||
if let disposeKey = _group.addDisposable(iterDisposable) {
|
||||
if let disposeKey = _group.insert(iterDisposable) {
|
||||
let iter = MergeSinkIter(parent: self, disposeKey: disposeKey)
|
||||
let subscription = source.subscribe(iter)
|
||||
iterDisposable.disposable = subscription
|
||||
|
|
@ -339,7 +339,7 @@ class MergeSink<SourceType, S: ObservableConvertibleType, O: ObserverType where
|
|||
}
|
||||
|
||||
func run(_ source: Observable<SourceType>) -> Disposable {
|
||||
let _ = _group.addDisposable(_sourceSubscription)
|
||||
let _ = _group.insert(_sourceSubscription)
|
||||
|
||||
let subscription = source.subscribe(self)
|
||||
_sourceSubscription.disposable = subscription
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ class WindowTimeCountSink<Element, O: ObserverType where O.E == Observable<Eleme
|
|||
init(parent: Parent, observer: O) {
|
||||
_parent = parent
|
||||
|
||||
let _ = _groupDisposable.addDisposable(_timerD)
|
||||
let _ = _groupDisposable.insert(_timerD)
|
||||
|
||||
_refCountDisposable = RefCountDisposable(disposable: _groupDisposable)
|
||||
super.init(observer: observer)
|
||||
|
|
@ -42,7 +42,7 @@ class WindowTimeCountSink<Element, O: ObserverType where O.E == Observable<Eleme
|
|||
forwardOn(.next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable()))
|
||||
createTimer(_windowId)
|
||||
|
||||
let _ = _groupDisposable.addDisposable(_parent._source.subscribeSafe(self))
|
||||
let _ = _groupDisposable.insert(_parent._source.subscribeSafe(self))
|
||||
return _refCountDisposable
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -60,12 +60,12 @@ extension DispatchQueueConfiguration {
|
|||
if compositeDisposable.disposed {
|
||||
return
|
||||
}
|
||||
_ = compositeDisposable.addDisposable(action(state))
|
||||
_ = compositeDisposable.insert(action(state))
|
||||
cancelTimer.dispose()
|
||||
})
|
||||
timer.resume()
|
||||
|
||||
_ = compositeDisposable.addDisposable(cancelTimer)
|
||||
_ = compositeDisposable.insert(cancelTimer)
|
||||
|
||||
return compositeDisposable
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,12 +44,12 @@ public class OperationQueueScheduler: ImmediateSchedulerType {
|
|||
}
|
||||
|
||||
let disposable = action(state)
|
||||
let _ = compositeDisposableWeak?.addDisposable(disposable)
|
||||
let _ = compositeDisposableWeak?.insert(disposable)
|
||||
}
|
||||
|
||||
self.operationQueue.addOperation(operation)
|
||||
|
||||
let _ = compositeDisposable.addDisposable(AnonymousDisposable(operation.cancel))
|
||||
let _ = compositeDisposable.insert(AnonymousDisposable(operation.cancel))
|
||||
|
||||
return compositeDisposable
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ class AnyRecursiveScheduler<State> {
|
|||
|
||||
let action = self._lock.calculateLocked { () -> Action? in
|
||||
if isAdded {
|
||||
self._group.removeDisposable(removeKey!)
|
||||
self._group.removeDisposable(forKey: removeKey!)
|
||||
}
|
||||
else {
|
||||
isDone = true
|
||||
|
|
@ -65,7 +65,7 @@ class AnyRecursiveScheduler<State> {
|
|||
|
||||
_lock.performLocked {
|
||||
if !isDone {
|
||||
removeKey = _group.addDisposable(d)
|
||||
removeKey = _group.insert(d)
|
||||
isAdded = true
|
||||
}
|
||||
}
|
||||
|
|
@ -90,7 +90,7 @@ class AnyRecursiveScheduler<State> {
|
|||
|
||||
let action = self._lock.calculateLocked { () -> Action? in
|
||||
if isAdded {
|
||||
self._group.removeDisposable(removeKey!)
|
||||
self._group.removeDisposable(forKey: removeKey!)
|
||||
}
|
||||
else {
|
||||
isDone = true
|
||||
|
|
@ -108,7 +108,7 @@ class AnyRecursiveScheduler<State> {
|
|||
|
||||
_lock.performLocked {
|
||||
if !isDone {
|
||||
removeKey = _group.addDisposable(d)
|
||||
removeKey = _group.insert(d)
|
||||
isAdded = true
|
||||
}
|
||||
}
|
||||
|
|
@ -160,7 +160,7 @@ class RecursiveImmediateScheduler<State> {
|
|||
|
||||
let action = self._lock.calculateLocked { () -> Action? in
|
||||
if isAdded {
|
||||
self._group.removeDisposable(removeKey!)
|
||||
self._group.removeDisposable(forKey: removeKey!)
|
||||
}
|
||||
else {
|
||||
isDone = true
|
||||
|
|
@ -178,7 +178,7 @@ class RecursiveImmediateScheduler<State> {
|
|||
|
||||
_lock.performLocked {
|
||||
if !isDone {
|
||||
removeKey = _group.addDisposable(d)
|
||||
removeKey = _group.insert(d)
|
||||
isAdded = true
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -128,7 +128,7 @@ public class VirtualTimeScheduler<Converter: VirtualTimeConverterType>
|
|||
|
||||
_schedulerQueue.enqueue(item)
|
||||
|
||||
_ = compositeDisposable.addDisposable(item)
|
||||
_ = compositeDisposable.insert(item)
|
||||
|
||||
return compositeDisposable
|
||||
}
|
||||
|
|
|
|||
|
|
@ -73,11 +73,11 @@ class DisposableTest : RxTest {
|
|||
var numberDisposed = 0
|
||||
let compositeDisposable = CompositeDisposable()
|
||||
|
||||
let result1 = compositeDisposable.addDisposable(AnonymousDisposable {
|
||||
let result1 = compositeDisposable.insert(AnonymousDisposable {
|
||||
numberDisposed += 1
|
||||
})
|
||||
|
||||
_ = compositeDisposable.addDisposable(AnonymousDisposable {
|
||||
_ = compositeDisposable.insert(AnonymousDisposable {
|
||||
numberDisposed += 1
|
||||
})
|
||||
|
||||
|
|
@ -89,7 +89,7 @@ class DisposableTest : RxTest {
|
|||
XCTAssertEqual(numberDisposed, 2)
|
||||
XCTAssertEqual(compositeDisposable.count, 0)
|
||||
|
||||
let result = compositeDisposable.addDisposable(AnonymousDisposable {
|
||||
let result = compositeDisposable.insert(AnonymousDisposable {
|
||||
numberDisposed += 1
|
||||
})
|
||||
|
||||
|
|
@ -131,11 +131,11 @@ class DisposableTest : RxTest {
|
|||
var numberDisposed = 0
|
||||
let compositeDisposable = CompositeDisposable()
|
||||
|
||||
let result1 = compositeDisposable.addDisposable(AnonymousDisposable {
|
||||
let result1 = compositeDisposable.insert(AnonymousDisposable {
|
||||
numberDisposed += 1
|
||||
})
|
||||
|
||||
let result2 = compositeDisposable.addDisposable(AnonymousDisposable {
|
||||
let result2 = compositeDisposable.insert(AnonymousDisposable {
|
||||
numberDisposed += 1
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ extension ObservableConcurrencyTest {
|
|||
let compositeDisposable = CompositeDisposable()
|
||||
|
||||
for test in tests {
|
||||
_ = compositeDisposable.addDisposable(runDispatchQueueSchedulerTests(scheduler, tests: test))
|
||||
_ = compositeDisposable.insert(runDispatchQueueSchedulerTests(scheduler, tests: test))
|
||||
}
|
||||
|
||||
compositeDisposable.dispose()
|
||||
|
|
|
|||
Loading…
Reference in New Issue