socket logic added

This commit is contained in:
Gavriil Sitnikov 2016-02-29 20:43:23 +03:00
parent 108ceb671a
commit ff93f0ea47
5 changed files with 292 additions and 0 deletions

View File

@ -41,4 +41,9 @@ dependencies {
}
provided 'com.squareup.okhttp:okhttp:2.7.4'
provided 'com.facebook.fresco:fbcore:0.9.0'
provided 'com.facebook.fresco:imagepipeline-okhttp:0.9.0'
provided('io.socket:socket.io-client:0.7.0') {
exclude group: 'org.json', module: 'json'
}
}

View File

@ -0,0 +1,143 @@
package ru.touchin.roboswag.components.socket;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.HashMap;
import java.util.Map;
import io.socket.client.Socket;
import io.socket.emitter.Emitter;
import ru.touchin.roboswag.core.log.Lc;
import ru.touchin.roboswag.core.utils.android.RxAndroidUtils;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Action1;
import rx.subjects.BehaviorSubject;
/**
* Created by Gavriil Sitnikov on 29/02/16.
* TODO: description
*/
public abstract class SocketConnection {
private final BehaviorSubject<State> stateSubject = BehaviorSubject.create(State.DISCONNECTED);
private final Scheduler scheduler = RxAndroidUtils.createLooperScheduler();
private final Map<SocketEvent, Observable> messagesObservableCache = new HashMap<>();
@Nullable
private Observable<Socket> socketObservable;
@NonNull
public Scheduler getScheduler() {
return scheduler;
}
protected Observable<Socket> getSocket() {
synchronized (scheduler) {
if (socketObservable == null) {
socketObservable = createSocketObservable();
}
}
return socketObservable;
}
protected abstract Socket createSocket() throws Exception;
private Observable<Socket> createSocketObservable() {
return Observable
.<Socket>create(subscriber -> {
try {
final Socket socket = createSocket();
socket.on(Socket.EVENT_CONNECT, args -> stateSubject.onNext(State.CONNECTED));
socket.on(Socket.EVENT_CONNECTING, args -> stateSubject.onNext(State.CONNECTING));
socket.on(Socket.EVENT_CONNECT_ERROR, args -> stateSubject.onNext(State.CONNECTION_ERROR));
socket.on(Socket.EVENT_CONNECT_TIMEOUT, args -> stateSubject.onNext(State.CONNECTION_ERROR));
socket.on(Socket.EVENT_DISCONNECT, args -> stateSubject.onNext(State.DISCONNECTED));
socket.on(Socket.EVENT_RECONNECT_ATTEMPT, args -> stateSubject.onNext(State.CONNECTING));
socket.on(Socket.EVENT_RECONNECTING, args -> stateSubject.onNext(State.CONNECTING));
socket.on(Socket.EVENT_RECONNECT, args -> stateSubject.onNext(State.CONNECTED));
socket.on(Socket.EVENT_RECONNECT_ERROR, args -> stateSubject.onNext(State.CONNECTION_ERROR));
socket.on(Socket.EVENT_RECONNECT_FAILED, args -> stateSubject.onNext(State.CONNECTION_ERROR));
subscriber.onNext(socket);
} catch (final Exception exception) {
Lc.assertion(exception);
}
subscriber.onCompleted();
})
.subscribeOn(scheduler)
.switchMap(socket -> Observable.just(socket)
.doOnSubscribe(socket::connect)
.doOnUnsubscribe(socket::disconnect))
.replay(1)
.refCount();
}
@SuppressWarnings("unchecked")
protected <T> Observable<T> observeEvent(@NonNull final SocketEvent<T> socketEvent) {
Observable result;
synchronized (scheduler) {
result = messagesObservableCache.get(socketEvent);
if (result != null) {
result = getSocket()
.switchMap(socket -> Observable
.<T>create(subscriber -> socket.on(socketEvent.getName(),
new SocketListener<>(socketEvent, subscriber::onNext)))
.doOnUnsubscribe(() -> socket.off(socketEvent.getName())));
messagesObservableCache.put(socketEvent, result);
}
}
return result;
}
public Observable<State> observeSocketState() {
return stateSubject.distinctUntilChanged();
}
public enum State {
DISCONNECTED,
CONNECTING,
CONNECTED,
CONNECTION_ERROR
}
public class SocketListener<T> implements Emitter.Listener {
@NonNull
private final SocketEvent<T> socketEvent;
@NonNull
private final Action1<T> action;
public SocketListener(@NonNull final SocketEvent<T> socketEvent,
@NonNull final Action1<T> action) {
this.socketEvent = socketEvent;
this.action = action;
}
@Override
public void call(final Object... args) {
try {
if (args != null) {
final String response = args[0].toString();
Lc.d("Got socket message: %s", response);
T message = socketEvent.parse(response);
if (socketEvent.getEventDataHandler() != null) {
socketEvent.getEventDataHandler().handleMessage(message);
}
action.call(message);
}
} catch (final RuntimeException throwable) {
Lc.assertion(throwable);
} catch (final JsonProcessingException exception) {
Lc.assertion(exception);
} catch (final Exception exception) {
Lc.e("Socket processing error", exception);
}
}
}
}

View File

@ -0,0 +1,59 @@
package ru.touchin.roboswag.components.socket;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import java.io.StringReader;
/**
* Created by Ilia Kurtov on 22.01.2016.
*/
public class SocketEvent<T> {
private static final JsonFactory DEFAULT_JSON_FACTORY = new JacksonFactory();
@NonNull
private final String name;
@NonNull
private final Class<T> clz;
@Nullable
private final SocketMessageHandler<T> eventDataHandler;
public SocketEvent(@NonNull final String name, @NonNull final Class<T> clz, @Nullable final SocketMessageHandler<T> eventDataHandler) {
this.name = name;
this.clz = clz;
this.eventDataHandler = eventDataHandler;
}
@NonNull
public String getName() {
return name;
}
@Nullable
public SocketMessageHandler<T> getEventDataHandler() {
return eventDataHandler;
}
@NonNull
public T parse(@NonNull final String source) throws Exception {
return DEFAULT_JSON_FACTORY.createJsonObjectParser().parseAndClose(new StringReader(source), clz);
}
@Override
public boolean equals(final Object o) {
return o instanceof SocketEvent
&& ((SocketEvent) o).name.equals(name)
&& ((SocketEvent) o).clz.equals(clz)
&& ((SocketEvent) o).eventDataHandler == eventDataHandler;
}
@Override
public int hashCode() {
return name.hashCode() + clz.hashCode();
}
}

View File

@ -0,0 +1,13 @@
package ru.touchin.roboswag.components.socket;
import android.support.annotation.NonNull;
/**
* Created by Gavriil Sitnikov on 29/02/16.
* TODO: description
*/
public interface SocketMessageHandler<T> {
T handleMessage(@NonNull T message) throws Exception;
}

View File

@ -19,10 +19,27 @@
package ru.touchin.roboswag.components.utils;
import android.content.Context;
import android.graphics.Bitmap;
import android.net.Uri;
import android.support.annotation.NonNull;
import com.facebook.cache.common.CacheKey;
import com.facebook.common.executors.CallerThreadExecutor;
import com.facebook.common.references.CloseableReference;
import com.facebook.common.util.UriUtil;
import com.facebook.datasource.DataSource;
import com.facebook.imagepipeline.bitmaps.PlatformBitmapFactory;
import com.facebook.imagepipeline.datasource.BaseBitmapDataSubscriber;
import com.facebook.imagepipeline.image.CloseableImage;
import com.facebook.imagepipeline.request.ImageRequest;
import com.facebook.imagepipeline.request.ImageRequestBuilder;
import com.facebook.imagepipeline.request.Postprocessor;
import com.facebook.drawee.backends.pipeline.Fresco;
import javax.annotation.Nullable;
import rx.functions.Action1;
/**
* Created by Gavriil Sitnikov on 20/10/2015.
@ -30,12 +47,67 @@ import com.facebook.common.util.UriUtil;
*/
public final class FrescoUtils {
private static final BaseBitmapDataSubscriber EMPTY_CALLBACK = new BaseBitmapDataSubscriber() {
@Override
protected void onNewResultImpl(final Bitmap bitmap) {
//do nothing
}
@Override
protected void onFailureImpl(final DataSource<CloseableReference<CloseableImage>> dataSource) {
//do nothing
}
};
@NonNull
public static Uri getResourceUri(final int resourceId) {
return new Uri.Builder().scheme(UriUtil.LOCAL_RESOURCE_SCHEME).path(String.valueOf(resourceId)).build();
}
public static void loadAndHandleBitmap(@NonNull final Context context,
@NonNull final Uri imageUrl,
@NonNull final Action1<Bitmap> bitmapHandler) {
final ImageRequest imageRequest = ImageRequestBuilder
.newBuilderWithSource(imageUrl)
.setPostprocessor(new RealCallback(bitmapHandler))
.build();
Fresco.getImagePipeline()
.fetchDecodedImage(imageRequest, context)
.subscribe(EMPTY_CALLBACK, CallerThreadExecutor.getInstance());
}
private FrescoUtils() {
}
private static class RealCallback implements Postprocessor {
private final Action1<Bitmap> bitmapHandler;
private RealCallback(final Action1<Bitmap> bitmapHandler) {
this.bitmapHandler = bitmapHandler;
}
@Override
public CloseableReference<Bitmap> process(final Bitmap sourceBitmap, final PlatformBitmapFactory bitmapFactory) {
final CloseableReference<Bitmap> result
= bitmapFactory.createBitmap(sourceBitmap.getWidth(), sourceBitmap.getHeight());
bitmapHandler.call(result.get());
return result;
}
@Override
public String getName() {
return null;
}
@Nullable
@Override
public CacheKey getPostprocessorCacheKey() {
return null;
}
}
}