static fixes

This commit is contained in:
Gavriil Sitnikov 2017-04-21 00:11:32 +03:00
parent 553f4f6bfa
commit 2602d56cbd
1 changed files with 85 additions and 66 deletions

View File

@ -41,19 +41,26 @@ import io.reactivex.schedulers.Schedulers;
*
* @param <T> the value type
*/
@SuppressWarnings({"PMD.CompareObjectsWithEquals", "PMD.AvoidUsingVolatile"})
//AvoidUsingVolatile: it's RxJava code
public final class ObservableRefCountWithCacheTime<T> extends Observable<T> implements HasUpstreamObservableSource<T> {
final ConnectableObservable<? extends T> source;
private final ObservableSource<T> source2;
@NonNull
private final ConnectableObservable<? extends T> connectableSource;
@NonNull
private final ObservableSource<T> actualSource;
volatile CompositeDisposable baseDisposable = new CompositeDisposable();
@NonNull
private volatile CompositeDisposable baseDisposable = new CompositeDisposable();
final AtomicInteger subscriptionCount = new AtomicInteger();
@NonNull
private final AtomicInteger subscriptionCount = new AtomicInteger();
/**
* Use this lock for every subscription and disconnect action.
*/
final ReentrantLock lock = new ReentrantLock();
@NonNull
private final ReentrantLock lock = new ReentrantLock();
@NonNull
private final Scheduler scheduler = Schedulers.computation();
@ -68,16 +75,18 @@ public final class ObservableRefCountWithCacheTime<T> extends Observable<T> impl
*
* @param source observable to apply ref count to
*/
public ObservableRefCountWithCacheTime(ConnectableObservable<T> source,
public ObservableRefCountWithCacheTime(@NonNull final ConnectableObservable<T> source,
final long cacheTime, @NonNull final TimeUnit cacheTimeUnit) {
this.source = source;
this.source2 = source;
super();
this.connectableSource = source;
this.actualSource = source;
this.cacheTime = cacheTime;
this.cacheTimeUnit = cacheTimeUnit;
}
@NonNull
public ObservableSource<T> source() {
return source2;
return actualSource;
}
private void cleanupWorker() {
@ -88,7 +97,7 @@ public final class ObservableRefCountWithCacheTime<T> extends Observable<T> impl
}
@Override
public void subscribeActual(final Observer<? super T> subscriber) {
public void subscribeActual(@NonNull final Observer<? super T> subscriber) {
lock.lock();
if (subscriptionCount.incrementAndGet() == 1) {
@ -99,7 +108,7 @@ public final class ObservableRefCountWithCacheTime<T> extends Observable<T> impl
// need to use this overload of connect to ensure that
// baseDisposable is set in the case that source is a
// synchronous Observable
source.connect(onSubscribe(subscriber, writeLocked));
connectableSource.connect(onSubscribe(subscriber, writeLocked));
} finally {
// need to cover the case where the source is subscribed to
// outside of this class thus preventing the Action1 passed
@ -121,56 +130,59 @@ public final class ObservableRefCountWithCacheTime<T> extends Observable<T> impl
}
private Consumer<Disposable> onSubscribe(final Observer<? super T> observer,
final AtomicBoolean writeLocked) {
@NonNull
private Consumer<Disposable> onSubscribe(@NonNull final Observer<? super T> observer, @NonNull final AtomicBoolean writeLocked) {
return new DisposeConsumer(observer, writeLocked);
}
void doSubscribe(final Observer<? super T> observer, final CompositeDisposable currentBase) {
private void doSubscribe(@NonNull final Observer<? super T> observer, @NonNull final CompositeDisposable currentBase) {
// handle disposing from the base CompositeDisposable
Disposable d = disconnect(currentBase);
final Disposable disposable = disconnect(currentBase);
ConnectionObserver s = new ConnectionObserver(observer, currentBase, d);
observer.onSubscribe(s);
final ConnectionObserver connectionObserver = new ConnectionObserver(observer, currentBase, disposable);
observer.onSubscribe(connectionObserver);
source.subscribe(s);
connectableSource.subscribe(connectionObserver);
}
private Disposable disconnect(final CompositeDisposable current) {
@NonNull
private Disposable disconnect(@NonNull final CompositeDisposable current) {
return Disposables.fromRunnable(new DisposeTask(current));
}
final class ConnectionObserver
extends AtomicReference<Disposable>
implements Observer<T>, Disposable {
private final class ConnectionObserver extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 3813126992133394324L;
final Observer<? super T> subscriber;
final CompositeDisposable currentBase;
final Disposable resource;
@NonNull
private final Observer<? super T> subscriber;
@NonNull
private final CompositeDisposable currentBase;
@NonNull
private final Disposable resource;
ConnectionObserver(Observer<? super T> subscriber,
CompositeDisposable currentBase, Disposable resource) {
public ConnectionObserver(@NonNull final Observer<? super T> subscriber, @NonNull final CompositeDisposable currentBase,
@NonNull final Disposable resource) {
super();
this.subscriber = subscriber;
this.currentBase = currentBase;
this.resource = resource;
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this, s);
public void onSubscribe(@NonNull final Disposable disposable) {
DisposableHelper.setOnce(this, disposable);
}
@Override
public void onError(Throwable e) {
public void onError(@NonNull final Throwable throwable) {
cleanup();
subscriber.onError(e);
subscriber.onError(throwable);
}
@Override
public void onNext(T t) {
subscriber.onNext(t);
public void onNext(@NonNull final T item) {
subscriber.onNext(item);
}
@Override
@ -190,15 +202,15 @@ public final class ObservableRefCountWithCacheTime<T> extends Observable<T> impl
return DisposableHelper.isDisposed(get());
}
void cleanup() {
private void cleanup() {
// on error or completion we need to dispose the base CompositeDisposable
// and set the subscriptionCount to 0
lock.lock();
try {
if (baseDisposable == currentBase) {
cleanupWorker();
if (source instanceof Disposable) {
((Disposable) source).dispose();
if (connectableSource instanceof Disposable) {
((Disposable) connectableSource).dispose();
}
baseDisposable.dispose();
@ -209,19 +221,23 @@ public final class ObservableRefCountWithCacheTime<T> extends Observable<T> impl
lock.unlock();
}
}
}
final class DisposeConsumer implements Consumer<Disposable> {
private final class DisposeConsumer implements Consumer<Disposable> {
@NonNull
private final Observer<? super T> observer;
@NonNull
private final AtomicBoolean writeLocked;
DisposeConsumer(Observer<? super T> observer, AtomicBoolean writeLocked) {
public DisposeConsumer(@NonNull final Observer<? super T> observer, @NonNull final AtomicBoolean writeLocked) {
this.observer = observer;
this.writeLocked = writeLocked;
}
@Override
public void accept(Disposable subscription) {
public void accept(@NonNull final Disposable subscription) {
try {
baseDisposable.add(subscription);
// ready to subscribe to source so do it
@ -232,12 +248,15 @@ public final class ObservableRefCountWithCacheTime<T> extends Observable<T> impl
writeLocked.set(false);
}
}
}
final class DisposeTask implements Runnable {
private final class DisposeTask implements Runnable {
@NonNull
private final CompositeDisposable current;
DisposeTask(CompositeDisposable current) {
public DisposeTask(@NonNull final CompositeDisposable current) {
this.current = current;
}
@ -245,36 +264,36 @@ public final class ObservableRefCountWithCacheTime<T> extends Observable<T> impl
public void run() {
lock.lock();
try {
if (baseDisposable == current) {
if (subscriptionCount.decrementAndGet() == 0) {
if (worker != null) {
worker.dispose();
} else {
worker = scheduler.createWorker();
}
worker.schedule(() -> {
lock.lock();
try {
if (subscriptionCount.get() == 0) {
cleanupWorker();
if (source instanceof Disposable) {
((Disposable) source).dispose();
}
baseDisposable.dispose();
// need a new baseDisposable because once
// disposed stays that way
baseDisposable = new CompositeDisposable();
}
} finally {
lock.unlock();
}
}, cacheTime, cacheTimeUnit);
if (baseDisposable == current && subscriptionCount.decrementAndGet() == 0) {
if (worker != null) {
worker.dispose();
} else {
worker = scheduler.createWorker();
}
worker.schedule(() -> {
lock.lock();
try {
if (subscriptionCount.get() == 0) {
cleanupWorker();
if (connectableSource instanceof Disposable) {
((Disposable) connectableSource).dispose();
}
baseDisposable.dispose();
// need a new baseDisposable because once
// disposed stays that way
baseDisposable = new CompositeDisposable();
}
} finally {
lock.unlock();
}
}, cacheTime, cacheTimeUnit);
}
} finally {
lock.unlock();
}
}
}
}