Compare commits
37 Commits
master
...
noblocking
| Author | SHA1 | Date |
|---|---|---|
|
|
669782c5a0 | |
|
|
d83fe2d510 | |
|
|
3aa53424a7 | |
|
|
8a586a9638 | |
|
|
8af90dcdd5 | |
|
|
2a3c647a30 | |
|
|
f97a81a4fb | |
|
|
4b78713f50 | |
|
|
6e7cd65ab2 | |
|
|
e3e9b96fee | |
|
|
ed31b8db1a | |
|
|
c496eba64e | |
|
|
b4e3c018aa | |
|
|
7142bed098 | |
|
|
b0610e1f51 | |
|
|
0e4b89bb4d | |
|
|
fb4634a6ed | |
|
|
e14f5b00cb | |
|
|
ac6b3e85ac | |
|
|
5e4475d2b2 | |
|
|
4166445bc6 | |
|
|
e5d021c885 | |
|
|
5e9bfc701c | |
|
|
3bf6f76159 | |
|
|
8c1f3d35fc | |
|
|
e4d652b3ab | |
|
|
ef4632304e | |
|
|
6ed1cb4e92 | |
|
|
50371ee530 | |
|
|
20951bed03 | |
|
|
0cb3099d14 | |
|
|
29dc6ce1c0 | |
|
|
7e6513218e | |
|
|
768c83addc | |
|
|
52c5b7fa98 | |
|
|
2602d56cbd | |
|
|
553f4f6bfa |
10
build.gradle
10
build.gradle
|
|
@ -1,9 +1,7 @@
|
||||||
apply plugin: 'com.android.library'
|
apply plugin: 'com.android.library'
|
||||||
apply plugin: 'me.tatarka.retrolambda'
|
|
||||||
|
|
||||||
android {
|
android {
|
||||||
compileSdkVersion 25
|
compileSdkVersion compileSdk
|
||||||
buildToolsVersion '25.0.3'
|
|
||||||
|
|
||||||
compileOptions {
|
compileOptions {
|
||||||
sourceCompatibility JavaVersion.VERSION_1_8
|
sourceCompatibility JavaVersion.VERSION_1_8
|
||||||
|
|
@ -16,7 +14,7 @@ android {
|
||||||
}
|
}
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
provided 'com.android.support:support-annotations:25.4.0'
|
compileOnly "com.android.support:support-annotations:$supportLibraryVersion"
|
||||||
provided 'io.reactivex:rxandroid:1.2.1'
|
compileOnly "io.reactivex.rxjava2:rxandroid:$rxAndroidVersion"
|
||||||
provided 'io.reactivex:rxjava:1.3.0'
|
compileOnly "io.reactivex.rxjava2:rxjava:$rxJavaVersion"
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -83,7 +83,6 @@ public class DiffUtil {
|
||||||
* Calculates the list of update operations that can covert one list into the other one.
|
* Calculates the list of update operations that can covert one list into the other one.
|
||||||
*
|
*
|
||||||
* @param cb The callback that acts as a gateway to the backing list data
|
* @param cb The callback that acts as a gateway to the backing list data
|
||||||
*
|
|
||||||
* @return A DiffResult that contains the information about the edit sequence to convert the
|
* @return A DiffResult that contains the information about the edit sequence to convert the
|
||||||
* old list into the new list.
|
* old list into the new list.
|
||||||
*/
|
*/
|
||||||
|
|
@ -100,7 +99,6 @@ public class DiffUtil {
|
||||||
*
|
*
|
||||||
* @param cb The callback that acts as a gateway to the backing list data
|
* @param cb The callback that acts as a gateway to the backing list data
|
||||||
* @param detectMoves True if DiffUtil should try to detect moved items, false otherwise.
|
* @param detectMoves True if DiffUtil should try to detect moved items, false otherwise.
|
||||||
*
|
|
||||||
* @return A DiffResult that contains the information about the edit sequence to convert the
|
* @return A DiffResult that contains the information about the edit sequence to convert the
|
||||||
* old list into the new list.
|
* old list into the new list.
|
||||||
*/
|
*/
|
||||||
|
|
@ -329,7 +327,6 @@ public class DiffUtil {
|
||||||
*
|
*
|
||||||
* @param oldItemPosition The position of the item in the old list
|
* @param oldItemPosition The position of the item in the old list
|
||||||
* @param newItemPosition The position of the item in the new list
|
* @param newItemPosition The position of the item in the new list
|
||||||
*
|
|
||||||
* @return A payload object that represents the change between the two items.
|
* @return A payload object that represents the change between the two items.
|
||||||
*/
|
*/
|
||||||
@Nullable
|
@Nullable
|
||||||
|
|
@ -560,7 +557,6 @@ public class DiffUtil {
|
||||||
* @param y The y position in the matrix (position in the new list)
|
* @param y The y position in the matrix (position in the new list)
|
||||||
* @param snakeIndex The current snake index
|
* @param snakeIndex The current snake index
|
||||||
* @param removal True if we are looking for a removal, false otherwise
|
* @param removal True if we are looking for a removal, false otherwise
|
||||||
*
|
|
||||||
* @return True if such item is found.
|
* @return True if such item is found.
|
||||||
*/
|
*/
|
||||||
private boolean findMatchingItem(final int x, final int y, final int snakeIndex,
|
private boolean findMatchingItem(final int x, final int y, final int snakeIndex,
|
||||||
|
|
|
||||||
|
|
@ -29,8 +29,8 @@ import java.io.Serializable;
|
||||||
|
|
||||||
import ru.touchin.roboswag.core.utils.ObjectUtils;
|
import ru.touchin.roboswag.core.utils.ObjectUtils;
|
||||||
import ru.touchin.roboswag.core.utils.Optional;
|
import ru.touchin.roboswag.core.utils.Optional;
|
||||||
import rx.Observable;
|
import io.reactivex.Observable;
|
||||||
import rx.subjects.BehaviorSubject;
|
import io.reactivex.subjects.BehaviorSubject;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Gavriil Sitnikov on 24/03/2016.
|
* Created by Gavriil Sitnikov on 24/03/2016.
|
||||||
|
|
@ -47,7 +47,7 @@ public abstract class BaseChangeable<TValue, TReturnValue> implements Serializab
|
||||||
private transient BehaviorSubject<Optional<TValue>> valueSubject;
|
private transient BehaviorSubject<Optional<TValue>> valueSubject;
|
||||||
|
|
||||||
public BaseChangeable(@Nullable final TValue defaultValue) {
|
public BaseChangeable(@Nullable final TValue defaultValue) {
|
||||||
valueSubject = BehaviorSubject.create(new Optional<>(defaultValue));
|
valueSubject = BehaviorSubject.createDefault(new Optional<>(defaultValue));
|
||||||
}
|
}
|
||||||
|
|
||||||
@NonNull
|
@NonNull
|
||||||
|
|
@ -88,7 +88,7 @@ public abstract class BaseChangeable<TValue, TReturnValue> implements Serializab
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private void readObject(@NonNull final ObjectInputStream inputStream) throws IOException, ClassNotFoundException {
|
private void readObject(@NonNull final ObjectInputStream inputStream) throws IOException, ClassNotFoundException {
|
||||||
valueSubject = BehaviorSubject.create((Optional<TValue>) inputStream.readObject());
|
valueSubject = BehaviorSubject.createDefault((Optional<TValue>) inputStream.readObject());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -22,16 +22,15 @@ package ru.touchin.roboswag.core.observables;
|
||||||
import android.support.annotation.NonNull;
|
import android.support.annotation.NonNull;
|
||||||
import android.support.annotation.Nullable;
|
import android.support.annotation.Nullable;
|
||||||
|
|
||||||
|
import io.reactivex.Observable;
|
||||||
import ru.touchin.roboswag.core.utils.Optional;
|
import ru.touchin.roboswag.core.utils.Optional;
|
||||||
import rx.Observable;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Gavriil Sitnikov on 24/03/2016.
|
* Created by Gavriil Sitnikov on 24/03/2016.
|
||||||
* Variant of {@link BaseChangeable} which is allows to set nullable values.
|
* Variant of {@link BaseChangeable} which is allows to set nullable values.
|
||||||
* Needed to separate non-null Changeable from nullable Changeable.
|
* Needed to separate non-null Changeable from nullable Changeable.
|
||||||
*/
|
*/
|
||||||
//COMPATIBILITY NOTE: in RxJava2 it should extends BaseChangeable<T, Optional<T>>
|
public class Changeable<T> extends BaseChangeable<T, Optional<T>> {
|
||||||
public class Changeable<T> extends BaseChangeable<T, T> {
|
|
||||||
|
|
||||||
public Changeable(@Nullable final T defaultValue) {
|
public Changeable(@Nullable final T defaultValue) {
|
||||||
super(defaultValue);
|
super(defaultValue);
|
||||||
|
|
@ -44,9 +43,8 @@ public class Changeable<T> extends BaseChangeable<T, T> {
|
||||||
*/
|
*/
|
||||||
@NonNull
|
@NonNull
|
||||||
@Override
|
@Override
|
||||||
//COMPATIBILITY NOTE: in RxJava2 it should be Observable<Optional<T>>
|
public Observable<Optional<T>> observe() {
|
||||||
public Observable<T> observe() {
|
return observeOptionalValue();
|
||||||
return observeOptionalValue().map(Optional::get);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -23,7 +23,7 @@ import android.support.annotation.NonNull;
|
||||||
|
|
||||||
import ru.touchin.roboswag.core.log.Lc;
|
import ru.touchin.roboswag.core.log.Lc;
|
||||||
import ru.touchin.roboswag.core.utils.ShouldNotHappenException;
|
import ru.touchin.roboswag.core.utils.ShouldNotHappenException;
|
||||||
import rx.Observable;
|
import io.reactivex.Observable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Gavriil Sitnikov on 24/03/2016.
|
* Created by Gavriil Sitnikov on 24/03/2016.
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,299 @@
|
||||||
|
/**
|
||||||
|
* Copyright (c) 2016-present, RxJava Contributors.
|
||||||
|
* <p>
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
|
||||||
|
* compliance with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software distributed under the License is
|
||||||
|
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
|
||||||
|
* the License for the specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package ru.touchin.roboswag.core.observables;
|
||||||
|
|
||||||
|
import android.support.annotation.NonNull;
|
||||||
|
import android.support.annotation.Nullable;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
import io.reactivex.Observable;
|
||||||
|
import io.reactivex.ObservableSource;
|
||||||
|
import io.reactivex.Observer;
|
||||||
|
import io.reactivex.Scheduler;
|
||||||
|
import io.reactivex.disposables.CompositeDisposable;
|
||||||
|
import io.reactivex.disposables.Disposable;
|
||||||
|
import io.reactivex.disposables.Disposables;
|
||||||
|
import io.reactivex.functions.Consumer;
|
||||||
|
import io.reactivex.internal.disposables.DisposableHelper;
|
||||||
|
import io.reactivex.internal.fuseable.HasUpstreamObservableSource;
|
||||||
|
import io.reactivex.observables.ConnectableObservable;
|
||||||
|
import io.reactivex.schedulers.Schedulers;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an observable sequence that stays connected to the source as long as
|
||||||
|
* there is at least one subscription to the observable sequence.
|
||||||
|
*
|
||||||
|
* @param <T> the value type
|
||||||
|
*/
|
||||||
|
@SuppressWarnings({"PMD.CompareObjectsWithEquals", "PMD.AvoidUsingVolatile"})
|
||||||
|
//AvoidUsingVolatile: it's RxJava code
|
||||||
|
public final class ObservableRefCountWithCacheTime<T> extends Observable<T> implements HasUpstreamObservableSource<T> {
|
||||||
|
|
||||||
|
@NonNull
|
||||||
|
private final ConnectableObservable<? extends T> connectableSource;
|
||||||
|
@NonNull
|
||||||
|
private final ObservableSource<T> actualSource;
|
||||||
|
|
||||||
|
@NonNull
|
||||||
|
private volatile CompositeDisposable baseDisposable = new CompositeDisposable();
|
||||||
|
|
||||||
|
@NonNull
|
||||||
|
private final AtomicInteger subscriptionCount = new AtomicInteger();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Use this lock for every subscription and disconnect action.
|
||||||
|
*/
|
||||||
|
@NonNull
|
||||||
|
private final ReentrantLock lock = new ReentrantLock();
|
||||||
|
|
||||||
|
@NonNull
|
||||||
|
private final Scheduler scheduler = Schedulers.computation();
|
||||||
|
private final long cacheTime;
|
||||||
|
@NonNull
|
||||||
|
private final TimeUnit cacheTimeUnit;
|
||||||
|
@Nullable
|
||||||
|
private Scheduler.Worker worker;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
*
|
||||||
|
* @param source observable to apply ref count to
|
||||||
|
*/
|
||||||
|
public ObservableRefCountWithCacheTime(@NonNull final ConnectableObservable<T> source,
|
||||||
|
final long cacheTime, @NonNull final TimeUnit cacheTimeUnit) {
|
||||||
|
super();
|
||||||
|
this.connectableSource = source;
|
||||||
|
this.actualSource = source;
|
||||||
|
this.cacheTime = cacheTime;
|
||||||
|
this.cacheTimeUnit = cacheTimeUnit;
|
||||||
|
}
|
||||||
|
|
||||||
|
@NonNull
|
||||||
|
public ObservableSource<T> source() {
|
||||||
|
return actualSource;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cleanupWorker() {
|
||||||
|
if (worker != null) {
|
||||||
|
worker.dispose();
|
||||||
|
worker = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void subscribeActual(@NonNull final Observer<? super T> subscriber) {
|
||||||
|
|
||||||
|
lock.lock();
|
||||||
|
if (subscriptionCount.incrementAndGet() == 1) {
|
||||||
|
cleanupWorker();
|
||||||
|
final AtomicBoolean writeLocked = new AtomicBoolean(true);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// need to use this overload of connect to ensure that
|
||||||
|
// baseDisposable is set in the case that source is a
|
||||||
|
// synchronous Observable
|
||||||
|
connectableSource.connect(onSubscribe(subscriber, writeLocked));
|
||||||
|
} finally {
|
||||||
|
// need to cover the case where the source is subscribed to
|
||||||
|
// outside of this class thus preventing the Action1 passed
|
||||||
|
// to source.connect above being called
|
||||||
|
if (writeLocked.get()) {
|
||||||
|
// Action1 passed to source.connect was not called
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
// ready to subscribe to source so do it
|
||||||
|
doSubscribe(subscriber, baseDisposable);
|
||||||
|
} finally {
|
||||||
|
// release the read lock
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@NonNull
|
||||||
|
private Consumer<Disposable> onSubscribe(@NonNull final Observer<? super T> observer, @NonNull final AtomicBoolean writeLocked) {
|
||||||
|
return new DisposeConsumer(observer, writeLocked);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doSubscribe(@NonNull final Observer<? super T> observer, @NonNull final CompositeDisposable currentBase) {
|
||||||
|
// handle disposing from the base CompositeDisposable
|
||||||
|
final Disposable disposable = disconnect(currentBase);
|
||||||
|
|
||||||
|
final ConnectionObserver connectionObserver = new ConnectionObserver(observer, currentBase, disposable);
|
||||||
|
observer.onSubscribe(connectionObserver);
|
||||||
|
|
||||||
|
connectableSource.subscribe(connectionObserver);
|
||||||
|
}
|
||||||
|
|
||||||
|
@NonNull
|
||||||
|
private Disposable disconnect(@NonNull final CompositeDisposable current) {
|
||||||
|
return Disposables.fromRunnable(new DisposeTask(current));
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class ConnectionObserver extends AtomicReference<Disposable> implements Observer<T>, Disposable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 3813126992133394324L;
|
||||||
|
|
||||||
|
@NonNull
|
||||||
|
private final Observer<? super T> subscriber;
|
||||||
|
@NonNull
|
||||||
|
private final CompositeDisposable currentBase;
|
||||||
|
@NonNull
|
||||||
|
private final Disposable resource;
|
||||||
|
|
||||||
|
public ConnectionObserver(@NonNull final Observer<? super T> subscriber, @NonNull final CompositeDisposable currentBase,
|
||||||
|
@NonNull final Disposable resource) {
|
||||||
|
super();
|
||||||
|
this.subscriber = subscriber;
|
||||||
|
this.currentBase = currentBase;
|
||||||
|
this.resource = resource;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSubscribe(@NonNull final Disposable disposable) {
|
||||||
|
DisposableHelper.setOnce(this, disposable);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(@NonNull final Throwable throwable) {
|
||||||
|
cleanup();
|
||||||
|
subscriber.onError(throwable);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onNext(@NonNull final T item) {
|
||||||
|
subscriber.onNext(item);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onComplete() {
|
||||||
|
cleanup();
|
||||||
|
subscriber.onComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void dispose() {
|
||||||
|
DisposableHelper.dispose(this);
|
||||||
|
resource.dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDisposed() {
|
||||||
|
return DisposableHelper.isDisposed(get());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cleanup() {
|
||||||
|
// on error or completion we need to dispose the base CompositeDisposable
|
||||||
|
// and set the subscriptionCount to 0
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
if (baseDisposable == currentBase) {
|
||||||
|
cleanupWorker();
|
||||||
|
if (connectableSource instanceof Disposable) {
|
||||||
|
((Disposable) connectableSource).dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
baseDisposable.dispose();
|
||||||
|
baseDisposable = new CompositeDisposable();
|
||||||
|
subscriptionCount.set(0);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class DisposeConsumer implements Consumer<Disposable> {
|
||||||
|
|
||||||
|
@NonNull
|
||||||
|
private final Observer<? super T> observer;
|
||||||
|
@NonNull
|
||||||
|
private final AtomicBoolean writeLocked;
|
||||||
|
|
||||||
|
public DisposeConsumer(@NonNull final Observer<? super T> observer, @NonNull final AtomicBoolean writeLocked) {
|
||||||
|
this.observer = observer;
|
||||||
|
this.writeLocked = writeLocked;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void accept(@NonNull final Disposable subscription) {
|
||||||
|
try {
|
||||||
|
baseDisposable.add(subscription);
|
||||||
|
// ready to subscribe to source so do it
|
||||||
|
doSubscribe(observer, baseDisposable);
|
||||||
|
} finally {
|
||||||
|
// release the write lock
|
||||||
|
lock.unlock();
|
||||||
|
writeLocked.set(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class DisposeTask implements Runnable {
|
||||||
|
|
||||||
|
@NonNull
|
||||||
|
private final CompositeDisposable current;
|
||||||
|
|
||||||
|
public DisposeTask(@NonNull final CompositeDisposable current) {
|
||||||
|
this.current = current;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
if (baseDisposable == current && subscriptionCount.decrementAndGet() == 0) {
|
||||||
|
if (worker != null) {
|
||||||
|
worker.dispose();
|
||||||
|
} else {
|
||||||
|
worker = scheduler.createWorker();
|
||||||
|
}
|
||||||
|
worker.schedule(() -> {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
if (subscriptionCount.get() == 0) {
|
||||||
|
cleanupWorker();
|
||||||
|
if (connectableSource instanceof Disposable) {
|
||||||
|
((Disposable) connectableSource).dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
baseDisposable.dispose();
|
||||||
|
// need a new baseDisposable because once
|
||||||
|
// disposed stays that way
|
||||||
|
baseDisposable = new CompositeDisposable();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}, cacheTime, cacheTimeUnit);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -1,213 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright 2014 Netflix, Inc.
|
|
||||||
* <p>
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
|
||||||
* use this file except in compliance with the License. You may obtain a copy of
|
|
||||||
* the License at
|
|
||||||
* <p>
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* <p>
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
* License for the specific language governing permissions and limitations under
|
|
||||||
* the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package ru.touchin.roboswag.core.observables;
|
|
||||||
|
|
||||||
import android.support.annotation.NonNull;
|
|
||||||
import android.support.annotation.Nullable;
|
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
|
||||||
|
|
||||||
import rx.Observable.OnSubscribe;
|
|
||||||
import rx.Scheduler;
|
|
||||||
import rx.Subscriber;
|
|
||||||
import rx.Subscription;
|
|
||||||
import rx.functions.Action1;
|
|
||||||
import rx.observables.ConnectableObservable;
|
|
||||||
import rx.schedulers.Schedulers;
|
|
||||||
import rx.subscriptions.CompositeSubscription;
|
|
||||||
import rx.subscriptions.Subscriptions;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns an observable sequence that stays connected to the source as long as
|
|
||||||
* there is at least one subscription to the observable sequence and also it stays connected
|
|
||||||
* for cache time after everyone unsubscribe.
|
|
||||||
*
|
|
||||||
* @param <T> the value type
|
|
||||||
*/
|
|
||||||
@SuppressWarnings({"PMD.AvoidUsingVolatile", "PMD.CompareObjectsWithEquals"})
|
|
||||||
//AvoidUsingVolatile,CompareObjectsWithEquals: from OnSubscribeRefCount code
|
|
||||||
public final class OnSubscribeRefCountWithCacheTime<T> implements OnSubscribe<T> {
|
|
||||||
|
|
||||||
@NonNull
|
|
||||||
private final ConnectableObservable<? extends T> source;
|
|
||||||
@NonNull
|
|
||||||
private volatile CompositeSubscription baseSubscription = new CompositeSubscription();
|
|
||||||
@NonNull
|
|
||||||
private final AtomicInteger subscriptionCount = new AtomicInteger(0);
|
|
||||||
|
|
||||||
@NonNull
|
|
||||||
private final Scheduler scheduler = Schedulers.computation();
|
|
||||||
private final long cacheTime;
|
|
||||||
@NonNull
|
|
||||||
private final TimeUnit cacheTimeUnit;
|
|
||||||
@Nullable
|
|
||||||
private Scheduler.Worker worker;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Use this lock for every subscription and disconnect action.
|
|
||||||
*/
|
|
||||||
@NonNull
|
|
||||||
private final ReentrantLock lock = new ReentrantLock();
|
|
||||||
|
|
||||||
public OnSubscribeRefCountWithCacheTime(@NonNull final ConnectableObservable<? extends T> source,
|
|
||||||
final long cacheTime, @NonNull final TimeUnit cacheTimeUnit) {
|
|
||||||
this.source = source;
|
|
||||||
this.cacheTime = cacheTime;
|
|
||||||
this.cacheTimeUnit = cacheTimeUnit;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void call(@NonNull final Subscriber<? super T> subscriber) {
|
|
||||||
|
|
||||||
lock.lock();
|
|
||||||
if (subscriptionCount.incrementAndGet() == 1) {
|
|
||||||
if (worker != null) {
|
|
||||||
worker.unsubscribe();
|
|
||||||
worker = null;
|
|
||||||
}
|
|
||||||
final AtomicBoolean writeLocked = new AtomicBoolean(true);
|
|
||||||
|
|
||||||
try {
|
|
||||||
// need to use this overload of connect to ensure that
|
|
||||||
// baseSubscription is set in the case that source is a
|
|
||||||
// synchronous Observable
|
|
||||||
source.connect(onSubscribe(subscriber, writeLocked));
|
|
||||||
} finally {
|
|
||||||
// need to cover the case where the source is subscribed to
|
|
||||||
// outside of this class thus preventing the Action1 passed
|
|
||||||
// to source.connect above being called
|
|
||||||
if (writeLocked.get()) {
|
|
||||||
// Action1 passed to source.connect was not called
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
// ready to subscribe to source so do it
|
|
||||||
doSubscribe(subscriber, baseSubscription);
|
|
||||||
} finally {
|
|
||||||
// release the read lock
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@NonNull
|
|
||||||
private Action1<Subscription> onSubscribe(@NonNull final Subscriber<? super T> subscriber,
|
|
||||||
@NonNull final AtomicBoolean writeLocked) {
|
|
||||||
return subscription -> {
|
|
||||||
try {
|
|
||||||
baseSubscription.add(subscription);
|
|
||||||
// ready to subscribe to source so do it
|
|
||||||
doSubscribe(subscriber, baseSubscription);
|
|
||||||
} finally {
|
|
||||||
// release the write lock
|
|
||||||
lock.unlock();
|
|
||||||
writeLocked.set(false);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
private void doSubscribe(@NonNull final Subscriber<? super T> subscriber, @NonNull final CompositeSubscription currentBase) {
|
|
||||||
subscriber.add(disconnect(currentBase));
|
|
||||||
source.unsafeSubscribe(new Subscriber<T>(subscriber) {
|
|
||||||
@Override
|
|
||||||
public void onError(@NonNull final Throwable throwable) {
|
|
||||||
cleanup();
|
|
||||||
subscriber.onError(throwable);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNext(@Nullable final T item) {
|
|
||||||
subscriber.onNext(item);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onCompleted() {
|
|
||||||
cleanup();
|
|
||||||
subscriber.onCompleted();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void cleanup() {
|
|
||||||
// on error or completion we need to unsubscribe the base subscription and set the subscriptionCount to 0
|
|
||||||
lock.lock();
|
|
||||||
try {
|
|
||||||
if (baseSubscription == currentBase) {
|
|
||||||
cleanupWorker();
|
|
||||||
// backdoor into the ConnectableObservable to cleanup and reset its state
|
|
||||||
if (source instanceof Subscription) {
|
|
||||||
((Subscription) source).unsubscribe();
|
|
||||||
}
|
|
||||||
baseSubscription.unsubscribe();
|
|
||||||
baseSubscription = new CompositeSubscription();
|
|
||||||
subscriptionCount.set(0);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@NonNull
|
|
||||||
private Subscription disconnect(@NonNull final CompositeSubscription current) {
|
|
||||||
return Subscriptions.create(() -> {
|
|
||||||
lock.lock();
|
|
||||||
try {
|
|
||||||
if (baseSubscription == current && subscriptionCount.decrementAndGet() == 0) {
|
|
||||||
if (worker != null) {
|
|
||||||
worker.unsubscribe();
|
|
||||||
} else {
|
|
||||||
worker = scheduler.createWorker();
|
|
||||||
}
|
|
||||||
worker.schedule(() -> {
|
|
||||||
lock.lock();
|
|
||||||
try {
|
|
||||||
if (subscriptionCount.get() == 0) {
|
|
||||||
cleanupWorker();
|
|
||||||
// backdoor into the ConnectableObservable to cleanup and reset its state
|
|
||||||
if (source instanceof Subscription) {
|
|
||||||
((Subscription) source).unsubscribe();
|
|
||||||
}
|
|
||||||
baseSubscription.unsubscribe();
|
|
||||||
// need a new baseSubscription because once
|
|
||||||
// unsubscribed stays that way
|
|
||||||
baseSubscription = new CompositeSubscription();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}, cacheTime, cacheTimeUnit);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private void cleanupWorker() {
|
|
||||||
if (worker != null) {
|
|
||||||
worker.unsubscribe();
|
|
||||||
worker = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
@ -33,12 +33,12 @@ import android.support.annotation.Nullable;
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
import io.reactivex.Emitter;
|
||||||
|
import io.reactivex.Observable;
|
||||||
|
import io.reactivex.Scheduler;
|
||||||
|
import io.reactivex.android.schedulers.AndroidSchedulers;
|
||||||
import ru.touchin.roboswag.core.log.Lc;
|
import ru.touchin.roboswag.core.log.Lc;
|
||||||
import ru.touchin.roboswag.core.utils.ServiceBinder;
|
import ru.touchin.roboswag.core.utils.ServiceBinder;
|
||||||
import rx.Emitter;
|
|
||||||
import rx.Observable;
|
|
||||||
import rx.Scheduler;
|
|
||||||
import rx.android.schedulers.AndroidSchedulers;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Gavriil Sitnikov on 10/01/2016.
|
* Created by Gavriil Sitnikov on 10/01/2016.
|
||||||
|
|
@ -62,8 +62,8 @@ public final class RxAndroidUtils {
|
||||||
.<T>create(emitter -> {
|
.<T>create(emitter -> {
|
||||||
onSubscribeServiceConnection.emitter = emitter;
|
onSubscribeServiceConnection.emitter = emitter;
|
||||||
context.bindService(new Intent(context, serviceClass), onSubscribeServiceConnection, Context.BIND_AUTO_CREATE);
|
context.bindService(new Intent(context, serviceClass), onSubscribeServiceConnection, Context.BIND_AUTO_CREATE);
|
||||||
}, Emitter.BackpressureMode.LATEST)
|
})
|
||||||
.doOnUnsubscribe(() -> {
|
.doOnDispose(() -> {
|
||||||
context.unbindService(onSubscribeServiceConnection);
|
context.unbindService(onSubscribeServiceConnection);
|
||||||
onSubscribeServiceConnection.emitter = null;
|
onSubscribeServiceConnection.emitter = null;
|
||||||
}))
|
}))
|
||||||
|
|
@ -87,8 +87,8 @@ public final class RxAndroidUtils {
|
||||||
.<Intent>create(emitter -> {
|
.<Intent>create(emitter -> {
|
||||||
onOnSubscribeBroadcastReceiver.emitter = emitter;
|
onOnSubscribeBroadcastReceiver.emitter = emitter;
|
||||||
context.registerReceiver(onOnSubscribeBroadcastReceiver, intentFilter);
|
context.registerReceiver(onOnSubscribeBroadcastReceiver, intentFilter);
|
||||||
}, Emitter.BackpressureMode.LATEST)
|
})
|
||||||
.doOnUnsubscribe(() -> {
|
.doOnDispose(() -> {
|
||||||
context.unregisterReceiver(onOnSubscribeBroadcastReceiver);
|
context.unregisterReceiver(onOnSubscribeBroadcastReceiver);
|
||||||
onOnSubscribeBroadcastReceiver.emitter = null;
|
onOnSubscribeBroadcastReceiver.emitter = null;
|
||||||
}))
|
}))
|
||||||
|
|
|
||||||
|
|
@ -31,8 +31,8 @@ import java.util.List;
|
||||||
|
|
||||||
import ru.touchin.roboswag.core.observables.collections.changes.Change;
|
import ru.touchin.roboswag.core.observables.collections.changes.Change;
|
||||||
import ru.touchin.roboswag.core.observables.collections.changes.CollectionChanges;
|
import ru.touchin.roboswag.core.observables.collections.changes.CollectionChanges;
|
||||||
import rx.Emitter;
|
import io.reactivex.Emitter;
|
||||||
import rx.Observable;
|
import io.reactivex.Observable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Gavriil Sitnikov on 23/05/16.
|
* Created by Gavriil Sitnikov on 23/05/16.
|
||||||
|
|
@ -60,8 +60,8 @@ public abstract class ObservableCollection<TItem> {
|
||||||
@NonNull
|
@NonNull
|
||||||
private Observable<CollectionChanges<TItem>> createChangesObservable() {
|
private Observable<CollectionChanges<TItem>> createChangesObservable() {
|
||||||
return Observable
|
return Observable
|
||||||
.<CollectionChanges<TItem>>create(emitter -> this.changesEmitter = emitter, Emitter.BackpressureMode.BUFFER)
|
.<CollectionChanges<TItem>>create(emitter -> this.changesEmitter = emitter)
|
||||||
.doOnUnsubscribe(() -> this.changesEmitter = null)
|
.doOnDispose(() -> this.changesEmitter = null)
|
||||||
.share();
|
.share();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,10 +10,11 @@ import java.util.List;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
import ru.touchin.roboswag.core.observables.collections.changes.DefaultCollectionsChangesCalculator;
|
import ru.touchin.roboswag.core.observables.collections.changes.DefaultCollectionsChangesCalculator;
|
||||||
import rx.Scheduler;
|
import io.reactivex.Scheduler;
|
||||||
import rx.Subscription;
|
import io.reactivex.disposables.Disposable;
|
||||||
import rx.functions.Func1;
|
import io.reactivex.functions.Function;
|
||||||
import rx.schedulers.Schedulers;
|
import io.reactivex.schedulers.Schedulers;
|
||||||
|
import ru.touchin.roboswag.core.log.Lc;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Gavriil Sitnikov on 02/06/2016.
|
* Created by Gavriil Sitnikov on 02/06/2016.
|
||||||
|
|
@ -29,16 +30,20 @@ public class ObservableFilteredList<TItem> extends ObservableCollection<TItem> {
|
||||||
|
|
||||||
@NonNull
|
@NonNull
|
||||||
private static <TItem> List<TItem> filterCollection(@NonNull final Collection<TItem> sourceCollection,
|
private static <TItem> List<TItem> filterCollection(@NonNull final Collection<TItem> sourceCollection,
|
||||||
@Nullable final Func1<TItem, Boolean> filter) {
|
@Nullable final Function<TItem, Boolean> filter) {
|
||||||
if (filter == null) {
|
if (filter == null) {
|
||||||
return new ArrayList<>(sourceCollection);
|
return new ArrayList<>(sourceCollection);
|
||||||
}
|
}
|
||||||
final List<TItem> result = new ArrayList<>(sourceCollection.size());
|
final List<TItem> result = new ArrayList<>(sourceCollection.size());
|
||||||
|
try {
|
||||||
for (final TItem item : sourceCollection) {
|
for (final TItem item : sourceCollection) {
|
||||||
if (filter.call(item)) {
|
if (filter.apply(item)) {
|
||||||
result.add(item);
|
result.add(item);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch (final Exception exception) {
|
||||||
|
Lc.assertion(exception);
|
||||||
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -47,23 +52,23 @@ public class ObservableFilteredList<TItem> extends ObservableCollection<TItem> {
|
||||||
@NonNull
|
@NonNull
|
||||||
private ObservableCollection<TItem> sourceCollection;
|
private ObservableCollection<TItem> sourceCollection;
|
||||||
@Nullable
|
@Nullable
|
||||||
private Func1<TItem, Boolean> filter;
|
private Function<TItem, Boolean> filter;
|
||||||
@Nullable
|
@Nullable
|
||||||
private Subscription sourceCollectionSubscription;
|
private Disposable sourceCollectionSubscription;
|
||||||
|
|
||||||
public ObservableFilteredList() {
|
public ObservableFilteredList() {
|
||||||
this(new ArrayList<>(), null);
|
this(new ArrayList<>(), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ObservableFilteredList(@NonNull final Func1<TItem, Boolean> filter) {
|
public ObservableFilteredList(@NonNull final Function<TItem, Boolean> filter) {
|
||||||
this(new ArrayList<>(), filter);
|
this(new ArrayList<>(), filter);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ObservableFilteredList(@NonNull final Collection<TItem> sourceCollection, @Nullable final Func1<TItem, Boolean> filter) {
|
public ObservableFilteredList(@NonNull final Collection<TItem> sourceCollection, @Nullable final Function<TItem, Boolean> filter) {
|
||||||
this(new ObservableList<>(sourceCollection), filter);
|
this(new ObservableList<>(sourceCollection), filter);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ObservableFilteredList(@NonNull final ObservableCollection<TItem> sourceCollection, @Nullable final Func1<TItem, Boolean> filter) {
|
public ObservableFilteredList(@NonNull final ObservableCollection<TItem> sourceCollection, @Nullable final Function<TItem, Boolean> filter) {
|
||||||
super();
|
super();
|
||||||
this.filter = filter;
|
this.filter = filter;
|
||||||
this.sourceCollection = sourceCollection;
|
this.sourceCollection = sourceCollection;
|
||||||
|
|
@ -96,14 +101,14 @@ public class ObservableFilteredList<TItem> extends ObservableCollection<TItem> {
|
||||||
*
|
*
|
||||||
* @param filter Function to filter item. True - item will stay, false - item will be filtered.
|
* @param filter Function to filter item. True - item will stay, false - item will be filtered.
|
||||||
*/
|
*/
|
||||||
public void setFilter(@Nullable final Func1<TItem, Boolean> filter) {
|
public void setFilter(@Nullable final Function<TItem, Boolean> filter) {
|
||||||
this.filter = filter;
|
this.filter = filter;
|
||||||
updateInternal();
|
updateInternal();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateInternal() {
|
private void updateInternal() {
|
||||||
if (sourceCollectionSubscription != null) {
|
if (sourceCollectionSubscription != null) {
|
||||||
sourceCollectionSubscription.unsubscribe();
|
sourceCollectionSubscription.dispose();
|
||||||
sourceCollectionSubscription = null;
|
sourceCollectionSubscription = null;
|
||||||
}
|
}
|
||||||
sourceCollectionSubscription = sourceCollection.observeItems()
|
sourceCollectionSubscription = sourceCollection.observeItems()
|
||||||
|
|
|
||||||
|
|
@ -58,6 +58,8 @@ public class ObservableList<TItem> extends ObservableCollection<TItem> implement
|
||||||
private SameItemsPredicate<TItem> sameItemsPredicate;
|
private SameItemsPredicate<TItem> sameItemsPredicate;
|
||||||
@Nullable
|
@Nullable
|
||||||
private ChangePayloadProducer<TItem> changePayloadProducer;
|
private ChangePayloadProducer<TItem> changePayloadProducer;
|
||||||
|
@Nullable
|
||||||
|
private ObservableList<TItem> diffUtilsSource;
|
||||||
|
|
||||||
public ObservableList() {
|
public ObservableList() {
|
||||||
super();
|
super();
|
||||||
|
|
@ -227,9 +229,19 @@ public class ObservableList<TItem> extends ObservableCollection<TItem> implement
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
final List<TItem> oldList = new ArrayList<>(items);
|
final List<TItem> oldList = new ArrayList<>(items);
|
||||||
final List<TItem> newList = new ArrayList<>(newItems);
|
final List<TItem> newList = new ArrayList<>(newItems);
|
||||||
final CollectionsChangesCalculator<TItem> calculator = sameItemsPredicate != null
|
final CollectionsChangesCalculator<TItem> calculator;
|
||||||
? new DiffCollectionsChangesCalculator<>(oldList, newList, detectMoves, sameItemsPredicate, changePayloadProducer)
|
if (diffUtilsSource != null) {
|
||||||
: new DefaultCollectionsChangesCalculator<>(oldList, newList, false);
|
if (diffUtilsSource.sameItemsPredicate != null) {
|
||||||
|
calculator = new DiffCollectionsChangesCalculator<>(oldList, newList,
|
||||||
|
diffUtilsSource.detectMoves, diffUtilsSource.sameItemsPredicate, diffUtilsSource.changePayloadProducer);
|
||||||
|
} else {
|
||||||
|
calculator = new DefaultCollectionsChangesCalculator<>(oldList, newList, false);
|
||||||
|
}
|
||||||
|
} else if (sameItemsPredicate != null) {
|
||||||
|
calculator = new DiffCollectionsChangesCalculator<>(oldList, newList, detectMoves, sameItemsPredicate, changePayloadProducer);
|
||||||
|
} else {
|
||||||
|
calculator = new DefaultCollectionsChangesCalculator<>(oldList, newList, false);
|
||||||
|
}
|
||||||
items.clear();
|
items.clear();
|
||||||
items.addAll(newItems);
|
items.addAll(newItems);
|
||||||
notifyAboutChanges(calculator.calculateInsertedItems(), calculator.calculateRemovedItems(), calculator.calculateChanges());
|
notifyAboutChanges(calculator.calculateInsertedItems(), calculator.calculateRemovedItems(), calculator.calculateChanges());
|
||||||
|
|
@ -271,7 +283,16 @@ public class ObservableList<TItem> extends ObservableCollection<TItem> implement
|
||||||
* @return true if diff utils is enabled.
|
* @return true if diff utils is enabled.
|
||||||
*/
|
*/
|
||||||
public boolean diffUtilsIsEnabled() {
|
public boolean diffUtilsIsEnabled() {
|
||||||
return sameItemsPredicate != null;
|
return diffUtilsSource != null ? diffUtilsSource.diffUtilsIsEnabled() : sameItemsPredicate != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets observableCollection as a source of diff utils parameters;
|
||||||
|
*
|
||||||
|
* @param diffUtilsSource Source of diff utils parameters.
|
||||||
|
*/
|
||||||
|
public void setDiffUtilsSource(@Nullable final ObservableList<TItem> diffUtilsSource) {
|
||||||
|
this.diffUtilsSource = diffUtilsSource;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -26,22 +26,21 @@ import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.NoSuchElementException;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
import io.reactivex.Observable;
|
||||||
|
import io.reactivex.Scheduler;
|
||||||
|
import io.reactivex.Single;
|
||||||
|
import io.reactivex.functions.Function;
|
||||||
|
import io.reactivex.schedulers.Schedulers;
|
||||||
|
import io.reactivex.subjects.BehaviorSubject;
|
||||||
import ru.touchin.roboswag.core.log.Lc;
|
import ru.touchin.roboswag.core.log.Lc;
|
||||||
import ru.touchin.roboswag.core.observables.collections.ObservableCollection;
|
import ru.touchin.roboswag.core.observables.collections.ObservableCollection;
|
||||||
import ru.touchin.roboswag.core.observables.collections.ObservableList;
|
import ru.touchin.roboswag.core.observables.collections.ObservableList;
|
||||||
import ru.touchin.roboswag.core.observables.collections.changes.Change;
|
import ru.touchin.roboswag.core.observables.collections.changes.Change;
|
||||||
import ru.touchin.roboswag.core.observables.collections.changes.CollectionChanges;
|
import ru.touchin.roboswag.core.observables.collections.changes.CollectionChanges;
|
||||||
import ru.touchin.roboswag.core.utils.ShouldNotHappenException;
|
import ru.touchin.roboswag.core.utils.Optional;
|
||||||
import rx.Observable;
|
|
||||||
import rx.Scheduler;
|
|
||||||
import rx.exceptions.OnErrorThrowable;
|
|
||||||
import rx.functions.Func0;
|
|
||||||
import rx.functions.Func1;
|
|
||||||
import rx.schedulers.Schedulers;
|
|
||||||
import rx.subjects.BehaviorSubject;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Gavriil Sitnikov on 23/05/16.
|
* Created by Gavriil Sitnikov on 23/05/16.
|
||||||
|
|
@ -67,7 +66,7 @@ public class LoadingMoreList<TItem, TMoreReference, TLoadedItems extends LoadedI
|
||||||
@NonNull
|
@NonNull
|
||||||
private Observable<TLoadedItems> loadingMoreObservable;
|
private Observable<TLoadedItems> loadingMoreObservable;
|
||||||
@NonNull
|
@NonNull
|
||||||
private final BehaviorSubject<Integer> moreItemsCount = BehaviorSubject.create(LoadedItems.UNKNOWN_ITEMS_COUNT);
|
private final BehaviorSubject<Integer> moreItemsCount = BehaviorSubject.createDefault(LoadedItems.UNKNOWN_ITEMS_COUNT);
|
||||||
@NonNull
|
@NonNull
|
||||||
private final ObservableList<TItem> innerList = new ObservableList<>();
|
private final ObservableList<TItem> innerList = new ObservableList<>();
|
||||||
@Nullable
|
@Nullable
|
||||||
|
|
@ -85,14 +84,8 @@ public class LoadingMoreList<TItem, TMoreReference, TLoadedItems extends LoadedI
|
||||||
@Nullable final LoadedItems<TItem, TMoreReference> initialItems) {
|
@Nullable final LoadedItems<TItem, TMoreReference> initialItems) {
|
||||||
super();
|
super();
|
||||||
this.loadingMoreObservable = Observable
|
this.loadingMoreObservable = Observable
|
||||||
.switchOnNext(Observable.fromCallable(() -> createLoadRequestBasedObservable(this::createActualRequest, moreMoreItemsLoader::load)))
|
.switchOnNext(Observable
|
||||||
.single()
|
.fromCallable(() -> createLoadRequestBasedObservable(this::createActualRequest, moreMoreItemsLoader::load).toObservable()))
|
||||||
.doOnError(throwable -> {
|
|
||||||
if (throwable instanceof IllegalArgumentException || throwable instanceof NoSuchElementException) {
|
|
||||||
Lc.assertion(new ShouldNotHappenException("Updates during loading not supported."
|
|
||||||
+ " MoreItemsLoader should emit only one result.", throwable));
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.doOnNext(loadedItems -> onItemsLoaded(loadedItems, size(), false))
|
.doOnNext(loadedItems -> onItemsLoaded(loadedItems, size(), false))
|
||||||
.replay(1)
|
.replay(1)
|
||||||
.refCount();
|
.refCount();
|
||||||
|
|
@ -113,16 +106,16 @@ public class LoadingMoreList<TItem, TMoreReference, TLoadedItems extends LoadedI
|
||||||
}
|
}
|
||||||
|
|
||||||
@NonNull
|
@NonNull
|
||||||
protected <T, TRequest> Observable<T> createLoadRequestBasedObservable(@NonNull final Func0<TRequest> requestCreator,
|
protected <T, TRequest> Single<T> createLoadRequestBasedObservable(@NonNull final Callable<TRequest> requestCreator,
|
||||||
@NonNull final Func1<TRequest, Observable<T>> observableCreator) {
|
@NonNull final Function<TRequest, Single<T>> observableCreator) {
|
||||||
return Observable
|
return Single
|
||||||
.fromCallable(requestCreator)
|
.fromCallable(requestCreator)
|
||||||
.switchMap(loadRequest -> observableCreator.call(loadRequest)
|
.flatMap(loadRequest -> observableCreator.apply(loadRequest)
|
||||||
.subscribeOn(Schedulers.io())
|
.subscribeOn(Schedulers.io())
|
||||||
.observeOn(loaderScheduler)
|
.observeOn(loaderScheduler)
|
||||||
.doOnNext(ignored -> {
|
.doOnSuccess(ignored -> {
|
||||||
if (!requestCreator.call().equals(loadRequest)) {
|
if (!requestCreator.call().equals(loadRequest)) {
|
||||||
throw OnErrorThrowable.from(new RequestChangedDuringLoadingException());
|
throw new RequestChangedDuringLoadingException();
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
.retry((number, throwable) ->
|
.retry((number, throwable) ->
|
||||||
|
|
@ -297,20 +290,20 @@ public class LoadingMoreList<TItem, TMoreReference, TLoadedItems extends LoadedI
|
||||||
* @return {@link Observable} to load item.
|
* @return {@link Observable} to load item.
|
||||||
*/
|
*/
|
||||||
@NonNull
|
@NonNull
|
||||||
public Observable<TItem> loadItem(final int position) {
|
public Single<Optional<TItem>> loadItem(final int position) {
|
||||||
return Observable
|
return Observable.switchOnNext(Observable
|
||||||
.switchOnNext(Observable
|
|
||||||
.fromCallable(() -> {
|
.fromCallable(() -> {
|
||||||
if (position < size()) {
|
if (position < size()) {
|
||||||
return Observable.just(get(position));
|
return Observable.just(new Optional<>(get(position)));
|
||||||
} else if (moreItemsCount.getValue() == 0) {
|
} else if (moreItemsCount.getValue() == 0) {
|
||||||
return Observable.just((TItem) null);
|
return Observable.just(new Optional<TItem>(null));
|
||||||
} else {
|
} else {
|
||||||
return loadingMoreObservable.switchMap(ignored -> Observable.<TItem>error(new NotLoadedYetException()));
|
return loadingMoreObservable.switchMap(ignored -> Observable.<Optional<TItem>>error(new NotLoadedYetException()));
|
||||||
}
|
}
|
||||||
})
|
}))
|
||||||
.subscribeOn(loaderScheduler))
|
.subscribeOn(loaderScheduler)
|
||||||
.retry((number, throwable) -> throwable instanceof NotLoadedYetException);
|
.retry((number, throwable) -> throwable instanceof NotLoadedYetException)
|
||||||
|
.firstOrError();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -322,15 +315,24 @@ public class LoadingMoreList<TItem, TMoreReference, TLoadedItems extends LoadedI
|
||||||
* @return {@link Observable} to load items.
|
* @return {@link Observable} to load items.
|
||||||
*/
|
*/
|
||||||
@NonNull
|
@NonNull
|
||||||
public Observable<Collection<TItem>> loadRange(final int first, final int last) {
|
@SuppressWarnings("unchecked")
|
||||||
final List<Observable<TItem>> itemsRequests = new ArrayList<>();
|
//unchecked: it's OK for such zip operator
|
||||||
|
public Single<Collection<TItem>> loadRange(final int first, final int last) {
|
||||||
|
final List<Single<Optional<TItem>>> itemsRequests = new ArrayList<>();
|
||||||
for (int i = first; i <= last; i++) {
|
for (int i = first; i <= last; i++) {
|
||||||
itemsRequests.add(loadItem(i));
|
itemsRequests.add(loadItem(i));
|
||||||
}
|
}
|
||||||
return Observable.concatEager(itemsRequests)
|
return Single.zip(itemsRequests,
|
||||||
.filter(loadedItem -> loadedItem != null)
|
items -> {
|
||||||
.toList()
|
final List<TItem> result = new ArrayList<>();
|
||||||
.map(Collections::unmodifiableCollection);
|
for (final Object item : items) {
|
||||||
|
final Optional<TItem> optional = (Optional<TItem>) item;
|
||||||
|
if (optional.get() != null) {
|
||||||
|
result.add(optional.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Collections.unmodifiableCollection(result);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ package ru.touchin.roboswag.core.observables.collections.loadable;
|
||||||
|
|
||||||
import android.support.annotation.NonNull;
|
import android.support.annotation.NonNull;
|
||||||
|
|
||||||
import rx.Observable;
|
import io.reactivex.Single;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Gavriil Sitnikov on 02/06/2016.
|
* Created by Gavriil Sitnikov on 02/06/2016.
|
||||||
|
|
@ -34,12 +34,12 @@ import rx.Observable;
|
||||||
public interface MoreItemsLoader<TItem, TMoreReference, TLoadedItems extends LoadedItems<TItem, TMoreReference>> {
|
public interface MoreItemsLoader<TItem, TMoreReference, TLoadedItems extends LoadedItems<TItem, TMoreReference>> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns {@link Observable} that could load next part of items.
|
* Returns {@link Single} that could load next part of items.
|
||||||
*
|
*
|
||||||
* @param moreLoadRequest Request with info inside to load next part of items;
|
* @param moreLoadRequest Request with info inside to load next part of items;
|
||||||
* @return {@link Observable} of loading items.
|
* @return {@link Single} of loading items.
|
||||||
*/
|
*/
|
||||||
@NonNull
|
@NonNull
|
||||||
Observable<TLoadedItems> load(@NonNull final MoreLoadRequest<TMoreReference> moreLoadRequest);
|
Single<TLoadedItems> load(@NonNull final MoreLoadRequest<TMoreReference> moreLoadRequest);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -26,19 +26,17 @@ import java.lang.reflect.Type;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import ru.touchin.roboswag.core.log.Lc;
|
import io.reactivex.Completable;
|
||||||
|
import io.reactivex.Observable;
|
||||||
|
import io.reactivex.Scheduler;
|
||||||
|
import io.reactivex.Single;
|
||||||
|
import io.reactivex.plugins.RxJavaPlugins;
|
||||||
|
import io.reactivex.schedulers.Schedulers;
|
||||||
|
import io.reactivex.subjects.PublishSubject;
|
||||||
import ru.touchin.roboswag.core.log.LcGroup;
|
import ru.touchin.roboswag.core.log.LcGroup;
|
||||||
import ru.touchin.roboswag.core.observables.OnSubscribeRefCountWithCacheTime;
|
import ru.touchin.roboswag.core.observables.ObservableRefCountWithCacheTime;
|
||||||
import ru.touchin.roboswag.core.utils.ObjectUtils;
|
import ru.touchin.roboswag.core.utils.ObjectUtils;
|
||||||
import ru.touchin.roboswag.core.utils.Optional;
|
import ru.touchin.roboswag.core.utils.Optional;
|
||||||
import rx.Completable;
|
|
||||||
import rx.Observable;
|
|
||||||
import rx.Scheduler;
|
|
||||||
import rx.Single;
|
|
||||||
import rx.exceptions.OnErrorThrowable;
|
|
||||||
import rx.functions.Actions;
|
|
||||||
import rx.schedulers.Schedulers;
|
|
||||||
import rx.subjects.PublishSubject;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Gavriil Sitnikov on 04/10/2015.
|
* Created by Gavriil Sitnikov on 04/10/2015.
|
||||||
|
|
@ -123,7 +121,8 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private Optional<TStoreObject> returnDefaultValueIfNull(@NonNull final Optional<TStoreObject> storeObject, @Nullable final TObject defaultValue) {
|
private Optional<TStoreObject> returnDefaultValueIfNull(@NonNull final Optional<TStoreObject> storeObject, @Nullable final TObject defaultValue)
|
||||||
|
throws Converter.ConversionException {
|
||||||
if (storeObject.get() != null || defaultValue == null) {
|
if (storeObject.get() != null || defaultValue == null) {
|
||||||
return storeObject;
|
return storeObject;
|
||||||
}
|
}
|
||||||
|
|
@ -133,7 +132,7 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
|
||||||
} catch (final Converter.ConversionException exception) {
|
} catch (final Converter.ConversionException exception) {
|
||||||
STORABLE_LC_GROUP.w(exception, "Exception while converting default value of '%s' from '%s' from store %s",
|
STORABLE_LC_GROUP.w(exception, "Exception while converting default value of '%s' from '%s' from store %s",
|
||||||
key, defaultValue, store);
|
key, defaultValue, store);
|
||||||
throw OnErrorThrowable.from(exception);
|
throw exception;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -160,7 +159,7 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
|
||||||
.concatWith(newStoreValueEvent)
|
.concatWith(newStoreValueEvent)
|
||||||
.map(storeObject -> returnDefaultValueIfNull(storeObject, defaultValue));
|
.map(storeObject -> returnDefaultValueIfNull(storeObject, defaultValue));
|
||||||
return observeStrategy == ObserveStrategy.CACHE_STORE_VALUE || observeStrategy == ObserveStrategy.CACHE_STORE_AND_ACTUAL_VALUE
|
return observeStrategy == ObserveStrategy.CACHE_STORE_VALUE || observeStrategy == ObserveStrategy.CACHE_STORE_AND_ACTUAL_VALUE
|
||||||
? Observable.unsafeCreate(new OnSubscribeRefCountWithCacheTime<>(result.replay(1), cacheTimeMillis, TimeUnit.MILLISECONDS))
|
? RxJavaPlugins.onAssembly(new ObservableRefCountWithCacheTime<>(result.replay(1), cacheTimeMillis, TimeUnit.MILLISECONDS))
|
||||||
: result;
|
: result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -175,11 +174,11 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
|
||||||
} catch (final Converter.ConversionException exception) {
|
} catch (final Converter.ConversionException exception) {
|
||||||
STORABLE_LC_GROUP.w(exception, "Exception while trying to converting value of '%s' from store %s by %s",
|
STORABLE_LC_GROUP.w(exception, "Exception while trying to converting value of '%s' from store %s by %s",
|
||||||
key, storeObject, store, converter);
|
key, storeObject, store, converter);
|
||||||
throw OnErrorThrowable.from(exception);
|
throw exception;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
return observeStrategy == ObserveStrategy.CACHE_ACTUAL_VALUE || observeStrategy == ObserveStrategy.CACHE_STORE_AND_ACTUAL_VALUE
|
return observeStrategy == ObserveStrategy.CACHE_ACTUAL_VALUE || observeStrategy == ObserveStrategy.CACHE_STORE_AND_ACTUAL_VALUE
|
||||||
? Observable.unsafeCreate(new OnSubscribeRefCountWithCacheTime<>(result.replay(1), cacheTimeMillis, TimeUnit.MILLISECONDS))
|
? RxJavaPlugins.onAssembly(new ObservableRefCountWithCacheTime<>(result.replay(1), cacheTimeMillis, TimeUnit.MILLISECONDS))
|
||||||
: result;
|
: result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -235,7 +234,7 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
|
||||||
|
|
||||||
@NonNull
|
@NonNull
|
||||||
private Completable internalSet(@Nullable final TObject newValue, final boolean checkForEqualityBeforeSet) {
|
private Completable internalSet(@Nullable final TObject newValue, final boolean checkForEqualityBeforeSet) {
|
||||||
return (checkForEqualityBeforeSet ? storeValueObservable.take(1).toSingle() : Single.just(new Optional<>(null)))
|
return (checkForEqualityBeforeSet ? storeValueObservable.firstOrError() : Single.just(new Optional<>(null)))
|
||||||
.observeOn(scheduler)
|
.observeOn(scheduler)
|
||||||
.flatMapCompletable(oldStoreValue -> {
|
.flatMapCompletable(oldStoreValue -> {
|
||||||
final TStoreObject newStoreValue;
|
final TStoreObject newStoreValue;
|
||||||
|
|
@ -274,10 +273,9 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
|
||||||
* @param newValue Value to set;
|
* @param newValue Value to set;
|
||||||
* @return Observable of setting process.
|
* @return Observable of setting process.
|
||||||
*/
|
*/
|
||||||
//COMPATIBILITY NOTE: it is not Completable to prevent migration of old code
|
|
||||||
@NonNull
|
@NonNull
|
||||||
public Observable<?> forceSet(@Nullable final TObject newValue) {
|
public Completable forceSet(@Nullable final TObject newValue) {
|
||||||
return internalSet(newValue, false).toObservable();
|
return internalSet(newValue, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -290,16 +288,9 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
|
||||||
* @param newValue Value to set;
|
* @param newValue Value to set;
|
||||||
* @return Observable of setting process.
|
* @return Observable of setting process.
|
||||||
*/
|
*/
|
||||||
//COMPATIBILITY NOTE: it is not Completable to prevent migration of old code
|
|
||||||
@NonNull
|
@NonNull
|
||||||
public Observable<?> set(@Nullable final TObject newValue) {
|
public Completable set(@Nullable final TObject newValue) {
|
||||||
return internalSet(newValue, true).toObservable();
|
return internalSet(newValue, true);
|
||||||
}
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
//COMPATIBILITY NOTE: it is deprecated as it's execution not bound to Android lifecycle objects
|
|
||||||
public void setCalm(@Nullable final TObject newValue) {
|
|
||||||
set(newValue).subscribe(Actions.empty(), Lc::assertion);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -310,7 +301,7 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
|
||||||
@Deprecated
|
@Deprecated
|
||||||
//deprecation: it should be used for debug only and in very rare cases.
|
//deprecation: it should be used for debug only and in very rare cases.
|
||||||
public void setSync(@Nullable final TObject newValue) {
|
public void setSync(@Nullable final TObject newValue) {
|
||||||
set(newValue).toBlocking().subscribe();
|
set(newValue).blockingAwait();
|
||||||
}
|
}
|
||||||
|
|
||||||
@NonNull
|
@NonNull
|
||||||
|
|
@ -334,9 +325,8 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
|
||||||
* @return Returns observable of value.
|
* @return Returns observable of value.
|
||||||
*/
|
*/
|
||||||
@NonNull
|
@NonNull
|
||||||
//COMPATIBILITY NOTE: it is not Single to prevent migration of old code
|
public Single<TReturnObject> get() {
|
||||||
public Observable<TReturnObject> get() {
|
return observe().firstOrError();
|
||||||
return observe().take(1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -344,11 +334,16 @@ public abstract class BaseStorable<TKey, TObject, TStoreObject, TReturnObject> {
|
||||||
*
|
*
|
||||||
* @return Returns value;
|
* @return Returns value;
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
|
||||||
//deprecation: it should be used for debug only and in very rare cases.
|
|
||||||
@Nullable
|
@Nullable
|
||||||
public TReturnObject getSync() {
|
public TObject getSync() {
|
||||||
return get().toBlocking().first();
|
final TStoreObject storeObject = store.getObject(storeObjectType, key);
|
||||||
|
try {
|
||||||
|
return converter.toObject(objectType, storeObjectType, storeObject);
|
||||||
|
} catch (final Converter.ConversionException exception) {
|
||||||
|
STORABLE_LC_GROUP.w(exception, "Exception while trying to converting value of '%s' from store %s by %s",
|
||||||
|
key, storeObject, store, converter);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -24,10 +24,9 @@ import android.support.annotation.NonNull;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import rx.Completable;
|
import io.reactivex.Completable;
|
||||||
import rx.Observable;
|
import io.reactivex.Flowable;
|
||||||
import rx.Single;
|
import io.reactivex.Single;
|
||||||
import rx.exceptions.OnErrorThrowable;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Gavriil Sitnikov on 06/10/2015.
|
* Created by Gavriil Sitnikov on 06/10/2015.
|
||||||
|
|
@ -91,15 +90,16 @@ public class Migration<TKey> {
|
||||||
return makeMigrationChain(key, versionUpdater)
|
return makeMigrationChain(key, versionUpdater)
|
||||||
.doOnSuccess(lastUpdatedVersion -> {
|
.doOnSuccess(lastUpdatedVersion -> {
|
||||||
if (lastUpdatedVersion < latestVersion) {
|
if (lastUpdatedVersion < latestVersion) {
|
||||||
throw OnErrorThrowable.from(new NextLoopMigrationException());
|
throw new NextLoopMigrationException();
|
||||||
}
|
}
|
||||||
if (versionUpdater.initialVersion == versionUpdater.oldVersion) {
|
if (versionUpdater.initialVersion == versionUpdater.oldVersion) {
|
||||||
throw new MigrationException(String.format("Version of '%s' not updated from %s",
|
throw new MigrationException(String.format("Version of '%s' not updated from %s",
|
||||||
key, versionUpdater.initialVersion));
|
key, versionUpdater.initialVersion));
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.retryWhen(attempts -> attempts.switchMap(throwable -> throwable instanceof NextLoopMigrationException
|
.retryWhen(attempts -> attempts
|
||||||
? Observable.just(null) : Observable.error(throwable)));
|
.switchMap(throwable -> throwable instanceof NextLoopMigrationException
|
||||||
|
? Flowable.just(new Object()) : Flowable.error(throwable)));
|
||||||
})
|
})
|
||||||
.toCompletable()
|
.toCompletable()
|
||||||
.andThen(versionsStore.storeObject(Long.class, key, latestVersion))
|
.andThen(versionsStore.storeObject(Long.class, key, latestVersion))
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ package ru.touchin.roboswag.core.observables.storable;
|
||||||
|
|
||||||
import android.support.annotation.NonNull;
|
import android.support.annotation.NonNull;
|
||||||
|
|
||||||
import rx.Single;
|
import io.reactivex.Single;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Gavriil Sitnikov on 05/10/2015.
|
* Created by Gavriil Sitnikov on 05/10/2015.
|
||||||
|
|
|
||||||
|
|
@ -17,19 +17,16 @@
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package ru.touchin.roboswag.core.observables.storable.concrete;
|
package ru.touchin.roboswag.core.observables.storable;
|
||||||
|
|
||||||
import android.support.annotation.NonNull;
|
import android.support.annotation.NonNull;
|
||||||
import android.support.annotation.Nullable;
|
import android.support.annotation.Nullable;
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import ru.touchin.roboswag.core.observables.storable.BaseStorable;
|
|
||||||
import ru.touchin.roboswag.core.observables.storable.Migration;
|
|
||||||
import ru.touchin.roboswag.core.observables.storable.Storable;
|
|
||||||
import ru.touchin.roboswag.core.utils.ShouldNotHappenException;
|
import ru.touchin.roboswag.core.utils.ShouldNotHappenException;
|
||||||
import rx.Observable;
|
import io.reactivex.Observable;
|
||||||
import rx.Scheduler;
|
import io.reactivex.Scheduler;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Gavriil Sitnikov on 04/10/2015.
|
* Created by Gavriil Sitnikov on 04/10/2015.
|
||||||
|
|
@ -25,10 +25,9 @@ import android.support.annotation.Nullable;
|
||||||
import java.lang.reflect.Type;
|
import java.lang.reflect.Type;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import ru.touchin.roboswag.core.observables.storable.concrete.NonNullStorable;
|
|
||||||
import ru.touchin.roboswag.core.utils.Optional;
|
import ru.touchin.roboswag.core.utils.Optional;
|
||||||
import rx.Observable;
|
import io.reactivex.Observable;
|
||||||
import rx.Scheduler;
|
import io.reactivex.Scheduler;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Gavriil Sitnikov on 04/10/2015.
|
* Created by Gavriil Sitnikov on 04/10/2015.
|
||||||
|
|
@ -43,8 +42,7 @@ import rx.Scheduler;
|
||||||
* @param <TObject> Type of actual object;
|
* @param <TObject> Type of actual object;
|
||||||
* @param <TStoreObject> Type of store object. Could be same as {@link TObject}.
|
* @param <TStoreObject> Type of store object. Could be same as {@link TObject}.
|
||||||
*/
|
*/
|
||||||
//COMPATIBILITY NOTE: in RxJava2 it should extends BaseStorable<TKey, TObject, TStoreObject, Optional<TObject>>
|
public class Storable<TKey, TObject, TStoreObject> extends BaseStorable<TKey, TObject, TStoreObject, Optional<TObject>> {
|
||||||
public class Storable<TKey, TObject, TStoreObject> extends BaseStorable<TKey, TObject, TStoreObject, TObject> {
|
|
||||||
|
|
||||||
public Storable(@NonNull final BuilderCore<TKey, TObject, TStoreObject> builderCore) {
|
public Storable(@NonNull final BuilderCore<TKey, TObject, TStoreObject> builderCore) {
|
||||||
super(builderCore);
|
super(builderCore);
|
||||||
|
|
@ -52,8 +50,8 @@ public class Storable<TKey, TObject, TStoreObject> extends BaseStorable<TKey, TO
|
||||||
|
|
||||||
@NonNull
|
@NonNull
|
||||||
@Override
|
@Override
|
||||||
public Observable<TObject> observe() {
|
public Observable<Optional<TObject>> observe() {
|
||||||
return observeOptionalValue().map(Optional::get);
|
return observeOptionalValue();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -25,8 +25,8 @@ import android.support.annotation.Nullable;
|
||||||
import java.lang.reflect.Type;
|
import java.lang.reflect.Type;
|
||||||
|
|
||||||
import ru.touchin.roboswag.core.utils.Optional;
|
import ru.touchin.roboswag.core.utils.Optional;
|
||||||
import rx.Completable;
|
import io.reactivex.Completable;
|
||||||
import rx.Single;
|
import io.reactivex.Single;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Gavriil Sitnikov on 04/10/2015.
|
* Created by Gavriil Sitnikov on 04/10/2015.
|
||||||
|
|
@ -67,4 +67,14 @@ public interface Store<TKey, TStoreObject> {
|
||||||
@NonNull
|
@NonNull
|
||||||
Single<Optional<TStoreObject>> loadObject(@NonNull Type storeObjectType, @NonNull TKey key);
|
Single<Optional<TStoreObject>> loadObject(@NonNull Type storeObjectType, @NonNull TKey key);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets object from store by key.
|
||||||
|
*
|
||||||
|
* @param storeObjectType Type of object to store;
|
||||||
|
* @param key Key related to object;
|
||||||
|
* @return Object from store found by key;
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
TStoreObject getObject(@NonNull Type storeObjectType, @NonNull TKey key);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,8 @@ import android.support.annotation.NonNull;
|
||||||
|
|
||||||
import java.security.MessageDigest;
|
import java.security.MessageDigest;
|
||||||
|
|
||||||
import rx.functions.Func1;
|
import io.reactivex.functions.Function;
|
||||||
|
import ru.touchin.roboswag.core.log.Lc;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Created by Gavriil Sitnikov on 29/08/2016.
|
* Created by Gavriil Sitnikov on 29/08/2016.
|
||||||
|
|
@ -65,12 +66,16 @@ public final class StringUtils {
|
||||||
* @param condition Condition of symbol;
|
* @param condition Condition of symbol;
|
||||||
* @return True if some character satisfies condition.
|
* @return True if some character satisfies condition.
|
||||||
*/
|
*/
|
||||||
public static boolean containsCharLike(@NonNull final String text, @NonNull final Func1<Character, Boolean> condition) {
|
public static boolean containsCharLike(@NonNull final String text, @NonNull final Function<Character, Boolean> condition) {
|
||||||
|
try {
|
||||||
for (int i = 0; i < text.length(); i++) {
|
for (int i = 0; i < text.length(); i++) {
|
||||||
if (condition.call(text.charAt(i))) {
|
if (condition.apply(text.charAt(i))) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch (final Exception exception) {
|
||||||
|
Lc.assertion(exception);
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue