From 846fc1368f9c1fadba7564cee0e7acbf67cc2cec Mon Sep 17 00:00:00 2001 From: Anton Domnikov Date: Thu, 9 Mar 2017 19:05:10 +0300 Subject: [PATCH] fixed storable --- .../OnSubscribeRefCountWithCacheTime.java | 214 ++++++++++++++++++ .../core/observables/storable/Storable.java | 80 +++++-- .../concrete/NonNullSafeListStorable.java | 8 + 3 files changed, 277 insertions(+), 25 deletions(-) create mode 100644 src/main/java/ru/touchin/roboswag/core/observables/OnSubscribeRefCountWithCacheTime.java diff --git a/src/main/java/ru/touchin/roboswag/core/observables/OnSubscribeRefCountWithCacheTime.java b/src/main/java/ru/touchin/roboswag/core/observables/OnSubscribeRefCountWithCacheTime.java new file mode 100644 index 0000000..d428aab --- /dev/null +++ b/src/main/java/ru/touchin/roboswag/core/observables/OnSubscribeRefCountWithCacheTime.java @@ -0,0 +1,214 @@ +/** + * Copyright 2014 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package ru.touchin.roboswag.core.observables; + +import android.support.annotation.NonNull; +import android.support.annotation.Nullable; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +import rx.Observable.OnSubscribe; +import rx.Scheduler; +import rx.Subscriber; +import rx.Subscription; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.observables.ConnectableObservable; +import rx.schedulers.Schedulers; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; + +/** + * Returns an observable sequence that stays connected to the source as long as + * there is at least one subscription to the observable sequence and also it stays connected + * for cache time after everyone unsubscribe. + * + * @param the value type + */ +@SuppressWarnings({"PMD.AvoidUsingVolatile", "PMD.CompareObjectsWithEquals"}) +//AvoidUsingVolatile,CompareObjectsWithEquals: from OnSubscribeRefCount code +public final class OnSubscribeRefCountWithCacheTime implements OnSubscribe { + + @NonNull + private final ConnectableObservable source; + @NonNull + private volatile CompositeSubscription baseSubscription = new CompositeSubscription(); + @NonNull + private final AtomicInteger subscriptionCount = new AtomicInteger(0); + + @NonNull + private final Scheduler scheduler = Schedulers.computation(); + private final long cacheTime; + @NonNull + private final TimeUnit cacheTimeUnit; + @Nullable + private Scheduler.Worker worker; + + /** + * Use this lock for every subscription and disconnect action. + */ + @NonNull + private final ReentrantLock lock = new ReentrantLock(); + + public OnSubscribeRefCountWithCacheTime(@NonNull final ConnectableObservable source, + final long cacheTime, @NonNull final TimeUnit cacheTimeUnit) { + this.source = source; + this.cacheTime = cacheTime; + this.cacheTimeUnit = cacheTimeUnit; + } + + @Override + public void call(@NonNull final Subscriber subscriber) { + + lock.lock(); + if (subscriptionCount.incrementAndGet() == 1) { + if (worker != null) { + worker.unsubscribe(); + worker = null; + } + final AtomicBoolean writeLocked = new AtomicBoolean(true); + + try { + // need to use this overload of connect to ensure that + // baseSubscription is set in the case that source is a + // synchronous Observable + source.connect(onSubscribe(subscriber, writeLocked)); + } finally { + // need to cover the case where the source is subscribed to + // outside of this class thus preventing the Action1 passed + // to source.connect above being called + if (writeLocked.get()) { + // Action1 passed to source.connect was not called + lock.unlock(); + } + } + } else { + try { + // ready to subscribe to source so do it + doSubscribe(subscriber, baseSubscription); + } finally { + // release the read lock + lock.unlock(); + } + } + + } + + @NonNull + private Action1 onSubscribe(@NonNull final Subscriber subscriber, + @NonNull final AtomicBoolean writeLocked) { + return new Action1() { + @Override + public void call(@NonNull final Subscription subscription) { + + try { + baseSubscription.add(subscription); + // ready to subscribe to source so do it + doSubscribe(subscriber, baseSubscription); + } finally { + // release the write lock + lock.unlock(); + writeLocked.set(false); + } + } + }; + } + + private void doSubscribe(@NonNull final Subscriber subscriber, @NonNull final CompositeSubscription currentBase) { + subscriber.add(disconnect(currentBase)); + source.unsafeSubscribe(new Subscriber(subscriber) { + @Override + public void onError(@NonNull final Throwable throwable) { + cleanup(); + subscriber.onError(throwable); + } + + @Override + public void onNext(@Nullable final T item) { + subscriber.onNext(item); + } + + @Override + public void onCompleted() { + cleanup(); + subscriber.onCompleted(); + } + + private void cleanup() { + // on error or completion we need to unsubscribe the base subscription + // and set the subscriptionCount to 0 + lock.lock(); + try { + if (baseSubscription == currentBase) { + if (worker != null) { + worker.unsubscribe(); + worker = null; + } + baseSubscription.unsubscribe(); + baseSubscription = new CompositeSubscription(); + subscriptionCount.set(0); + } + } finally { + lock.unlock(); + } + } + }); + } + + @NonNull + private Subscription disconnect(@NonNull final CompositeSubscription current) { + return Subscriptions.create(new Action0() { + + @Override + public void call() { + lock.lock(); + try { + if (baseSubscription == current && subscriptionCount.decrementAndGet() == 0) { + if (worker != null) { + worker.unsubscribe(); + } else { + worker = scheduler.createWorker(); + } + worker.schedule(new Action0() { + @Override + public void call() { + lock.lock(); + try { + if (subscriptionCount.get() == 0) { + baseSubscription.unsubscribe(); + // need a new baseSubscription because once + // unsubscribed stays that way + worker.unsubscribe(); + worker = null; + baseSubscription = new CompositeSubscription(); + } + } finally { + lock.unlock(); + } + } + }, cacheTime, cacheTimeUnit); + } + } finally { + lock.unlock(); + } + } + }); + } +} \ No newline at end of file diff --git a/src/main/java/ru/touchin/roboswag/core/observables/storable/Storable.java b/src/main/java/ru/touchin/roboswag/core/observables/storable/Storable.java index fe10134..a4cb3c5 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/storable/Storable.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/storable/Storable.java @@ -22,8 +22,11 @@ package ru.touchin.roboswag.core.observables.storable; import android.support.annotation.NonNull; import android.support.annotation.Nullable; +import java.util.concurrent.TimeUnit; + import ru.touchin.roboswag.core.log.LcGroup; import ru.touchin.roboswag.core.observables.ObservableResult; +import ru.touchin.roboswag.core.observables.OnSubscribeRefCountWithCacheTime; import ru.touchin.roboswag.core.observables.RxUtils; import ru.touchin.roboswag.core.observables.storable.builders.MigratableStorableBuilder; import ru.touchin.roboswag.core.observables.storable.builders.NonNullStorableBuilder; @@ -54,6 +57,8 @@ public class Storable { public static final LcGroup STORABLE_LC_GROUP = new LcGroup("STORABLE"); + private static final long CACHE_TIME = TimeUnit.SECONDS.toMillis(5); + @NonNull private final TKey key; @NonNull @@ -151,7 +156,9 @@ public class Storable { .subscribeOn(storeScheduler != null ? storeScheduler : Schedulers.io()) .concatWith(newStoreValueEvent) .map(storeObject -> returnDefaultValueIfNull(storeObject, defaultValue)); - return observeStrategy == ObserveStrategy.CACHE_STORE_VALUE ? result.replay(1).refCount() : result; + return observeStrategy == ObserveStrategy.CACHE_STORE_VALUE + ? Observable.create(new OnSubscribeRefCountWithCacheTime<>(result.replay(1), CACHE_TIME, TimeUnit.MILLISECONDS)) + : result; } @NonNull @@ -173,7 +180,9 @@ public class Storable { }) .subscribeOn(storeScheduler != null ? storeScheduler : Schedulers.computation()); - return observeStrategy == ObserveStrategy.CACHE_ACTUAL_VALUE ? result.replay(1).refCount() : result; + return observeStrategy == ObserveStrategy.CACHE_ACTUAL_VALUE + ? Observable.create(new OnSubscribeRefCountWithCacheTime<>(result.replay(1), CACHE_TIME, TimeUnit.MILLISECONDS)) + : result; } /** @@ -226,6 +235,49 @@ public class Storable { return converter; } + @NonNull + private Observable internalSet(@Nullable final TObject newValue, final boolean checkForEqualityBeforeSet) { + return (checkForEqualityBeforeSet ? valueObservable.first() : Observable.just(null)) + .switchMap(value -> { + if (checkForEqualityBeforeSet && ObjectUtils.equals(value, newValue)) { + return Observable.empty(); + } + return Observable + .create(subscriber -> { + try { + final TStoreObject storeObject = converter.toStoreObject(objectClass, storeObjectClass, newValue); + store.storeObject(storeObjectClass, key, storeObject); + newStoreValueEvent.onNext(storeObject); + STORABLE_LC_GROUP.i("Value of '%s' changed from '%s' to '%s'", key, value, newValue); + subscriber.onCompleted(); + } catch (final Converter.ConversionException conversionException) { + STORABLE_LC_GROUP.w(conversionException, "Exception while converting value of '%s' from '%s' to store object", + key, newValue, store); + subscriber.onError(conversionException); + } catch (final Store.StoreException storeException) { + STORABLE_LC_GROUP.w(storeException, "Exception while trying to store value of '%s' to store %s", key, store); + subscriber.onError(storeException); + } catch (final RuntimeException throwable) { + STORABLE_LC_GROUP.assertion(throwable); + } + }); + }); + } + + /** + * Creates observable which is async setting value to store. + * It is not checking if stored value equals new value. + * In result it will be faster to not get value from store and compare but it will emit item to {@link #observe()} subscribers. + * NOTE: It could emit ONLY completed and errors events. It is not providing onNext event! + * + * @param newValue Value to set; + * @return Observable of setting process. + */ + @NonNull + public Observable forceSet(@Nullable final TObject newValue) { + return internalSet(newValue, false); + } + /** * Creates observable which is async setting value to store. * NOTE: It could emit ONLY completed and errors events. It is not providing onNext event! @@ -236,29 +288,7 @@ public class Storable { */ @NonNull public Observable set(@Nullable final TObject newValue) { - return valueObservable - .first() - .switchMap(value -> ObjectUtils.equals(value, newValue) - ? Observable.empty() - : Observable - .create(subscriber -> { - try { - final TStoreObject storeObject = converter.toStoreObject(objectClass, storeObjectClass, newValue); - store.storeObject(storeObjectClass, key, storeObject); - newStoreValueEvent.onNext(storeObject); - STORABLE_LC_GROUP.i("Value of '%s' changed from '%s' to '%s'", key, value, newValue); - subscriber.onCompleted(); - } catch (final Converter.ConversionException conversionException) { - STORABLE_LC_GROUP.w(conversionException, "Exception while converting value of '%s' from '%s' to store object", - key, newValue, store); - subscriber.onError(conversionException); - } catch (final Store.StoreException storeException) { - STORABLE_LC_GROUP.w(storeException, "Exception while trying to store value of '%s' to store %s", key, store); - subscriber.onError(storeException); - } catch (final RuntimeException throwable) { - STORABLE_LC_GROUP.assertion(throwable); - } - })); + return internalSet(newValue, true); } /** diff --git a/src/main/java/ru/touchin/roboswag/core/observables/storable/concrete/NonNullSafeListStorable.java b/src/main/java/ru/touchin/roboswag/core/observables/storable/concrete/NonNullSafeListStorable.java index 21ac7b3..ed4e584 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/storable/concrete/NonNullSafeListStorable.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/storable/concrete/NonNullSafeListStorable.java @@ -71,6 +71,14 @@ public class NonNullSafeListStorable { return storable.set(list); } + /** + * Wraps {@link Storable#forceSet(Object)} (Object)}. + */ + @NonNull + public Observable forseSet(@Nullable final List list) { + return storable.forceSet(list); + } + /** * Wraps {@link Storable#setCalm(Object)}. */