Flowable handling

This commit is contained in:
Denis Karmyshakov 2018-03-27 17:40:34 +03:00
parent bbc73d3d12
commit cd9db1203a
2 changed files with 30 additions and 0 deletions

View File

@ -1,6 +1,7 @@
package ru.touchin.roboswag.components.utils.destroyable
import io.reactivex.Completable
import io.reactivex.Flowable
import io.reactivex.Maybe
import io.reactivex.Observable
import io.reactivex.Single
@ -21,6 +22,16 @@ open class BaseDestroyable : Destroyable {
*/
fun onDestroy() = subscriptions.dispose()
override fun <T> untilDestroy(
flowable: Flowable<T>,
onNextAction: (T) -> Unit,
onErrorAction: (Throwable) -> Unit,
onCompletedAction: () -> Unit
): Disposable = flowable
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onNextAction, onErrorAction, onCompletedAction)
.also { subscriptions.add(it) }
override fun <T> untilDestroy(
observable: Observable<T>,
onNextAction: (T) -> Unit,

View File

@ -1,6 +1,7 @@
package ru.touchin.roboswag.components.utils.destroyable
import io.reactivex.Completable
import io.reactivex.Flowable
import io.reactivex.Maybe
import io.reactivex.Observable
import io.reactivex.Single
@ -23,6 +24,24 @@ interface Destroyable {
}
}
/**
* Method should be used to guarantee that observable won't be subscribed after onDestroy.
* It is automatically subscribing to the observable and calls onNextAction and onErrorAction on observable events.
* Don't forget to process errors if observable can emit them.
*
* @param flowable [Flowable] to subscribe until onDestroy;
* @param onNextAction Action which will raise on every [io.reactivex.Emitter.onNext] item;
* @param onErrorAction Action which will raise on every [io.reactivex.Emitter.onError] throwable;
* @param T Type of emitted by observable items;
* @return [Disposable] which is wrapping source observable to unsubscribe from it onDestroy.
*/
fun <T> untilDestroy(
flowable: Flowable<T>,
onNextAction: (T) -> Unit = Functions.emptyConsumer<T>()::accept,
onErrorAction: (Throwable) -> Unit = getActionThrowableForAssertion(Lc.getCodePoint(this, 2)),
onCompletedAction: () -> Unit = Functions.EMPTY_ACTION::run
): Disposable
/**
* Method should be used to guarantee that observable won't be subscribed after onDestroy.
* It is automatically subscribing to the observable and calls onNextAction and onErrorAction on observable events.