diff --git a/src/main/java/ru/touchin/roboswag/core/observables/OnSubscribeRefCountWithCacheTime.java b/src/main/java/ru/touchin/roboswag/core/observables/OnSubscribeRefCountWithCacheTime.java
new file mode 100644
index 0000000..d428aab
--- /dev/null
+++ b/src/main/java/ru/touchin/roboswag/core/observables/OnSubscribeRefCountWithCacheTime.java
@@ -0,0 +1,214 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package ru.touchin.roboswag.core.observables;
+
+import android.support.annotation.NonNull;
+import android.support.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import rx.Observable.OnSubscribe;
+import rx.Scheduler;
+import rx.Subscriber;
+import rx.Subscription;
+import rx.functions.Action0;
+import rx.functions.Action1;
+import rx.observables.ConnectableObservable;
+import rx.schedulers.Schedulers;
+import rx.subscriptions.CompositeSubscription;
+import rx.subscriptions.Subscriptions;
+
+/**
+ * Returns an observable sequence that stays connected to the source as long as
+ * there is at least one subscription to the observable sequence and also it stays connected
+ * for cache time after everyone unsubscribe.
+ *
+ * @param the value type
+ */
+@SuppressWarnings({"PMD.AvoidUsingVolatile", "PMD.CompareObjectsWithEquals"})
+//AvoidUsingVolatile,CompareObjectsWithEquals: from OnSubscribeRefCount code
+public final class OnSubscribeRefCountWithCacheTime implements OnSubscribe {
+
+ @NonNull
+ private final ConnectableObservable extends T> source;
+ @NonNull
+ private volatile CompositeSubscription baseSubscription = new CompositeSubscription();
+ @NonNull
+ private final AtomicInteger subscriptionCount = new AtomicInteger(0);
+
+ @NonNull
+ private final Scheduler scheduler = Schedulers.computation();
+ private final long cacheTime;
+ @NonNull
+ private final TimeUnit cacheTimeUnit;
+ @Nullable
+ private Scheduler.Worker worker;
+
+ /**
+ * Use this lock for every subscription and disconnect action.
+ */
+ @NonNull
+ private final ReentrantLock lock = new ReentrantLock();
+
+ public OnSubscribeRefCountWithCacheTime(@NonNull final ConnectableObservable extends T> source,
+ final long cacheTime, @NonNull final TimeUnit cacheTimeUnit) {
+ this.source = source;
+ this.cacheTime = cacheTime;
+ this.cacheTimeUnit = cacheTimeUnit;
+ }
+
+ @Override
+ public void call(@NonNull final Subscriber super T> subscriber) {
+
+ lock.lock();
+ if (subscriptionCount.incrementAndGet() == 1) {
+ if (worker != null) {
+ worker.unsubscribe();
+ worker = null;
+ }
+ final AtomicBoolean writeLocked = new AtomicBoolean(true);
+
+ try {
+ // need to use this overload of connect to ensure that
+ // baseSubscription is set in the case that source is a
+ // synchronous Observable
+ source.connect(onSubscribe(subscriber, writeLocked));
+ } finally {
+ // need to cover the case where the source is subscribed to
+ // outside of this class thus preventing the Action1 passed
+ // to source.connect above being called
+ if (writeLocked.get()) {
+ // Action1 passed to source.connect was not called
+ lock.unlock();
+ }
+ }
+ } else {
+ try {
+ // ready to subscribe to source so do it
+ doSubscribe(subscriber, baseSubscription);
+ } finally {
+ // release the read lock
+ lock.unlock();
+ }
+ }
+
+ }
+
+ @NonNull
+ private Action1 onSubscribe(@NonNull final Subscriber super T> subscriber,
+ @NonNull final AtomicBoolean writeLocked) {
+ return new Action1() {
+ @Override
+ public void call(@NonNull final Subscription subscription) {
+
+ try {
+ baseSubscription.add(subscription);
+ // ready to subscribe to source so do it
+ doSubscribe(subscriber, baseSubscription);
+ } finally {
+ // release the write lock
+ lock.unlock();
+ writeLocked.set(false);
+ }
+ }
+ };
+ }
+
+ private void doSubscribe(@NonNull final Subscriber super T> subscriber, @NonNull final CompositeSubscription currentBase) {
+ subscriber.add(disconnect(currentBase));
+ source.unsafeSubscribe(new Subscriber(subscriber) {
+ @Override
+ public void onError(@NonNull final Throwable throwable) {
+ cleanup();
+ subscriber.onError(throwable);
+ }
+
+ @Override
+ public void onNext(@Nullable final T item) {
+ subscriber.onNext(item);
+ }
+
+ @Override
+ public void onCompleted() {
+ cleanup();
+ subscriber.onCompleted();
+ }
+
+ private void cleanup() {
+ // on error or completion we need to unsubscribe the base subscription
+ // and set the subscriptionCount to 0
+ lock.lock();
+ try {
+ if (baseSubscription == currentBase) {
+ if (worker != null) {
+ worker.unsubscribe();
+ worker = null;
+ }
+ baseSubscription.unsubscribe();
+ baseSubscription = new CompositeSubscription();
+ subscriptionCount.set(0);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ });
+ }
+
+ @NonNull
+ private Subscription disconnect(@NonNull final CompositeSubscription current) {
+ return Subscriptions.create(new Action0() {
+
+ @Override
+ public void call() {
+ lock.lock();
+ try {
+ if (baseSubscription == current && subscriptionCount.decrementAndGet() == 0) {
+ if (worker != null) {
+ worker.unsubscribe();
+ } else {
+ worker = scheduler.createWorker();
+ }
+ worker.schedule(new Action0() {
+ @Override
+ public void call() {
+ lock.lock();
+ try {
+ if (subscriptionCount.get() == 0) {
+ baseSubscription.unsubscribe();
+ // need a new baseSubscription because once
+ // unsubscribed stays that way
+ worker.unsubscribe();
+ worker = null;
+ baseSubscription = new CompositeSubscription();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }, cacheTime, cacheTimeUnit);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ });
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/ru/touchin/roboswag/core/observables/storable/Storable.java b/src/main/java/ru/touchin/roboswag/core/observables/storable/Storable.java
index fe10134..a4cb3c5 100644
--- a/src/main/java/ru/touchin/roboswag/core/observables/storable/Storable.java
+++ b/src/main/java/ru/touchin/roboswag/core/observables/storable/Storable.java
@@ -22,8 +22,11 @@ package ru.touchin.roboswag.core.observables.storable;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
+import java.util.concurrent.TimeUnit;
+
import ru.touchin.roboswag.core.log.LcGroup;
import ru.touchin.roboswag.core.observables.ObservableResult;
+import ru.touchin.roboswag.core.observables.OnSubscribeRefCountWithCacheTime;
import ru.touchin.roboswag.core.observables.RxUtils;
import ru.touchin.roboswag.core.observables.storable.builders.MigratableStorableBuilder;
import ru.touchin.roboswag.core.observables.storable.builders.NonNullStorableBuilder;
@@ -54,6 +57,8 @@ public class Storable {
public static final LcGroup STORABLE_LC_GROUP = new LcGroup("STORABLE");
+ private static final long CACHE_TIME = TimeUnit.SECONDS.toMillis(5);
+
@NonNull
private final TKey key;
@NonNull
@@ -151,7 +156,9 @@ public class Storable {
.subscribeOn(storeScheduler != null ? storeScheduler : Schedulers.io())
.concatWith(newStoreValueEvent)
.map(storeObject -> returnDefaultValueIfNull(storeObject, defaultValue));
- return observeStrategy == ObserveStrategy.CACHE_STORE_VALUE ? result.replay(1).refCount() : result;
+ return observeStrategy == ObserveStrategy.CACHE_STORE_VALUE
+ ? Observable.create(new OnSubscribeRefCountWithCacheTime<>(result.replay(1), CACHE_TIME, TimeUnit.MILLISECONDS))
+ : result;
}
@NonNull
@@ -173,7 +180,9 @@ public class Storable {
})
.subscribeOn(storeScheduler != null ? storeScheduler : Schedulers.computation());
- return observeStrategy == ObserveStrategy.CACHE_ACTUAL_VALUE ? result.replay(1).refCount() : result;
+ return observeStrategy == ObserveStrategy.CACHE_ACTUAL_VALUE
+ ? Observable.create(new OnSubscribeRefCountWithCacheTime<>(result.replay(1), CACHE_TIME, TimeUnit.MILLISECONDS))
+ : result;
}
/**
@@ -226,6 +235,49 @@ public class Storable {
return converter;
}
+ @NonNull
+ private Observable> internalSet(@Nullable final TObject newValue, final boolean checkForEqualityBeforeSet) {
+ return (checkForEqualityBeforeSet ? valueObservable.first() : Observable.just(null))
+ .switchMap(value -> {
+ if (checkForEqualityBeforeSet && ObjectUtils.equals(value, newValue)) {
+ return Observable.empty();
+ }
+ return Observable
+ .create(subscriber -> {
+ try {
+ final TStoreObject storeObject = converter.toStoreObject(objectClass, storeObjectClass, newValue);
+ store.storeObject(storeObjectClass, key, storeObject);
+ newStoreValueEvent.onNext(storeObject);
+ STORABLE_LC_GROUP.i("Value of '%s' changed from '%s' to '%s'", key, value, newValue);
+ subscriber.onCompleted();
+ } catch (final Converter.ConversionException conversionException) {
+ STORABLE_LC_GROUP.w(conversionException, "Exception while converting value of '%s' from '%s' to store object",
+ key, newValue, store);
+ subscriber.onError(conversionException);
+ } catch (final Store.StoreException storeException) {
+ STORABLE_LC_GROUP.w(storeException, "Exception while trying to store value of '%s' to store %s", key, store);
+ subscriber.onError(storeException);
+ } catch (final RuntimeException throwable) {
+ STORABLE_LC_GROUP.assertion(throwable);
+ }
+ });
+ });
+ }
+
+ /**
+ * Creates observable which is async setting value to store.
+ * It is not checking if stored value equals new value.
+ * In result it will be faster to not get value from store and compare but it will emit item to {@link #observe()} subscribers.
+ * NOTE: It could emit ONLY completed and errors events. It is not providing onNext event!
+ *
+ * @param newValue Value to set;
+ * @return Observable of setting process.
+ */
+ @NonNull
+ public Observable> forceSet(@Nullable final TObject newValue) {
+ return internalSet(newValue, false);
+ }
+
/**
* Creates observable which is async setting value to store.
* NOTE: It could emit ONLY completed and errors events. It is not providing onNext event!
@@ -236,29 +288,7 @@ public class Storable {
*/
@NonNull
public Observable> set(@Nullable final TObject newValue) {
- return valueObservable
- .first()
- .switchMap(value -> ObjectUtils.equals(value, newValue)
- ? Observable.empty()
- : Observable
- .create(subscriber -> {
- try {
- final TStoreObject storeObject = converter.toStoreObject(objectClass, storeObjectClass, newValue);
- store.storeObject(storeObjectClass, key, storeObject);
- newStoreValueEvent.onNext(storeObject);
- STORABLE_LC_GROUP.i("Value of '%s' changed from '%s' to '%s'", key, value, newValue);
- subscriber.onCompleted();
- } catch (final Converter.ConversionException conversionException) {
- STORABLE_LC_GROUP.w(conversionException, "Exception while converting value of '%s' from '%s' to store object",
- key, newValue, store);
- subscriber.onError(conversionException);
- } catch (final Store.StoreException storeException) {
- STORABLE_LC_GROUP.w(storeException, "Exception while trying to store value of '%s' to store %s", key, store);
- subscriber.onError(storeException);
- } catch (final RuntimeException throwable) {
- STORABLE_LC_GROUP.assertion(throwable);
- }
- }));
+ return internalSet(newValue, true);
}
/**
diff --git a/src/main/java/ru/touchin/roboswag/core/observables/storable/concrete/NonNullSafeListStorable.java b/src/main/java/ru/touchin/roboswag/core/observables/storable/concrete/NonNullSafeListStorable.java
index 21ac7b3..ed4e584 100644
--- a/src/main/java/ru/touchin/roboswag/core/observables/storable/concrete/NonNullSafeListStorable.java
+++ b/src/main/java/ru/touchin/roboswag/core/observables/storable/concrete/NonNullSafeListStorable.java
@@ -71,6 +71,14 @@ public class NonNullSafeListStorable {
return storable.set(list);
}
+ /**
+ * Wraps {@link Storable#forceSet(Object)} (Object)}.
+ */
+ @NonNull
+ public Observable> forseSet(@Nullable final List list) {
+ return storable.forceSet(list);
+ }
+
/**
* Wraps {@link Storable#setCalm(Object)}.
*/