untilDestroy moved to extension

This commit is contained in:
Denis Karmyshakov 2018-07-03 15:00:36 +03:00
parent 42750812dd
commit 448501aa06
3 changed files with 23 additions and 30 deletions

View File

@ -14,3 +14,11 @@ inline fun <T> Delegates.observable(
): ReadWriteProperty<Any?, T> = object : ObservableProperty<T>(initialValue) { ): ReadWriteProperty<Any?, T> = object : ObservableProperty<T>(initialValue) {
override fun afterChange(property: KProperty<*>, oldValue: T, newValue: T) = onChange(newValue) override fun afterChange(property: KProperty<*>, oldValue: T, newValue: T) = onChange(newValue)
} }
inline fun <T> Delegates.distinctUntilChanged(
initialValue: T,
crossinline onChange: (newValue: T) -> Unit
): ReadWriteProperty<Any?, T> = object : ObservableProperty<T>(initialValue) {
override fun afterChange(property: KProperty<*>, oldValue: T, newValue: T) =
if (newValue != null && oldValue != newValue) onChange(newValue) else Unit
}

View File

@ -24,51 +24,41 @@ open class BaseDestroyable : Destroyable {
*/ */
fun onDestroy() = subscriptions.dispose() fun onDestroy() = subscriptions.dispose()
override fun <T> untilDestroy( override fun <T> Flowable<T>.untilDestroy(
flowable: Flowable<T>,
onNextAction: (T) -> Unit, onNextAction: (T) -> Unit,
onErrorAction: (Throwable) -> Unit, onErrorAction: (Throwable) -> Unit,
onCompletedAction: () -> Unit onCompletedAction: () -> Unit
): Disposable = flowable ): Disposable = observeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onNextAction, onErrorAction, onCompletedAction) .subscribe(onNextAction, onErrorAction, onCompletedAction)
.also { subscriptions.add(it) } .also { subscriptions.add(it) }
override fun <T> untilDestroy( override fun <T> Observable<T>.untilDestroy(
observable: Observable<T>,
onNextAction: (T) -> Unit, onNextAction: (T) -> Unit,
onErrorAction: (Throwable) -> Unit, onErrorAction: (Throwable) -> Unit,
onCompletedAction: () -> Unit onCompletedAction: () -> Unit
): Disposable = observable ): Disposable = observeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onNextAction, onErrorAction, onCompletedAction) .subscribe(onNextAction, onErrorAction, onCompletedAction)
.also { subscriptions.add(it) } .also { subscriptions.add(it) }
override fun <T> untilDestroy( override fun <T> Single<T>.untilDestroy(
single: Single<T>,
onSuccessAction: (T) -> Unit, onSuccessAction: (T) -> Unit,
onErrorAction: (Throwable) -> Unit onErrorAction: (Throwable) -> Unit
): Disposable = single ): Disposable = observeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onSuccessAction, onErrorAction) .subscribe(onSuccessAction, onErrorAction)
.also { subscriptions.add(it) } .also { subscriptions.add(it) }
override fun untilDestroy( override fun Completable.untilDestroy(
completable: Completable,
onCompletedAction: () -> Unit, onCompletedAction: () -> Unit,
onErrorAction: (Throwable) -> Unit onErrorAction: (Throwable) -> Unit
): Disposable = completable ): Disposable = observeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onCompletedAction, onErrorAction) .subscribe(onCompletedAction, onErrorAction)
.also { subscriptions.add(it) } .also { subscriptions.add(it) }
override fun <T> untilDestroy( override fun <T> Maybe<T>.untilDestroy(
maybe: Maybe<T>,
onSuccessAction: (T) -> Unit, onSuccessAction: (T) -> Unit,
onErrorAction: (Throwable) -> Unit, onErrorAction: (Throwable) -> Unit,
onCompletedAction: () -> Unit onCompletedAction: () -> Unit
): Disposable = maybe ): Disposable = observeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onSuccessAction, onErrorAction, onCompletedAction) .subscribe(onSuccessAction, onErrorAction, onCompletedAction)
.also { subscriptions.add(it) } .also { subscriptions.add(it) }

View File

@ -40,8 +40,7 @@ interface Destroyable {
* @param T Type of emitted by observable items; * @param T Type of emitted by observable items;
* @return [Disposable] which is wrapping source observable to unsubscribe from it onDestroy. * @return [Disposable] which is wrapping source observable to unsubscribe from it onDestroy.
*/ */
fun <T> untilDestroy( fun <T> Flowable<T>.untilDestroy(
flowable: Flowable<T>,
onNextAction: (T) -> Unit = Functions.emptyConsumer<T>()::accept, onNextAction: (T) -> Unit = Functions.emptyConsumer<T>()::accept,
onErrorAction: (Throwable) -> Unit = getActionThrowableForAssertion(Lc.getCodePoint(this, 2)), onErrorAction: (Throwable) -> Unit = getActionThrowableForAssertion(Lc.getCodePoint(this, 2)),
onCompletedAction: () -> Unit = Functions.EMPTY_ACTION::run onCompletedAction: () -> Unit = Functions.EMPTY_ACTION::run
@ -58,8 +57,7 @@ interface Destroyable {
* @param T Type of emitted by observable items; * @param T Type of emitted by observable items;
* @return [Disposable] which is wrapping source observable to unsubscribe from it onDestroy. * @return [Disposable] which is wrapping source observable to unsubscribe from it onDestroy.
*/ */
fun <T> untilDestroy( fun <T> Observable<T>.untilDestroy(
observable: Observable<T>,
onNextAction: (T) -> Unit = Functions.emptyConsumer<T>()::accept, onNextAction: (T) -> Unit = Functions.emptyConsumer<T>()::accept,
onErrorAction: (Throwable) -> Unit = getActionThrowableForAssertion(Lc.getCodePoint(this, 2)), onErrorAction: (Throwable) -> Unit = getActionThrowableForAssertion(Lc.getCodePoint(this, 2)),
onCompletedAction: () -> Unit = Functions.EMPTY_ACTION::run onCompletedAction: () -> Unit = Functions.EMPTY_ACTION::run
@ -76,8 +74,7 @@ interface Destroyable {
* @param T Type of emitted by single items; * @param T Type of emitted by single items;
* @return [Disposable] which is wrapping source single to unsubscribe from it onDestroy. * @return [Disposable] which is wrapping source single to unsubscribe from it onDestroy.
*/ */
fun <T> untilDestroy( fun <T> Single<T>.untilDestroy(
single: Single<T>,
onSuccessAction: (T) -> Unit = Functions.emptyConsumer<T>()::accept, onSuccessAction: (T) -> Unit = Functions.emptyConsumer<T>()::accept,
onErrorAction: (Throwable) -> Unit = getActionThrowableForAssertion(Lc.getCodePoint(this, 2)) onErrorAction: (Throwable) -> Unit = getActionThrowableForAssertion(Lc.getCodePoint(this, 2))
): Disposable ): Disposable
@ -92,8 +89,7 @@ interface Destroyable {
* @param onErrorAction Action which will raise on every [io.reactivex.CompletableEmitter.onError] throwable; * @param onErrorAction Action which will raise on every [io.reactivex.CompletableEmitter.onError] throwable;
* @return [Disposable] which is wrapping source completable to unsubscribe from it onDestroy. * @return [Disposable] which is wrapping source completable to unsubscribe from it onDestroy.
*/ */
fun untilDestroy( fun Completable.untilDestroy(
completable: Completable,
onCompletedAction: () -> Unit = Functions.EMPTY_ACTION::run, onCompletedAction: () -> Unit = Functions.EMPTY_ACTION::run,
onErrorAction: (Throwable) -> Unit = getActionThrowableForAssertion(Lc.getCodePoint(this, 2)) onErrorAction: (Throwable) -> Unit = getActionThrowableForAssertion(Lc.getCodePoint(this, 2))
): Disposable ): Disposable
@ -108,8 +104,7 @@ interface Destroyable {
* @param onErrorAction Action which will raise on every [io.reactivex.MaybeEmitter.onError] throwable; * @param onErrorAction Action which will raise on every [io.reactivex.MaybeEmitter.onError] throwable;
* @return [Disposable] which is wrapping source maybe to unsubscribe from it onDestroy. * @return [Disposable] which is wrapping source maybe to unsubscribe from it onDestroy.
*/ */
fun <T> untilDestroy( fun <T> Maybe<T>.untilDestroy(
maybe: Maybe<T>,
onSuccessAction: (T) -> Unit = Functions.emptyConsumer<T>()::accept, onSuccessAction: (T) -> Unit = Functions.emptyConsumer<T>()::accept,
onErrorAction: (Throwable) -> Unit = getActionThrowableForAssertion(Lc.getCodePoint(this, 2)), onErrorAction: (Throwable) -> Unit = getActionThrowableForAssertion(Lc.getCodePoint(this, 2)),
onCompletedAction: () -> Unit = Functions.EMPTY_ACTION::run onCompletedAction: () -> Unit = Functions.EMPTY_ACTION::run