sockets fixed
This commit is contained in:
parent
9b9dbd2842
commit
faa68036f4
|
|
@ -87,9 +87,16 @@ public abstract class SocketConnection {
|
|||
subscriber.onCompleted();
|
||||
})
|
||||
.subscribeOn(scheduler)
|
||||
.switchMap(socket -> Observable.just(socket)
|
||||
.doOnSubscribe(socket::connect)
|
||||
.doOnUnsubscribe(socket::disconnect))
|
||||
.switchMap(socket -> Observable
|
||||
.<Socket>create(subscriber -> subscriber.onNext(socket))
|
||||
.doOnSubscribe(() -> {
|
||||
Lc.d("Socket connection requested");
|
||||
socket.connect();
|
||||
})
|
||||
.doOnUnsubscribe(() -> {
|
||||
Lc.d("Socket disconnection requested");
|
||||
socket.disconnect();
|
||||
}))
|
||||
.replay(1)
|
||||
.refCount();
|
||||
}
|
||||
|
|
@ -104,12 +111,14 @@ public abstract class SocketConnection {
|
|||
Observable result;
|
||||
synchronized (scheduler) {
|
||||
result = messagesObservableCache.get(socketEvent);
|
||||
if (result != null) {
|
||||
if (result == null) {
|
||||
result = getSocket()
|
||||
.switchMap(socket -> Observable
|
||||
.<T>create(subscriber -> socket.on(socketEvent.getName(),
|
||||
new SocketListener<>(socketEvent, subscriber::onNext)))
|
||||
.doOnUnsubscribe(() -> socket.off(socketEvent.getName())));
|
||||
.doOnUnsubscribe(() -> socket.off(socketEvent.getName())))
|
||||
.replay(1)
|
||||
.refCount();
|
||||
messagesObservableCache.put(socketEvent, result);
|
||||
}
|
||||
}
|
||||
|
|
@ -127,7 +136,7 @@ public abstract class SocketConnection {
|
|||
CONNECTION_ERROR
|
||||
}
|
||||
|
||||
public class SocketListener<T> implements Emitter.Listener {
|
||||
public static class SocketListener<T> implements Emitter.Listener {
|
||||
|
||||
@NonNull
|
||||
private final SocketEvent<T> socketEvent;
|
||||
|
|
@ -146,7 +155,7 @@ public abstract class SocketConnection {
|
|||
if (args != null) {
|
||||
final String response = args[0].toString();
|
||||
Lc.d("Got socket message: %s", response);
|
||||
T message = socketEvent.parse(response);
|
||||
final T message = socketEvent.parse(response);
|
||||
if (socketEvent.getEventDataHandler() != null) {
|
||||
socketEvent.getEventDataHandler().handleMessage(message);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -63,11 +63,11 @@ public class SocketEvent<T> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(final Object o) {
|
||||
return o instanceof SocketEvent
|
||||
&& ((SocketEvent) o).name.equals(name)
|
||||
&& ((SocketEvent) o).clz.equals(clz)
|
||||
&& ((SocketEvent) o).eventDataHandler == eventDataHandler;
|
||||
public boolean equals(final Object object) {
|
||||
return object instanceof SocketEvent
|
||||
&& ((SocketEvent) object).name.equals(name)
|
||||
&& ((SocketEvent) object).clz.equals(clz)
|
||||
&& ((SocketEvent) object).eventDataHandler == eventDataHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ import rx.functions.Action1;
|
|||
* Created by Gavriil Sitnikov on 20/10/2015.
|
||||
* TODO: fill description
|
||||
*/
|
||||
@SuppressWarnings("PMD.AccessorClassGeneration")
|
||||
public final class FrescoUtils {
|
||||
|
||||
public static final BaseBitmapDataSubscriber EMPTY_CALLBACK = new BaseBitmapDataSubscriber() {
|
||||
|
|
|
|||
Loading…
Reference in New Issue