diff --git a/src/main/java/ru/touchin/roboswag/core/observables/storable/Storable.java b/src/main/java/ru/touchin/roboswag/core/observables/storable/Storable.java index dea99cd..f9fcf0f 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/storable/Storable.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/storable/Storable.java @@ -33,6 +33,7 @@ import ru.touchin.roboswag.core.utils.ObjectUtils; import rx.Completable; import rx.Observable; import rx.Scheduler; +import rx.Single; import rx.exceptions.OnErrorThrowable; import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; @@ -130,28 +131,28 @@ public class Storable { STORABLE_LC_GROUP.w(exception, "Exception while converting default value of '%s' from '%s' from store %s", key, defaultValue, store); throw OnErrorThrowable.from(exception); - } catch (final RuntimeException throwable) { - STORABLE_LC_GROUP.assertion(throwable); - throw OnErrorThrowable.from(throwable); } } + @NonNull + private Observable createStoreInitialLoadingObservable(@Nullable final Migration migration) { + final Single loadObservable = store.loadObject(storeObjectType, key) + .doOnError(throwable -> STORABLE_LC_GROUP.w(throwable, "Exception while trying to load value of '%s' from store %s", key, store)); + return (migration != null ? migration.migrateToLatestVersion(key).andThen(loadObservable) : loadObservable) + .subscribeOn(scheduler) + .observeOn(scheduler) + .toObservable() + .publish() + .refCount(); + } + @NonNull private Observable createStoreValueObservable(@NonNull final ObserveStrategy observeStrategy, @Nullable final Migration migration, @Nullable final TObject defaultValue, final long cacheTimeMillis) { - final Observable result = (migration != null - ? migration.migrateToLatestVersion(key).subscribeOn(scheduler) - : Completable.complete()) - .andThen(store.loadObject(storeObjectType, key).toObservable().subscribeOn(scheduler)) - .doOnError(throwable -> { - if (throwable instanceof RuntimeException) { - STORABLE_LC_GROUP.assertion(throwable); - } else { - STORABLE_LC_GROUP.w(throwable, "Exception while trying to load value of '%s' from store %s", key, store); - } - }) + final Observable storeInitialLoadingObservable = createStoreInitialLoadingObservable(migration); + final Observable result = storeInitialLoadingObservable .concatWith(newStoreValueEvent) .map(storeObject -> returnDefaultValueIfNull(storeObject, defaultValue)); return observeStrategy == ObserveStrategy.CACHE_STORE_VALUE || observeStrategy == ObserveStrategy.CACHE_STORE_AND_ACTUAL_VALUE @@ -164,17 +165,15 @@ public class Storable { @NonNull final ObserveStrategy observeStrategy, final long cacheTimeMillis) { final Observable result = storeValueObservable - .switchMap(storeObject -> Observable - .fromCallable(() -> converter.toObject(objectType, storeObjectType, storeObject)) - .subscribeOn(scheduler) - .doOnError(throwable -> { - if (throwable instanceof RuntimeException) { - STORABLE_LC_GROUP.assertion(throwable); - } else { - STORABLE_LC_GROUP.w(throwable, "Exception while trying to converting value of '%s' from store %s by %s", - key, storeObject, store, converter); - } - })); + .map(storeObject -> { + try { + return converter.toObject(objectType, storeObjectType, storeObject); + } catch (final Converter.ConversionException exception) { + STORABLE_LC_GROUP.w(exception, "Exception while trying to converting value of '%s' from store %s by %s", + key, storeObject, store, converter); + throw OnErrorThrowable.from(exception); + } + }); return observeStrategy == ObserveStrategy.CACHE_ACTUAL_VALUE || observeStrategy == ObserveStrategy.CACHE_STORE_AND_ACTUAL_VALUE ? Observable.create(new OnSubscribeRefCountWithCacheTime<>(result.replay(1), cacheTimeMillis, TimeUnit.MILLISECONDS)) : result; @@ -233,31 +232,33 @@ public class Storable { @NonNull private Completable internalSet(@Nullable final TObject newValue, final boolean checkForEqualityBeforeSet) { return (checkForEqualityBeforeSet ? storeValueObservable.first() : Observable.just(null)) - .switchMap(oldStoreValue -> Observable - .fromCallable(() -> converter.toStoreObject(objectType, storeObjectType, newValue)) - .subscribeOn(scheduler) - .switchMap(newStoreValue -> { - if (checkForEqualityBeforeSet && ObjectUtils.equals(newStoreValue, oldStoreValue)) { - return Observable.empty(); - } - return store.storeObject(storeObjectType, key, newStoreValue) - .doOnCompleted(() -> { - newStoreValueEvent.onNext(newStoreValue); - if (checkForEqualityBeforeSet) { - STORABLE_LC_GROUP.i("Value of '%s' changed from '%s' to '%s'", key, oldStoreValue, newStoreValue); - } else { - STORABLE_LC_GROUP.i("Value of '%s' force changed to '%s'", key, newStoreValue); - } - }) - .toObservable(); - })) - .doOnError(throwable -> { - if (throwable instanceof RuntimeException) { - STORABLE_LC_GROUP.assertion(throwable); - } else { - STORABLE_LC_GROUP.w(throwable, "Exception while trying to store value of '%s' from store %s by %s", + .observeOn(scheduler) + .switchMap(oldStoreValue -> { + final TStoreObject newStoreValue; + try { + newStoreValue = converter.toStoreObject(objectType, storeObjectType, newValue); + } catch (final Converter.ConversionException exception) { + STORABLE_LC_GROUP.w(exception, "Exception while trying to store value of '%s' from store %s by %s", key, newValue, store, converter); + return Observable.error(exception); } + if (checkForEqualityBeforeSet && ObjectUtils.equals(newStoreValue, oldStoreValue)) { + return Observable.empty(); + } + return store.storeObject(storeObjectType, key, newStoreValue) + .doOnError(throwable -> STORABLE_LC_GROUP.w(throwable, + "Exception while trying to store value of '%s' from store %s by %s", + key, newValue, store, converter)) + .observeOn(scheduler) + .andThen(Completable.fromAction(() -> { + newStoreValueEvent.onNext(newStoreValue); + if (checkForEqualityBeforeSet) { + STORABLE_LC_GROUP.i("Value of '%s' changed from '%s' to '%s'", key, oldStoreValue, newStoreValue); + } else { + STORABLE_LC_GROUP.i("Value of '%s' force changed to '%s'", key, newStoreValue); + } + })) + .toObservable(); }) .toCompletable(); }