load newer endless subscription fixed

This commit is contained in:
Gavriil Sitnikov 2016-09-23 16:23:26 +03:00
parent cf523eb20f
commit ad17e2ab77
1 changed files with 25 additions and 14 deletions

View File

@ -28,7 +28,6 @@ import ru.touchin.roboswag.core.log.Lc;
import ru.touchin.roboswag.core.observables.collections.ObservableCollection;
import ru.touchin.roboswag.core.utils.ShouldNotHappenException;
import rx.Observable;
import rx.exceptions.OnErrorThrowable;
import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;
@ -93,7 +92,7 @@ public class LoadingRenewableList<TItem, TReference, TNewerReference,
.switchOnNext(Observable.<Observable<TLoadedItems>>create(subscriber -> {
if (!renew) {
subscriber.onNext(Observable.concat(
//we need non-empty list to star loading newer items or we need to wait any change (should be insertion)
//we need non-empty list to start loading newer items or we need to wait any change (should be insertion)
isEmpty() ? observeChanges().first().switchMap(ignored -> Observable.empty()) : Observable.empty(),
createLoadRequestBasedObservable(this::createActualRequest,
loadRequest -> loadRequest.getNewerReference() == null && isEmpty()
@ -180,25 +179,37 @@ public class LoadingRenewableList<TItem, TReference, TNewerReference,
*/
@NonNull
public Observable<TLoadedItems> loadNewer() {
return loadingNewerObservable;
return loadingNewerObservable.first();
}
/**
* Returns {@link Observable} that will load some newer limited by maximum pages loading results.
* Returns {@link Observable} that will load all newer items.
*
* @return Returns {@link Observable} to limited load newer items.
*/
@NonNull
public Observable<?> loadToNewest() {
return loadToNewest(Integer.MAX_VALUE);
}
/**
* Returns {@link Observable} that will load some newer itemslimited by maximum pages loading results.
*
* @param maxPageDeep Limit to load pages;
* @return Returns {@link Observable} to limited load newer items.
*/
@NonNull
public Observable<TLoadedItems> loadNewest(final int maxPageDeep) {
return loadingNewerObservable
.doOnNext(loadedItems -> {
if (loadedItems.getNewerItemsCount() != 0) {
throw OnErrorThrowable.from(new NotLoadedYetException());
}
})
.retry((number, throwable) -> number <= maxPageDeep && throwable instanceof NotLoadedYetException)
.last();
public Observable<?> loadToNewest(final int maxPageDeep) {
return Observable
.switchOnNext(Observable
.<Observable<?>>create(subscriber -> {
subscriber.onNext(newerItemsCount.getValue() == 0
? Observable.empty()
: loadingNewerObservable.switchMap(ignored -> Observable.error(new NotLoadedYetException())));
subscriber.onCompleted();
})
.subscribeOn(getLoaderScheduler()))
.retry((number, throwable) -> number <= maxPageDeep && throwable instanceof NotLoadedYetException);
}
/**
@ -208,7 +219,7 @@ public class LoadingRenewableList<TItem, TReference, TNewerReference,
*/
@NonNull
public Observable<TLoadedItems> renew() {
return loadingNewestObservable;
return loadingNewestObservable.first();
}
private void updateNewerReference(@NonNull final TLoadedItems loadedItems) {