diff --git a/src/main/java/ru/touchin/roboswag/core/observables/SequenceObservableExecutor.java b/src/main/java/ru/touchin/roboswag/core/observables/SequenceObservableExecutor.java index 7daa855..3dda576 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/SequenceObservableExecutor.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/SequenceObservableExecutor.java @@ -18,34 +18,47 @@ public class SequenceObservableExecutor { @NonNull private final Scheduler sendingScheduler = Schedulers.from(Executors.newSingleThreadExecutor()); + @NonNull public Completable execute(@NonNull final Completable completable) { - final SubscriptionHolder subscriptionHolder = new SubscriptionHolder(); + final Task task = new Task(completable); return Completable - .create(subscriber -> - subscriptionHolder.subscription = sendingScheduler.createWorker().schedule(() -> { - final CountDownLatch blocker = new CountDownLatch(1); - final Subscription executeSubscription = completable - .doOnTerminate(blocker::countDown) - .subscribe(subscriber::onError, subscriber::onCompleted); - try { - blocker.await(); - } catch (final InterruptedException exception) { - executeSubscription.unsubscribe(); - subscriber.onError(exception); - } - })) - .doOnUnsubscribe(() -> { - if (subscriptionHolder.subscription != null && !subscriptionHolder.subscription.isUnsubscribed()) { - subscriptionHolder.subscription.unsubscribe(); - } - }); + .create(task) + .doOnUnsubscribe(task::cancel); } - private class SubscriptionHolder { + private class Task implements Completable.CompletableOnSubscribe { + @NonNull + private final Completable completable; @Nullable private Subscription subscription; + public Task(@NonNull final Completable completable) { + this.completable = completable; + } + + @Override + public void call(@NonNull final Completable.CompletableSubscriber subscriber) { + subscription = sendingScheduler.createWorker().schedule(() -> { + final CountDownLatch blocker = new CountDownLatch(1); + final Subscription executeSubscription = completable + .doOnTerminate(blocker::countDown) + .subscribe(subscriber::onError, subscriber::onCompleted); + try { + blocker.await(); + } catch (final InterruptedException exception) { + executeSubscription.unsubscribe(); + subscriber.onError(exception); + } + }); + } + + public void cancel() { + if (subscription != null && !subscription.isUnsubscribed()) { + subscription.unsubscribe(); + } + } + } }