From 2602d56cbdc19f60c2869ac85904293fe56530ca Mon Sep 17 00:00:00 2001 From: Gavriil Sitnikov Date: Fri, 21 Apr 2017 00:11:32 +0300 Subject: [PATCH] static fixes --- .../ObservableRefCountWithCacheTime.java | 151 ++++++++++-------- 1 file changed, 85 insertions(+), 66 deletions(-) diff --git a/src/main/java/ru/touchin/roboswag/core/observables/ObservableRefCountWithCacheTime.java b/src/main/java/ru/touchin/roboswag/core/observables/ObservableRefCountWithCacheTime.java index 1e580a1..08fb9c4 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/ObservableRefCountWithCacheTime.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/ObservableRefCountWithCacheTime.java @@ -41,19 +41,26 @@ import io.reactivex.schedulers.Schedulers; * * @param the value type */ +@SuppressWarnings({"PMD.CompareObjectsWithEquals", "PMD.AvoidUsingVolatile"}) +//AvoidUsingVolatile: it's RxJava code public final class ObservableRefCountWithCacheTime extends Observable implements HasUpstreamObservableSource { - final ConnectableObservable source; - private final ObservableSource source2; + @NonNull + private final ConnectableObservable connectableSource; + @NonNull + private final ObservableSource actualSource; - volatile CompositeDisposable baseDisposable = new CompositeDisposable(); + @NonNull + private volatile CompositeDisposable baseDisposable = new CompositeDisposable(); - final AtomicInteger subscriptionCount = new AtomicInteger(); + @NonNull + private final AtomicInteger subscriptionCount = new AtomicInteger(); /** * Use this lock for every subscription and disconnect action. */ - final ReentrantLock lock = new ReentrantLock(); + @NonNull + private final ReentrantLock lock = new ReentrantLock(); @NonNull private final Scheduler scheduler = Schedulers.computation(); @@ -68,16 +75,18 @@ public final class ObservableRefCountWithCacheTime extends Observable impl * * @param source observable to apply ref count to */ - public ObservableRefCountWithCacheTime(ConnectableObservable source, + public ObservableRefCountWithCacheTime(@NonNull final ConnectableObservable source, final long cacheTime, @NonNull final TimeUnit cacheTimeUnit) { - this.source = source; - this.source2 = source; + super(); + this.connectableSource = source; + this.actualSource = source; this.cacheTime = cacheTime; this.cacheTimeUnit = cacheTimeUnit; } + @NonNull public ObservableSource source() { - return source2; + return actualSource; } private void cleanupWorker() { @@ -88,7 +97,7 @@ public final class ObservableRefCountWithCacheTime extends Observable impl } @Override - public void subscribeActual(final Observer subscriber) { + public void subscribeActual(@NonNull final Observer subscriber) { lock.lock(); if (subscriptionCount.incrementAndGet() == 1) { @@ -99,7 +108,7 @@ public final class ObservableRefCountWithCacheTime extends Observable impl // need to use this overload of connect to ensure that // baseDisposable is set in the case that source is a // synchronous Observable - source.connect(onSubscribe(subscriber, writeLocked)); + connectableSource.connect(onSubscribe(subscriber, writeLocked)); } finally { // need to cover the case where the source is subscribed to // outside of this class thus preventing the Action1 passed @@ -121,56 +130,59 @@ public final class ObservableRefCountWithCacheTime extends Observable impl } - private Consumer onSubscribe(final Observer observer, - final AtomicBoolean writeLocked) { + @NonNull + private Consumer onSubscribe(@NonNull final Observer observer, @NonNull final AtomicBoolean writeLocked) { return new DisposeConsumer(observer, writeLocked); } - void doSubscribe(final Observer observer, final CompositeDisposable currentBase) { + private void doSubscribe(@NonNull final Observer observer, @NonNull final CompositeDisposable currentBase) { // handle disposing from the base CompositeDisposable - Disposable d = disconnect(currentBase); + final Disposable disposable = disconnect(currentBase); - ConnectionObserver s = new ConnectionObserver(observer, currentBase, d); - observer.onSubscribe(s); + final ConnectionObserver connectionObserver = new ConnectionObserver(observer, currentBase, disposable); + observer.onSubscribe(connectionObserver); - source.subscribe(s); + connectableSource.subscribe(connectionObserver); } - private Disposable disconnect(final CompositeDisposable current) { + @NonNull + private Disposable disconnect(@NonNull final CompositeDisposable current) { return Disposables.fromRunnable(new DisposeTask(current)); } - final class ConnectionObserver - extends AtomicReference - implements Observer, Disposable { + private final class ConnectionObserver extends AtomicReference implements Observer, Disposable { private static final long serialVersionUID = 3813126992133394324L; - final Observer subscriber; - final CompositeDisposable currentBase; - final Disposable resource; + @NonNull + private final Observer subscriber; + @NonNull + private final CompositeDisposable currentBase; + @NonNull + private final Disposable resource; - ConnectionObserver(Observer subscriber, - CompositeDisposable currentBase, Disposable resource) { + public ConnectionObserver(@NonNull final Observer subscriber, @NonNull final CompositeDisposable currentBase, + @NonNull final Disposable resource) { + super(); this.subscriber = subscriber; this.currentBase = currentBase; this.resource = resource; } @Override - public void onSubscribe(Disposable s) { - DisposableHelper.setOnce(this, s); + public void onSubscribe(@NonNull final Disposable disposable) { + DisposableHelper.setOnce(this, disposable); } @Override - public void onError(Throwable e) { + public void onError(@NonNull final Throwable throwable) { cleanup(); - subscriber.onError(e); + subscriber.onError(throwable); } @Override - public void onNext(T t) { - subscriber.onNext(t); + public void onNext(@NonNull final T item) { + subscriber.onNext(item); } @Override @@ -190,15 +202,15 @@ public final class ObservableRefCountWithCacheTime extends Observable impl return DisposableHelper.isDisposed(get()); } - void cleanup() { + private void cleanup() { // on error or completion we need to dispose the base CompositeDisposable // and set the subscriptionCount to 0 lock.lock(); try { if (baseDisposable == currentBase) { cleanupWorker(); - if (source instanceof Disposable) { - ((Disposable) source).dispose(); + if (connectableSource instanceof Disposable) { + ((Disposable) connectableSource).dispose(); } baseDisposable.dispose(); @@ -209,19 +221,23 @@ public final class ObservableRefCountWithCacheTime extends Observable impl lock.unlock(); } } + } - final class DisposeConsumer implements Consumer { + private final class DisposeConsumer implements Consumer { + + @NonNull private final Observer observer; + @NonNull private final AtomicBoolean writeLocked; - DisposeConsumer(Observer observer, AtomicBoolean writeLocked) { + public DisposeConsumer(@NonNull final Observer observer, @NonNull final AtomicBoolean writeLocked) { this.observer = observer; this.writeLocked = writeLocked; } @Override - public void accept(Disposable subscription) { + public void accept(@NonNull final Disposable subscription) { try { baseDisposable.add(subscription); // ready to subscribe to source so do it @@ -232,12 +248,15 @@ public final class ObservableRefCountWithCacheTime extends Observable impl writeLocked.set(false); } } + } - final class DisposeTask implements Runnable { + private final class DisposeTask implements Runnable { + + @NonNull private final CompositeDisposable current; - DisposeTask(CompositeDisposable current) { + public DisposeTask(@NonNull final CompositeDisposable current) { this.current = current; } @@ -245,36 +264,36 @@ public final class ObservableRefCountWithCacheTime extends Observable impl public void run() { lock.lock(); try { - if (baseDisposable == current) { - if (subscriptionCount.decrementAndGet() == 0) { - if (worker != null) { - worker.dispose(); - } else { - worker = scheduler.createWorker(); - } - worker.schedule(() -> { - lock.lock(); - try { - if (subscriptionCount.get() == 0) { - cleanupWorker(); - if (source instanceof Disposable) { - ((Disposable) source).dispose(); - } - - baseDisposable.dispose(); - // need a new baseDisposable because once - // disposed stays that way - baseDisposable = new CompositeDisposable(); - } - } finally { - lock.unlock(); - } - }, cacheTime, cacheTimeUnit); + if (baseDisposable == current && subscriptionCount.decrementAndGet() == 0) { + if (worker != null) { + worker.dispose(); + } else { + worker = scheduler.createWorker(); } + worker.schedule(() -> { + lock.lock(); + try { + if (subscriptionCount.get() == 0) { + cleanupWorker(); + if (connectableSource instanceof Disposable) { + ((Disposable) connectableSource).dispose(); + } + + baseDisposable.dispose(); + // need a new baseDisposable because once + // disposed stays that way + baseDisposable = new CompositeDisposable(); + } + } finally { + lock.unlock(); + } + }, cacheTime, cacheTimeUnit); } } finally { lock.unlock(); } } + } + } \ No newline at end of file