From 9872845aa6fa7ea963beae437cc7afae9029c070 Mon Sep 17 00:00:00 2001 From: Ilia Kurtov Date: Wed, 22 Mar 2017 20:02:46 +0300 Subject: [PATCH 1/5] added singles and completables to lifecycle bindable --- .../components/utils/LifecycleBindable.java | 186 ++++++++++++++++-- 1 file changed, 172 insertions(+), 14 deletions(-) diff --git a/src/main/java/ru/touchin/roboswag/components/utils/LifecycleBindable.java b/src/main/java/ru/touchin/roboswag/components/utils/LifecycleBindable.java index 2a4157a..5e2c778 100644 --- a/src/main/java/ru/touchin/roboswag/components/utils/LifecycleBindable.java +++ b/src/main/java/ru/touchin/roboswag/components/utils/LifecycleBindable.java @@ -21,7 +21,9 @@ package ru.touchin.roboswag.components.utils; import android.support.annotation.NonNull; +import rx.Completable; import rx.Observable; +import rx.Single; import rx.Subscriber; import rx.Subscription; import rx.functions.Action0; @@ -39,7 +41,7 @@ import rx.functions.Func1; public interface LifecycleBindable { /** - * Method should be used to subscribe to observable while this element is in started state. + * Method should be used to subscribe to the observable while this element is in started state. * Passed observable should NOT emit errors. It is illegal as in that case it stops emitting items and binding lost after error. * If you want to process errors return something via {@link Observable#onErrorReturn(Func1)} method and process in onNextAction. * @@ -53,7 +55,7 @@ public interface LifecycleBindable { /** * Method should be used to guarantee that observable won't be subscribed after onStop. - * It is automatically subscribing to observable. + * It is automatically subscribing to the observable. * Usually it is using to stop requests/execution while element is off or to not do illegal actions after onStop like fragment's stack changing. * Don't forget to process errors if observable can emit them. * @@ -66,7 +68,7 @@ public interface LifecycleBindable { /** * Method should be used to guarantee that observable won't be subscribed after onStop. - * It is automatically subscribing to observable and calls onNextAction on every emitted item. + * It is automatically subscribing to the observable and calls onNextAction on every emitted item. * Usually it is using to stop requests/execution while element is off or to not do illegal actions after onStop like fragment's stack changing. * Don't forget to process errors if observable can emit them. * @@ -80,7 +82,7 @@ public interface LifecycleBindable { /** * Method should be used to guarantee that observable won't be subscribed after onStop. - * It is automatically subscribing to observable and calls onNextAction, onErrorAction on observable events. + * It is automatically subscribing to the observable and calls onNextAction and onErrorAction on observable events. * Usually it is using to stop requests/execution while element is off or to not do illegal actions after onStop like fragment's stack changing. * Don't forget to process errors if observable can emit them. * @@ -95,7 +97,7 @@ public interface LifecycleBindable { /** * Method should be used to guarantee that observable won't be subscribed after onStop. - * It is automatically subscribing to observable and calls onNextAction, onErrorAction, onCompletedAction on observable events. + * It is automatically subscribing to the observable and calls onNextAction, onErrorAction and onCompletedAction on observable events. * Usually it is using to stop requests/execution while element is off or to not do illegal actions after onStop like fragment's stack changing. * Don't forget to process errors if observable can emit them. * @@ -104,54 +106,135 @@ public interface LifecycleBindable { * @param onErrorAction Action which will raise on every {@link Subscriber#onError(Throwable)} throwable; * @param onCompletedAction Action which will raise at {@link Subscriber#onCompleted()} on completion of observable; * @param Type of emitted by observable items; - * @return {@link Observable} which is wrapping source observable to unsubscribe from it onStop. + * @return {@link Subscription} which is wrapping source observable to unsubscribe from it onStop. */ @NonNull Subscription untilStop(@NonNull Observable observable, @NonNull Action1 onNextAction, @NonNull Action1 onErrorAction, @NonNull Action0 onCompletedAction); + /** + * Method should be used to guarantee that single won't be subscribed after onStop. + * It is automatically subscribing to the single. + * Usually it is using to stop requests/execution while element is off or to not do illegal actions after onStop like fragment's stack changing. + * Don't forget to process errors if single can emit them. + * + * @param single {@link Single} to subscribe until onStop; + * @param Type of emitted by single item; + * @return {@link Subscription} which will unsubscribes from single onStop. + */ + @NonNull + Subscription untilStop(@NonNull Single single); + + /** + * Method should be used to guarantee that single won't be subscribed after onStop. + * It is automatically subscribing to the single and calls onSuccessAction on the emitted item. + * Usually it is using to stop requests/execution while element is off or to not do illegal actions after onStop like fragment's stack changing. + * Don't forget to process errors if single can emit them. + * + * @param single {@link Single} to subscribe until onStop; + * @param onSuccessAction Action which will raise on every {@link rx.SingleSubscriber#onSuccess(Object)} item; + * @param Type of emitted by single item; + * @return {@link Subscription} which will unsubscribes from single onStop. + */ + @NonNull + Subscription untilStop(@NonNull Single single, @NonNull Action1 onSuccessAction); + + /** + * Method should be used to guarantee that single won't be subscribed after onStop. + * It is automatically subscribing to the single and calls onSuccessAction and onErrorAction on single events. + * Usually it is using to stop requests/execution while element is off or to not do illegal actions after onStop like fragment's stack changing. + * Don't forget to process errors if single can emit them. + * + * @param single {@link Single} to subscribe until onStop; + * @param onSuccessAction Action which will raise on every {@link rx.SingleSubscriber#onSuccess(Object)} item; + * @param onErrorAction Action which will raise on every {@link rx.SingleSubscriber#onError(Throwable)} throwable; + * @param Type of emitted by observable items; + * @return {@link Subscription} which is wrapping source single to unsubscribe from it onStop. + */ + @NonNull + Subscription untilStop(@NonNull Single single, @NonNull Action1 onSuccessAction, @NonNull Action1 onErrorAction); + + /** + * Method should be used to guarantee that completable won't be subscribed after onStop. + * It is automatically subscribing to the completable. + * Usually it is using to stop requests/execution while element is off or to not do illegal actions after onStop like fragment's stack changing. + * Don't forget to process errors if completable can emit them. + * + * @param completable {@link Completable} to subscribe until onStop; + * @return {@link Subscription} which will unsubscribes from completable onStop. + */ + @NonNull + Subscription untilStop(@NonNull Completable completable); + + /** + * Method should be used to guarantee that completable won't be subscribed after onStop. + * It is automatically subscribing to the completable and calls onCompletedAction on completable item. + * Usually it is using to stop requests/execution while element is off or to not do illegal actions after onStop like fragment's stack changing. + * Don't forget to process errors if completable can emit them. + * + * @param completable {@link Completable} to subscribe until onStop; + * @param onCompletedAction Action which will raise at {@link rx.Completable.CompletableSubscriber#onCompleted()} on completion of observable; + * @return {@link Subscription} which is wrapping source completable to unsubscribe from it onStop. + */ + @NonNull + Subscription untilStop(@NonNull Completable completable, @NonNull Action0 onCompletedAction); + + /** + * Method should be used to guarantee that completable won't be subscribed after onStop. + * It is automatically subscribing to the completable and calls onCompletedAction and onErrorAction on completable item. + * Usually it is using to stop requests/execution while element is off or to not do illegal actions after onStop like fragment's stack changing. + * Don't forget to process errors if completable can emit them. + * + * @param completable {@link Completable} to subscribe until onStop; + * @param onCompletedAction Action which will raise at {@link rx.Completable.CompletableSubscriber#onCompleted()} on completion of observable; + * @param onErrorAction Action which will raise on every {@link rx.Completable.CompletableSubscriber#onError(Throwable)} throwable; + * @return {@link Subscription} which is wrapping source completable to unsubscribe from it onStop. + */ + @NonNull + Subscription untilStop(@NonNull Completable completable, @NonNull Action0 onCompletedAction, @NonNull Action1 onErrorAction); + /** * Method should be used to guarantee that observable won't be subscribed after onDestroy. - * It is automatically subscribing to observable. + * It is automatically subscribing to the observable. * Don't forget to process errors if observable can emit them. * * @param observable {@link Observable} to subscribe until onDestroy; * @param Type of emitted by observable items; - * @return {@link Observable} which is wrapping source observable to unsubscribe from it onDestroy. + * @return {@link Subscription} which is wrapping source observable to unsubscribe from it onDestroy. */ @NonNull Subscription untilDestroy(@NonNull Observable observable); /** * Method should be used to guarantee that observable won't be subscribed after onDestroy. - * It is automatically subscribing to observable and calls onNextAction on every emitted item. + * It is automatically subscribing to the observable and calls onNextAction on every emitted item. * Don't forget to process errors if observable can emit them. * * @param observable {@link Observable} to subscribe until onDestroy; * @param onNextAction Action which will raise on every {@link Subscriber#onNext(Object)} item; * @param Type of emitted by observable items; - * @return {@link Observable} which is wrapping source observable to unsubscribe from it onDestroy. + * @return {@link Subscription} which is wrapping source observable to unsubscribe from it onDestroy. */ @NonNull Subscription untilDestroy(@NonNull Observable observable, @NonNull Action1 onNextAction); /** * Method should be used to guarantee that observable won't be subscribed after onDestroy. - * It is automatically subscribing to observable and calls onNextAction, onErrorAction on observable events. + * It is automatically subscribing to the observable and calls onNextAction and onErrorAction on observable events. * Don't forget to process errors if observable can emit them. * * @param observable {@link Observable} to subscribe until onDestroy; * @param onNextAction Action which will raise on every {@link Subscriber#onNext(Object)} item; * @param onErrorAction Action which will raise on every {@link Subscriber#onError(Throwable)} throwable; * @param Type of emitted by observable items; - * @return {@link Observable} which is wrapping source observable to unsubscribe from it onDestroy. + * @return {@link Subscription} which is wrapping source observable to unsubscribe from it onDestroy. */ @NonNull Subscription untilDestroy(@NonNull Observable observable, @NonNull Action1 onNextAction, @NonNull Action1 onErrorAction); /** * Method should be used to guarantee that observable won't be subscribed after onDestroy. - * It is automatically subscribing to observable and calls onNextAction, onErrorAction, onCompletedAction on observable events. + * It is automatically subscribing to the observable and calls onNextAction, onErrorAction and onCompletedAction on observable events. * Don't forget to process errors if observable can emit them. * * @param observable {@link Observable} to subscribe until onDestroy; @@ -159,10 +242,85 @@ public interface LifecycleBindable { * @param onErrorAction Action which will raise on every {@link Subscriber#onError(Throwable)} throwable; * @param onCompletedAction Action which will raise at {@link Subscriber#onCompleted()} on completion of observable; * @param Type of emitted by observable items; - * @return {@link Observable} which is wrapping source observable to unsubscribe from it onDestroy. + * @return {@link Subscription} which is wrapping source observable to unsubscribe from it onDestroy. */ @NonNull Subscription untilDestroy(@NonNull Observable observable, @NonNull Action1 onNextAction, @NonNull Action1 onErrorAction, @NonNull Action0 onCompletedAction); + /** + * Method should be used to guarantee that single won't be subscribed after onDestroy. + * It is automatically subscribing to the single. + * Don't forget to process errors if single can emit them. + * + * @param single {@link Single} to subscribe until onDestroy; + * @param Type of emitted by single items; + * @return {@link Subscription} which is wrapping source single to unsubscribe from it onDestroy. + */ + @NonNull + Subscription untilDestroy(@NonNull Single single); + + /** + * Method should be used to guarantee that single won't be subscribed after onDestroy. + * It is automatically subscribing to the single and calls onSuccessAction on every emitted item. + * Don't forget to process errors if single can emit them. + * + * @param single {@link Single} to subscribe until onDestroy; + * @param onSuccessAction Action which will raise on every {@link rx.SingleSubscriber#onSuccess(Object)} item; + * @param Type of emitted by single items; + * @return {@link Subscription} which is wrapping source single to unsubscribe from it onDestroy. + */ + @NonNull + Subscription untilDestroy(@NonNull Single single, @NonNull Action1 onSuccessAction); + + /** + * Method should be used to guarantee that single won't be subscribed after onDestroy. + * It is automatically subscribing to the single and calls onSuccessAction and onErrorAction on single events. + * Don't forget to process errors if single can emit them. + * + * @param single {@link Single} to subscribe until onDestroy; + * @param onSuccessAction Action which will raise on every {@link rx.SingleSubscriber#onSuccess(Object)} item; + * @param onErrorAction Action which will raise on every {@link rx.SingleSubscriber#onError(Throwable)} throwable; + * @param Type of emitted by single items; + * @return {@link Subscription} which is wrapping source single to unsubscribe from it onDestroy. + */ + @NonNull + Subscription untilDestroy(@NonNull Single single, @NonNull Action1 onSuccessAction, @NonNull Action1 onErrorAction); + + /** + * Method should be used to guarantee that completable won't be subscribed after onDestroy. + * It is automatically subscribing to the completable. + * Don't forget to process errors if completable can emit them. + * + * @param completable {@link Completable} to subscribe until onDestroy; + * @return {@link Subscription} which is wrapping source completable to unsubscribe from it onDestroy. + */ + @NonNull + Subscription untilDestroy(@NonNull Completable completable); + + /** + * Method should be used to guarantee that completable won't be subscribed after onDestroy. + * It is automatically subscribing to the completable and calls onCompletedAction on completable item. + * Don't forget to process errors if single can emit them. + * + * @param completable {@link Completable} to subscribe until onDestroy; + * @param onCompletedAction Action which will raise on every {@link Completable.CompletableSubscriber#onCompleted()} item; + * @return {@link Subscription} which is wrapping source single to unsubscribe from it onDestroy. + */ + @NonNull + Subscription untilDestroy(@NonNull Completable completable, @NonNull Action0 onCompletedAction); + + /** + * Method should be used to guarantee that completable won't be subscribed after onDestroy. + * It is automatically subscribing to the completable and calls onCompletedAction and onErrorAction on completable events. + * Don't forget to process errors if completable can emit them. + * + * @param completable {@link Completable} to subscribe until onDestroy; + * @param onCompletedAction Action which will raise on every {@link Completable.CompletableSubscriber#onCompleted()} item; + * @param onErrorAction Action which will raise on every {@link rx.Completable.CompletableSubscriber#onError(Throwable)} throwable; + * @return {@link Subscription} which is wrapping source completable to unsubscribe from it onDestroy. + */ + @NonNull + Subscription untilDestroy(@NonNull Completable completable, @NonNull Action0 onCompletedAction, @NonNull Action1 onErrorAction); + } From 7cda509e8215506a3f1ff125dbe2524a96c6e639 Mon Sep 17 00:00:00 2001 From: Ilia Kurtov Date: Wed, 22 Mar 2017 20:03:19 +0300 Subject: [PATCH 2/5] added singles and completables to LifecycleBindab --- .../ru/touchin/roboswag/components/utils/LifecycleBindable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/ru/touchin/roboswag/components/utils/LifecycleBindable.java b/src/main/java/ru/touchin/roboswag/components/utils/LifecycleBindable.java index 5e2c778..26a932d 100644 --- a/src/main/java/ru/touchin/roboswag/components/utils/LifecycleBindable.java +++ b/src/main/java/ru/touchin/roboswag/components/utils/LifecycleBindable.java @@ -323,4 +323,4 @@ public interface LifecycleBindable { @NonNull Subscription untilDestroy(@NonNull Completable completable, @NonNull Action0 onCompletedAction, @NonNull Action1 onErrorAction); -} +} \ No newline at end of file From a135d5ac16fa16e57603ee45f67e54d30b25ee9b Mon Sep 17 00:00:00 2001 From: Ilia Kurtov Date: Wed, 22 Mar 2017 20:36:08 +0300 Subject: [PATCH 3/5] new methods for BaseLifecycleBindable --- .../adapters/BindableViewHolder.java | 82 +++++++++ .../components/navigation/ViewController.java | 82 +++++++++ .../utils/BaseLifecycleBindable.java | 172 ++++++++++++++++-- 3 files changed, 321 insertions(+), 15 deletions(-) diff --git a/src/main/java/ru/touchin/roboswag/components/adapters/BindableViewHolder.java b/src/main/java/ru/touchin/roboswag/components/adapters/BindableViewHolder.java index 227d23d..37c2e28 100644 --- a/src/main/java/ru/touchin/roboswag/components/adapters/BindableViewHolder.java +++ b/src/main/java/ru/touchin/roboswag/components/adapters/BindableViewHolder.java @@ -33,7 +33,9 @@ import android.view.View; import ru.touchin.roboswag.components.utils.LifecycleBindable; import ru.touchin.roboswag.core.utils.ShouldNotHappenException; +import rx.Completable; import rx.Observable; +import rx.Single; import rx.Subscription; import rx.functions.Action0; import rx.functions.Action1; @@ -154,6 +156,46 @@ public class BindableViewHolder extends RecyclerView.ViewHolder implements Lifec return baseLifecycleBindable.untilStop(observable, onNextAction, onErrorAction, onCompletedAction); } + @NonNull + @Override + public Subscription untilStop(@NonNull final Single single) { + return baseLifecycleBindable.untilStop(single); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Single single, @NonNull final Action1 onSuccessAction) { + return baseLifecycleBindable.untilStop(single, onSuccessAction); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Single single, + @NonNull final Action1 onSuccessAction, + @NonNull final Action1 onErrorAction) { + return baseLifecycleBindable.untilStop(single, onSuccessAction, onErrorAction); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Completable completable) { + return baseLifecycleBindable.untilStop(completable); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Completable completable, @NonNull final Action0 onCompletedAction) { + return baseLifecycleBindable.untilStop(completable, onCompletedAction); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Completable completable, + @NonNull final Action0 onCompletedAction, + @NonNull final Action1 onErrorAction) { + return baseLifecycleBindable.untilStop(completable, onCompletedAction, onErrorAction); + } + @NonNull @Override public Subscription untilDestroy(@NonNull final Observable observable) { @@ -183,4 +225,44 @@ public class BindableViewHolder extends RecyclerView.ViewHolder implements Lifec return baseLifecycleBindable.untilDestroy(observable, onNextAction, onErrorAction, onCompletedAction); } + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Single single) { + return baseLifecycleBindable.untilDestroy(single); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Single single, @NonNull final Action1 onSuccessAction) { + return baseLifecycleBindable.untilDestroy(single, onSuccessAction); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Single single, + @NonNull final Action1 onSuccessAction, + @NonNull final Action1 onErrorAction) { + return baseLifecycleBindable.untilDestroy(single, onSuccessAction, onErrorAction); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Completable completable) { + return baseLifecycleBindable.untilDestroy(completable); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Completable completable, @NonNull final Action0 onCompletedAction) { + return baseLifecycleBindable.untilDestroy(completable, onCompletedAction); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Completable completable, + @NonNull final Action0 onCompletedAction, + @NonNull final Action1 onErrorAction) { + return baseLifecycleBindable.untilDestroy(completable, onCompletedAction, onErrorAction); + } + } diff --git a/src/main/java/ru/touchin/roboswag/components/navigation/ViewController.java b/src/main/java/ru/touchin/roboswag/components/navigation/ViewController.java index a1d92a7..5d932cb 100644 --- a/src/main/java/ru/touchin/roboswag/components/navigation/ViewController.java +++ b/src/main/java/ru/touchin/roboswag/components/navigation/ViewController.java @@ -44,7 +44,9 @@ import ru.touchin.roboswag.components.utils.LifecycleBindable; import ru.touchin.roboswag.components.utils.UiUtils; import ru.touchin.roboswag.core.log.Lc; import ru.touchin.roboswag.core.utils.ShouldNotHappenException; +import rx.Completable; import rx.Observable; +import rx.Single; import rx.Subscription; import rx.functions.Action0; import rx.functions.Action1; @@ -372,6 +374,46 @@ public class ViewController, return baseLifecycleBindable.untilStop(observable, onNextAction, onErrorAction, onCompletedAction); } + @NonNull + @Override + public Subscription untilStop(@NonNull final Single single) { + return baseLifecycleBindable.untilStop(single); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Single single, @NonNull final Action1 onSuccessAction) { + return baseLifecycleBindable.untilStop(single, onSuccessAction); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Single single, + @NonNull final Action1 onSuccessAction, + @NonNull final Action1 onErrorAction) { + return baseLifecycleBindable.untilStop(single, onSuccessAction, onErrorAction); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Completable completable) { + return baseLifecycleBindable.untilStop(completable); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Completable completable, @NonNull final Action0 onCompletedAction) { + return baseLifecycleBindable.untilStop(completable, onCompletedAction); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Completable completable, + @NonNull final Action0 onCompletedAction, + @NonNull final Action1 onErrorAction) { + return baseLifecycleBindable.untilStop(completable, onCompletedAction, onErrorAction); + } + @NonNull @Override public Subscription untilDestroy(@NonNull final Observable observable) { @@ -401,6 +443,46 @@ public class ViewController, return baseLifecycleBindable.untilDestroy(observable, onNextAction, onErrorAction, onCompletedAction); } + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Single single) { + return baseLifecycleBindable.untilDestroy(single); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Single single, @NonNull final Action1 onSuccessAction) { + return baseLifecycleBindable.untilDestroy(single, onSuccessAction); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Single single, + @NonNull final Action1 onSuccessAction, + @NonNull final Action1 onErrorAction) { + return baseLifecycleBindable.untilDestroy(single, onSuccessAction, onErrorAction); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Completable completable) { + return baseLifecycleBindable.untilDestroy(completable); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Completable completable, @NonNull final Action0 onCompletedAction) { + return baseLifecycleBindable.untilDestroy(completable, onCompletedAction); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Completable completable, + @NonNull final Action0 onCompletedAction, + @NonNull final Action1 onErrorAction) { + return baseLifecycleBindable.untilDestroy(completable, onCompletedAction, onErrorAction); + } + @SuppressWarnings("CPD-END") //CPD: it is same as in other implementation based on BaseLifecycleBindable /** diff --git a/src/main/java/ru/touchin/roboswag/components/utils/BaseLifecycleBindable.java b/src/main/java/ru/touchin/roboswag/components/utils/BaseLifecycleBindable.java index 59106de..124a508 100644 --- a/src/main/java/ru/touchin/roboswag/components/utils/BaseLifecycleBindable.java +++ b/src/main/java/ru/touchin/roboswag/components/utils/BaseLifecycleBindable.java @@ -23,7 +23,9 @@ import android.support.annotation.NonNull; import ru.touchin.roboswag.core.log.Lc; import ru.touchin.roboswag.core.utils.ShouldNotHappenException; +import rx.Completable; import rx.Observable; +import rx.Single; import rx.Subscription; import rx.android.schedulers.AndroidSchedulers; import rx.exceptions.OnErrorThrowable; @@ -38,6 +40,9 @@ import rx.subjects.BehaviorSubject; */ public class BaseLifecycleBindable implements LifecycleBindable { + private static final String UNTIL_DESTROY_METHOD = "untilDestroy"; + private static final String UNTIL_STOP_METHOD = "untilStop"; + @NonNull private final BehaviorSubject isCreatedSubject = BehaviorSubject.create(); @NonNull @@ -77,26 +82,21 @@ public class BaseLifecycleBindable implements LifecycleBindable { final String codePoint = Lc.getCodePoint(this, 2); return isStartedSubject.switchMap(started -> started ? observable.observeOn(AndroidSchedulers.mainThread()) : Observable.never()) .takeUntil(isCreatedSubject.filter(created -> !created)) - .subscribe(onNextAction, - throwable -> Lc.assertion(new ShouldNotHappenException("Unexpected error on untilStop at " + codePoint, throwable))); + .subscribe(onNextAction, getActionThrowableForAssertion(codePoint, UNTIL_STOP_METHOD)); } @NonNull @Override public Subscription untilStop(@NonNull final Observable observable) { final String codePoint = Lc.getCodePoint(this, 2); - return untilStop(observable, Actions.empty(), - throwable -> Lc.assertion(new ShouldNotHappenException("Unexpected error on untilStop at " + codePoint, throwable)), - Actions.empty()); + return untilStop(observable, Actions.empty(), getActionThrowableForAssertion(codePoint, UNTIL_STOP_METHOD), Actions.empty()); } @NonNull @Override public Subscription untilStop(@NonNull final Observable observable, @NonNull final Action1 onNextAction) { final String codePoint = Lc.getCodePoint(this, 2); - return untilStop(observable, onNextAction, - throwable -> Lc.assertion(new ShouldNotHappenException("Unexpected error on untilStop at " + codePoint, throwable)), - Actions.empty()); + return untilStop(observable, onNextAction, getActionThrowableForAssertion(codePoint, UNTIL_STOP_METHOD), Actions.empty()); } @NonNull @@ -116,13 +116,56 @@ public class BaseLifecycleBindable implements LifecycleBindable { return until(observable, isStartedSubject.map(started -> !started), onNextAction, onErrorAction, onCompletedAction); } + @NonNull + @Override + public Subscription untilStop(@NonNull final Single single) { + final String codePoint = Lc.getCodePoint(this, 2); + return untilStop(single, Actions.empty(), getActionThrowableForAssertion(codePoint, UNTIL_STOP_METHOD)); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Single single, @NonNull final Action1 onSuccessAction) { + final String codePoint = Lc.getCodePoint(this, 2); + return untilStop(single, onSuccessAction, getActionThrowableForAssertion(codePoint, UNTIL_STOP_METHOD)); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Single single, + @NonNull final Action1 onSuccessAction, + @NonNull final Action1 onErrorAction) { + return until(single, isStartedSubject.map(started -> !started), onSuccessAction, onErrorAction); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Completable completable) { + final String codePoint = Lc.getCodePoint(this, 2); + return untilStop(completable, Actions.empty(), getActionThrowableForAssertion(codePoint, UNTIL_STOP_METHOD)); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Completable completable, + @NonNull final Action0 onCompletedAction) { + final String codePoint = Lc.getCodePoint(this, 2); + return untilStop(completable, onCompletedAction, getActionThrowableForAssertion(codePoint, UNTIL_STOP_METHOD)); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Completable completable, + @NonNull final Action0 onCompletedAction, + @NonNull final Action1 onErrorAction) { + return until(completable, isStartedSubject.map(started -> !started), onCompletedAction, onErrorAction); + } + @NonNull @Override public Subscription untilDestroy(@NonNull final Observable observable) { final String codePoint = Lc.getCodePoint(this, 2); - return untilDestroy(observable, Actions.empty(), - throwable -> Lc.assertion(new ShouldNotHappenException("Unexpected error on untilDestroy at " + codePoint, throwable)), - Actions.empty()); + return untilDestroy(observable, Actions.empty(), getActionThrowableForAssertion(codePoint, UNTIL_DESTROY_METHOD), Actions.empty()); } @NonNull @@ -130,9 +173,7 @@ public class BaseLifecycleBindable implements LifecycleBindable { public Subscription untilDestroy(@NonNull final Observable observable, @NonNull final Action1 onNextAction) { final String codePoint = Lc.getCodePoint(this, 2); - return untilDestroy(observable, onNextAction, - throwable -> Lc.assertion(new ShouldNotHappenException("Unexpected error on untilDestroy at " + codePoint, throwable)), - Actions.empty()); + return untilDestroy(observable, onNextAction, getActionThrowableForAssertion(codePoint, UNTIL_DESTROY_METHOD), Actions.empty()); } @NonNull @@ -152,6 +193,51 @@ public class BaseLifecycleBindable implements LifecycleBindable { return until(observable, isCreatedSubject.map(created -> !created), onNextAction, onErrorAction, onCompletedAction); } + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Single single) { + final String codePoint = Lc.getCodePoint(this, 2); + return untilDestroy(single, Actions.empty(), getActionThrowableForAssertion(codePoint, UNTIL_DESTROY_METHOD)); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Single single, @NonNull final Action1 onSuccessAction) { + final String codePoint = Lc.getCodePoint(this, 2); + return untilDestroy(single, onSuccessAction, getActionThrowableForAssertion(codePoint, UNTIL_DESTROY_METHOD)); + } + + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Single single, + @NonNull final Action1 onSuccessAction, + @NonNull final Action1 onErrorAction) { + return until(single, isCreatedSubject.map(created -> !created), onSuccessAction, onErrorAction); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Completable completable) { + final String codePoint = Lc.getCodePoint(this, 2); + return untilDestroy(completable, Actions.empty(), getActionThrowableForAssertion(codePoint, UNTIL_DESTROY_METHOD)); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Completable completable, @NonNull final Action0 onCompletedAction) { + final String codePoint = Lc.getCodePoint(this, 2); + return untilDestroy(completable, onCompletedAction, getActionThrowableForAssertion(codePoint, UNTIL_DESTROY_METHOD)); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Completable completable, + @NonNull final Action0 onCompletedAction, + @NonNull final Action1 onErrorAction) { + return until(completable, isCreatedSubject.map(created -> !created), onCompletedAction, onErrorAction); + } + @NonNull private Subscription until(@NonNull final Observable observable, @NonNull final Observable conditionSubject, @@ -178,4 +264,60 @@ public class BaseLifecycleBindable implements LifecycleBindable { }); } -} + @NonNull + private Subscription until(@NonNull final Single single, + @NonNull final Observable conditionSubject, + @NonNull final Action1 onSuccessAction, + @NonNull final Action1 onErrorAction) { + final Single actualSingle; + if (onSuccessAction == Actions.empty() && onErrorAction == (Action1) Actions.empty()) { + actualSingle = single; + } else { + actualSingle = single.observeOn(AndroidSchedulers.mainThread()); + } + return isCreatedSubject.first() + .flatMap(created -> created ? actualSingle.toObservable() : Observable.empty()) + .takeUntil(conditionSubject.filter(condition -> condition)) + .toSingle() + .subscribe(onSuccessAction, throwable -> { + final boolean isRxError = throwable instanceof OnErrorThrowable; + if ((!isRxError && throwable instanceof RuntimeException) + || (isRxError && throwable.getCause() instanceof RuntimeException)) { + Lc.assertion(throwable); + } + onErrorAction.call(throwable); + }); + } + + + @NonNull + private Subscription until(@NonNull final Completable completable, + @NonNull final Observable conditionSubject, + @NonNull final Action0 onCompletedAction, + @NonNull final Action1 onErrorAction) { + final Completable actualCompletable; + if (onCompletedAction == Actions.empty() && onErrorAction == (Action1) Actions.empty()) { + actualCompletable = completable; + } else { + actualCompletable = completable.observeOn(AndroidSchedulers.mainThread()); + } + return isCreatedSubject.first() + .flatMap(created -> created ? actualCompletable.toObservable() : Observable.empty()) + .takeUntil(conditionSubject.filter(condition -> condition)) + .toCompletable() + .subscribe(throwable -> { + final boolean isRxError = throwable instanceof OnErrorThrowable; + if ((!isRxError && throwable instanceof RuntimeException) + || (isRxError && throwable.getCause() instanceof RuntimeException)) { + Lc.assertion(throwable); + } + onErrorAction.call(throwable); + }, onCompletedAction); + } + + @NonNull + private Action1 getActionThrowableForAssertion(@NonNull final String codePoint, @NonNull final String method) { + return throwable -> Lc.assertion(new ShouldNotHappenException("Unexpected error on " + method + " at " + codePoint, throwable)); + } + +} \ No newline at end of file From 031648451f445a7de38e359c02e1f8695231872e Mon Sep 17 00:00:00 2001 From: Ilia Kurtov Date: Wed, 22 Mar 2017 21:21:31 +0300 Subject: [PATCH 4/5] static --- .../adapters/BindableViewHolder.java | 1 + .../components/navigation/ViewController.java | 2 +- .../navigation/activities/BaseActivity.java | 82 +++++++++++++++++++ .../utils/BaseLifecycleBindable.java | 1 + .../components/utils/LifecycleBindable.java | 22 ++--- 5 files changed, 97 insertions(+), 11 deletions(-) diff --git a/src/main/java/ru/touchin/roboswag/components/adapters/BindableViewHolder.java b/src/main/java/ru/touchin/roboswag/components/adapters/BindableViewHolder.java index 37c2e28..2748721 100644 --- a/src/main/java/ru/touchin/roboswag/components/adapters/BindableViewHolder.java +++ b/src/main/java/ru/touchin/roboswag/components/adapters/BindableViewHolder.java @@ -44,6 +44,7 @@ import rx.functions.Action1; * Created by Gavriil Sitnikov on 12/8/2016. * ViewHolder that implements {@link LifecycleBindable} and uses parent bindable object as bridge (Activity, ViewController etc.). */ +@SuppressWarnings("PMD.TooManyMethods") public class BindableViewHolder extends RecyclerView.ViewHolder implements LifecycleBindable { @NonNull diff --git a/src/main/java/ru/touchin/roboswag/components/navigation/ViewController.java b/src/main/java/ru/touchin/roboswag/components/navigation/ViewController.java index 5d932cb..8d70cde 100644 --- a/src/main/java/ru/touchin/roboswag/components/navigation/ViewController.java +++ b/src/main/java/ru/touchin/roboswag/components/navigation/ViewController.java @@ -58,7 +58,7 @@ import rx.functions.Action1; * @param Type of activity where such {@link ViewController} could be; * @param Type of fragment where such {@link ViewController} could be; */ -@SuppressWarnings("PMD.TooManyMethods") +@SuppressWarnings({"PMD.TooManyMethods", "PMD.ExcessivePublicCount"}) public class ViewController, TFragment extends ViewControllerFragment> implements LifecycleBindable { diff --git a/src/main/java/ru/touchin/roboswag/components/navigation/activities/BaseActivity.java b/src/main/java/ru/touchin/roboswag/components/navigation/activities/BaseActivity.java index 84746be..c58337a 100644 --- a/src/main/java/ru/touchin/roboswag/components/navigation/activities/BaseActivity.java +++ b/src/main/java/ru/touchin/roboswag/components/navigation/activities/BaseActivity.java @@ -40,7 +40,9 @@ import ru.touchin.roboswag.components.utils.BaseLifecycleBindable; import ru.touchin.roboswag.components.utils.LifecycleBindable; import ru.touchin.roboswag.components.utils.UiUtils; import ru.touchin.roboswag.core.log.Lc; +import rx.Completable; import rx.Observable; +import rx.Single; import rx.Subscription; import rx.functions.Action0; import rx.functions.Action1; @@ -269,6 +271,46 @@ public abstract class BaseActivity extends AppCompatActivity return baseLifecycleBindable.untilStop(observable, onNextAction, onErrorAction, onCompletedAction); } + @NonNull + @Override + public Subscription untilStop(@NonNull final Single single) { + return baseLifecycleBindable.untilStop(single); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Single single, @NonNull final Action1 onSuccessAction) { + return baseLifecycleBindable.untilStop(single, onSuccessAction); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Single single, + @NonNull final Action1 onSuccessAction, + @NonNull final Action1 onErrorAction) { + return baseLifecycleBindable.untilStop(single, onSuccessAction, onErrorAction); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Completable completable) { + return baseLifecycleBindable.untilStop(completable); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Completable completable, @NonNull final Action0 onCompletedAction) { + return baseLifecycleBindable.untilStop(completable, onCompletedAction); + } + + @NonNull + @Override + public Subscription untilStop(@NonNull final Completable completable, + @NonNull final Action0 onCompletedAction, + @NonNull final Action1 onErrorAction) { + return baseLifecycleBindable.untilStop(completable, onCompletedAction, onErrorAction); + } + @NonNull @Override public Subscription untilStop(@NonNull final Observable observable) { @@ -310,6 +352,46 @@ public abstract class BaseActivity extends AppCompatActivity return baseLifecycleBindable.untilDestroy(observable, onNextAction, onErrorAction, onCompletedAction); } + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Single single) { + return baseLifecycleBindable.untilDestroy(single); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Single single, @NonNull final Action1 onSuccessAction) { + return baseLifecycleBindable.untilDestroy(single, onSuccessAction); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Single single, + @NonNull final Action1 onSuccessAction, + @NonNull final Action1 onErrorAction) { + return baseLifecycleBindable.untilDestroy(single, onSuccessAction, onErrorAction); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Completable completable) { + return baseLifecycleBindable.untilDestroy(completable); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Completable completable, @NonNull final Action0 onCompletedAction) { + return baseLifecycleBindable.untilDestroy(completable, onCompletedAction); + } + + @NonNull + @Override + public Subscription untilDestroy(@NonNull final Completable completable, + @NonNull final Action0 onCompletedAction, + @NonNull final Action1 onErrorAction) { + return baseLifecycleBindable.untilDestroy(completable, onCompletedAction, onErrorAction); + } + @SuppressWarnings("CPD-END") //CPD: it is same as in other implementation based on BaseLifecycleBindable /** diff --git a/src/main/java/ru/touchin/roboswag/components/utils/BaseLifecycleBindable.java b/src/main/java/ru/touchin/roboswag/components/utils/BaseLifecycleBindable.java index 124a508..193ebf7 100644 --- a/src/main/java/ru/touchin/roboswag/components/utils/BaseLifecycleBindable.java +++ b/src/main/java/ru/touchin/roboswag/components/utils/BaseLifecycleBindable.java @@ -38,6 +38,7 @@ import rx.subjects.BehaviorSubject; * Created by Gavriil Sitnikov on 18/04/16. * Simple implementation of {@link LifecycleBindable}. Could be used to not implement interface but use such object inside. */ +@SuppressWarnings("PMD.TooManyMethods") public class BaseLifecycleBindable implements LifecycleBindable { private static final String UNTIL_DESTROY_METHOD = "untilDestroy"; diff --git a/src/main/java/ru/touchin/roboswag/components/utils/LifecycleBindable.java b/src/main/java/ru/touchin/roboswag/components/utils/LifecycleBindable.java index 26a932d..7db7f74 100644 --- a/src/main/java/ru/touchin/roboswag/components/utils/LifecycleBindable.java +++ b/src/main/java/ru/touchin/roboswag/components/utils/LifecycleBindable.java @@ -24,6 +24,7 @@ import android.support.annotation.NonNull; import rx.Completable; import rx.Observable; import rx.Single; +import rx.SingleSubscriber; import rx.Subscriber; import rx.Subscription; import rx.functions.Action0; @@ -38,6 +39,7 @@ import rx.functions.Func1; * Use {@link #untilStop(Observable)} method to subscribe to observable where you want and unsubscribe onStop. * Use {@link #untilDestroy(Observable)} method to subscribe to observable where you want and unsubscribe onDestroy. */ +@SuppressWarnings("PMD.TooManyMethods") public interface LifecycleBindable { /** @@ -132,7 +134,7 @@ public interface LifecycleBindable { * Don't forget to process errors if single can emit them. * * @param single {@link Single} to subscribe until onStop; - * @param onSuccessAction Action which will raise on every {@link rx.SingleSubscriber#onSuccess(Object)} item; + * @param onSuccessAction Action which will raise on every {@link SingleSubscriber#onSuccess(Object)} item; * @param Type of emitted by single item; * @return {@link Subscription} which will unsubscribes from single onStop. */ @@ -146,8 +148,8 @@ public interface LifecycleBindable { * Don't forget to process errors if single can emit them. * * @param single {@link Single} to subscribe until onStop; - * @param onSuccessAction Action which will raise on every {@link rx.SingleSubscriber#onSuccess(Object)} item; - * @param onErrorAction Action which will raise on every {@link rx.SingleSubscriber#onError(Throwable)} throwable; + * @param onSuccessAction Action which will raise on every {@link SingleSubscriber#onSuccess(Object)} item; + * @param onErrorAction Action which will raise on every {@link SingleSubscriber#onError(Throwable)} throwable; * @param Type of emitted by observable items; * @return {@link Subscription} which is wrapping source single to unsubscribe from it onStop. */ @@ -173,7 +175,7 @@ public interface LifecycleBindable { * Don't forget to process errors if completable can emit them. * * @param completable {@link Completable} to subscribe until onStop; - * @param onCompletedAction Action which will raise at {@link rx.Completable.CompletableSubscriber#onCompleted()} on completion of observable; + * @param onCompletedAction Action which will raise at {@link Completable.CompletableSubscriber#onCompleted()} on completion of observable; * @return {@link Subscription} which is wrapping source completable to unsubscribe from it onStop. */ @NonNull @@ -186,8 +188,8 @@ public interface LifecycleBindable { * Don't forget to process errors if completable can emit them. * * @param completable {@link Completable} to subscribe until onStop; - * @param onCompletedAction Action which will raise at {@link rx.Completable.CompletableSubscriber#onCompleted()} on completion of observable; - * @param onErrorAction Action which will raise on every {@link rx.Completable.CompletableSubscriber#onError(Throwable)} throwable; + * @param onCompletedAction Action which will raise at {@link Completable.CompletableSubscriber#onCompleted()} on completion of observable; + * @param onErrorAction Action which will raise on every {@link Completable.CompletableSubscriber#onError(Throwable)} throwable; * @return {@link Subscription} which is wrapping source completable to unsubscribe from it onStop. */ @NonNull @@ -266,7 +268,7 @@ public interface LifecycleBindable { * Don't forget to process errors if single can emit them. * * @param single {@link Single} to subscribe until onDestroy; - * @param onSuccessAction Action which will raise on every {@link rx.SingleSubscriber#onSuccess(Object)} item; + * @param onSuccessAction Action which will raise on every {@link SingleSubscriber#onSuccess(Object)} item; * @param Type of emitted by single items; * @return {@link Subscription} which is wrapping source single to unsubscribe from it onDestroy. */ @@ -279,8 +281,8 @@ public interface LifecycleBindable { * Don't forget to process errors if single can emit them. * * @param single {@link Single} to subscribe until onDestroy; - * @param onSuccessAction Action which will raise on every {@link rx.SingleSubscriber#onSuccess(Object)} item; - * @param onErrorAction Action which will raise on every {@link rx.SingleSubscriber#onError(Throwable)} throwable; + * @param onSuccessAction Action which will raise on every {@link SingleSubscriber#onSuccess(Object)} item; + * @param onErrorAction Action which will raise on every {@link SingleSubscriber#onError(Throwable)} throwable; * @param Type of emitted by single items; * @return {@link Subscription} which is wrapping source single to unsubscribe from it onDestroy. */ @@ -317,7 +319,7 @@ public interface LifecycleBindable { * * @param completable {@link Completable} to subscribe until onDestroy; * @param onCompletedAction Action which will raise on every {@link Completable.CompletableSubscriber#onCompleted()} item; - * @param onErrorAction Action which will raise on every {@link rx.Completable.CompletableSubscriber#onError(Throwable)} throwable; + * @param onErrorAction Action which will raise on every {@link Completable.CompletableSubscriber#onError(Throwable)} throwable; * @return {@link Subscription} which is wrapping source completable to unsubscribe from it onDestroy. */ @NonNull From 6d561612a2e402093e0af53ebf34952e966955a4 Mon Sep 17 00:00:00 2001 From: Ilia Kurtov Date: Thu, 23 Mar 2017 18:28:22 +0300 Subject: [PATCH 5/5] code review fixes --- .../utils/BaseLifecycleBindable.java | 59 ++----------------- 1 file changed, 4 insertions(+), 55 deletions(-) diff --git a/src/main/java/ru/touchin/roboswag/components/utils/BaseLifecycleBindable.java b/src/main/java/ru/touchin/roboswag/components/utils/BaseLifecycleBindable.java index 193ebf7..8d96351 100644 --- a/src/main/java/ru/touchin/roboswag/components/utils/BaseLifecycleBindable.java +++ b/src/main/java/ru/touchin/roboswag/components/utils/BaseLifecycleBindable.java @@ -136,7 +136,7 @@ public class BaseLifecycleBindable implements LifecycleBindable { public Subscription untilStop(@NonNull final Single single, @NonNull final Action1 onSuccessAction, @NonNull final Action1 onErrorAction) { - return until(single, isStartedSubject.map(started -> !started), onSuccessAction, onErrorAction); + return until(single.toObservable(), isStartedSubject.map(started -> !started), onSuccessAction, onErrorAction, Actions.empty()); } @NonNull @@ -159,7 +159,7 @@ public class BaseLifecycleBindable implements LifecycleBindable { public Subscription untilStop(@NonNull final Completable completable, @NonNull final Action0 onCompletedAction, @NonNull final Action1 onErrorAction) { - return until(completable, isStartedSubject.map(started -> !started), onCompletedAction, onErrorAction); + return until(completable.toObservable(), isStartedSubject.map(started -> !started), Actions.empty(), onErrorAction, onCompletedAction); } @NonNull @@ -214,7 +214,7 @@ public class BaseLifecycleBindable implements LifecycleBindable { public Subscription untilDestroy(@NonNull final Single single, @NonNull final Action1 onSuccessAction, @NonNull final Action1 onErrorAction) { - return until(single, isCreatedSubject.map(created -> !created), onSuccessAction, onErrorAction); + return until(single.toObservable(), isCreatedSubject.map(created -> !created), onSuccessAction, onErrorAction, Actions.empty()); } @NonNull @@ -236,7 +236,7 @@ public class BaseLifecycleBindable implements LifecycleBindable { public Subscription untilDestroy(@NonNull final Completable completable, @NonNull final Action0 onCompletedAction, @NonNull final Action1 onErrorAction) { - return until(completable, isCreatedSubject.map(created -> !created), onCompletedAction, onErrorAction); + return until(completable.toObservable(), isCreatedSubject.map(created -> !created), Actions.empty(), onErrorAction, onCompletedAction); } @NonNull @@ -265,57 +265,6 @@ public class BaseLifecycleBindable implements LifecycleBindable { }); } - @NonNull - private Subscription until(@NonNull final Single single, - @NonNull final Observable conditionSubject, - @NonNull final Action1 onSuccessAction, - @NonNull final Action1 onErrorAction) { - final Single actualSingle; - if (onSuccessAction == Actions.empty() && onErrorAction == (Action1) Actions.empty()) { - actualSingle = single; - } else { - actualSingle = single.observeOn(AndroidSchedulers.mainThread()); - } - return isCreatedSubject.first() - .flatMap(created -> created ? actualSingle.toObservable() : Observable.empty()) - .takeUntil(conditionSubject.filter(condition -> condition)) - .toSingle() - .subscribe(onSuccessAction, throwable -> { - final boolean isRxError = throwable instanceof OnErrorThrowable; - if ((!isRxError && throwable instanceof RuntimeException) - || (isRxError && throwable.getCause() instanceof RuntimeException)) { - Lc.assertion(throwable); - } - onErrorAction.call(throwable); - }); - } - - - @NonNull - private Subscription until(@NonNull final Completable completable, - @NonNull final Observable conditionSubject, - @NonNull final Action0 onCompletedAction, - @NonNull final Action1 onErrorAction) { - final Completable actualCompletable; - if (onCompletedAction == Actions.empty() && onErrorAction == (Action1) Actions.empty()) { - actualCompletable = completable; - } else { - actualCompletable = completable.observeOn(AndroidSchedulers.mainThread()); - } - return isCreatedSubject.first() - .flatMap(created -> created ? actualCompletable.toObservable() : Observable.empty()) - .takeUntil(conditionSubject.filter(condition -> condition)) - .toCompletable() - .subscribe(throwable -> { - final boolean isRxError = throwable instanceof OnErrorThrowable; - if ((!isRxError && throwable instanceof RuntimeException) - || (isRxError && throwable.getCause() instanceof RuntimeException)) { - Lc.assertion(throwable); - } - onErrorAction.call(throwable); - }, onCompletedAction); - } - @NonNull private Action1 getActionThrowableForAssertion(@NonNull final String codePoint, @NonNull final String method) { return throwable -> Lc.assertion(new ShouldNotHappenException("Unexpected error on " + method + " at " + codePoint, throwable));