diff --git a/src/main/java/ru/touchin/roboswag/components/utils/destroyable/BaseDestroyable.kt b/src/main/java/ru/touchin/roboswag/components/utils/destroyable/BaseDestroyable.kt index dae5cd3..f2a47a9 100644 --- a/src/main/java/ru/touchin/roboswag/components/utils/destroyable/BaseDestroyable.kt +++ b/src/main/java/ru/touchin/roboswag/components/utils/destroyable/BaseDestroyable.kt @@ -1,6 +1,7 @@ package ru.touchin.roboswag.components.utils.destroyable import io.reactivex.Completable +import io.reactivex.Flowable import io.reactivex.Maybe import io.reactivex.Observable import io.reactivex.Single @@ -21,6 +22,16 @@ open class BaseDestroyable : Destroyable { */ fun onDestroy() = subscriptions.dispose() + override fun untilDestroy( + flowable: Flowable, + onNextAction: (T) -> Unit, + onErrorAction: (Throwable) -> Unit, + onCompletedAction: () -> Unit + ): Disposable = flowable + .observeOn(AndroidSchedulers.mainThread()) + .subscribe(onNextAction, onErrorAction, onCompletedAction) + .also { subscriptions.add(it) } + override fun untilDestroy( observable: Observable, onNextAction: (T) -> Unit, diff --git a/src/main/java/ru/touchin/roboswag/components/utils/destroyable/Destroyable.kt b/src/main/java/ru/touchin/roboswag/components/utils/destroyable/Destroyable.kt index ea2e1eb..6d2aea7 100644 --- a/src/main/java/ru/touchin/roboswag/components/utils/destroyable/Destroyable.kt +++ b/src/main/java/ru/touchin/roboswag/components/utils/destroyable/Destroyable.kt @@ -1,6 +1,7 @@ package ru.touchin.roboswag.components.utils.destroyable import io.reactivex.Completable +import io.reactivex.Flowable import io.reactivex.Maybe import io.reactivex.Observable import io.reactivex.Single @@ -23,6 +24,24 @@ interface Destroyable { } } + /** + * Method should be used to guarantee that observable won't be subscribed after onDestroy. + * It is automatically subscribing to the observable and calls onNextAction and onErrorAction on observable events. + * Don't forget to process errors if observable can emit them. + * + * @param flowable [Flowable] to subscribe until onDestroy; + * @param onNextAction Action which will raise on every [io.reactivex.Emitter.onNext] item; + * @param onErrorAction Action which will raise on every [io.reactivex.Emitter.onError] throwable; + * @param T Type of emitted by observable items; + * @return [Disposable] which is wrapping source observable to unsubscribe from it onDestroy. + */ + fun untilDestroy( + flowable: Flowable, + onNextAction: (T) -> Unit = Functions.emptyConsumer()::accept, + onErrorAction: (Throwable) -> Unit = getActionThrowableForAssertion(Lc.getCodePoint(this, 2)), + onCompletedAction: () -> Unit = Functions.EMPTY_ACTION::run + ): Disposable + /** * Method should be used to guarantee that observable won't be subscribed after onDestroy. * It is automatically subscribing to the observable and calls onNextAction and onErrorAction on observable events.