fixed storable

This commit is contained in:
Anton Domnikov 2017-03-09 19:05:10 +03:00
parent 8b498098de
commit 846fc1368f
3 changed files with 277 additions and 25 deletions

View File

@ -0,0 +1,214 @@
/**
* Copyright 2014 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package ru.touchin.roboswag.core.observables;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import rx.Observable.OnSubscribe;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.observables.ConnectableObservable;
import rx.schedulers.Schedulers;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
/**
* Returns an observable sequence that stays connected to the source as long as
* there is at least one subscription to the observable sequence and also it stays connected
* for cache time after everyone unsubscribe.
*
* @param <T> the value type
*/
@SuppressWarnings({"PMD.AvoidUsingVolatile", "PMD.CompareObjectsWithEquals"})
//AvoidUsingVolatile,CompareObjectsWithEquals: from OnSubscribeRefCount code
public final class OnSubscribeRefCountWithCacheTime<T> implements OnSubscribe<T> {
@NonNull
private final ConnectableObservable<? extends T> source;
@NonNull
private volatile CompositeSubscription baseSubscription = new CompositeSubscription();
@NonNull
private final AtomicInteger subscriptionCount = new AtomicInteger(0);
@NonNull
private final Scheduler scheduler = Schedulers.computation();
private final long cacheTime;
@NonNull
private final TimeUnit cacheTimeUnit;
@Nullable
private Scheduler.Worker worker;
/**
* Use this lock for every subscription and disconnect action.
*/
@NonNull
private final ReentrantLock lock = new ReentrantLock();
public OnSubscribeRefCountWithCacheTime(@NonNull final ConnectableObservable<? extends T> source,
final long cacheTime, @NonNull final TimeUnit cacheTimeUnit) {
this.source = source;
this.cacheTime = cacheTime;
this.cacheTimeUnit = cacheTimeUnit;
}
@Override
public void call(@NonNull final Subscriber<? super T> subscriber) {
lock.lock();
if (subscriptionCount.incrementAndGet() == 1) {
if (worker != null) {
worker.unsubscribe();
worker = null;
}
final AtomicBoolean writeLocked = new AtomicBoolean(true);
try {
// need to use this overload of connect to ensure that
// baseSubscription is set in the case that source is a
// synchronous Observable
source.connect(onSubscribe(subscriber, writeLocked));
} finally {
// need to cover the case where the source is subscribed to
// outside of this class thus preventing the Action1 passed
// to source.connect above being called
if (writeLocked.get()) {
// Action1 passed to source.connect was not called
lock.unlock();
}
}
} else {
try {
// ready to subscribe to source so do it
doSubscribe(subscriber, baseSubscription);
} finally {
// release the read lock
lock.unlock();
}
}
}
@NonNull
private Action1<Subscription> onSubscribe(@NonNull final Subscriber<? super T> subscriber,
@NonNull final AtomicBoolean writeLocked) {
return new Action1<Subscription>() {
@Override
public void call(@NonNull final Subscription subscription) {
try {
baseSubscription.add(subscription);
// ready to subscribe to source so do it
doSubscribe(subscriber, baseSubscription);
} finally {
// release the write lock
lock.unlock();
writeLocked.set(false);
}
}
};
}
private void doSubscribe(@NonNull final Subscriber<? super T> subscriber, @NonNull final CompositeSubscription currentBase) {
subscriber.add(disconnect(currentBase));
source.unsafeSubscribe(new Subscriber<T>(subscriber) {
@Override
public void onError(@NonNull final Throwable throwable) {
cleanup();
subscriber.onError(throwable);
}
@Override
public void onNext(@Nullable final T item) {
subscriber.onNext(item);
}
@Override
public void onCompleted() {
cleanup();
subscriber.onCompleted();
}
private void cleanup() {
// on error or completion we need to unsubscribe the base subscription
// and set the subscriptionCount to 0
lock.lock();
try {
if (baseSubscription == currentBase) {
if (worker != null) {
worker.unsubscribe();
worker = null;
}
baseSubscription.unsubscribe();
baseSubscription = new CompositeSubscription();
subscriptionCount.set(0);
}
} finally {
lock.unlock();
}
}
});
}
@NonNull
private Subscription disconnect(@NonNull final CompositeSubscription current) {
return Subscriptions.create(new Action0() {
@Override
public void call() {
lock.lock();
try {
if (baseSubscription == current && subscriptionCount.decrementAndGet() == 0) {
if (worker != null) {
worker.unsubscribe();
} else {
worker = scheduler.createWorker();
}
worker.schedule(new Action0() {
@Override
public void call() {
lock.lock();
try {
if (subscriptionCount.get() == 0) {
baseSubscription.unsubscribe();
// need a new baseSubscription because once
// unsubscribed stays that way
worker.unsubscribe();
worker = null;
baseSubscription = new CompositeSubscription();
}
} finally {
lock.unlock();
}
}
}, cacheTime, cacheTimeUnit);
}
} finally {
lock.unlock();
}
}
});
}
}

View File

@ -22,8 +22,11 @@ package ru.touchin.roboswag.core.observables.storable;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import java.util.concurrent.TimeUnit;
import ru.touchin.roboswag.core.log.LcGroup;
import ru.touchin.roboswag.core.observables.ObservableResult;
import ru.touchin.roboswag.core.observables.OnSubscribeRefCountWithCacheTime;
import ru.touchin.roboswag.core.observables.RxUtils;
import ru.touchin.roboswag.core.observables.storable.builders.MigratableStorableBuilder;
import ru.touchin.roboswag.core.observables.storable.builders.NonNullStorableBuilder;
@ -54,6 +57,8 @@ public class Storable<TKey, TObject, TStoreObject> {
public static final LcGroup STORABLE_LC_GROUP = new LcGroup("STORABLE");
private static final long CACHE_TIME = TimeUnit.SECONDS.toMillis(5);
@NonNull
private final TKey key;
@NonNull
@ -151,7 +156,9 @@ public class Storable<TKey, TObject, TStoreObject> {
.subscribeOn(storeScheduler != null ? storeScheduler : Schedulers.io())
.concatWith(newStoreValueEvent)
.map(storeObject -> returnDefaultValueIfNull(storeObject, defaultValue));
return observeStrategy == ObserveStrategy.CACHE_STORE_VALUE ? result.replay(1).refCount() : result;
return observeStrategy == ObserveStrategy.CACHE_STORE_VALUE
? Observable.create(new OnSubscribeRefCountWithCacheTime<>(result.replay(1), CACHE_TIME, TimeUnit.MILLISECONDS))
: result;
}
@NonNull
@ -173,7 +180,9 @@ public class Storable<TKey, TObject, TStoreObject> {
})
.subscribeOn(storeScheduler != null ? storeScheduler : Schedulers.computation());
return observeStrategy == ObserveStrategy.CACHE_ACTUAL_VALUE ? result.replay(1).refCount() : result;
return observeStrategy == ObserveStrategy.CACHE_ACTUAL_VALUE
? Observable.create(new OnSubscribeRefCountWithCacheTime<>(result.replay(1), CACHE_TIME, TimeUnit.MILLISECONDS))
: result;
}
/**
@ -226,21 +235,14 @@ public class Storable<TKey, TObject, TStoreObject> {
return converter;
}
/**
* Creates observable which is async setting value to store.
* NOTE: It could emit ONLY completed and errors events. It is not providing onNext event!
* Errors won't be emitted if {@link #getStore()} implements {@link SafeStore} and {@link #getConverter()} implements {@link SafeConverter}.
*
* @param newValue Value to set;
* @return Observable of setting process.
*/
@NonNull
public Observable<?> set(@Nullable final TObject newValue) {
return valueObservable
.first()
.switchMap(value -> ObjectUtils.equals(value, newValue)
? Observable.empty()
: Observable
private Observable<?> internalSet(@Nullable final TObject newValue, final boolean checkForEqualityBeforeSet) {
return (checkForEqualityBeforeSet ? valueObservable.first() : Observable.just(null))
.switchMap(value -> {
if (checkForEqualityBeforeSet && ObjectUtils.equals(value, newValue)) {
return Observable.empty();
}
return Observable
.<TStoreObject>create(subscriber -> {
try {
final TStoreObject storeObject = converter.toStoreObject(objectClass, storeObjectClass, newValue);
@ -258,7 +260,35 @@ public class Storable<TKey, TObject, TStoreObject> {
} catch (final RuntimeException throwable) {
STORABLE_LC_GROUP.assertion(throwable);
}
}));
});
});
}
/**
* Creates observable which is async setting value to store.
* It is not checking if stored value equals new value.
* In result it will be faster to not get value from store and compare but it will emit item to {@link #observe()} subscribers.
* NOTE: It could emit ONLY completed and errors events. It is not providing onNext event!
*
* @param newValue Value to set;
* @return Observable of setting process.
*/
@NonNull
public Observable<?> forceSet(@Nullable final TObject newValue) {
return internalSet(newValue, false);
}
/**
* Creates observable which is async setting value to store.
* NOTE: It could emit ONLY completed and errors events. It is not providing onNext event!
* Errors won't be emitted if {@link #getStore()} implements {@link SafeStore} and {@link #getConverter()} implements {@link SafeConverter}.
*
* @param newValue Value to set;
* @return Observable of setting process.
*/
@NonNull
public Observable<?> set(@Nullable final TObject newValue) {
return internalSet(newValue, true);
}
/**

View File

@ -71,6 +71,14 @@ public class NonNullSafeListStorable<TKey, TItemObject, TStoreObject> {
return storable.set(list);
}
/**
* Wraps {@link Storable#forceSet(Object)} (Object)}.
*/
@NonNull
public Observable<?> forseSet(@Nullable final List<TItemObject> list) {
return storable.forceSet(list);
}
/**
* Wraps {@link Storable#setCalm(Object)}.
*/