Compare commits

...

37 Commits

Author SHA1 Message Date
Denis Karmyshakov 669782c5a0 Merge remote-tracking branch 'origin/master-kotlin-rxjava2' into noblocking_sync 2017-12-19 18:38:30 +03:00
Denis Karmyshakov d83fe2d510
Merge pull request #53 from TouchInstinct/gradle_update
Gradle update
2017-12-01 16:11:49 +03:00
Denis Karmyshakov 3aa53424a7 Gradle update 2017-11-30 18:02:23 +03:00
Arseniy Borisov 8a586a9638 Update by payload fixed (#50) 2017-11-13 11:58:10 +03:00
Oleg 8af90dcdd5 Merge pull request #49 from TouchInstinct/update/gradle
Versions in constants
2017-10-05 15:50:20 +03:00
Oleg 2a3c647a30 Versions in constants 2017-10-05 15:48:40 +03:00
Denis Karmyshakov f97a81a4fb Merge branch 'master-kotlin-rxjava2' into noblocking_sync 2017-10-04 12:41:52 +03:00
Denis Karmyshakov 4b78713f50 Merge pull request #47 from TouchInstinct/versions_in_constants
Versions in constants
2017-10-04 12:29:00 +03:00
Denis Karmyshakov 6e7cd65ab2 Versions in constants 2017-10-04 12:25:04 +03:00
Oleg e3e9b96fee buildToolsVersion 26.0.2 (#46) 2017-09-29 14:50:36 +03:00
Denis Karmyshakov ed31b8db1a Noblocking getSync method 2017-09-28 13:57:37 +03:00
Ilia Kurtov c496eba64e Merge pull request #44 from TouchInstinct/update/rxjava
update rxjava
2017-09-22 16:39:51 +03:00
Ilia Kurtov b4e3c018aa update rxjava 2017-09-22 16:36:57 +03:00
Ilia Kurtov 7142bed098 rxjava update (#40) 2017-08-15 21:02:46 +03:00
Ilia Kurtov b0610e1f51 libs update (#39) 2017-08-14 17:41:20 +03:00
Ilia Kurtov 0e4b89bb4d Merge branch 'kotlin_migration' into master-kotlin-rxjava2 2017-08-11 13:50:25 +03:00
Ilia Kurtov fb4634a6ed Rxjava2/merge kotlin (#38)
* remove retrolambda

* revert retrolambda

* Build tools update

* Support lib update
2017-08-09 20:27:02 +03:00
Arseniy Borisov e14f5b00cb Merge pull request #37 from TouchInstinct/support_lib_update
Support lib update
2017-08-03 18:22:07 +03:00
Denis Karmyshakov ac6b3e85ac Support lib update 2017-08-03 18:15:24 +03:00
Arseniy Borisov 5e4475d2b2 Merge pull request #36 from TouchInstinct/build_tools_update
Build tools update
2017-07-25 19:58:37 +03:00
Denis Karmyshakov 4166445bc6 Build tools update 2017-07-25 19:54:06 +03:00
Denis Karmyshakov e5d021c885 Merge remote-tracking branch 'origin/master-rx-java-2' into kotlin_migration 2017-07-24 12:48:56 +03:00
Denis Karmyshakov 5e9bfc701c Merge pull request #35 from TouchInstinct/idea_formatting
idea formatting
2017-07-24 12:45:36 +03:00
Arseniy Borisov 3bf6f76159 idea formatting 2017-07-24 12:31:50 +03:00
Anton Domnikov 8c1f3d35fc Merge branch 'master-rx-java-2' into kotlin_migration
# Conflicts:
#	build.gradle
2017-07-11 14:30:00 +03:00
Denis Karmyshakov e4d652b3ab Merge remote-tracking branch 'origin/master' into master-rx-java-2
# Conflicts:
#	build.gradle
#	src/main/java/ru/touchin/roboswag/core/observables/collections/ObservableCollection.java
#	src/main/java/ru/touchin/roboswag/core/observables/collections/ObservableFilteredList.java
#	src/main/java/ru/touchin/roboswag/core/observables/collections/loadable/LoadingMoreList.java
2017-06-30 19:15:24 +03:00
Anton Domnikov ef4632304e Merge branch 'master-rx-java-2' into kotlin_migration 2017-06-23 13:32:21 +03:00
Alexander Bubnov 6ed1cb4e92 fix missing import 2017-06-21 17:21:45 +03:00
Alexander Bubnov 50371ee530 update rxJava2 to 2.1.1 2017-06-21 17:06:07 +03:00
Anton Domnikov 20951bed03 revert retrolambda 2017-06-19 18:24:02 +03:00
Anton Domnikov 0cb3099d14 remove retrolambda 2017-06-14 17:55:17 +03:00
Gavriil Sitnikov 29dc6ce1c0 Merge branch 'master' into master-rx-java-2
# Conflicts:
#	src/main/java/ru/touchin/roboswag/core/observables/collections/ObservableFilteredList.java
2017-06-13 14:55:16 +03:00
gorodeckii 7e6513218e Merge branch 'master' into master-rx-java-2 2017-06-09 15:53:12 +03:00
Arhipov 768c83addc Merge branch 'master' into master-rx-java-2 2017-05-04 14:06:01 +03:00
gorodeckii 52c5b7fa98 rxjava version 2017-05-03 09:26:28 +03:00
Gavriil Sitnikov 2602d56cbd static fixes 2017-04-21 00:11:32 +03:00
Gavriil Sitnikov 553f4f6bfa RxJava2 migration 2017-04-17 03:11:09 +03:00
22 changed files with 515 additions and 404 deletions

View File

@ -1,9 +1,7 @@
apply plugin: 'com.android.library' apply plugin: 'com.android.library'
apply plugin: 'me.tatarka.retrolambda'
android { android {
compileSdkVersion 25 compileSdkVersion compileSdk
buildToolsVersion '25.0.3'
compileOptions { compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8 sourceCompatibility JavaVersion.VERSION_1_8
@ -16,7 +14,7 @@ android {
} }
dependencies { dependencies {
provided 'com.android.support:support-annotations:25.4.0' compileOnly "com.android.support:support-annotations:$supportLibraryVersion"
provided 'io.reactivex:rxandroid:1.2.1' compileOnly "io.reactivex.rxjava2:rxandroid:$rxAndroidVersion"
provided 'io.reactivex:rxjava:1.3.0' compileOnly "io.reactivex.rxjava2:rxjava:$rxJavaVersion"
} }

View File

@ -50,13 +50,13 @@ import java.util.List;
* and the cost of your comparison methods. Below are some average run times for reference: * and the cost of your comparison methods. Below are some average run times for reference:
* (The areSame list is composed of random UUID Strings and the tests are run on Nexus 5X with M) * (The areSame list is composed of random UUID Strings and the tests are run on Nexus 5X with M)
* <ul> * <ul>
* <li>100 items and 10 modifications: avg: 0.39 ms, median: 0.35 ms * <li>100 items and 10 modifications: avg: 0.39 ms, median: 0.35 ms
* <li>100 items and 100 modifications: 3.82 ms, median: 3.75 ms * <li>100 items and 100 modifications: 3.82 ms, median: 3.75 ms
* <li>100 items and 100 modifications without moves: 2.09 ms, median: 2.06 ms * <li>100 items and 100 modifications without moves: 2.09 ms, median: 2.06 ms
* <li>1000 items and 50 modifications: avg: 4.67 ms, median: 4.59 ms * <li>1000 items and 50 modifications: avg: 4.67 ms, median: 4.59 ms
* <li>1000 items and 50 modifications without moves: avg: 3.59 ms, median: 3.50 ms * <li>1000 items and 50 modifications without moves: avg: 3.59 ms, median: 3.50 ms
* <li>1000 items and 200 modifications: 27.07 ms, median: 26.92 ms * <li>1000 items and 200 modifications: 27.07 ms, median: 26.92 ms
* <li>1000 items and 200 modifications without moves: 13.54 ms, median: 13.36 ms * <li>1000 items and 200 modifications without moves: 13.54 ms, median: 13.36 ms
* </ul> * </ul>
* *
* <p>Due to implementation constraints, the max size of the list can be 2^26. * <p>Due to implementation constraints, the max size of the list can be 2^26.
@ -83,7 +83,6 @@ public class DiffUtil {
* Calculates the list of update operations that can covert one list into the other one. * Calculates the list of update operations that can covert one list into the other one.
* *
* @param cb The callback that acts as a gateway to the backing list data * @param cb The callback that acts as a gateway to the backing list data
*
* @return A DiffResult that contains the information about the edit sequence to convert the * @return A DiffResult that contains the information about the edit sequence to convert the
* old list into the new list. * old list into the new list.
*/ */
@ -98,9 +97,8 @@ public class DiffUtil {
* positions), you can disable move detection which takes <code>O(N^2)</code> time where * positions), you can disable move detection which takes <code>O(N^2)</code> time where
* N is the number of added, moved, removed items. * N is the number of added, moved, removed items.
* *
* @param cb The callback that acts as a gateway to the backing list data * @param cb The callback that acts as a gateway to the backing list data
* @param detectMoves True if DiffUtil should try to detect moved items, false otherwise. * @param detectMoves True if DiffUtil should try to detect moved items, false otherwise.
*
* @return A DiffResult that contains the information about the edit sequence to convert the * @return A DiffResult that contains the information about the edit sequence to convert the
* old list into the new list. * old list into the new list.
*/ */
@ -185,7 +183,7 @@ public class DiffUtil {
} }
private static Snake diffPartial(Callback cb, int startOld, int endOld, private static Snake diffPartial(Callback cb, int startOld, int endOld,
int startNew, int endNew, int[] forward, int[] backward, int kOffset) { int startNew, int endNew, int[] forward, int[] backward, int kOffset) {
final int oldSize = endOld - startOld; final int oldSize = endOld - startOld;
final int newSize = endNew - startNew; final int newSize = endNew - startNew;
@ -329,7 +327,6 @@ public class DiffUtil {
* *
* @param oldItemPosition The position of the item in the old list * @param oldItemPosition The position of the item in the old list
* @param newItemPosition The position of the item in the new list * @param newItemPosition The position of the item in the new list
*
* @return A payload object that represents the change between the two items. * @return A payload object that represents the change between the two items.
*/ */
@Nullable @Nullable
@ -453,14 +450,14 @@ public class DiffUtil {
private final boolean mDetectMoves; private final boolean mDetectMoves;
/** /**
* @param callback The callback that was used to calculate the diff * @param callback The callback that was used to calculate the diff
* @param snakes The list of Myers' snakes * @param snakes The list of Myers' snakes
* @param oldItemStatuses An int[] that can be re-purposed to keep metadata * @param oldItemStatuses An int[] that can be re-purposed to keep metadata
* @param newItemStatuses An int[] that can be re-purposed to keep metadata * @param newItemStatuses An int[] that can be re-purposed to keep metadata
* @param detectMoves True if this DiffResult will try to detect moved items * @param detectMoves True if this DiffResult will try to detect moved items
*/ */
DiffResult(Callback callback, List<Snake> snakes, int[] oldItemStatuses, DiffResult(Callback callback, List<Snake> snakes, int[] oldItemStatuses,
int[] newItemStatuses, boolean detectMoves) { int[] newItemStatuses, boolean detectMoves) {
mSnakes = snakes; mSnakes = snakes;
mOldItemStatuses = oldItemStatuses; mOldItemStatuses = oldItemStatuses;
mNewItemStatuses = newItemStatuses; mNewItemStatuses = newItemStatuses;
@ -556,15 +553,14 @@ public class DiffUtil {
* Finds a matching item that is before the given coordinates in the matrix * Finds a matching item that is before the given coordinates in the matrix
* (before : left and above). * (before : left and above).
* *
* @param x The x position in the matrix (position in the old list) * @param x The x position in the matrix (position in the old list)
* @param y The y position in the matrix (position in the new list) * @param y The y position in the matrix (position in the new list)
* @param snakeIndex The current snake index * @param snakeIndex The current snake index
* @param removal True if we are looking for a removal, false otherwise * @param removal True if we are looking for a removal, false otherwise
*
* @return True if such item is found. * @return True if such item is found.
*/ */
private boolean findMatchingItem(final int x, final int y, final int snakeIndex, private boolean findMatchingItem(final int x, final int y, final int snakeIndex,
final boolean removal) { final boolean removal) {
final int myItemPos; final int myItemPos;
int curX; int curX;
int curY; int curY;
@ -664,7 +660,7 @@ public class DiffUtil {
} }
private static PostponedUpdate removePostponedUpdate(List<PostponedUpdate> updates, private static PostponedUpdate removePostponedUpdate(List<PostponedUpdate> updates,
int pos, boolean removal) { int pos, boolean removal) {
for (int i = updates.size() - 1; i >= 0; i--) { for (int i = updates.size() - 1; i >= 0; i--) {
final PostponedUpdate update = updates.get(i); final PostponedUpdate update = updates.get(i);
if (update.posInOwnerList == pos && update.removal == removal) { if (update.posInOwnerList == pos && update.removal == removal) {
@ -680,7 +676,7 @@ public class DiffUtil {
} }
private void dispatchAdditions(List<PostponedUpdate> postponedUpdates, private void dispatchAdditions(List<PostponedUpdate> postponedUpdates,
ListUpdateCallback updateCallback, int start, int count, int globalIndex) { ListUpdateCallback updateCallback, int start, int count, int globalIndex) {
if (!mDetectMoves) { if (!mDetectMoves) {
updateCallback.onInserted(start, count); updateCallback.onInserted(start, count);
return; return;
@ -720,7 +716,7 @@ public class DiffUtil {
} }
private void dispatchRemovals(List<PostponedUpdate> postponedUpdates, private void dispatchRemovals(List<PostponedUpdate> postponedUpdates,
ListUpdateCallback updateCallback, int start, int count, int globalIndex) { ListUpdateCallback updateCallback, int start, int count, int globalIndex) {
if (!mDetectMoves) { if (!mDetectMoves) {
updateCallback.onRemoved(start, count); updateCallback.onRemoved(start, count);
return; return;

View File

@ -29,8 +29,8 @@ import java.io.Serializable;
import ru.touchin.roboswag.core.utils.ObjectUtils; import ru.touchin.roboswag.core.utils.ObjectUtils;
import ru.touchin.roboswag.core.utils.Optional; import ru.touchin.roboswag.core.utils.Optional;
import rx.Observable; import io.reactivex.Observable;
import rx.subjects.BehaviorSubject; import io.reactivex.subjects.BehaviorSubject;
/** /**
* Created by Gavriil Sitnikov on 24/03/2016. * Created by Gavriil Sitnikov on 24/03/2016.
@ -47,7 +47,7 @@ public abstract class BaseChangeable<TValue, TReturnValue> implements Serializab
private transient BehaviorSubject<Optional<TValue>> valueSubject; private transient BehaviorSubject<Optional<TValue>> valueSubject;
public BaseChangeable(@Nullable final TValue defaultValue) { public BaseChangeable(@Nullable final TValue defaultValue) {
valueSubject = BehaviorSubject.create(new Optional<>(defaultValue)); valueSubject = BehaviorSubject.createDefault(new Optional<>(defaultValue));
} }
@NonNull @NonNull
@ -88,7 +88,7 @@ public abstract class BaseChangeable<TValue, TReturnValue> implements Serializab
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void readObject(@NonNull final ObjectInputStream inputStream) throws IOException, ClassNotFoundException { private void readObject(@NonNull final ObjectInputStream inputStream) throws IOException, ClassNotFoundException {
valueSubject = BehaviorSubject.create((Optional<TValue>) inputStream.readObject()); valueSubject = BehaviorSubject.createDefault((Optional<TValue>) inputStream.readObject());
} }
@Override @Override

View File

@ -22,16 +22,15 @@ package ru.touchin.roboswag.core.observables;
import android.support.annotation.NonNull; import android.support.annotation.NonNull;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import io.reactivex.Observable;
import ru.touchin.roboswag.core.utils.Optional; import ru.touchin.roboswag.core.utils.Optional;
import rx.Observable;
/** /**
* Created by Gavriil Sitnikov on 24/03/2016. * Created by Gavriil Sitnikov on 24/03/2016.
* Variant of {@link BaseChangeable} which is allows to set nullable values. * Variant of {@link BaseChangeable} which is allows to set nullable values.
* Needed to separate non-null Changeable from nullable Changeable. * Needed to separate non-null Changeable from nullable Changeable.
*/ */
//COMPATIBILITY NOTE: in RxJava2 it should extends BaseChangeable<T, Optional<T>> public class Changeable<T> extends BaseChangeable<T, Optional<T>> {
public class Changeable<T> extends BaseChangeable<T, T> {
public Changeable(@Nullable final T defaultValue) { public Changeable(@Nullable final T defaultValue) {
super(defaultValue); super(defaultValue);
@ -44,9 +43,8 @@ public class Changeable<T> extends BaseChangeable<T, T> {
*/ */
@NonNull @NonNull
@Override @Override
//COMPATIBILITY NOTE: in RxJava2 it should be Observable<Optional<T>> public Observable<Optional<T>> observe() {
public Observable<T> observe() { return observeOptionalValue();
return observeOptionalValue().map(Optional::get);
} }
} }

View File

@ -23,7 +23,7 @@ import android.support.annotation.NonNull;
import ru.touchin.roboswag.core.log.Lc; import ru.touchin.roboswag.core.log.Lc;
import ru.touchin.roboswag.core.utils.ShouldNotHappenException; import ru.touchin.roboswag.core.utils.ShouldNotHappenException;
import rx.Observable; import io.reactivex.Observable;
/** /**
* Created by Gavriil Sitnikov on 24/03/2016. * Created by Gavriil Sitnikov on 24/03/2016.

View File

@ -0,0 +1,299 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package ru.touchin.roboswag.core.observables;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.Scheduler;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.fuseable.HasUpstreamObservableSource;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.schedulers.Schedulers;
/**
* Returns an observable sequence that stays connected to the source as long as
* there is at least one subscription to the observable sequence.
*
* @param <T> the value type
*/
@SuppressWarnings({"PMD.CompareObjectsWithEquals", "PMD.AvoidUsingVolatile"})
//AvoidUsingVolatile: it's RxJava code
public final class ObservableRefCountWithCacheTime<T> extends Observable<T> implements HasUpstreamObservableSource<T> {
@NonNull
private final ConnectableObservable<? extends T> connectableSource;
@NonNull
private final ObservableSource<T> actualSource;
@NonNull
private volatile CompositeDisposable baseDisposable = new CompositeDisposable();
@NonNull
private final AtomicInteger subscriptionCount = new AtomicInteger();
/**
* Use this lock for every subscription and disconnect action.
*/
@NonNull
private final ReentrantLock lock = new ReentrantLock();
@NonNull
private final Scheduler scheduler = Schedulers.computation();
private final long cacheTime;
@NonNull
private final TimeUnit cacheTimeUnit;
@Nullable
private Scheduler.Worker worker;
/**
* Constructor.
*
* @param source observable to apply ref count to
*/
public ObservableRefCountWithCacheTime(@NonNull final ConnectableObservable<T> source,
final long cacheTime, @NonNull final TimeUnit cacheTimeUnit) {
super();
this.connectableSource = source;
this.actualSource = source;
this.cacheTime = cacheTime;
this.cacheTimeUnit = cacheTimeUnit;
}
@NonNull
public ObservableSource<T> source() {
return actualSource;
}
private void cleanupWorker() {
if (worker != null) {
worker.dispose();
worker = null;
}
}
@Override
public void subscribeActual(@NonNull final Observer<? super T> subscriber) {
lock.lock();
if (subscriptionCount.incrementAndGet() == 1) {
cleanupWorker();
final AtomicBoolean writeLocked = new AtomicBoolean(true);
try {
// need to use this overload of connect to ensure that
// baseDisposable is set in the case that source is a
// synchronous Observable
connectableSource.connect(onSubscribe(subscriber, writeLocked));
} finally {
// need to cover the case where the source is subscribed to
// outside of this class thus preventing the Action1 passed
// to source.connect above being called
if (writeLocked.get()) {
// Action1 passed to source.connect was not called
lock.unlock();
}
}
} else {
try {
// ready to subscribe to source so do it
doSubscribe(subscriber, baseDisposable);
} finally {
// release the read lock
lock.unlock();
}
}
}
@NonNull
private Consumer<Disposable> onSubscribe(@NonNull final Observer<? super T> observer, @NonNull final AtomicBoolean writeLocked) {
return new DisposeConsumer(observer, writeLocked);
}
private void doSubscribe(@NonNull final Observer<? super T> observer, @NonNull final CompositeDisposable currentBase) {
// handle disposing from the base CompositeDisposable
final Disposable disposable = disconnect(currentBase);
final ConnectionObserver connectionObserver = new ConnectionObserver(observer, currentBase, disposable);
observer.onSubscribe(connectionObserver);
connectableSource.subscribe(connectionObserver);
}
@NonNull
private Disposable disconnect(@NonNull final CompositeDisposable current) {
return Disposables.fromRunnable(new DisposeTask(current));
}
private final class ConnectionObserver extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 3813126992133394324L;
@NonNull
private final Observer<? super T> subscriber;
@NonNull
private final CompositeDisposable currentBase;
@NonNull
private final Disposable resource;
public ConnectionObserver(@NonNull final Observer<? super T> subscriber, @NonNull final CompositeDisposable currentBase,
@NonNull final Disposable resource) {
super();
this.subscriber = subscriber;
this.currentBase = currentBase;
this.resource = resource;
}
@Override
public void onSubscribe(@NonNull final Disposable disposable) {
DisposableHelper.setOnce(this, disposable);
}
@Override
public void onError(@NonNull final Throwable throwable) {
cleanup();
subscriber.onError(throwable);
}
@Override
public void onNext(@NonNull final T item) {
subscriber.onNext(item);
}
@Override
public void onComplete() {
cleanup();
subscriber.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
resource.dispose();
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
private void cleanup() {
// on error or completion we need to dispose the base CompositeDisposable
// and set the subscriptionCount to 0
lock.lock();
try {
if (baseDisposable == currentBase) {
cleanupWorker();
if (connectableSource instanceof Disposable) {
((Disposable) connectableSource).dispose();
}
baseDisposable.dispose();
baseDisposable = new CompositeDisposable();
subscriptionCount.set(0);
}
} finally {
lock.unlock();
}
}
}
private final class DisposeConsumer implements Consumer<Disposable> {
@NonNull
private final Observer<? super T> observer;
@NonNull
private final AtomicBoolean writeLocked;
public DisposeConsumer(@NonNull final Observer<? super T> observer, @NonNull final AtomicBoolean writeLocked) {
this.observer = observer;
this.writeLocked = writeLocked;
}
@Override
public void accept(@NonNull final Disposable subscription) {
try {
baseDisposable.add(subscription);
// ready to subscribe to source so do it
doSubscribe(observer, baseDisposable);
} finally {
// release the write lock
lock.unlock();
writeLocked.set(false);
}
}
}
private final class DisposeTask implements Runnable {
@NonNull
private final CompositeDisposable current;
public DisposeTask(@NonNull final CompositeDisposable current) {
this.current = current;
}
@Override
public void run() {
lock.lock();
try {
if (baseDisposable == current && subscriptionCount.decrementAndGet() == 0) {
if (worker != null) {
worker.dispose();
} else {
worker = scheduler.createWorker();
}
worker.schedule(() -> {
lock.lock();
try {
if (subscriptionCount.get() == 0) {
cleanupWorker();
if (connectableSource instanceof Disposable) {
((Disposable) connectableSource).dispose();
}
baseDisposable.dispose();
// need a new baseDisposable because once
// disposed stays that way
baseDisposable = new CompositeDisposable();
}
} finally {
lock.unlock();
}
}, cacheTime, cacheTimeUnit);
}
} finally {
lock.unlock();
}
}
}
}

View File

@ -1,213 +0,0 @@
/**
* Copyright 2014 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package ru.touchin.roboswag.core.observables;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import rx.Observable.OnSubscribe;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.observables.ConnectableObservable;
import rx.schedulers.Schedulers;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
/**
* Returns an observable sequence that stays connected to the source as long as
* there is at least one subscription to the observable sequence and also it stays connected
* for cache time after everyone unsubscribe.
*
* @param <T> the value type
*/
@SuppressWarnings({"PMD.AvoidUsingVolatile", "PMD.CompareObjectsWithEquals"})
//AvoidUsingVolatile,CompareObjectsWithEquals: from OnSubscribeRefCount code
public final class OnSubscribeRefCountWithCacheTime<T> implements OnSubscribe<T> {
@NonNull
private final ConnectableObservable<? extends T> source;
@NonNull
private volatile CompositeSubscription baseSubscription = new CompositeSubscription();
@NonNull
private final AtomicInteger subscriptionCount = new AtomicInteger(0);
@NonNull
private final Scheduler scheduler = Schedulers.computation();
private final long cacheTime;
@NonNull
private final TimeUnit cacheTimeUnit;
@Nullable
private Scheduler.Worker worker;
/**
* Use this lock for every subscription and disconnect action.
*/
@NonNull
private final ReentrantLock lock = new ReentrantLock();
public OnSubscribeRefCountWithCacheTime(@NonNull final ConnectableObservable<? extends T> source,
final long cacheTime, @NonNull final TimeUnit cacheTimeUnit) {
this.source = source;
this.cacheTime = cacheTime;
this.cacheTimeUnit = cacheTimeUnit;
}
@Override
public void call(@NonNull final Subscriber<? super T> subscriber) {
lock.lock();
if (subscriptionCount.incrementAndGet() == 1) {
if (worker != null) {
worker.unsubscribe();
worker = null;
}
final AtomicBoolean writeLocked = new AtomicBoolean(true);
try {
// need to use this overload of connect to ensure that
// baseSubscription is set in the case that source is a
// synchronous Observable
source.connect(onSubscribe(subscriber, writeLocked));
} finally {
// need to cover the case where the source is subscribed to
// outside of this class thus preventing the Action1 passed
// to source.connect above being called
if (writeLocked.get()) {
// Action1 passed to source.connect was not called
lock.unlock();
}
}
} else {
try {
// ready to subscribe to source so do it
doSubscribe(subscriber, baseSubscription);
} finally {
// release the read lock
lock.unlock();
}
}
}
@NonNull
private Action1<Subscription> onSubscribe(@NonNull final Subscriber<? super T> subscriber,
@NonNull final AtomicBoolean writeLocked) {
return subscription -> {
try {
baseSubscription.add(subscription);
// ready to subscribe to source so do it
doSubscribe(subscriber, baseSubscription);
} finally {
// release the write lock
lock.unlock();
writeLocked.set(false);
}
};
}
private void doSubscribe(@NonNull final Subscriber<? super T> subscriber, @NonNull final CompositeSubscription currentBase) {
subscriber.add(disconnect(currentBase));
source.unsafeSubscribe(new Subscriber<T>(subscriber) {
@Override
public void onError(@NonNull final Throwable throwable) {
cleanup();
subscriber.onError(throwable);
}
@Override
public void onNext(@Nullable final T item) {
subscriber.onNext(item);
}
@Override
public void onCompleted() {
cleanup();
subscriber.onCompleted();
}
private void cleanup() {
// on error or completion we need to unsubscribe the base subscription and set the subscriptionCount to 0
lock.lock();
try {
if (baseSubscription == currentBase) {
cleanupWorker();
// backdoor into the ConnectableObservable to cleanup and reset its state
if (source instanceof Subscription) {
((Subscription) source).unsubscribe();
}
baseSubscription.unsubscribe();
baseSubscription = new CompositeSubscription();
subscriptionCount.set(0);
}
} finally {
lock.unlock();
}
}
});
}
@NonNull
private Subscription disconnect(@NonNull final CompositeSubscription current) {
return Subscriptions.create(() -> {
lock.lock();
try {
if (baseSubscription == current && subscriptionCount.decrementAndGet() == 0) {
if (worker != null) {
worker.unsubscribe();
} else {
worker = scheduler.createWorker();
}
worker.schedule(() -> {
lock.lock();
try {
if (subscriptionCount.get() == 0) {
cleanupWorker();
// backdoor into the ConnectableObservable to cleanup and reset its state
if (source instanceof Subscription) {
((Subscription) source).unsubscribe();
}
baseSubscription.unsubscribe();
// need a new baseSubscription because once
// unsubscribed stays that way
baseSubscription = new CompositeSubscription();
}
} finally {
lock.unlock();
}
}, cacheTime, cacheTimeUnit);
}
} finally {
lock.unlock();
}
});
}
private void cleanupWorker() {
if (worker != null) {
worker.unsubscribe();
worker = null;
}
}
}

View File

@ -33,12 +33,12 @@ import android.support.annotation.Nullable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import io.reactivex.Emitter;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.android.schedulers.AndroidSchedulers;
import ru.touchin.roboswag.core.log.Lc; import ru.touchin.roboswag.core.log.Lc;
import ru.touchin.roboswag.core.utils.ServiceBinder; import ru.touchin.roboswag.core.utils.ServiceBinder;
import rx.Emitter;
import rx.Observable;
import rx.Scheduler;
import rx.android.schedulers.AndroidSchedulers;
/** /**
* Created by Gavriil Sitnikov on 10/01/2016. * Created by Gavriil Sitnikov on 10/01/2016.
@ -62,8 +62,8 @@ public final class RxAndroidUtils {
.<T>create(emitter -> { .<T>create(emitter -> {
onSubscribeServiceConnection.emitter = emitter; onSubscribeServiceConnection.emitter = emitter;
context.bindService(new Intent(context, serviceClass), onSubscribeServiceConnection, Context.BIND_AUTO_CREATE); context.bindService(new Intent(context, serviceClass), onSubscribeServiceConnection, Context.BIND_AUTO_CREATE);
}, Emitter.BackpressureMode.LATEST) })
.doOnUnsubscribe(() -> { .doOnDispose(() -> {
context.unbindService(onSubscribeServiceConnection); context.unbindService(onSubscribeServiceConnection);
onSubscribeServiceConnection.emitter = null; onSubscribeServiceConnection.emitter = null;
})) }))
@ -87,8 +87,8 @@ public final class RxAndroidUtils {
.<Intent>create(emitter -> { .<Intent>create(emitter -> {
onOnSubscribeBroadcastReceiver.emitter = emitter; onOnSubscribeBroadcastReceiver.emitter = emitter;
context.registerReceiver(onOnSubscribeBroadcastReceiver, intentFilter); context.registerReceiver(onOnSubscribeBroadcastReceiver, intentFilter);
}, Emitter.BackpressureMode.LATEST) })
.doOnUnsubscribe(() -> { .doOnDispose(() -> {
context.unregisterReceiver(onOnSubscribeBroadcastReceiver); context.unregisterReceiver(onOnSubscribeBroadcastReceiver);
onOnSubscribeBroadcastReceiver.emitter = null; onOnSubscribeBroadcastReceiver.emitter = null;
})) }))

View File

@ -31,8 +31,8 @@ import java.util.List;
import ru.touchin.roboswag.core.observables.collections.changes.Change; import ru.touchin.roboswag.core.observables.collections.changes.Change;
import ru.touchin.roboswag.core.observables.collections.changes.CollectionChanges; import ru.touchin.roboswag.core.observables.collections.changes.CollectionChanges;
import rx.Emitter; import io.reactivex.Emitter;
import rx.Observable; import io.reactivex.Observable;
/** /**
* Created by Gavriil Sitnikov on 23/05/16. * Created by Gavriil Sitnikov on 23/05/16.
@ -60,8 +60,8 @@ public abstract class ObservableCollection<TItem> {
@NonNull @NonNull
private Observable<CollectionChanges<TItem>> createChangesObservable() { private Observable<CollectionChanges<TItem>> createChangesObservable() {
return Observable return Observable
.<CollectionChanges<TItem>>create(emitter -> this.changesEmitter = emitter, Emitter.BackpressureMode.BUFFER) .<CollectionChanges<TItem>>create(emitter -> this.changesEmitter = emitter)
.doOnUnsubscribe(() -> this.changesEmitter = null) .doOnDispose(() -> this.changesEmitter = null)
.share(); .share();
} }
@ -98,8 +98,8 @@ public abstract class ObservableCollection<TItem> {
* Method to notify that collection have changed. * Method to notify that collection have changed.
* *
* @param insertedItems Collection of inserted items; * @param insertedItems Collection of inserted items;
* @param removedItems Collection of removed items; * @param removedItems Collection of removed items;
* @param changes Changes of collection. * @param changes Changes of collection.
*/ */
protected void notifyAboutChanges(@NonNull final List<TItem> insertedItems, protected void notifyAboutChanges(@NonNull final List<TItem> insertedItems,
@NonNull final List<TItem> removedItems, @NonNull final List<TItem> removedItems,

View File

@ -10,10 +10,11 @@ import java.util.List;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import ru.touchin.roboswag.core.observables.collections.changes.DefaultCollectionsChangesCalculator; import ru.touchin.roboswag.core.observables.collections.changes.DefaultCollectionsChangesCalculator;
import rx.Scheduler; import io.reactivex.Scheduler;
import rx.Subscription; import io.reactivex.disposables.Disposable;
import rx.functions.Func1; import io.reactivex.functions.Function;
import rx.schedulers.Schedulers; import io.reactivex.schedulers.Schedulers;
import ru.touchin.roboswag.core.log.Lc;
/** /**
* Created by Gavriil Sitnikov on 02/06/2016. * Created by Gavriil Sitnikov on 02/06/2016.
@ -29,15 +30,19 @@ public class ObservableFilteredList<TItem> extends ObservableCollection<TItem> {
@NonNull @NonNull
private static <TItem> List<TItem> filterCollection(@NonNull final Collection<TItem> sourceCollection, private static <TItem> List<TItem> filterCollection(@NonNull final Collection<TItem> sourceCollection,
@Nullable final Func1<TItem, Boolean> filter) { @Nullable final Function<TItem, Boolean> filter) {
if (filter == null) { if (filter == null) {
return new ArrayList<>(sourceCollection); return new ArrayList<>(sourceCollection);
} }
final List<TItem> result = new ArrayList<>(sourceCollection.size()); final List<TItem> result = new ArrayList<>(sourceCollection.size());
for (final TItem item : sourceCollection) { try {
if (filter.call(item)) { for (final TItem item : sourceCollection) {
result.add(item); if (filter.apply(item)) {
result.add(item);
}
} }
} catch (final Exception exception) {
Lc.assertion(exception);
} }
return result; return result;
} }
@ -47,23 +52,23 @@ public class ObservableFilteredList<TItem> extends ObservableCollection<TItem> {
@NonNull @NonNull
private ObservableCollection<TItem> sourceCollection; private ObservableCollection<TItem> sourceCollection;
@Nullable @Nullable
private Func1<TItem, Boolean> filter; private Function<TItem, Boolean> filter;
@Nullable @Nullable
private Subscription sourceCollectionSubscription; private Disposable sourceCollectionSubscription;
public ObservableFilteredList() { public ObservableFilteredList() {
this(new ArrayList<>(), null); this(new ArrayList<>(), null);
} }
public ObservableFilteredList(@NonNull final Func1<TItem, Boolean> filter) { public ObservableFilteredList(@NonNull final Function<TItem, Boolean> filter) {
this(new ArrayList<>(), filter); this(new ArrayList<>(), filter);
} }
public ObservableFilteredList(@NonNull final Collection<TItem> sourceCollection, @Nullable final Func1<TItem, Boolean> filter) { public ObservableFilteredList(@NonNull final Collection<TItem> sourceCollection, @Nullable final Function<TItem, Boolean> filter) {
this(new ObservableList<>(sourceCollection), filter); this(new ObservableList<>(sourceCollection), filter);
} }
public ObservableFilteredList(@NonNull final ObservableCollection<TItem> sourceCollection, @Nullable final Func1<TItem, Boolean> filter) { public ObservableFilteredList(@NonNull final ObservableCollection<TItem> sourceCollection, @Nullable final Function<TItem, Boolean> filter) {
super(); super();
this.filter = filter; this.filter = filter;
this.sourceCollection = sourceCollection; this.sourceCollection = sourceCollection;
@ -96,14 +101,14 @@ public class ObservableFilteredList<TItem> extends ObservableCollection<TItem> {
* *
* @param filter Function to filter item. True - item will stay, false - item will be filtered. * @param filter Function to filter item. True - item will stay, false - item will be filtered.
*/ */
public void setFilter(@Nullable final Func1<TItem, Boolean> filter) { public void setFilter(@Nullable final Function<TItem, Boolean> filter) {
this.filter = filter; this.filter = filter;
updateInternal(); updateInternal();
} }
private void updateInternal() { private void updateInternal() {
if (sourceCollectionSubscription != null) { if (sourceCollectionSubscription != null) {
sourceCollectionSubscription.unsubscribe(); sourceCollectionSubscription.dispose();
sourceCollectionSubscription = null; sourceCollectionSubscription = null;
} }
sourceCollectionSubscription = sourceCollection.observeItems() sourceCollectionSubscription = sourceCollection.observeItems()

View File

@ -58,6 +58,8 @@ public class ObservableList<TItem> extends ObservableCollection<TItem> implement
private SameItemsPredicate<TItem> sameItemsPredicate; private SameItemsPredicate<TItem> sameItemsPredicate;
@Nullable @Nullable
private ChangePayloadProducer<TItem> changePayloadProducer; private ChangePayloadProducer<TItem> changePayloadProducer;
@Nullable
private ObservableList<TItem> diffUtilsSource;
public ObservableList() { public ObservableList() {
super(); super();
@ -227,9 +229,19 @@ public class ObservableList<TItem> extends ObservableCollection<TItem> implement
synchronized (this) { synchronized (this) {
final List<TItem> oldList = new ArrayList<>(items); final List<TItem> oldList = new ArrayList<>(items);
final List<TItem> newList = new ArrayList<>(newItems); final List<TItem> newList = new ArrayList<>(newItems);
final CollectionsChangesCalculator<TItem> calculator = sameItemsPredicate != null final CollectionsChangesCalculator<TItem> calculator;
? new DiffCollectionsChangesCalculator<>(oldList, newList, detectMoves, sameItemsPredicate, changePayloadProducer) if (diffUtilsSource != null) {
: new DefaultCollectionsChangesCalculator<>(oldList, newList, false); if (diffUtilsSource.sameItemsPredicate != null) {
calculator = new DiffCollectionsChangesCalculator<>(oldList, newList,
diffUtilsSource.detectMoves, diffUtilsSource.sameItemsPredicate, diffUtilsSource.changePayloadProducer);
} else {
calculator = new DefaultCollectionsChangesCalculator<>(oldList, newList, false);
}
} else if (sameItemsPredicate != null) {
calculator = new DiffCollectionsChangesCalculator<>(oldList, newList, detectMoves, sameItemsPredicate, changePayloadProducer);
} else {
calculator = new DefaultCollectionsChangesCalculator<>(oldList, newList, false);
}
items.clear(); items.clear();
items.addAll(newItems); items.addAll(newItems);
notifyAboutChanges(calculator.calculateInsertedItems(), calculator.calculateRemovedItems(), calculator.calculateChanges()); notifyAboutChanges(calculator.calculateInsertedItems(), calculator.calculateRemovedItems(), calculator.calculateChanges());
@ -246,8 +258,8 @@ public class ObservableList<TItem> extends ObservableCollection<TItem> implement
/** /**
* Enable diff utils algorithm in collection changes. * Enable diff utils algorithm in collection changes.
* *
* @param detectMoves The flag that determines whether the {@link Change.Moved} changes will be generated or not; * @param detectMoves The flag that determines whether the {@link Change.Moved} changes will be generated or not;
* @param sameItemsPredicate Predicate for the determination of the same elements; * @param sameItemsPredicate Predicate for the determination of the same elements;
* @param changePayloadProducer Function that calculate change payload when items the same but contents are different. * @param changePayloadProducer Function that calculate change payload when items the same but contents are different.
*/ */
public void enableDiffUtils(final boolean detectMoves, public void enableDiffUtils(final boolean detectMoves,
@ -271,7 +283,16 @@ public class ObservableList<TItem> extends ObservableCollection<TItem> implement
* @return true if diff utils is enabled. * @return true if diff utils is enabled.
*/ */
public boolean diffUtilsIsEnabled() { public boolean diffUtilsIsEnabled() {
return sameItemsPredicate != null; return diffUtilsSource != null ? diffUtilsSource.diffUtilsIsEnabled() : sameItemsPredicate != null;
}
/**
* Sets observableCollection as a source of diff utils parameters;
*
* @param diffUtilsSource Source of diff utils parameters.
*/
public void setDiffUtilsSource(@Nullable final ObservableList<TItem> diffUtilsSource) {
this.diffUtilsSource = diffUtilsSource;
} }
/** /**

View File

@ -26,22 +26,21 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.concurrent.Callable;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.BehaviorSubject;
import ru.touchin.roboswag.core.log.Lc; import ru.touchin.roboswag.core.log.Lc;
import ru.touchin.roboswag.core.observables.collections.ObservableCollection; import ru.touchin.roboswag.core.observables.collections.ObservableCollection;
import ru.touchin.roboswag.core.observables.collections.ObservableList; import ru.touchin.roboswag.core.observables.collections.ObservableList;
import ru.touchin.roboswag.core.observables.collections.changes.Change; import ru.touchin.roboswag.core.observables.collections.changes.Change;
import ru.touchin.roboswag.core.observables.collections.changes.CollectionChanges; import ru.touchin.roboswag.core.observables.collections.changes.CollectionChanges;
import ru.touchin.roboswag.core.utils.ShouldNotHappenException; import ru.touchin.roboswag.core.utils.Optional;
import rx.Observable;
import rx.Scheduler;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;
/** /**
* Created by Gavriil Sitnikov on 23/05/16. * Created by Gavriil Sitnikov on 23/05/16.
@ -67,7 +66,7 @@ public class LoadingMoreList<TItem, TMoreReference, TLoadedItems extends LoadedI
@NonNull @NonNull
private Observable<TLoadedItems> loadingMoreObservable; private Observable<TLoadedItems> loadingMoreObservable;
@NonNull @NonNull
private final BehaviorSubject<Integer> moreItemsCount = BehaviorSubject.create(LoadedItems.UNKNOWN_ITEMS_COUNT); private final BehaviorSubject<Integer> moreItemsCount = BehaviorSubject.createDefault(LoadedItems.UNKNOWN_ITEMS_COUNT);
@NonNull @NonNull
private final ObservableList<TItem> innerList = new ObservableList<>(); private final ObservableList<TItem> innerList = new ObservableList<>();
@Nullable @Nullable
@ -85,14 +84,8 @@ public class LoadingMoreList<TItem, TMoreReference, TLoadedItems extends LoadedI
@Nullable final LoadedItems<TItem, TMoreReference> initialItems) { @Nullable final LoadedItems<TItem, TMoreReference> initialItems) {
super(); super();
this.loadingMoreObservable = Observable this.loadingMoreObservable = Observable
.switchOnNext(Observable.fromCallable(() -> createLoadRequestBasedObservable(this::createActualRequest, moreMoreItemsLoader::load))) .switchOnNext(Observable
.single() .fromCallable(() -> createLoadRequestBasedObservable(this::createActualRequest, moreMoreItemsLoader::load).toObservable()))
.doOnError(throwable -> {
if (throwable instanceof IllegalArgumentException || throwable instanceof NoSuchElementException) {
Lc.assertion(new ShouldNotHappenException("Updates during loading not supported."
+ " MoreItemsLoader should emit only one result.", throwable));
}
})
.doOnNext(loadedItems -> onItemsLoaded(loadedItems, size(), false)) .doOnNext(loadedItems -> onItemsLoaded(loadedItems, size(), false))
.replay(1) .replay(1)
.refCount(); .refCount();
@ -113,16 +106,16 @@ public class LoadingMoreList<TItem, TMoreReference, TLoadedItems extends LoadedI
} }
@NonNull @NonNull
protected <T, TRequest> Observable<T> createLoadRequestBasedObservable(@NonNull final Func0<TRequest> requestCreator, protected <T, TRequest> Single<T> createLoadRequestBasedObservable(@NonNull final Callable<TRequest> requestCreator,
@NonNull final Func1<TRequest, Observable<T>> observableCreator) { @NonNull final Function<TRequest, Single<T>> observableCreator) {
return Observable return Single
.fromCallable(requestCreator) .fromCallable(requestCreator)
.switchMap(loadRequest -> observableCreator.call(loadRequest) .flatMap(loadRequest -> observableCreator.apply(loadRequest)
.subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.io())
.observeOn(loaderScheduler) .observeOn(loaderScheduler)
.doOnNext(ignored -> { .doOnSuccess(ignored -> {
if (!requestCreator.call().equals(loadRequest)) { if (!requestCreator.call().equals(loadRequest)) {
throw OnErrorThrowable.from(new RequestChangedDuringLoadingException()); throw new RequestChangedDuringLoadingException();
} }
})) }))
.retry((number, throwable) -> .retry((number, throwable) ->
@ -297,20 +290,20 @@ public class LoadingMoreList<TItem, TMoreReference, TLoadedItems extends LoadedI
* @return {@link Observable} to load item. * @return {@link Observable} to load item.
*/ */
@NonNull @NonNull
public Observable<TItem> loadItem(final int position) { public Single<Optional<TItem>> loadItem(final int position) {
return Observable return Observable.switchOnNext(Observable
.switchOnNext(Observable .fromCallable(() -> {
.fromCallable(() -> { if (position < size()) {
if (position < size()) { return Observable.just(new Optional<>(get(position)));
return Observable.just(get(position)); } else if (moreItemsCount.getValue() == 0) {
} else if (moreItemsCount.getValue() == 0) { return Observable.just(new Optional<TItem>(null));
return Observable.just((TItem) null); } else {
} else { return loadingMoreObservable.switchMap(ignored -> Observable.<Optional<TItem>>error(new NotLoadedYetException()));
return loadingMoreObservable.switchMap(ignored -> Observable.<TItem>error(new NotLoadedYetException())); }
} }))
}) .subscribeOn(loaderScheduler)
.subscribeOn(loaderScheduler)) .retry((number, throwable) -> throwable instanceof NotLoadedYetException)
.retry((number, throwable) -> throwable instanceof NotLoadedYetException); .firstOrError();
} }
/** /**
@ -322,15 +315,24 @@ public class LoadingMoreList<TItem, TMoreReference, TLoadedItems extends LoadedI
* @return {@link Observable} to load items. * @return {@link Observable} to load items.
*/ */
@NonNull @NonNull
public Observable<Collection<TItem>> loadRange(final int first, final int last) { @SuppressWarnings("unchecked")
final List<Observable<TItem>> itemsRequests = new ArrayList<>(); //unchecked: it's OK for such zip operator
public Single<Collection<TItem>> loadRange(final int first, final int last) {
final List<Single<Optional<TItem>>> itemsRequests = new ArrayList<>();
for (int i = first; i <= last; i++) { for (int i = first; i <= last; i++) {
itemsRequests.add(loadItem(i)); itemsRequests.add(loadItem(i));
} }
return Observable.concatEager(itemsRequests) return Single.zip(itemsRequests,
.filter(loadedItem -> loadedItem != null) items -> {
.toList() final List<TItem> result = new ArrayList<>();
.map(Collections::unmodifiableCollection); for (final Object item : items) {
final Optional<TItem> optional = (Optional<TItem>) item;
if (optional.get() != null) {
result.add(optional.get());
}
}
return Collections.unmodifiableCollection(result);
});
} }
/** /**

View File

@ -21,7 +21,7 @@ package ru.touchin.roboswag.core.observables.collections.loadable;
import android.support.annotation.NonNull; import android.support.annotation.NonNull;
import rx.Observable; import io.reactivex.Single;
/** /**
* Created by Gavriil Sitnikov on 02/06/2016. * Created by Gavriil Sitnikov on 02/06/2016.
@ -34,12 +34,12 @@ import rx.Observable;
public interface MoreItemsLoader<TItem, TMoreReference, TLoadedItems extends LoadedItems<TItem, TMoreReference>> { public interface MoreItemsLoader<TItem, TMoreReference, TLoadedItems extends LoadedItems<TItem, TMoreReference>> {
/** /**
* Returns {@link Observable} that could load next part of items. * Returns {@link Single} that could load next part of items.
* *
* @param moreLoadRequest Request with info inside to load next part of items; * @param moreLoadRequest Request with info inside to load next part of items;
* @return {@link Observable} of loading items. * @return {@link Single} of loading items.
*/ */
@NonNull @NonNull
Observable<TLoadedItems> load(@NonNull final MoreLoadRequest<TMoreReference> moreLoadRequest); Single<TLoadedItems> load(@NonNull final MoreLoadRequest<TMoreReference> moreLoadRequest);
} }

View File

@ -26,19 +26,17 @@ import java.lang.reflect.Type;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import ru.touchin.roboswag.core.log.Lc; import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import ru.touchin.roboswag.core.log.LcGroup; import ru.touchin.roboswag.core.log.LcGroup;
import ru.touchin.roboswag.core.observables.OnSubscribeRefCountWithCacheTime; import ru.touchin.roboswag.core.observables.ObservableRefCountWithCacheTime;
import ru.touchin.roboswag.core.utils.ObjectUtils; import ru.touchin.roboswag.core.utils.ObjectUtils;
import ru.touchin.roboswag.core.utils.Optional; import ru.touchin.roboswag.core.utils.Optional;
import rx.Completable;
import rx.Observable;
import rx.Scheduler;
import rx.Single;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Actions;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
/** /**
* Created by Gavriil Sitnikov on 04/10/2015. * Created by Gavriil Sitnikov on 04/10/2015.
@ -123,7 +121,8 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
} }
@Nullable @Nullable
private Optional<TStoreObject> returnDefaultValueIfNull(@NonNull final Optional<TStoreObject> storeObject, @Nullable final TObject defaultValue) { private Optional<TStoreObject> returnDefaultValueIfNull(@NonNull final Optional<TStoreObject> storeObject, @Nullable final TObject defaultValue)
throws Converter.ConversionException {
if (storeObject.get() != null || defaultValue == null) { if (storeObject.get() != null || defaultValue == null) {
return storeObject; return storeObject;
} }
@ -133,7 +132,7 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
} catch (final Converter.ConversionException exception) { } catch (final Converter.ConversionException exception) {
STORABLE_LC_GROUP.w(exception, "Exception while converting default value of '%s' from '%s' from store %s", STORABLE_LC_GROUP.w(exception, "Exception while converting default value of '%s' from '%s' from store %s",
key, defaultValue, store); key, defaultValue, store);
throw OnErrorThrowable.from(exception); throw exception;
} }
} }
@ -160,7 +159,7 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
.concatWith(newStoreValueEvent) .concatWith(newStoreValueEvent)
.map(storeObject -> returnDefaultValueIfNull(storeObject, defaultValue)); .map(storeObject -> returnDefaultValueIfNull(storeObject, defaultValue));
return observeStrategy == ObserveStrategy.CACHE_STORE_VALUE || observeStrategy == ObserveStrategy.CACHE_STORE_AND_ACTUAL_VALUE return observeStrategy == ObserveStrategy.CACHE_STORE_VALUE || observeStrategy == ObserveStrategy.CACHE_STORE_AND_ACTUAL_VALUE
? Observable.unsafeCreate(new OnSubscribeRefCountWithCacheTime<>(result.replay(1), cacheTimeMillis, TimeUnit.MILLISECONDS)) ? RxJavaPlugins.onAssembly(new ObservableRefCountWithCacheTime<>(result.replay(1), cacheTimeMillis, TimeUnit.MILLISECONDS))
: result; : result;
} }
@ -175,11 +174,11 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
} catch (final Converter.ConversionException exception) { } catch (final Converter.ConversionException exception) {
STORABLE_LC_GROUP.w(exception, "Exception while trying to converting value of '%s' from store %s by %s", STORABLE_LC_GROUP.w(exception, "Exception while trying to converting value of '%s' from store %s by %s",
key, storeObject, store, converter); key, storeObject, store, converter);
throw OnErrorThrowable.from(exception); throw exception;
} }
}); });
return observeStrategy == ObserveStrategy.CACHE_ACTUAL_VALUE || observeStrategy == ObserveStrategy.CACHE_STORE_AND_ACTUAL_VALUE return observeStrategy == ObserveStrategy.CACHE_ACTUAL_VALUE || observeStrategy == ObserveStrategy.CACHE_STORE_AND_ACTUAL_VALUE
? Observable.unsafeCreate(new OnSubscribeRefCountWithCacheTime<>(result.replay(1), cacheTimeMillis, TimeUnit.MILLISECONDS)) ? RxJavaPlugins.onAssembly(new ObservableRefCountWithCacheTime<>(result.replay(1), cacheTimeMillis, TimeUnit.MILLISECONDS))
: result; : result;
} }
@ -235,7 +234,7 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
@NonNull @NonNull
private Completable internalSet(@Nullable final TObject newValue, final boolean checkForEqualityBeforeSet) { private Completable internalSet(@Nullable final TObject newValue, final boolean checkForEqualityBeforeSet) {
return (checkForEqualityBeforeSet ? storeValueObservable.take(1).toSingle() : Single.just(new Optional<>(null))) return (checkForEqualityBeforeSet ? storeValueObservable.firstOrError() : Single.just(new Optional<>(null)))
.observeOn(scheduler) .observeOn(scheduler)
.flatMapCompletable(oldStoreValue -> { .flatMapCompletable(oldStoreValue -> {
final TStoreObject newStoreValue; final TStoreObject newStoreValue;
@ -274,10 +273,9 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
* @param newValue Value to set; * @param newValue Value to set;
* @return Observable of setting process. * @return Observable of setting process.
*/ */
//COMPATIBILITY NOTE: it is not Completable to prevent migration of old code
@NonNull @NonNull
public Observable<?> forceSet(@Nullable final TObject newValue) { public Completable forceSet(@Nullable final TObject newValue) {
return internalSet(newValue, false).toObservable(); return internalSet(newValue, false);
} }
/** /**
@ -290,16 +288,9 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
* @param newValue Value to set; * @param newValue Value to set;
* @return Observable of setting process. * @return Observable of setting process.
*/ */
//COMPATIBILITY NOTE: it is not Completable to prevent migration of old code
@NonNull @NonNull
public Observable<?> set(@Nullable final TObject newValue) { public Completable set(@Nullable final TObject newValue) {
return internalSet(newValue, true).toObservable(); return internalSet(newValue, true);
}
@Deprecated
//COMPATIBILITY NOTE: it is deprecated as it's execution not bound to Android lifecycle objects
public void setCalm(@Nullable final TObject newValue) {
set(newValue).subscribe(Actions.empty(), Lc::assertion);
} }
/** /**
@ -310,7 +301,7 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
@Deprecated @Deprecated
//deprecation: it should be used for debug only and in very rare cases. //deprecation: it should be used for debug only and in very rare cases.
public void setSync(@Nullable final TObject newValue) { public void setSync(@Nullable final TObject newValue) {
set(newValue).toBlocking().subscribe(); set(newValue).blockingAwait();
} }
@NonNull @NonNull
@ -334,9 +325,8 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
* @return Returns observable of value. * @return Returns observable of value.
*/ */
@NonNull @NonNull
//COMPATIBILITY NOTE: it is not Single to prevent migration of old code public Single<TReturnObject> get() {
public Observable<TReturnObject> get() { return observe().firstOrError();
return observe().take(1);
} }
/** /**
@ -344,11 +334,16 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
* *
* @return Returns value; * @return Returns value;
*/ */
@Deprecated
//deprecation: it should be used for debug only and in very rare cases.
@Nullable @Nullable
public TReturnObject getSync() { public TObject getSync() {
return get().toBlocking().first(); final TStoreObject storeObject = store.getObject(storeObjectType, key);
try {
return converter.toObject(objectType, storeObjectType, storeObject);
} catch (final Converter.ConversionException exception) {
STORABLE_LC_GROUP.w(exception, "Exception while trying to converting value of '%s' from store %s by %s",
key, storeObject, store, converter);
return null;
}
} }
/** /**

View File

@ -38,7 +38,7 @@ public interface Converter<TObject, TStoreObject> {
* *
* @param objectType Type of object; * @param objectType Type of object;
* @param storeObjectType Type of store object allowed to store; * @param storeObjectType Type of store object allowed to store;
* @param object Object to be converted to store object; * @param object Object to be converted to store object;
* @return Object that is allowed to store into specific {@link Store}; * @return Object that is allowed to store into specific {@link Store};
* @throws ConversionException Exception during conversion. Usually it indicates illegal state. * @throws ConversionException Exception during conversion. Usually it indicates illegal state.
*/ */
@ -51,7 +51,7 @@ public interface Converter<TObject, TStoreObject> {
* *
* @param objectType Type of object; * @param objectType Type of object;
* @param storeObjectType Type of store object allowed to store; * @param storeObjectType Type of store object allowed to store;
* @param storeObject Object from specific {@link Store}; * @param storeObject Object from specific {@link Store};
* @return Object converted from store object; * @return Object converted from store object;
* @throws ConversionException Exception during conversion. Usually it indicates illegal state. * @throws ConversionException Exception during conversion. Usually it indicates illegal state.
*/ */

View File

@ -24,10 +24,9 @@ import android.support.annotation.NonNull;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import rx.Completable; import io.reactivex.Completable;
import rx.Observable; import io.reactivex.Flowable;
import rx.Single; import io.reactivex.Single;
import rx.exceptions.OnErrorThrowable;
/** /**
* Created by Gavriil Sitnikov on 06/10/2015. * Created by Gavriil Sitnikov on 06/10/2015.
@ -91,15 +90,16 @@ public class Migration<TKey> {
return makeMigrationChain(key, versionUpdater) return makeMigrationChain(key, versionUpdater)
.doOnSuccess(lastUpdatedVersion -> { .doOnSuccess(lastUpdatedVersion -> {
if (lastUpdatedVersion < latestVersion) { if (lastUpdatedVersion < latestVersion) {
throw OnErrorThrowable.from(new NextLoopMigrationException()); throw new NextLoopMigrationException();
} }
if (versionUpdater.initialVersion == versionUpdater.oldVersion) { if (versionUpdater.initialVersion == versionUpdater.oldVersion) {
throw new MigrationException(String.format("Version of '%s' not updated from %s", throw new MigrationException(String.format("Version of '%s' not updated from %s",
key, versionUpdater.initialVersion)); key, versionUpdater.initialVersion));
} }
}) })
.retryWhen(attempts -> attempts.switchMap(throwable -> throwable instanceof NextLoopMigrationException .retryWhen(attempts -> attempts
? Observable.just(null) : Observable.error(throwable))); .switchMap(throwable -> throwable instanceof NextLoopMigrationException
? Flowable.just(new Object()) : Flowable.error(throwable)));
}) })
.toCompletable() .toCompletable()
.andThen(versionsStore.storeObject(Long.class, key, latestVersion)) .andThen(versionsStore.storeObject(Long.class, key, latestVersion))

View File

@ -21,7 +21,7 @@ package ru.touchin.roboswag.core.observables.storable;
import android.support.annotation.NonNull; import android.support.annotation.NonNull;
import rx.Single; import io.reactivex.Single;
/** /**
* Created by Gavriil Sitnikov on 05/10/2015. * Created by Gavriil Sitnikov on 05/10/2015.

View File

@ -17,19 +17,16 @@
* *
*/ */
package ru.touchin.roboswag.core.observables.storable.concrete; package ru.touchin.roboswag.core.observables.storable;
import android.support.annotation.NonNull; import android.support.annotation.NonNull;
import android.support.annotation.Nullable; import android.support.annotation.Nullable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import ru.touchin.roboswag.core.observables.storable.BaseStorable;
import ru.touchin.roboswag.core.observables.storable.Migration;
import ru.touchin.roboswag.core.observables.storable.Storable;
import ru.touchin.roboswag.core.utils.ShouldNotHappenException; import ru.touchin.roboswag.core.utils.ShouldNotHappenException;
import rx.Observable; import io.reactivex.Observable;
import rx.Scheduler; import io.reactivex.Scheduler;
/** /**
* Created by Gavriil Sitnikov on 04/10/2015. * Created by Gavriil Sitnikov on 04/10/2015.

View File

@ -25,10 +25,9 @@ import android.support.annotation.Nullable;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import ru.touchin.roboswag.core.observables.storable.concrete.NonNullStorable;
import ru.touchin.roboswag.core.utils.Optional; import ru.touchin.roboswag.core.utils.Optional;
import rx.Observable; import io.reactivex.Observable;
import rx.Scheduler; import io.reactivex.Scheduler;
/** /**
* Created by Gavriil Sitnikov on 04/10/2015. * Created by Gavriil Sitnikov on 04/10/2015.
@ -43,8 +42,7 @@ import rx.Scheduler;
* @param <TObject> Type of actual object; * @param <TObject> Type of actual object;
* @param <TStoreObject> Type of store object. Could be same as {@link TObject}. * @param <TStoreObject> Type of store object. Could be same as {@link TObject}.
*/ */
//COMPATIBILITY NOTE: in RxJava2 it should extends BaseStorable<TKey, TObject, TStoreObject, Optional<TObject>> public class Storable<TKey, TObject, TStoreObject> extends BaseStorable<TKey, TObject, TStoreObject, Optional<TObject>> {
public class Storable<TKey, TObject, TStoreObject> extends BaseStorable<TKey, TObject, TStoreObject, TObject> {
public Storable(@NonNull final BuilderCore<TKey, TObject, TStoreObject> builderCore) { public Storable(@NonNull final BuilderCore<TKey, TObject, TStoreObject> builderCore) {
super(builderCore); super(builderCore);
@ -52,8 +50,8 @@ public class Storable<TKey, TObject, TStoreObject> extends BaseStorable<TKey, TO
@NonNull @NonNull
@Override @Override
public Observable<TObject> observe() { public Observable<Optional<TObject>> observe() {
return observeOptionalValue().map(Optional::get); return observeOptionalValue();
} }
/** /**

View File

@ -25,8 +25,8 @@ import android.support.annotation.Nullable;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import ru.touchin.roboswag.core.utils.Optional; import ru.touchin.roboswag.core.utils.Optional;
import rx.Completable; import io.reactivex.Completable;
import rx.Single; import io.reactivex.Single;
/** /**
* Created by Gavriil Sitnikov on 04/10/2015. * Created by Gavriil Sitnikov on 04/10/2015.
@ -67,4 +67,14 @@ public interface Store<TKey, TStoreObject> {
@NonNull @NonNull
Single<Optional<TStoreObject>> loadObject(@NonNull Type storeObjectType, @NonNull TKey key); Single<Optional<TStoreObject>> loadObject(@NonNull Type storeObjectType, @NonNull TKey key);
/**
* Gets object from store by key.
*
* @param storeObjectType Type of object to store;
* @param key Key related to object;
* @return Object from store found by key;
*/
@Nullable
TStoreObject getObject(@NonNull Type storeObjectType, @NonNull TKey key);
} }

View File

@ -23,7 +23,8 @@ import android.support.annotation.NonNull;
import java.security.MessageDigest; import java.security.MessageDigest;
import rx.functions.Func1; import io.reactivex.functions.Function;
import ru.touchin.roboswag.core.log.Lc;
/** /**
* Created by Gavriil Sitnikov on 29/08/2016. * Created by Gavriil Sitnikov on 29/08/2016.
@ -65,11 +66,15 @@ public final class StringUtils {
* @param condition Condition of symbol; * @param condition Condition of symbol;
* @return True if some character satisfies condition. * @return True if some character satisfies condition.
*/ */
public static boolean containsCharLike(@NonNull final String text, @NonNull final Func1<Character, Boolean> condition) { public static boolean containsCharLike(@NonNull final String text, @NonNull final Function<Character, Boolean> condition) {
for (int i = 0; i < text.length(); i++) { try {
if (condition.call(text.charAt(i))) { for (int i = 0; i < text.length(); i++) {
return true; if (condition.apply(text.charAt(i))) {
return true;
}
} }
} catch (final Exception exception) {
Lc.assertion(exception);
} }
return false; return false;
} }

View File

@ -32,7 +32,7 @@ import ru.touchin.roboswag.core.utils.ObjectUtils;
* Both arguments are not null. * Both arguments are not null.
* Note that if you want to save this pair in state, you need make TFirst and TSecond Serializable too. * Note that if you want to save this pair in state, you need make TFirst and TSecond Serializable too.
* *
* @param <TFirst> type of the first nonnull argument. * @param <TFirst> type of the first nonnull argument.
* @param <TSecond> type of the second nonnull argument. * @param <TSecond> type of the second nonnull argument.
*/ */
public class NonNullPair<TFirst, TSecond> implements Serializable { public class NonNullPair<TFirst, TSecond> implements Serializable {