diff --git a/src/main/java/ru/touchin/templates/chat/Chat.java b/src/main/java/ru/touchin/templates/chat/Chat.java index 17884a1..6b800a0 100644 --- a/src/main/java/ru/touchin/templates/chat/Chat.java +++ b/src/main/java/ru/touchin/templates/chat/Chat.java @@ -24,6 +24,8 @@ import android.support.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -59,7 +61,7 @@ public abstract class Chat { @NonNull private final Scheduler sendingScheduler = Schedulers.from(Executors.newSingleThreadExecutor()); @NonNull - private final Observable messagesToSendObservable; + private final Observable messagesToSendObservable; @Nullable private Subscription activationSubscription; @@ -70,17 +72,22 @@ public abstract class Chat { messagesToSendObservable = sendingMessages.observeItems() .first() - .concatMap(initialMessages -> Observable.from(initialMessages) - .concatWith(sendingMessages.observeChanges().concatMap(changes -> { - final Collection insertedMessages = new ArrayList<>(); - for (final Change change : changes.getChanges()) { - if (change.getType() == Change.Type.INSERTED) { - insertedMessages.addAll(change.getChangedItems()); + .concatMap(initialMessages -> { + final List reversedMessages = new ArrayList<>(initialMessages); + Collections.reverse(reversedMessages); + return Observable.from(reversedMessages) + .concatWith(sendingMessages.observeChanges().concatMap(changes -> { + final Collection insertedMessages = new ArrayList<>(); + for (final Change change : changes.getChanges()) { + if (change.getType() == Change.Type.INSERTED) { + insertedMessages.addAll(change.getChangedItems()); + } } - } - return insertedMessages.isEmpty() ? Observable.empty() : Observable.from(insertedMessages); - }))) - .doOnNext(message -> sendingScheduler.createWorker().schedule(() -> internalSendMessage(message))); + return insertedMessages.isEmpty() ? Observable.empty() : Observable.from(insertedMessages); + })) + //observe on some scheduler? + .flatMap(this::internalSendMessage); + }); } /** @@ -180,28 +187,49 @@ public abstract class Chat { activationSubscription = null; } - private void internalSendMessage(@NonNull final TOutgoingMessage message) { - final CountDownLatch blocker = new CountDownLatch(1); - final Subscription subscription = Observable - .combineLatest(isMessageInCacheObservable(message), isMessageInActualObservable(message), - (messageInCache, messageInActual) -> !messageInCache && !messageInActual) - .subscribeOn(Schedulers.computation()) - .first() - .switchMap(shouldSendMessage -> shouldSendMessage ? createSendMessageObservable(message).ignoreElements() : Observable.empty()) - .retryWhen(attempts -> attempts.switchMap(ignored -> { - isSendingInError.onNext(true); - return Observable - .merge(retrySendingRequest, Observable.timer(RETRY_SENDING_DELAY, TimeUnit.MILLISECONDS)) - .first() - .doOnCompleted(() -> isSendingInError.onNext(false)); - })) - .doOnUnsubscribe(blocker::countDown) - .subscribe(Actions.empty(), Lc::assertion, () -> sendingMessages.remove(message)); - try { - blocker.await(); - } catch (final InterruptedException exception) { - subscription.unsubscribe(); - } + @NonNull + private Observable internalSendMessage(@NonNull final TOutgoingMessage message) { + final SubscriptionHolder subscriptionHolder = new SubscriptionHolder(); + return Observable + .create(subscriber -> { + subscriptionHolder.subscription = sendingScheduler.createWorker().schedule(() -> { + final CountDownLatch blocker = new CountDownLatch(1); + final Subscription sendSubscription = Observable + .combineLatest(isMessageInCacheObservable(message), isMessageInActualObservable(message), + (messageInCache, messageInActual) -> !messageInCache && !messageInActual) + .subscribeOn(Schedulers.computation()) + .first() + .switchMap(shouldSendMessage -> shouldSendMessage + ? createSendMessageObservable(message).ignoreElements() : Observable.empty()) + .retryWhen(attempts -> attempts.switchMap(ignored -> { + isSendingInError.onNext(true); + return Observable + .merge(retrySendingRequest, Observable.timer(RETRY_SENDING_DELAY, TimeUnit.MILLISECONDS)) + .first() + .doOnCompleted(() -> isSendingInError.onNext(false)); + })) + .doOnUnsubscribe(blocker::countDown) + .subscribe(Actions.empty(), Lc::assertion, () -> sendingMessages.remove(message)); + try { + blocker.await(); + } catch (final InterruptedException exception) { + sendSubscription.unsubscribe(); + } + subscriber.onCompleted(); + }); + }) + .doOnUnsubscribe(() -> { + if (subscriptionHolder.subscription != null && !subscriptionHolder.subscription.isUnsubscribed()) { + subscriptionHolder.subscription.unsubscribe(); + } + }); + } + + private class SubscriptionHolder { + + @Nullable + private Subscription subscription; + } }