diff --git a/build.gradle b/build.gradle index e711b15..d73e7ba 100644 --- a/build.gradle +++ b/build.gradle @@ -18,6 +18,6 @@ android { dependencies { provided 'com.android.support:support-annotations:25.3.1' - provided 'io.reactivex:rxandroid:1.2.1' - provided 'io.reactivex:rxjava:1.2.9' + provided 'io.reactivex.rxjava2:rxandroid:2.0.1' + provided 'io.reactivex.rxjava2:rxjava:2.0.8' } diff --git a/src/main/java/ru/touchin/roboswag/core/observables/BaseChangeable.java b/src/main/java/ru/touchin/roboswag/core/observables/BaseChangeable.java index 86ca15d..5c650ac 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/BaseChangeable.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/BaseChangeable.java @@ -29,8 +29,8 @@ import java.io.Serializable; import ru.touchin.roboswag.core.utils.ObjectUtils; import ru.touchin.roboswag.core.utils.Optional; -import rx.Observable; -import rx.subjects.BehaviorSubject; +import io.reactivex.Observable; +import io.reactivex.subjects.BehaviorSubject; /** * Created by Gavriil Sitnikov on 24/03/2016. @@ -47,7 +47,7 @@ public abstract class BaseChangeable implements Serializab private transient BehaviorSubject> valueSubject; public BaseChangeable(@Nullable final TValue defaultValue) { - valueSubject = BehaviorSubject.create(new Optional<>(defaultValue)); + valueSubject = BehaviorSubject.createDefault(new Optional<>(defaultValue)); } @NonNull @@ -88,7 +88,7 @@ public abstract class BaseChangeable implements Serializab @SuppressWarnings("unchecked") private void readObject(@NonNull final ObjectInputStream inputStream) throws IOException, ClassNotFoundException { - valueSubject = BehaviorSubject.create((Optional) inputStream.readObject()); + valueSubject = BehaviorSubject.createDefault((Optional) inputStream.readObject()); } @Override diff --git a/src/main/java/ru/touchin/roboswag/core/observables/Changeable.java b/src/main/java/ru/touchin/roboswag/core/observables/Changeable.java index 243009c..ad3cdea 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/Changeable.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/Changeable.java @@ -22,16 +22,15 @@ package ru.touchin.roboswag.core.observables; import android.support.annotation.NonNull; import android.support.annotation.Nullable; +import io.reactivex.Observable; import ru.touchin.roboswag.core.utils.Optional; -import rx.Observable; /** * Created by Gavriil Sitnikov on 24/03/2016. * Variant of {@link BaseChangeable} which is allows to set nullable values. * Needed to separate non-null Changeable from nullable Changeable. */ -//COMPATIBILITY NOTE: in RxJava2 it should extends BaseChangeable> -public class Changeable extends BaseChangeable { +public class Changeable extends BaseChangeable> { public Changeable(@Nullable final T defaultValue) { super(defaultValue); @@ -44,9 +43,8 @@ public class Changeable extends BaseChangeable { */ @NonNull @Override - //COMPATIBILITY NOTE: in RxJava2 it should be Observable> - public Observable observe() { - return observeOptionalValue().map(Optional::get); + public Observable> observe() { + return observeOptionalValue(); } } \ No newline at end of file diff --git a/src/main/java/ru/touchin/roboswag/core/observables/NonNullChangeable.java b/src/main/java/ru/touchin/roboswag/core/observables/NonNullChangeable.java index 731913d..f3bb38a 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/NonNullChangeable.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/NonNullChangeable.java @@ -23,7 +23,7 @@ import android.support.annotation.NonNull; import ru.touchin.roboswag.core.log.Lc; import ru.touchin.roboswag.core.utils.ShouldNotHappenException; -import rx.Observable; +import io.reactivex.Observable; /** * Created by Gavriil Sitnikov on 24/03/2016. diff --git a/src/main/java/ru/touchin/roboswag/core/observables/ObservableRefCountWithCacheTime.java b/src/main/java/ru/touchin/roboswag/core/observables/ObservableRefCountWithCacheTime.java new file mode 100644 index 0000000..1e580a1 --- /dev/null +++ b/src/main/java/ru/touchin/roboswag/core/observables/ObservableRefCountWithCacheTime.java @@ -0,0 +1,280 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package ru.touchin.roboswag.core.observables; + +import android.support.annotation.NonNull; +import android.support.annotation.Nullable; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +import io.reactivex.Observable; +import io.reactivex.ObservableSource; +import io.reactivex.Observer; +import io.reactivex.Scheduler; +import io.reactivex.disposables.CompositeDisposable; +import io.reactivex.disposables.Disposable; +import io.reactivex.disposables.Disposables; +import io.reactivex.functions.Consumer; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.fuseable.HasUpstreamObservableSource; +import io.reactivex.observables.ConnectableObservable; +import io.reactivex.schedulers.Schedulers; + +/** + * Returns an observable sequence that stays connected to the source as long as + * there is at least one subscription to the observable sequence. + * + * @param the value type + */ +public final class ObservableRefCountWithCacheTime extends Observable implements HasUpstreamObservableSource { + + final ConnectableObservable source; + private final ObservableSource source2; + + volatile CompositeDisposable baseDisposable = new CompositeDisposable(); + + final AtomicInteger subscriptionCount = new AtomicInteger(); + + /** + * Use this lock for every subscription and disconnect action. + */ + final ReentrantLock lock = new ReentrantLock(); + + @NonNull + private final Scheduler scheduler = Schedulers.computation(); + private final long cacheTime; + @NonNull + private final TimeUnit cacheTimeUnit; + @Nullable + private Scheduler.Worker worker; + + /** + * Constructor. + * + * @param source observable to apply ref count to + */ + public ObservableRefCountWithCacheTime(ConnectableObservable source, + final long cacheTime, @NonNull final TimeUnit cacheTimeUnit) { + this.source = source; + this.source2 = source; + this.cacheTime = cacheTime; + this.cacheTimeUnit = cacheTimeUnit; + } + + public ObservableSource source() { + return source2; + } + + private void cleanupWorker() { + if (worker != null) { + worker.dispose(); + worker = null; + } + } + + @Override + public void subscribeActual(final Observer subscriber) { + + lock.lock(); + if (subscriptionCount.incrementAndGet() == 1) { + cleanupWorker(); + final AtomicBoolean writeLocked = new AtomicBoolean(true); + + try { + // need to use this overload of connect to ensure that + // baseDisposable is set in the case that source is a + // synchronous Observable + source.connect(onSubscribe(subscriber, writeLocked)); + } finally { + // need to cover the case where the source is subscribed to + // outside of this class thus preventing the Action1 passed + // to source.connect above being called + if (writeLocked.get()) { + // Action1 passed to source.connect was not called + lock.unlock(); + } + } + } else { + try { + // ready to subscribe to source so do it + doSubscribe(subscriber, baseDisposable); + } finally { + // release the read lock + lock.unlock(); + } + } + + } + + private Consumer onSubscribe(final Observer observer, + final AtomicBoolean writeLocked) { + return new DisposeConsumer(observer, writeLocked); + } + + void doSubscribe(final Observer observer, final CompositeDisposable currentBase) { + // handle disposing from the base CompositeDisposable + Disposable d = disconnect(currentBase); + + ConnectionObserver s = new ConnectionObserver(observer, currentBase, d); + observer.onSubscribe(s); + + source.subscribe(s); + } + + private Disposable disconnect(final CompositeDisposable current) { + return Disposables.fromRunnable(new DisposeTask(current)); + } + + final class ConnectionObserver + extends AtomicReference + implements Observer, Disposable { + + private static final long serialVersionUID = 3813126992133394324L; + + final Observer subscriber; + final CompositeDisposable currentBase; + final Disposable resource; + + ConnectionObserver(Observer subscriber, + CompositeDisposable currentBase, Disposable resource) { + this.subscriber = subscriber; + this.currentBase = currentBase; + this.resource = resource; + } + + @Override + public void onSubscribe(Disposable s) { + DisposableHelper.setOnce(this, s); + } + + @Override + public void onError(Throwable e) { + cleanup(); + subscriber.onError(e); + } + + @Override + public void onNext(T t) { + subscriber.onNext(t); + } + + @Override + public void onComplete() { + cleanup(); + subscriber.onComplete(); + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + resource.dispose(); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + + void cleanup() { + // on error or completion we need to dispose the base CompositeDisposable + // and set the subscriptionCount to 0 + lock.lock(); + try { + if (baseDisposable == currentBase) { + cleanupWorker(); + if (source instanceof Disposable) { + ((Disposable) source).dispose(); + } + + baseDisposable.dispose(); + baseDisposable = new CompositeDisposable(); + subscriptionCount.set(0); + } + } finally { + lock.unlock(); + } + } + } + + final class DisposeConsumer implements Consumer { + private final Observer observer; + private final AtomicBoolean writeLocked; + + DisposeConsumer(Observer observer, AtomicBoolean writeLocked) { + this.observer = observer; + this.writeLocked = writeLocked; + } + + @Override + public void accept(Disposable subscription) { + try { + baseDisposable.add(subscription); + // ready to subscribe to source so do it + doSubscribe(observer, baseDisposable); + } finally { + // release the write lock + lock.unlock(); + writeLocked.set(false); + } + } + } + + final class DisposeTask implements Runnable { + private final CompositeDisposable current; + + DisposeTask(CompositeDisposable current) { + this.current = current; + } + + @Override + public void run() { + lock.lock(); + try { + if (baseDisposable == current) { + if (subscriptionCount.decrementAndGet() == 0) { + if (worker != null) { + worker.dispose(); + } else { + worker = scheduler.createWorker(); + } + worker.schedule(() -> { + lock.lock(); + try { + if (subscriptionCount.get() == 0) { + cleanupWorker(); + if (source instanceof Disposable) { + ((Disposable) source).dispose(); + } + + baseDisposable.dispose(); + // need a new baseDisposable because once + // disposed stays that way + baseDisposable = new CompositeDisposable(); + } + } finally { + lock.unlock(); + } + }, cacheTime, cacheTimeUnit); + } + } + } finally { + lock.unlock(); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/ru/touchin/roboswag/core/observables/OnSubscribeRefCountWithCacheTime.java b/src/main/java/ru/touchin/roboswag/core/observables/OnSubscribeRefCountWithCacheTime.java deleted file mode 100644 index e6b7de2..0000000 --- a/src/main/java/ru/touchin/roboswag/core/observables/OnSubscribeRefCountWithCacheTime.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package ru.touchin.roboswag.core.observables; - -import android.support.annotation.NonNull; -import android.support.annotation.Nullable; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantLock; - -import rx.Observable.OnSubscribe; -import rx.Scheduler; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Action1; -import rx.observables.ConnectableObservable; -import rx.schedulers.Schedulers; -import rx.subscriptions.CompositeSubscription; -import rx.subscriptions.Subscriptions; - -/** - * Returns an observable sequence that stays connected to the source as long as - * there is at least one subscription to the observable sequence and also it stays connected - * for cache time after everyone unsubscribe. - * - * @param the value type - */ -@SuppressWarnings({"PMD.AvoidUsingVolatile", "PMD.CompareObjectsWithEquals"}) -//AvoidUsingVolatile,CompareObjectsWithEquals: from OnSubscribeRefCount code -public final class OnSubscribeRefCountWithCacheTime implements OnSubscribe { - - @NonNull - private final ConnectableObservable source; - @NonNull - private volatile CompositeSubscription baseSubscription = new CompositeSubscription(); - @NonNull - private final AtomicInteger subscriptionCount = new AtomicInteger(0); - - @NonNull - private final Scheduler scheduler = Schedulers.computation(); - private final long cacheTime; - @NonNull - private final TimeUnit cacheTimeUnit; - @Nullable - private Scheduler.Worker worker; - - /** - * Use this lock for every subscription and disconnect action. - */ - @NonNull - private final ReentrantLock lock = new ReentrantLock(); - - public OnSubscribeRefCountWithCacheTime(@NonNull final ConnectableObservable source, - final long cacheTime, @NonNull final TimeUnit cacheTimeUnit) { - this.source = source; - this.cacheTime = cacheTime; - this.cacheTimeUnit = cacheTimeUnit; - } - - @Override - public void call(@NonNull final Subscriber subscriber) { - - lock.lock(); - if (subscriptionCount.incrementAndGet() == 1) { - if (worker != null) { - worker.unsubscribe(); - worker = null; - } - final AtomicBoolean writeLocked = new AtomicBoolean(true); - - try { - // need to use this overload of connect to ensure that - // baseSubscription is set in the case that source is a - // synchronous Observable - source.connect(onSubscribe(subscriber, writeLocked)); - } finally { - // need to cover the case where the source is subscribed to - // outside of this class thus preventing the Action1 passed - // to source.connect above being called - if (writeLocked.get()) { - // Action1 passed to source.connect was not called - lock.unlock(); - } - } - } else { - try { - // ready to subscribe to source so do it - doSubscribe(subscriber, baseSubscription); - } finally { - // release the read lock - lock.unlock(); - } - } - - } - - @NonNull - private Action1 onSubscribe(@NonNull final Subscriber subscriber, - @NonNull final AtomicBoolean writeLocked) { - return subscription -> { - try { - baseSubscription.add(subscription); - // ready to subscribe to source so do it - doSubscribe(subscriber, baseSubscription); - } finally { - // release the write lock - lock.unlock(); - writeLocked.set(false); - } - }; - } - - private void doSubscribe(@NonNull final Subscriber subscriber, @NonNull final CompositeSubscription currentBase) { - subscriber.add(disconnect(currentBase)); - source.unsafeSubscribe(new Subscriber(subscriber) { - @Override - public void onError(@NonNull final Throwable throwable) { - cleanup(); - subscriber.onError(throwable); - } - - @Override - public void onNext(@Nullable final T item) { - subscriber.onNext(item); - } - - @Override - public void onCompleted() { - cleanup(); - subscriber.onCompleted(); - } - - private void cleanup() { - // on error or completion we need to unsubscribe the base subscription and set the subscriptionCount to 0 - lock.lock(); - try { - if (baseSubscription == currentBase) { - cleanupWorker(); - // backdoor into the ConnectableObservable to cleanup and reset its state - if (source instanceof Subscription) { - ((Subscription) source).unsubscribe(); - } - baseSubscription.unsubscribe(); - baseSubscription = new CompositeSubscription(); - subscriptionCount.set(0); - } - } finally { - lock.unlock(); - } - } - }); - } - - @NonNull - private Subscription disconnect(@NonNull final CompositeSubscription current) { - return Subscriptions.create(() -> { - lock.lock(); - try { - if (baseSubscription == current && subscriptionCount.decrementAndGet() == 0) { - if (worker != null) { - worker.unsubscribe(); - } else { - worker = scheduler.createWorker(); - } - worker.schedule(() -> { - lock.lock(); - try { - if (subscriptionCount.get() == 0) { - cleanupWorker(); - // backdoor into the ConnectableObservable to cleanup and reset its state - if (source instanceof Subscription) { - ((Subscription) source).unsubscribe(); - } - baseSubscription.unsubscribe(); - // need a new baseSubscription because once - // unsubscribed stays that way - baseSubscription = new CompositeSubscription(); - } - } finally { - lock.unlock(); - } - }, cacheTime, cacheTimeUnit); - } - } finally { - lock.unlock(); - } - }); - } - - private void cleanupWorker() { - if (worker != null) { - worker.unsubscribe(); - worker = null; - } - } - -} \ No newline at end of file diff --git a/src/main/java/ru/touchin/roboswag/core/observables/RxAndroidUtils.java b/src/main/java/ru/touchin/roboswag/core/observables/RxAndroidUtils.java index ef35ab1..bc674b2 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/RxAndroidUtils.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/RxAndroidUtils.java @@ -33,12 +33,12 @@ import android.support.annotation.Nullable; import java.util.concurrent.CountDownLatch; +import io.reactivex.Emitter; +import io.reactivex.Observable; +import io.reactivex.Scheduler; +import io.reactivex.android.schedulers.AndroidSchedulers; import ru.touchin.roboswag.core.log.Lc; import ru.touchin.roboswag.core.utils.ServiceBinder; -import rx.Emitter; -import rx.Observable; -import rx.Scheduler; -import rx.android.schedulers.AndroidSchedulers; /** * Created by Gavriil Sitnikov on 10/01/2016. @@ -62,8 +62,8 @@ public final class RxAndroidUtils { .create(emitter -> { onSubscribeServiceConnection.emitter = emitter; context.bindService(new Intent(context, serviceClass), onSubscribeServiceConnection, Context.BIND_AUTO_CREATE); - }, Emitter.BackpressureMode.LATEST) - .doOnUnsubscribe(() -> { + }) + .doOnDispose(() -> { context.unbindService(onSubscribeServiceConnection); onSubscribeServiceConnection.emitter = null; })) @@ -87,8 +87,8 @@ public final class RxAndroidUtils { .create(emitter -> { onOnSubscribeBroadcastReceiver.emitter = emitter; context.registerReceiver(onOnSubscribeBroadcastReceiver, intentFilter); - }, Emitter.BackpressureMode.LATEST) - .doOnUnsubscribe(() -> { + }) + .doOnDispose(() -> { context.unregisterReceiver(onOnSubscribeBroadcastReceiver); onOnSubscribeBroadcastReceiver.emitter = null; })) diff --git a/src/main/java/ru/touchin/roboswag/core/observables/collections/ObservableCollection.java b/src/main/java/ru/touchin/roboswag/core/observables/collections/ObservableCollection.java index d759085..4aee98c 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/collections/ObservableCollection.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/collections/ObservableCollection.java @@ -28,8 +28,8 @@ import java.io.ObjectOutputStream; import java.util.Collection; import java.util.Collections; -import rx.Emitter; -import rx.Observable; +import io.reactivex.Emitter; +import io.reactivex.Observable; /** * Created by Gavriil Sitnikov on 23/05/16. @@ -57,8 +57,8 @@ public abstract class ObservableCollection { @NonNull private Observable> createChangesObservable() { return Observable - .>create(emitter -> this.changesEmitter = emitter, Emitter.BackpressureMode.BUFFER) - .doOnUnsubscribe(() -> this.changesEmitter = null) + .>create(emitter -> this.changesEmitter = emitter) + .doOnDispose(() -> this.changesEmitter = null) .share(); } diff --git a/src/main/java/ru/touchin/roboswag/core/observables/collections/ObservableFilteredList.java b/src/main/java/ru/touchin/roboswag/core/observables/collections/ObservableFilteredList.java index 0d4ff8f..ce67884 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/collections/ObservableFilteredList.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/collections/ObservableFilteredList.java @@ -8,9 +8,10 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import rx.Subscription; -import rx.functions.Func1; -import rx.schedulers.Schedulers; +import io.reactivex.disposables.Disposable; +import io.reactivex.functions.Function; +import io.reactivex.schedulers.Schedulers; +import ru.touchin.roboswag.core.log.Lc; /** * Created by Gavriil Sitnikov on 02/06/2016. @@ -23,15 +24,19 @@ public class ObservableFilteredList extends ObservableCollection { @NonNull private static List filterCollection(@NonNull final Collection sourceCollection, - @Nullable final Func1 filter) { + @Nullable final Function filter) { if (filter == null) { return new ArrayList<>(sourceCollection); } final List result = new ArrayList<>(sourceCollection.size()); - for (final TItem item : sourceCollection) { - if (filter.call(item)) { - result.add(item); + try { + for (final TItem item : sourceCollection) { + if (filter.apply(item)) { + result.add(item); + } } + } catch (final Exception exception) { + Lc.assertion(exception); } return result; } @@ -41,19 +46,19 @@ public class ObservableFilteredList extends ObservableCollection { @NonNull private ObservableCollection sourceCollection; @Nullable - private Func1 filter; + private Function filter; @Nullable - private Subscription sourceCollectionSubscription; + private Disposable sourceCollectionSubscription; - public ObservableFilteredList(@NonNull final Func1 filter) { + public ObservableFilteredList(@NonNull final Function filter) { this(new ArrayList<>(), filter); } - public ObservableFilteredList(@NonNull final Collection sourceCollection, @Nullable final Func1 filter) { + public ObservableFilteredList(@NonNull final Collection sourceCollection, @Nullable final Function filter) { this(new ObservableList<>(sourceCollection), filter); } - public ObservableFilteredList(@NonNull final ObservableCollection sourceCollection, @Nullable final Func1 filter) { + public ObservableFilteredList(@NonNull final ObservableCollection sourceCollection, @Nullable final Function filter) { super(); this.filter = filter; this.sourceCollection = sourceCollection; @@ -86,7 +91,7 @@ public class ObservableFilteredList extends ObservableCollection { * * @param filter Function to filter item. True - item will stay, false - item will be filtered. */ - public void setFilter(@Nullable final Func1 filter) { + public void setFilter(@Nullable final Function filter) { this.filter = filter; update(); } @@ -96,7 +101,7 @@ public class ObservableFilteredList extends ObservableCollection { */ private void update() { if (sourceCollectionSubscription != null) { - sourceCollectionSubscription.unsubscribe(); + sourceCollectionSubscription.dispose(); sourceCollectionSubscription = null; } sourceCollectionSubscription = sourceCollection.observeItems() diff --git a/src/main/java/ru/touchin/roboswag/core/observables/collections/loadable/LoadingMoreList.java b/src/main/java/ru/touchin/roboswag/core/observables/collections/loadable/LoadingMoreList.java index 38412ee..b7cccec 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/collections/loadable/LoadingMoreList.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/collections/loadable/LoadingMoreList.java @@ -26,21 +26,20 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.NoSuchElementException; +import java.util.concurrent.Callable; import java.util.concurrent.Executors; +import io.reactivex.Observable; +import io.reactivex.Scheduler; +import io.reactivex.Single; +import io.reactivex.functions.Function; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.BehaviorSubject; import ru.touchin.roboswag.core.log.Lc; import ru.touchin.roboswag.core.observables.collections.Change; import ru.touchin.roboswag.core.observables.collections.ObservableCollection; import ru.touchin.roboswag.core.observables.collections.ObservableList; -import ru.touchin.roboswag.core.utils.ShouldNotHappenException; -import rx.Observable; -import rx.Scheduler; -import rx.exceptions.OnErrorThrowable; -import rx.functions.Func0; -import rx.functions.Func1; -import rx.schedulers.Schedulers; -import rx.subjects.BehaviorSubject; +import ru.touchin.roboswag.core.utils.Optional; /** * Created by Gavriil Sitnikov on 23/05/16. @@ -66,7 +65,7 @@ public class LoadingMoreList loadingMoreObservable; @NonNull - private final BehaviorSubject moreItemsCount = BehaviorSubject.create(LoadedItems.UNKNOWN_ITEMS_COUNT); + private final BehaviorSubject moreItemsCount = BehaviorSubject.createDefault(LoadedItems.UNKNOWN_ITEMS_COUNT); @NonNull private final ObservableList innerList = new ObservableList<>(); @Nullable @@ -84,14 +83,8 @@ public class LoadingMoreList initialItems) { super(); this.loadingMoreObservable = Observable - .switchOnNext(Observable.fromCallable(() -> createLoadRequestBasedObservable(this::createActualRequest, moreMoreItemsLoader::load))) - .single() - .doOnError(throwable -> { - if (throwable instanceof IllegalArgumentException || throwable instanceof NoSuchElementException) { - Lc.assertion(new ShouldNotHappenException("Updates during loading not supported." - + " MoreItemsLoader should emit only one result.", throwable)); - } - }) + .switchOnNext(Observable + .fromCallable(() -> createLoadRequestBasedObservable(this::createActualRequest, moreMoreItemsLoader::load).toObservable())) .doOnNext(loadedItems -> onItemsLoaded(loadedItems, size(), false)) .replay(1) .refCount(); @@ -112,16 +105,16 @@ public class LoadingMoreList Observable createLoadRequestBasedObservable(@NonNull final Func0 requestCreator, - @NonNull final Func1> observableCreator) { - return Observable + protected Single createLoadRequestBasedObservable(@NonNull final Callable requestCreator, + @NonNull final Function> observableCreator) { + return Single .fromCallable(requestCreator) - .switchMap(loadRequest -> observableCreator.call(loadRequest) + .flatMap(loadRequest -> observableCreator.apply(loadRequest) .subscribeOn(Schedulers.io()) .observeOn(loaderScheduler) - .doOnNext(ignored -> { + .doOnSuccess(ignored -> { if (!requestCreator.call().equals(loadRequest)) { - throw OnErrorThrowable.from(new RequestChangedDuringLoadingException()); + throw new RequestChangedDuringLoadingException(); } })) .retry((number, throwable) -> @@ -294,20 +287,20 @@ public class LoadingMoreList loadItem(final int position) { - return Observable - .switchOnNext(Observable - .fromCallable(() -> { - if (position < size()) { - return Observable.just(get(position)); - } else if (moreItemsCount.getValue() == 0) { - return Observable.just((TItem) null); - } else { - return loadingMoreObservable.switchMap(ignored -> Observable.error(new NotLoadedYetException())); - } - }) - .subscribeOn(loaderScheduler)) - .retry((number, throwable) -> throwable instanceof NotLoadedYetException); + public Single> loadItem(final int position) { + return Observable.switchOnNext(Observable + .fromCallable(() -> { + if (position < size()) { + return Observable.just(new Optional<>(get(position))); + } else if (moreItemsCount.getValue() == 0) { + return Observable.just(new Optional(null)); + } else { + return loadingMoreObservable.switchMap(ignored -> Observable.>error(new NotLoadedYetException())); + } + })) + .subscribeOn(loaderScheduler) + .retry((number, throwable) -> throwable instanceof NotLoadedYetException) + .firstOrError(); } /** @@ -319,15 +312,24 @@ public class LoadingMoreList> loadRange(final int first, final int last) { - final List> itemsRequests = new ArrayList<>(); + @SuppressWarnings("unchecked") + //unchecked: it's OK for such zip operator + public Single> loadRange(final int first, final int last) { + final List>> itemsRequests = new ArrayList<>(); for (int i = first; i <= last; i++) { itemsRequests.add(loadItem(i)); } - return Observable.concatEager(itemsRequests) - .filter(loadedItem -> loadedItem != null) - .toList() - .map(Collections::unmodifiableCollection); + return Single.zip(itemsRequests, + items -> { + final List result = new ArrayList<>(); + for (final Object item : items) { + final Optional optional = (Optional) item; + if (optional.get() != null) { + result.add(optional.get()); + } + } + return Collections.unmodifiableCollection(result); + }); } /** diff --git a/src/main/java/ru/touchin/roboswag/core/observables/collections/loadable/MoreItemsLoader.java b/src/main/java/ru/touchin/roboswag/core/observables/collections/loadable/MoreItemsLoader.java index be991ca..8a64530 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/collections/loadable/MoreItemsLoader.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/collections/loadable/MoreItemsLoader.java @@ -21,7 +21,7 @@ package ru.touchin.roboswag.core.observables.collections.loadable; import android.support.annotation.NonNull; -import rx.Observable; +import io.reactivex.Single; /** * Created by Gavriil Sitnikov on 02/06/2016. @@ -34,12 +34,12 @@ import rx.Observable; public interface MoreItemsLoader> { /** - * Returns {@link Observable} that could load next part of items. + * Returns {@link Single} that could load next part of items. * * @param moreLoadRequest Request with info inside to load next part of items; - * @return {@link Observable} of loading items. + * @return {@link Single} of loading items. */ @NonNull - Observable load(@NonNull final MoreLoadRequest moreLoadRequest); + Single load(@NonNull final MoreLoadRequest moreLoadRequest); } \ No newline at end of file diff --git a/src/main/java/ru/touchin/roboswag/core/observables/storable/BaseStorable.java b/src/main/java/ru/touchin/roboswag/core/observables/storable/BaseStorable.java index f4e2f30..390a024 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/storable/BaseStorable.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/storable/BaseStorable.java @@ -26,19 +26,17 @@ import java.lang.reflect.Type; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import ru.touchin.roboswag.core.log.Lc; +import io.reactivex.Completable; +import io.reactivex.Observable; +import io.reactivex.Scheduler; +import io.reactivex.Single; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.PublishSubject; import ru.touchin.roboswag.core.log.LcGroup; -import ru.touchin.roboswag.core.observables.OnSubscribeRefCountWithCacheTime; +import ru.touchin.roboswag.core.observables.ObservableRefCountWithCacheTime; import ru.touchin.roboswag.core.utils.ObjectUtils; import ru.touchin.roboswag.core.utils.Optional; -import rx.Completable; -import rx.Observable; -import rx.Scheduler; -import rx.Single; -import rx.exceptions.OnErrorThrowable; -import rx.functions.Actions; -import rx.schedulers.Schedulers; -import rx.subjects.PublishSubject; /** * Created by Gavriil Sitnikov on 04/10/2015. @@ -123,7 +121,8 @@ public abstract class BaseStorable { } @Nullable - private Optional returnDefaultValueIfNull(@NonNull final Optional storeObject, @Nullable final TObject defaultValue) { + private Optional returnDefaultValueIfNull(@NonNull final Optional storeObject, @Nullable final TObject defaultValue) + throws Converter.ConversionException { if (storeObject.get() != null || defaultValue == null) { return storeObject; } @@ -133,7 +132,7 @@ public abstract class BaseStorable { } catch (final Converter.ConversionException exception) { STORABLE_LC_GROUP.w(exception, "Exception while converting default value of '%s' from '%s' from store %s", key, defaultValue, store); - throw OnErrorThrowable.from(exception); + throw exception; } } @@ -160,7 +159,7 @@ public abstract class BaseStorable { .concatWith(newStoreValueEvent) .map(storeObject -> returnDefaultValueIfNull(storeObject, defaultValue)); return observeStrategy == ObserveStrategy.CACHE_STORE_VALUE || observeStrategy == ObserveStrategy.CACHE_STORE_AND_ACTUAL_VALUE - ? Observable.unsafeCreate(new OnSubscribeRefCountWithCacheTime<>(result.replay(1), cacheTimeMillis, TimeUnit.MILLISECONDS)) + ? RxJavaPlugins.onAssembly(new ObservableRefCountWithCacheTime<>(result.replay(1), cacheTimeMillis, TimeUnit.MILLISECONDS)) : result; } @@ -175,11 +174,11 @@ public abstract class BaseStorable { } catch (final Converter.ConversionException exception) { STORABLE_LC_GROUP.w(exception, "Exception while trying to converting value of '%s' from store %s by %s", key, storeObject, store, converter); - throw OnErrorThrowable.from(exception); + throw exception; } }); return observeStrategy == ObserveStrategy.CACHE_ACTUAL_VALUE || observeStrategy == ObserveStrategy.CACHE_STORE_AND_ACTUAL_VALUE - ? Observable.unsafeCreate(new OnSubscribeRefCountWithCacheTime<>(result.replay(1), cacheTimeMillis, TimeUnit.MILLISECONDS)) + ? RxJavaPlugins.onAssembly(new ObservableRefCountWithCacheTime<>(result.replay(1), cacheTimeMillis, TimeUnit.MILLISECONDS)) : result; } @@ -235,7 +234,7 @@ public abstract class BaseStorable { @NonNull private Completable internalSet(@Nullable final TObject newValue, final boolean checkForEqualityBeforeSet) { - return (checkForEqualityBeforeSet ? storeValueObservable.take(1).toSingle() : Single.just(new Optional<>(null))) + return (checkForEqualityBeforeSet ? storeValueObservable.firstOrError() : Single.just(new Optional<>(null))) .observeOn(scheduler) .flatMapCompletable(oldStoreValue -> { final TStoreObject newStoreValue; @@ -274,10 +273,9 @@ public abstract class BaseStorable { * @param newValue Value to set; * @return Observable of setting process. */ - //COMPATIBILITY NOTE: it is not Completable to prevent migration of old code @NonNull - public Observable forceSet(@Nullable final TObject newValue) { - return internalSet(newValue, false).toObservable(); + public Completable forceSet(@Nullable final TObject newValue) { + return internalSet(newValue, false); } /** @@ -290,16 +288,9 @@ public abstract class BaseStorable { * @param newValue Value to set; * @return Observable of setting process. */ - //COMPATIBILITY NOTE: it is not Completable to prevent migration of old code @NonNull - public Observable set(@Nullable final TObject newValue) { - return internalSet(newValue, true).toObservable(); - } - - @Deprecated - //COMPATIBILITY NOTE: it is deprecated as it's execution not bound to Android lifecycle objects - public void setCalm(@Nullable final TObject newValue) { - set(newValue).subscribe(Actions.empty(), Lc::assertion); + public Completable set(@Nullable final TObject newValue) { + return internalSet(newValue, true); } /** @@ -310,7 +301,7 @@ public abstract class BaseStorable { @Deprecated //deprecation: it should be used for debug only and in very rare cases. public void setSync(@Nullable final TObject newValue) { - set(newValue).toBlocking().subscribe(); + set(newValue).blockingAwait(); } @NonNull @@ -334,9 +325,8 @@ public abstract class BaseStorable { * @return Returns observable of value. */ @NonNull - //COMPATIBILITY NOTE: it is not Single to prevent migration of old code - public Observable get() { - return observe().take(1); + public Single get() { + return observe().firstOrError(); } /** @@ -348,7 +338,7 @@ public abstract class BaseStorable { //deprecation: it should be used for debug only and in very rare cases. @Nullable public TReturnObject getSync() { - return get().toBlocking().first(); + return get().blockingGet(); } /** diff --git a/src/main/java/ru/touchin/roboswag/core/observables/storable/Migration.java b/src/main/java/ru/touchin/roboswag/core/observables/storable/Migration.java index 714fb4d..3fa2475 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/storable/Migration.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/storable/Migration.java @@ -24,10 +24,9 @@ import android.support.annotation.NonNull; import java.util.Arrays; import java.util.List; -import rx.Completable; -import rx.Observable; -import rx.Single; -import rx.exceptions.OnErrorThrowable; +import io.reactivex.Completable; +import io.reactivex.Flowable; +import io.reactivex.Single; /** * Created by Gavriil Sitnikov on 06/10/2015. @@ -91,15 +90,16 @@ public class Migration { return makeMigrationChain(key, versionUpdater) .doOnSuccess(lastUpdatedVersion -> { if (lastUpdatedVersion < latestVersion) { - throw OnErrorThrowable.from(new NextLoopMigrationException()); + throw new NextLoopMigrationException(); } if (versionUpdater.initialVersion == versionUpdater.oldVersion) { throw new MigrationException(String.format("Version of '%s' not updated from %s", key, versionUpdater.initialVersion)); } }) - .retryWhen(attempts -> attempts.switchMap(throwable -> throwable instanceof NextLoopMigrationException - ? Observable.just(null) : Observable.error(throwable))); + .retryWhen(attempts -> attempts + .switchMap(throwable -> throwable instanceof NextLoopMigrationException + ? Flowable.just(new Object()) : Flowable.error(throwable))); }) .toCompletable() .andThen(versionsStore.storeObject(Long.class, key, latestVersion)) diff --git a/src/main/java/ru/touchin/roboswag/core/observables/storable/Migrator.java b/src/main/java/ru/touchin/roboswag/core/observables/storable/Migrator.java index 7ef8539..41a3c53 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/storable/Migrator.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/storable/Migrator.java @@ -21,7 +21,7 @@ package ru.touchin.roboswag.core.observables.storable; import android.support.annotation.NonNull; -import rx.Single; +import io.reactivex.Single; /** * Created by Gavriil Sitnikov on 05/10/2015. diff --git a/src/main/java/ru/touchin/roboswag/core/observables/storable/concrete/NonNullStorable.java b/src/main/java/ru/touchin/roboswag/core/observables/storable/NonNullStorable.java similarity index 94% rename from src/main/java/ru/touchin/roboswag/core/observables/storable/concrete/NonNullStorable.java rename to src/main/java/ru/touchin/roboswag/core/observables/storable/NonNullStorable.java index 2b7b0aa..a32c8c2 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/storable/concrete/NonNullStorable.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/storable/NonNullStorable.java @@ -17,19 +17,16 @@ * */ -package ru.touchin.roboswag.core.observables.storable.concrete; +package ru.touchin.roboswag.core.observables.storable; import android.support.annotation.NonNull; import android.support.annotation.Nullable; import java.util.concurrent.TimeUnit; -import ru.touchin.roboswag.core.observables.storable.BaseStorable; -import ru.touchin.roboswag.core.observables.storable.Migration; -import ru.touchin.roboswag.core.observables.storable.Storable; import ru.touchin.roboswag.core.utils.ShouldNotHappenException; -import rx.Observable; -import rx.Scheduler; +import io.reactivex.Observable; +import io.reactivex.Scheduler; /** * Created by Gavriil Sitnikov on 04/10/2015. diff --git a/src/main/java/ru/touchin/roboswag/core/observables/storable/Storable.java b/src/main/java/ru/touchin/roboswag/core/observables/storable/Storable.java index d7f88a6..e91643f 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/storable/Storable.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/storable/Storable.java @@ -25,10 +25,9 @@ import android.support.annotation.Nullable; import java.lang.reflect.Type; import java.util.concurrent.TimeUnit; -import ru.touchin.roboswag.core.observables.storable.concrete.NonNullStorable; import ru.touchin.roboswag.core.utils.Optional; -import rx.Observable; -import rx.Scheduler; +import io.reactivex.Observable; +import io.reactivex.Scheduler; /** * Created by Gavriil Sitnikov on 04/10/2015. @@ -43,8 +42,7 @@ import rx.Scheduler; * @param Type of actual object; * @param Type of store object. Could be same as {@link TObject}. */ -//COMPATIBILITY NOTE: in RxJava2 it should extends BaseStorable> -public class Storable extends BaseStorable { +public class Storable extends BaseStorable> { public Storable(@NonNull final BuilderCore builderCore) { super(builderCore); @@ -52,8 +50,8 @@ public class Storable extends BaseStorable observe() { - return observeOptionalValue().map(Optional::get); + public Observable> observe() { + return observeOptionalValue(); } /** diff --git a/src/main/java/ru/touchin/roboswag/core/observables/storable/Store.java b/src/main/java/ru/touchin/roboswag/core/observables/storable/Store.java index 5a294a1..afc2609 100644 --- a/src/main/java/ru/touchin/roboswag/core/observables/storable/Store.java +++ b/src/main/java/ru/touchin/roboswag/core/observables/storable/Store.java @@ -25,8 +25,8 @@ import android.support.annotation.Nullable; import java.lang.reflect.Type; import ru.touchin.roboswag.core.utils.Optional; -import rx.Completable; -import rx.Single; +import io.reactivex.Completable; +import io.reactivex.Single; /** * Created by Gavriil Sitnikov on 04/10/2015. diff --git a/src/main/java/ru/touchin/roboswag/core/utils/StringUtils.java b/src/main/java/ru/touchin/roboswag/core/utils/StringUtils.java index 4355c34..79fee45 100644 --- a/src/main/java/ru/touchin/roboswag/core/utils/StringUtils.java +++ b/src/main/java/ru/touchin/roboswag/core/utils/StringUtils.java @@ -23,7 +23,8 @@ import android.support.annotation.NonNull; import java.security.MessageDigest; -import rx.functions.Func1; +import io.reactivex.functions.Function; +import ru.touchin.roboswag.core.log.Lc; /** * Created by Gavriil Sitnikov on 29/08/2016. @@ -65,11 +66,15 @@ public final class StringUtils { * @param condition Condition of symbol; * @return True if some character satisfies condition. */ - public static boolean containsCharLike(@NonNull final String text, @NonNull final Func1 condition) { - for (int i = 0; i < text.length(); i++) { - if (condition.call(text.charAt(i))) { - return true; + public static boolean containsCharLike(@NonNull final String text, @NonNull final Function condition) { + try { + for (int i = 0; i < text.length(); i++) { + if (condition.apply(text.charAt(i))) { + return true; + } } + } catch (final Exception exception) { + Lc.assertion(exception); } return false; }