Merge pull request #26 from TouchInstinct/storable_fix_take_1
Storable fix take 1
This commit is contained in:
commit
7584c06edd
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package ru.touchin.roboswag.core.log;
|
||||
|
||||
import android.annotation.SuppressLint;
|
||||
import android.os.Handler;
|
||||
import android.os.Looper;
|
||||
import android.support.annotation.NonNull;
|
||||
|
|
@ -262,6 +263,7 @@ public final class Lc {
|
|||
* @param tag Tag to be shown in logs.
|
||||
*/
|
||||
|
||||
@SuppressLint("LogConditional")
|
||||
public static void printStackTrace(@NonNull final String tag) {
|
||||
final StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
|
||||
if (Log.isLoggable(tag, Log.DEBUG)) {
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ import ru.touchin.roboswag.core.utils.ObjectUtils;
|
|||
import rx.Completable;
|
||||
import rx.Observable;
|
||||
import rx.Scheduler;
|
||||
import rx.Single;
|
||||
import rx.exceptions.OnErrorThrowable;
|
||||
import rx.schedulers.Schedulers;
|
||||
import rx.subjects.PublishSubject;
|
||||
|
|
@ -109,6 +110,7 @@ public class Storable<TKey, TObject, TStoreObject> {
|
|||
this.storeObjectType = storeObjectType;
|
||||
this.store = store;
|
||||
this.converter = converter;
|
||||
|
||||
final ObserveStrategy nonNullObserveStrategy
|
||||
= observeStrategy != null ? observeStrategy : getDefaultObserveStrategyFor(objectType, storeObjectType);
|
||||
scheduler = storeScheduler != null ? storeScheduler : Schedulers.from(Executors.newSingleThreadExecutor());
|
||||
|
|
@ -129,28 +131,29 @@ public class Storable<TKey, TObject, TStoreObject> {
|
|||
STORABLE_LC_GROUP.w(exception, "Exception while converting default value of '%s' from '%s' from store %s",
|
||||
key, defaultValue, store);
|
||||
throw OnErrorThrowable.from(exception);
|
||||
} catch (final RuntimeException throwable) {
|
||||
STORABLE_LC_GROUP.assertion(throwable);
|
||||
throw OnErrorThrowable.from(throwable);
|
||||
}
|
||||
}
|
||||
|
||||
@NonNull
|
||||
private Observable<TStoreObject> createStoreInitialLoadingObservable(@Nullable final Migration<TKey> migration) {
|
||||
final Single<TStoreObject> loadObservable = store.loadObject(storeObjectType, key)
|
||||
.doOnError(throwable -> STORABLE_LC_GROUP.w(throwable, "Exception while trying to load value of '%s' from store %s", key, store));
|
||||
return (migration != null ? migration.migrateToLatestVersion(key).andThen(loadObservable) : loadObservable)
|
||||
.subscribeOn(scheduler)
|
||||
.observeOn(scheduler)
|
||||
.toObservable()
|
||||
.replay(1)
|
||||
.refCount()
|
||||
.take(1);
|
||||
}
|
||||
|
||||
@NonNull
|
||||
private Observable<TStoreObject> createStoreValueObservable(@NonNull final ObserveStrategy observeStrategy,
|
||||
@Nullable final Migration<TKey> migration,
|
||||
@Nullable final TObject defaultValue,
|
||||
final long cacheTimeMillis) {
|
||||
final Observable<TStoreObject> result = (migration != null
|
||||
? migration.migrateToLatestVersion(key).subscribeOn(scheduler)
|
||||
: Completable.complete())
|
||||
.andThen(store.loadObject(storeObjectType, key).toObservable().subscribeOn(scheduler))
|
||||
.doOnError(throwable -> {
|
||||
if (throwable instanceof RuntimeException) {
|
||||
STORABLE_LC_GROUP.assertion(throwable);
|
||||
} else {
|
||||
STORABLE_LC_GROUP.w(throwable, "Exception while trying to load value of '%s' from store %s", key, store);
|
||||
}
|
||||
})
|
||||
final Observable<TStoreObject> storeInitialLoadingObservable = createStoreInitialLoadingObservable(migration);
|
||||
final Observable<TStoreObject> result = storeInitialLoadingObservable
|
||||
.concatWith(newStoreValueEvent)
|
||||
.map(storeObject -> returnDefaultValueIfNull(storeObject, defaultValue));
|
||||
return observeStrategy == ObserveStrategy.CACHE_STORE_VALUE || observeStrategy == ObserveStrategy.CACHE_STORE_AND_ACTUAL_VALUE
|
||||
|
|
@ -163,17 +166,15 @@ public class Storable<TKey, TObject, TStoreObject> {
|
|||
@NonNull final ObserveStrategy observeStrategy,
|
||||
final long cacheTimeMillis) {
|
||||
final Observable<TObject> result = storeValueObservable
|
||||
.switchMap(storeObject -> Observable
|
||||
.fromCallable(() -> converter.toObject(objectType, storeObjectType, storeObject))
|
||||
.subscribeOn(scheduler)
|
||||
.doOnError(throwable -> {
|
||||
if (throwable instanceof RuntimeException) {
|
||||
STORABLE_LC_GROUP.assertion(throwable);
|
||||
} else {
|
||||
STORABLE_LC_GROUP.w(throwable, "Exception while trying to converting value of '%s' from store %s by %s",
|
||||
key, storeObject, store, converter);
|
||||
}
|
||||
}));
|
||||
.map(storeObject -> {
|
||||
try {
|
||||
return converter.toObject(objectType, storeObjectType, storeObject);
|
||||
} catch (final Converter.ConversionException exception) {
|
||||
STORABLE_LC_GROUP.w(exception, "Exception while trying to converting value of '%s' from store %s by %s",
|
||||
key, storeObject, store, converter);
|
||||
throw OnErrorThrowable.from(exception);
|
||||
}
|
||||
});
|
||||
return observeStrategy == ObserveStrategy.CACHE_ACTUAL_VALUE || observeStrategy == ObserveStrategy.CACHE_STORE_AND_ACTUAL_VALUE
|
||||
? Observable.create(new OnSubscribeRefCountWithCacheTime<>(result.replay(1), cacheTimeMillis, TimeUnit.MILLISECONDS))
|
||||
: result;
|
||||
|
|
@ -231,32 +232,34 @@ public class Storable<TKey, TObject, TStoreObject> {
|
|||
|
||||
@NonNull
|
||||
private Completable internalSet(@Nullable final TObject newValue, final boolean checkForEqualityBeforeSet) {
|
||||
return (checkForEqualityBeforeSet ? storeValueObservable.first() : Observable.just(null))
|
||||
.switchMap(oldStoreValue -> Observable
|
||||
.fromCallable(() -> converter.toStoreObject(objectType, storeObjectType, newValue))
|
||||
.subscribeOn(scheduler)
|
||||
.switchMap(newStoreValue -> {
|
||||
if (checkForEqualityBeforeSet && ObjectUtils.equals(newStoreValue, oldStoreValue)) {
|
||||
return Observable.empty();
|
||||
}
|
||||
return store.storeObject(storeObjectType, key, newStoreValue)
|
||||
.doOnCompleted(() -> {
|
||||
newStoreValueEvent.onNext(newStoreValue);
|
||||
if (checkForEqualityBeforeSet) {
|
||||
STORABLE_LC_GROUP.i("Value of '%s' changed from '%s' to '%s'", key, oldStoreValue, newStoreValue);
|
||||
} else {
|
||||
STORABLE_LC_GROUP.i("Value of '%s' force changed to '%s'", key, newStoreValue);
|
||||
}
|
||||
})
|
||||
.toObservable();
|
||||
}))
|
||||
.doOnError(throwable -> {
|
||||
if (throwable instanceof RuntimeException) {
|
||||
STORABLE_LC_GROUP.assertion(throwable);
|
||||
} else {
|
||||
STORABLE_LC_GROUP.w(throwable, "Exception while trying to store value of '%s' from store %s by %s",
|
||||
return (checkForEqualityBeforeSet ? storeValueObservable.take(1) : Observable.just(null))
|
||||
.observeOn(scheduler)
|
||||
.switchMap(oldStoreValue -> {
|
||||
final TStoreObject newStoreValue;
|
||||
try {
|
||||
newStoreValue = converter.toStoreObject(objectType, storeObjectType, newValue);
|
||||
} catch (final Converter.ConversionException exception) {
|
||||
STORABLE_LC_GROUP.w(exception, "Exception while trying to store value of '%s' from store %s by %s",
|
||||
key, newValue, store, converter);
|
||||
return Observable.error(exception);
|
||||
}
|
||||
if (checkForEqualityBeforeSet && ObjectUtils.equals(newStoreValue, oldStoreValue)) {
|
||||
return Observable.empty();
|
||||
}
|
||||
return store.storeObject(storeObjectType, key, newStoreValue)
|
||||
.doOnError(throwable -> STORABLE_LC_GROUP.w(throwable,
|
||||
"Exception while trying to store value of '%s' from store %s by %s",
|
||||
key, newValue, store, converter))
|
||||
.observeOn(scheduler)
|
||||
.andThen(Completable.fromAction(() -> {
|
||||
newStoreValueEvent.onNext(newStoreValue);
|
||||
if (checkForEqualityBeforeSet) {
|
||||
STORABLE_LC_GROUP.i("Value of '%s' changed from '%s' to '%s'", key, oldStoreValue, newStoreValue);
|
||||
} else {
|
||||
STORABLE_LC_GROUP.i("Value of '%s' force changed to '%s'", key, newStoreValue);
|
||||
}
|
||||
}))
|
||||
.toObservable();
|
||||
})
|
||||
.toCompletable();
|
||||
}
|
||||
|
|
@ -320,7 +323,7 @@ public class Storable<TKey, TObject, TStoreObject> {
|
|||
*/
|
||||
@NonNull
|
||||
public Observable<TObject> get() {
|
||||
return valueObservable.first();
|
||||
return valueObservable.take(1);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
Loading…
Reference in New Issue