Merge pull request #29 from TouchInstinct/chat_fixes

dirty fix of sending messages order
This commit is contained in:
Gavriil 2017-02-17 19:23:21 +03:00 committed by GitHub
commit 0258586512
1 changed files with 61 additions and 33 deletions

View File

@ -24,6 +24,8 @@ import android.support.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -59,7 +61,7 @@ public abstract class Chat<TOutgoingMessage> {
@NonNull
private final Scheduler sendingScheduler = Schedulers.from(Executors.newSingleThreadExecutor());
@NonNull
private final Observable<TOutgoingMessage> messagesToSendObservable;
private final Observable<?> messagesToSendObservable;
@Nullable
private Subscription activationSubscription;
@ -70,17 +72,22 @@ public abstract class Chat<TOutgoingMessage> {
messagesToSendObservable = sendingMessages.observeItems()
.first()
.concatMap(initialMessages -> Observable.from(initialMessages)
.concatWith(sendingMessages.observeChanges().concatMap(changes -> {
final Collection<TOutgoingMessage> insertedMessages = new ArrayList<>();
for (final Change<TOutgoingMessage> change : changes.getChanges()) {
if (change.getType() == Change.Type.INSERTED) {
insertedMessages.addAll(change.getChangedItems());
.concatMap(initialMessages -> {
final List<TOutgoingMessage> reversedMessages = new ArrayList<>(initialMessages);
Collections.reverse(reversedMessages);
return Observable.from(reversedMessages)
.concatWith(sendingMessages.observeChanges().concatMap(changes -> {
final Collection<TOutgoingMessage> insertedMessages = new ArrayList<>();
for (final Change<TOutgoingMessage> change : changes.getChanges()) {
if (change.getType() == Change.Type.INSERTED) {
insertedMessages.addAll(change.getChangedItems());
}
}
}
return insertedMessages.isEmpty() ? Observable.empty() : Observable.from(insertedMessages);
})))
.doOnNext(message -> sendingScheduler.createWorker().schedule(() -> internalSendMessage(message)));
return insertedMessages.isEmpty() ? Observable.empty() : Observable.from(insertedMessages);
}))
//observe on some scheduler?
.flatMap(this::internalSendMessage);
});
}
/**
@ -180,28 +187,49 @@ public abstract class Chat<TOutgoingMessage> {
activationSubscription = null;
}
private void internalSendMessage(@NonNull final TOutgoingMessage message) {
final CountDownLatch blocker = new CountDownLatch(1);
final Subscription subscription = Observable
.combineLatest(isMessageInCacheObservable(message), isMessageInActualObservable(message),
(messageInCache, messageInActual) -> !messageInCache && !messageInActual)
.subscribeOn(Schedulers.computation())
.first()
.switchMap(shouldSendMessage -> shouldSendMessage ? createSendMessageObservable(message).ignoreElements() : Observable.empty())
.retryWhen(attempts -> attempts.switchMap(ignored -> {
isSendingInError.onNext(true);
return Observable
.merge(retrySendingRequest, Observable.timer(RETRY_SENDING_DELAY, TimeUnit.MILLISECONDS))
.first()
.doOnCompleted(() -> isSendingInError.onNext(false));
}))
.doOnUnsubscribe(blocker::countDown)
.subscribe(Actions.empty(), Lc::assertion, () -> sendingMessages.remove(message));
try {
blocker.await();
} catch (final InterruptedException exception) {
subscription.unsubscribe();
}
@NonNull
private Observable<?> internalSendMessage(@NonNull final TOutgoingMessage message) {
final SubscriptionHolder subscriptionHolder = new SubscriptionHolder();
return Observable
.create(subscriber -> {
subscriptionHolder.subscription = sendingScheduler.createWorker().schedule(() -> {
final CountDownLatch blocker = new CountDownLatch(1);
final Subscription sendSubscription = Observable
.combineLatest(isMessageInCacheObservable(message), isMessageInActualObservable(message),
(messageInCache, messageInActual) -> !messageInCache && !messageInActual)
.subscribeOn(Schedulers.computation())
.first()
.switchMap(shouldSendMessage -> shouldSendMessage
? createSendMessageObservable(message).ignoreElements() : Observable.empty())
.retryWhen(attempts -> attempts.switchMap(ignored -> {
isSendingInError.onNext(true);
return Observable
.merge(retrySendingRequest, Observable.timer(RETRY_SENDING_DELAY, TimeUnit.MILLISECONDS))
.first()
.doOnCompleted(() -> isSendingInError.onNext(false));
}))
.doOnUnsubscribe(blocker::countDown)
.subscribe(Actions.empty(), Lc::assertion, () -> sendingMessages.remove(message));
try {
blocker.await();
} catch (final InterruptedException exception) {
sendSubscription.unsubscribe();
}
subscriber.onCompleted();
});
})
.doOnUnsubscribe(() -> {
if (subscriptionHolder.subscription != null && !subscriptionHolder.subscription.isUnsubscribed()) {
subscriptionHolder.subscription.unsubscribe();
}
});
}
private class SubscriptionHolder {
@Nullable
private Subscription subscription;
}
}