Compare commits

...

1 Commits

Author SHA1 Message Date
Junior B ff8586989d Operators Materialize/Dematerialize 2015-10-21 19:02:40 +02:00
5 changed files with 190 additions and 0 deletions

View File

@ -531,6 +531,14 @@
C8F0C0441BBBFBB9001B112F /* _RXSwizzling.h in Headers */ = {isa = PBXBuildFile; fileRef = C8093E881B8A732E0088E94D /* _RXSwizzling.h */; settings = {ATTRIBUTES = (Public, ); }; };
C8F0C0451BBBFBB9001B112F /* _RXKVOObserver.h in Headers */ = {isa = PBXBuildFile; fileRef = C8093E861B8A732E0088E94D /* _RXKVOObserver.h */; settings = {ATTRIBUTES = (Public, ); }; };
C8F0C04F1BBBFBCE001B112F /* ObservableConvertibleType+Blocking.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093F581B8A73A20088E94D /* ObservableConvertibleType+Blocking.swift */; };
CBEE77371BD7CFCE00AD584C /* Materialize.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE77361BD7CFCE00AD584C /* Materialize.swift */; settings = {ASSET_TAGS = (); }; };
CBEE77381BD7CFCE00AD584C /* Materialize.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE77361BD7CFCE00AD584C /* Materialize.swift */; settings = {ASSET_TAGS = (); }; };
CBEE77391BD7CFCE00AD584C /* Materialize.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE77361BD7CFCE00AD584C /* Materialize.swift */; settings = {ASSET_TAGS = (); }; };
CBEE773A1BD7CFCE00AD584C /* Materialize.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE77361BD7CFCE00AD584C /* Materialize.swift */; settings = {ASSET_TAGS = (); }; };
CBEE773C1BD7CFD800AD584C /* Dematerialize.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE773B1BD7CFD800AD584C /* Dematerialize.swift */; settings = {ASSET_TAGS = (); }; };
CBEE773D1BD7CFD800AD584C /* Dematerialize.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE773B1BD7CFD800AD584C /* Dematerialize.swift */; settings = {ASSET_TAGS = (); }; };
CBEE773E1BD7CFD800AD584C /* Dematerialize.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE773B1BD7CFD800AD584C /* Dematerialize.swift */; settings = {ASSET_TAGS = (); }; };
CBEE773F1BD7CFD800AD584C /* Dematerialize.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE773B1BD7CFD800AD584C /* Dematerialize.swift */; settings = {ASSET_TAGS = (); }; };
D203C4F31BB9C4CA00D02D00 /* RxCollectionViewReactiveArrayDataSource.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F11B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift */; };
D203C4F41BB9C52400D02D00 /* RxTableViewReactiveArrayDataSource.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F21B8A752B00B02D69 /* RxTableViewReactiveArrayDataSource.swift */; };
D203C4F51BB9C52900D02D00 /* ItemEvents.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88253F41B8A752B00B02D69 /* ItemEvents.swift */; };
@ -978,6 +986,8 @@
C8F0C0021BBBFB8B001B112F /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; };
C8F0C04B1BBBFBB9001B112F /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; };
C8F0C0581BBBFBCE001B112F /* RxBlocking.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; };
CBEE77361BD7CFCE00AD584C /* Materialize.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Materialize.swift; sourceTree = "<group>"; };
CBEE773B1BD7CFD800AD584C /* Dematerialize.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Dematerialize.swift; sourceTree = "<group>"; };
D2138C751BB9BE9800339B5C /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; };
D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = "<group>"; };
D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SkipWhile.swift; sourceTree = "<group>"; };
@ -1220,6 +1230,8 @@
C8C3D9FD1B935EDF004D233E /* Zip+CollectionType.swift */,
D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */,
D235B23D1BD003DD007E84DA /* Using.swift */,
CBEE77361BD7CFCE00AD584C /* Materialize.swift */,
CBEE773B1BD7CFD800AD584C /* Dematerialize.swift */,
);
path = Implementations;
sourceTree = "<group>";
@ -2100,6 +2112,7 @@
C8093CD41B8A72BE0088E94D /* Disposable.swift in Sources */,
C8093CEE1B8A72BE0088E94D /* SingleAssignmentDisposable.swift in Sources */,
C849BE2C1BAB5D070019AD27 /* ObservableConvertibleType.swift in Sources */,
CBEE77381BD7CFCE00AD584C /* Materialize.swift in Sources */,
C8C3DA0A1B93941E004D233E /* FailWith.swift in Sources */,
C8093D9C1B8A72BE0088E94D /* SchedulerServices+Emulation.swift in Sources */,
C8093D6A1B8A72BE0088E94D /* AnyObserver.swift in Sources */,
@ -2136,6 +2149,7 @@
C8093CC81B8A72BE0088E94D /* AsyncLock.swift in Sources */,
C8093CD81B8A72BE0088E94D /* BinaryDisposable.swift in Sources */,
C89CDB371BCB0DD7002063D9 /* ShareReplay1.swift in Sources */,
CBEE773D1BD7CFD800AD584C /* Dematerialize.swift in Sources */,
C8093D2A1B8A72BE0088E94D /* ObserveOn.swift in Sources */,
C8093D361B8A72BE0088E94D /* Sample.swift in Sources */,
D2752D621BC5551A0070C418 /* SkipUntil.swift in Sources */,
@ -2220,6 +2234,7 @@
C8093CD31B8A72BE0088E94D /* Disposable.swift in Sources */,
C8093CED1B8A72BE0088E94D /* SingleAssignmentDisposable.swift in Sources */,
C849BE2B1BAB5D070019AD27 /* ObservableConvertibleType.swift in Sources */,
CBEE77371BD7CFCE00AD584C /* Materialize.swift in Sources */,
C8C3DA091B93941E004D233E /* FailWith.swift in Sources */,
C8093D9B1B8A72BE0088E94D /* SchedulerServices+Emulation.swift in Sources */,
C8093D691B8A72BE0088E94D /* AnyObserver.swift in Sources */,
@ -2256,6 +2271,7 @@
C8093CC71B8A72BE0088E94D /* AsyncLock.swift in Sources */,
C8093CD71B8A72BE0088E94D /* BinaryDisposable.swift in Sources */,
C89CDB361BCB0DD7002063D9 /* ShareReplay1.swift in Sources */,
CBEE773C1BD7CFD800AD584C /* Dematerialize.swift in Sources */,
C8093D291B8A72BE0088E94D /* ObserveOn.swift in Sources */,
C8093D351B8A72BE0088E94D /* Sample.swift in Sources */,
D285BAC41BC0231000B3F602 /* SkipUntil.swift in Sources */,
@ -2340,6 +2356,7 @@
C8F0BF961BBBFB8B001B112F /* Disposable.swift in Sources */,
C8F0BF971BBBFB8B001B112F /* SingleAssignmentDisposable.swift in Sources */,
C89461751BC6C1210055219D /* ObservableConvertibleType.swift in Sources */,
CBEE773A1BD7CFCE00AD584C /* Materialize.swift in Sources */,
C8F0BF981BBBFB8B001B112F /* FailWith.swift in Sources */,
C8F0BF991BBBFB8B001B112F /* SchedulerServices+Emulation.swift in Sources */,
C8F0BF9A1BBBFB8B001B112F /* AnyObserver.swift in Sources */,
@ -2376,6 +2393,7 @@
C8F0BFB51BBBFB8B001B112F /* AsyncLock.swift in Sources */,
C8F0BFB61BBBFB8B001B112F /* BinaryDisposable.swift in Sources */,
C89CDB391BCB0DD7002063D9 /* ShareReplay1.swift in Sources */,
CBEE773F1BD7CFD800AD584C /* Dematerialize.swift in Sources */,
C8F0BFB71BBBFB8B001B112F /* ObserveOn.swift in Sources */,
C8F0BFB81BBBFB8B001B112F /* Sample.swift in Sources */,
D21C29311BC6A1C300448E70 /* SkipUntil.swift in Sources */,
@ -2610,6 +2628,7 @@
D2EBEB281BB9B6C1003A27DC /* Zip.swift in Sources */,
D2EBEB3E1BB9B6D8003A27DC /* SerialDispatchQueueScheduler.swift in Sources */,
C89461761BC6C1220055219D /* ObservableConvertibleType.swift in Sources */,
CBEE77391BD7CFCE00AD584C /* Materialize.swift in Sources */,
D2EBEAF71BB9B6B2003A27DC /* ScheduledDisposable.swift in Sources */,
D2EBEAE11BB9B697003A27DC /* ImmediateSchedulerType.swift in Sources */,
D2EBEB0B1BB9B6C1003A27DC /* Empty.swift in Sources */,
@ -2646,6 +2665,7 @@
D2EBEB091BB9B6C1003A27DC /* DistinctUntilChanged.swift in Sources */,
D2EBEB2A1BB9B6C5003A27DC /* Zip+CollectionType.swift in Sources */,
C89CDB381BCB0DD7002063D9 /* ShareReplay1.swift in Sources */,
CBEE773E1BD7CFD800AD584C /* Dematerialize.swift in Sources */,
D2EBEB401BB9B6DE003A27DC /* BehaviorSubject.swift in Sources */,
D2EBEB271BB9B6C1003A27DC /* Timer.swift in Sources */,
D2752D631BC5551B0070C418 /* SkipUntil.swift in Sources */,

View File

@ -0,0 +1,55 @@
//
// Dematerialize.swift
// Rx
//
// Created by Junior B. on 21/10/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
class DematerializeSink<Element, O : ObserverType where O.E == Element>: Sink<O>, ObserverType {
override init(observer: O, cancel: Disposable){
super.init(observer: observer, cancel: cancel)
}
func on(event: Event<Event<Element>>) {
switch event {
case .Next(let val):
switch val {
case .Next(let value):
observer?.on(.Next(value))
case .Error(let e):
observer?.on(.Error(e))
observer?.on(.Completed)
self.dispose()
case .Completed:
observer?.on(.Completed)
self.dispose()
}
case .Error(let e):
observer?.on(.Error(e))
observer?.on(.Completed)
self.dispose()
case .Completed:
observer?.on(.Completed)
self.dispose()
}
}
}
class Dematerialize<Element> : Producer<Element> {
let source: Observable<Event<Element>>
init(source: Observable<Event<Element>>) {
self.source = source
}
override func run<O: ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = DematerializeSink(observer: observer, cancel: cancel)
setSink(sink)
return source.subscribeSafe(sink)
}
}

View File

@ -0,0 +1,46 @@
//
// Materialize.swift
// Rx
//
// Created by Junior B. on 21/10/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
class MaterializeSink<Element, O : ObserverType where O.E == Event<Element>>: Sink<O>, ObserverType {
override init(observer: O, cancel: Disposable){
super.init(observer: observer, cancel: cancel)
}
func on(event: Event<Element>) {
switch event {
case .Next(let value):
observer?.on(.Next(Event.Next(value)))
case .Error(let e):
observer?.on(.Next(Event.Error(e)))
observer?.on(.Completed)
self.dispose()
case .Completed:
observer?.on(.Next(Event.Completed))
observer?.on(.Completed)
self.dispose()
}
}
}
class Materialize<Element> : Producer<Event<Element>> {
let source: Observable<Element>
init(source: Observable<Element>) {
self.source = source
}
override func run<O: ObserverType where O.E == Event<Element>>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = MaterializeSink(observer: observer, cancel: cancel)
setSink(sink)
return source.subscribeSafe(sink)
}
}

View File

@ -179,4 +179,41 @@ extension ObservableType {
-> Observable<O.E> {
return FlatMap(source: self.asObservable(), selector: selector)
}
}
// MARK: materialize and dematerialize
extension ObservableType {
/**
A well-formed, finite Observable will invoke its observers onNext method zero or more times, and then will invoke either the `onCompleted` or `onError` method exactly once.
Materialize operator converts this series of invocations into a series of items emitted by an Observable.
To restore the sequence beahviour, check the revers operator `dematerialize`.
- returns: An observable sequence that contains all the events of the source sequence as wrapped events.
*/
@warn_unused_result(message="http://git.io/rxs.uo")
public func materialize()
-> Observable<Event<E>> {
return Materialize(source: self.asObservable())
}
/**
The Dematerialize operator reverses the process of `materialize`.
It operates on an Observable that has previously been transformed by `materialize`
and returns it to its original form.
- returns: An observable that emits the correct events as sequence, restored after _materialization_.
*/
@warn_unused_result(message="http://git.io/rxs.uo")
public func dematerialize()
-> Observable<E> {
if let source = self as? Observable<Event<E>> {
return Dematerialize(source: source.asObservable())
} else {
return self.asObservable()
}
}
}

View File

@ -3644,4 +3644,36 @@ extension ObservableStandardSequenceOperators {
Subscription(200, 350)
])
}
}
// MARK: Materialize and Dematerialize
extension ObservableStandardSequenceOperators {
func testMaterialize_Return() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(100, -1),
next(200, -1),
next(300, 0),
completed(600)
])
var invoked = 0
let res = scheduler.start() { () -> Observable<Event<Int>> in
return xs.materialize()
}
XCTAssertEqual(res.messages, [
completed(330)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 330)
])
XCTAssertEqual(4, invoked)
}
}