onerror/complete callbacks added to Sequence executor

This commit is contained in:
Gavriil Sitnikov 2017-04-04 17:05:49 +03:00
parent 3196ee4271
commit e67daf7805
1 changed files with 4 additions and 4 deletions

View File

@ -24,15 +24,15 @@ public class SequenceObservableExecutor {
.create(subscriber ->
subscriptionHolder.subscription = sendingScheduler.createWorker().schedule(() -> {
final CountDownLatch blocker = new CountDownLatch(1);
final Subscription sendSubscription = completable
final Subscription executeSubscription = completable
.doOnTerminate(blocker::countDown)
.subscribe();
.subscribe(subscriber::onError, subscriber::onCompleted);
try {
blocker.await();
} catch (final InterruptedException exception) {
sendSubscription.unsubscribe();
executeSubscription.unsubscribe();
subscriber.onError(exception);
}
subscriber.onCompleted();
}))
.doOnUnsubscribe(() -> {
if (subscriptionHolder.subscription != null && !subscriptionHolder.subscription.isUnsubscribed()) {