add task subclass to sequence executor

This commit is contained in:
Gavriil Sitnikov 2017-04-04 18:22:29 +03:00
parent e67daf7805
commit ab3a16136c
1 changed files with 33 additions and 20 deletions

View File

@ -18,34 +18,47 @@ public class SequenceObservableExecutor {
@NonNull
private final Scheduler sendingScheduler = Schedulers.from(Executors.newSingleThreadExecutor());
@NonNull
public Completable execute(@NonNull final Completable completable) {
final SubscriptionHolder subscriptionHolder = new SubscriptionHolder();
final Task task = new Task(completable);
return Completable
.create(subscriber ->
subscriptionHolder.subscription = sendingScheduler.createWorker().schedule(() -> {
final CountDownLatch blocker = new CountDownLatch(1);
final Subscription executeSubscription = completable
.doOnTerminate(blocker::countDown)
.subscribe(subscriber::onError, subscriber::onCompleted);
try {
blocker.await();
} catch (final InterruptedException exception) {
executeSubscription.unsubscribe();
subscriber.onError(exception);
}
}))
.doOnUnsubscribe(() -> {
if (subscriptionHolder.subscription != null && !subscriptionHolder.subscription.isUnsubscribed()) {
subscriptionHolder.subscription.unsubscribe();
}
});
.create(task)
.doOnUnsubscribe(task::cancel);
}
private class SubscriptionHolder {
private class Task implements Completable.CompletableOnSubscribe {
@NonNull
private final Completable completable;
@Nullable
private Subscription subscription;
public Task(@NonNull final Completable completable) {
this.completable = completable;
}
@Override
public void call(@NonNull final Completable.CompletableSubscriber subscriber) {
subscription = sendingScheduler.createWorker().schedule(() -> {
final CountDownLatch blocker = new CountDownLatch(1);
final Subscription executeSubscription = completable
.doOnTerminate(blocker::countDown)
.subscribe(subscriber::onError, subscriber::onCompleted);
try {
blocker.await();
} catch (final InterruptedException exception) {
executeSubscription.unsubscribe();
subscriber.onError(exception);
}
});
}
public void cancel() {
if (subscription != null && !subscription.isUnsubscribed()) {
subscription.unsubscribe();
}
}
}
}