Add `window` operator, time/count version

This commit is contained in:
Junior B 2015-10-30 14:57:13 +01:00 committed by Krunoslav Zaher
parent e7723a2173
commit 7fe9a87cb5
6 changed files with 443 additions and 18 deletions

View File

@ -576,6 +576,10 @@
CB883B461BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; };
CB883B471BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; };
CB883B481BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; };
CB883B4A1BE369AA000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B491BE369AA000AC2EE /* AddRef.swift */; };
CB883B4B1BE369AA000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B491BE369AA000AC2EE /* AddRef.swift */; };
CB883B4C1BE369AA000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B491BE369AA000AC2EE /* AddRef.swift */; };
CB883B4D1BE369AA000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B491BE369AA000AC2EE /* AddRef.swift */; };
CBEE771F1BD649A000AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE771E1BD649A000AD584C /* ToArray.swift */; };
CBEE77201BD649A000AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE771E1BD649A000AD584C /* ToArray.swift */; };
CBEE77211BD649A000AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE771E1BD649A000AD584C /* ToArray.swift */; };
@ -1037,6 +1041,7 @@
CB883B3A1BE24355000AC2EE /* Window.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Window.swift; sourceTree = "<group>"; };
CB883B3F1BE24C15000AC2EE /* RefCountDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RefCountDisposable.swift; sourceTree = "<group>"; };
CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BooleanDisposable.swift; sourceTree = "<group>"; };
CB883B491BE369AA000AC2EE /* AddRef.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AddRef.swift; sourceTree = "<group>"; };
CBEE771E1BD649A000AD584C /* ToArray.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ToArray.swift; sourceTree = "<group>"; };
D2138C751BB9BE9800339B5C /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; };
D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = "<group>"; };
@ -1234,6 +1239,7 @@
C8093C6A1B8A72BE0088E94D /* Implementations */ = {
isa = PBXGroup;
children = (
CB883B491BE369AA000AC2EE /* AddRef.swift */,
C8093C6B1B8A72BE0088E94D /* Amb.swift */,
C8C3DA111B93A3EA004D233E /* AnonymousObservable.swift */,
C821DBA11BA4DCAB008F3809 /* Buffer.swift */,
@ -2271,6 +2277,7 @@
C8640A041BA5B12A00D3C4E8 /* Repeat.swift in Sources */,
C8093CF41B8A72BE0088E94D /* Error.swift in Sources */,
C8093D141B8A72BE0088E94D /* Debug.swift in Sources */,
CB883B4B1BE369AA000AC2EE /* AddRef.swift in Sources */,
C8093CCE1B8A72BE0088E94D /* Bag.swift in Sources */,
C8093D301B8A72BE0088E94D /* Producer.swift in Sources */,
C8093CF81B8A72BE0088E94D /* ImmediateSchedulerType.swift in Sources */,
@ -2402,6 +2409,7 @@
C8640A031BA5B12A00D3C4E8 /* Repeat.swift in Sources */,
C8093CF31B8A72BE0088E94D /* Error.swift in Sources */,
C8093D131B8A72BE0088E94D /* Debug.swift in Sources */,
CB883B4A1BE369AA000AC2EE /* AddRef.swift in Sources */,
C8093CCD1B8A72BE0088E94D /* Bag.swift in Sources */,
C8093D2F1B8A72BE0088E94D /* Producer.swift in Sources */,
C8093CF71B8A72BE0088E94D /* ImmediateSchedulerType.swift in Sources */,
@ -2533,6 +2541,7 @@
C8F0BFE71BBBFB8B001B112F /* Repeat.swift in Sources */,
C8F0BFE81BBBFB8B001B112F /* Error.swift in Sources */,
C8F0BFE91BBBFB8B001B112F /* Debug.swift in Sources */,
CB883B4D1BE369AA000AC2EE /* AddRef.swift in Sources */,
C8F0BFEA1BBBFB8B001B112F /* Bag.swift in Sources */,
C8F0BFEB1BBBFB8B001B112F /* Producer.swift in Sources */,
C8F0BFEC1BBBFB8B001B112F /* ImmediateSchedulerType.swift in Sources */,
@ -2815,6 +2824,7 @@
D2EBEB3D1BB9B6D8003A27DC /* SchedulerServices+Emulation.swift in Sources */,
D2EBEB1C1BB9B6C1003A27DC /* Sample.swift in Sources */,
D2EBEAFD1BB9B6BA003A27DC /* AnonymousObservable.swift in Sources */,
CB883B4C1BE369AA000AC2EE /* AddRef.swift in Sources */,
D2EBEAFA1BB9B6B2003A27DC /* SingleAssignmentDisposable.swift in Sources */,
D2EBEAF31BB9B6AE003A27DC /* DisposeBag.swift in Sources */,
D2EBEAED1BB9B6A4003A27DC /* Bag.swift in Sources */,

View File

@ -343,6 +343,10 @@
C8DF92EB1B0B38C0009BCF9A /* Images.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = C8DF92E91B0B38C0009BCF9A /* Images.xcassets */; };
C8DF92F61B0B43A4009BCF9A /* IntroductionExampleViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8DF92F51B0B43A4009BCF9A /* IntroductionExampleViewController.swift */; };
C8E9D2AF1BD3FD960079D0DB /* ActivityIndicator.swift in Sources */ = {isa = PBXBuildFile; fileRef = C80397391BD3E17D009D8B26 /* ActivityIndicator.swift */; };
CB883B501BE3AC54000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B4E1BE3AC54000AC2EE /* BooleanDisposable.swift */; };
CB883B511BE3AC54000AC2EE /* RefCountDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B4F1BE3AC54000AC2EE /* RefCountDisposable.swift */; };
CB883B601BE3AC72000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B5E1BE3AC72000AC2EE /* Window.swift */; };
CB883B611BE3AC72000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B5F1BE3AC72000AC2EE /* AddRef.swift */; };
CBEE77541BD8C7B700AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE77531BD8C7B700AD584C /* ToArray.swift */; };
D2245A191BD5654C00E7146F /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */; };
D2AF91981BD3D95900A008C1 /* Using.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2AF91881BD2C51900A008C1 /* Using.swift */; };
@ -733,6 +737,10 @@
C8DF92F01B0B3E67009BCF9A /* Info-OSX.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; path = "Info-OSX.plist"; sourceTree = "<group>"; };
C8DF92F21B0B3E71009BCF9A /* Info-iOS.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; path = "Info-iOS.plist"; sourceTree = "<group>"; };
C8DF92F51B0B43A4009BCF9A /* IntroductionExampleViewController.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = IntroductionExampleViewController.swift; sourceTree = "<group>"; xcLanguageSpecificationIdentifier = xcode.lang.swift; };
CB883B4E1BE3AC54000AC2EE /* BooleanDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BooleanDisposable.swift; sourceTree = "<group>"; };
CB883B4F1BE3AC54000AC2EE /* RefCountDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RefCountDisposable.swift; sourceTree = "<group>"; };
CB883B5E1BE3AC72000AC2EE /* Window.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Window.swift; sourceTree = "<group>"; };
CB883B5F1BE3AC72000AC2EE /* AddRef.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AddRef.swift; sourceTree = "<group>"; };
CBEE77531BD8C7B700AD584C /* ToArray.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ToArray.swift; sourceTree = "<group>"; };
D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = "<group>"; };
D2AF91881BD2C51900A008C1 /* Using.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Using.swift; sourceTree = "<group>"; };
@ -1105,12 +1113,14 @@
C84CC5831BDD484400E06A64 /* SubscriptionDisposable.swift */,
C89464331BC6C2B00055219D /* AnonymousDisposable.swift */,
C89464341BC6C2B00055219D /* BinaryDisposable.swift */,
CB883B4E1BE3AC54000AC2EE /* BooleanDisposable.swift */,
C89464351BC6C2B00055219D /* CompositeDisposable.swift */,
C89464361BC6C2B00055219D /* DisposeBag.swift */,
C89464371BC6C2B00055219D /* DisposeBase.swift */,
C89464381BC6C2B00055219D /* NAryDisposable.swift */,
C89464391BC6C2B00055219D /* NAryDisposable.tt */,
C894643A1BC6C2B00055219D /* NopDisposable.swift */,
CB883B4F1BE3AC54000AC2EE /* RefCountDisposable.swift */,
C894643B1BC6C2B00055219D /* ScheduledDisposable.swift */,
C894643C1BC6C2B00055219D /* ScopedDisposable.swift */,
C894643D1BC6C2B00055219D /* SerialDisposable.swift */,
@ -1140,7 +1150,7 @@
C89464481BC6C2B00055219D /* Implementations */ = {
isa = PBXGroup;
children = (
C84CC52D1BDC344100E06A64 /* ElementAt.swift */,
CB883B5F1BE3AC72000AC2EE /* AddRef.swift */,
C89464491BC6C2B00055219D /* Amb.swift */,
C894644A1BC6C2B00055219D /* AnonymousObservable.swift */,
C894644C1BC6C2B00055219D /* Buffer.swift */,
@ -1156,6 +1166,7 @@
C89464561BC6C2B00055219D /* DelaySubscription.swift */,
C89464571BC6C2B00055219D /* DistinctUntilChanged.swift */,
C89464581BC6C2B00055219D /* Do.swift */,
C84CC52D1BDC344100E06A64 /* ElementAt.swift */,
C89464591BC6C2B00055219D /* Empty.swift */,
C894645A1BC6C2B00055219D /* FailWith.swift */,
C894645B1BC6C2B00055219D /* Filter.swift */,
@ -1191,11 +1202,12 @@
C89464751BC6C2B00055219D /* Timer.swift */,
CBEE77531BD8C7B700AD584C /* ToArray.swift */,
D2AF91881BD2C51900A008C1 /* Using.swift */,
CB883B5E1BE3AC72000AC2EE /* Window.swift */,
D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */,
C89464791BC6C2B00055219D /* Zip.swift */,
C89464761BC6C2B00055219D /* Zip+arity.swift */,
C89464771BC6C2B00055219D /* Zip+arity.tt */,
C89464781BC6C2B00055219D /* Zip+CollectionType.swift */,
D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */,
);
path = Implementations;
sourceTree = "<group>";
@ -1735,6 +1747,7 @@
C89465881BC6C2BC0055219D /* RxScrollViewDelegateProxy.swift in Sources */,
C89464B71BC6C2B00055219D /* Observable+Extensions.swift in Sources */,
C89464A01BC6C2B00055219D /* Lock.swift in Sources */,
CB883B601BE3AC72000AC2EE /* Window.swift in Sources */,
C89464C91BC6C2B00055219D /* Do.swift in Sources */,
C89464A41BC6C2B00055219D /* Queue.swift in Sources */,
C89464B91BC6C2B00055219D /* ObservableConvertibleType.swift in Sources */,
@ -1855,11 +1868,13 @@
C894656E1BC6C2BC0055219D /* DelegateProxy.swift in Sources */,
C89464EF1BC6C2B00055219D /* Observable+Debug.swift in Sources */,
C89464E91BC6C2B00055219D /* Zip+CollectionType.swift in Sources */,
CB883B511BE3AC54000AC2EE /* RefCountDisposable.swift in Sources */,
C89465761BC6C2BC0055219D /* KVOObserver.swift in Sources */,
C89465641BC6C2BC0055219D /* _RXSwizzling.m in Sources */,
C89464CA1BC6C2B00055219D /* Empty.swift in Sources */,
C803973B1BD3E17D009D8B26 /* ActivityIndicator.swift in Sources */,
C89464C61BC6C2B00055219D /* Deferred.swift in Sources */,
CB883B611BE3AC72000AC2EE /* AddRef.swift in Sources */,
D2AF91981BD3D95900A008C1 /* Using.swift in Sources */,
C8297E501B6CF905000589EA /* TableViewController.swift in Sources */,
C8297E511B6CF905000589EA /* PartialUpdatesViewController.swift in Sources */,
@ -1889,6 +1904,7 @@
C89465711BC6C2BC0055219D /* Observable+Bind.swift in Sources */,
C89464B01BC6C2B00055219D /* SerialDisposable.swift in Sources */,
C89464A21BC6C2B00055219D /* Bag.swift in Sources */,
CB883B501BE3AC54000AC2EE /* BooleanDisposable.swift in Sources */,
C894657B1BC6C2BC0055219D /* RxCLLocationManagerDelegateProxy.swift in Sources */,
C8297E581B6CF905000589EA /* User.swift in Sources */,
C89464B41BC6C2B00055219D /* Event.swift in Sources */,

View File

@ -0,0 +1,47 @@
//
// AddRef.swift
// Rx
//
// Created by Junior B. on 30/10/15.
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
//
import Foundation
class AddRefSink<O: ObserverType> : Sink<O>, ObserverType {
typealias Element = O.E
override init(observer: O, cancel: Disposable) {
super.init(observer: observer, cancel: cancel)
}
func on(event: Event<Element>) {
switch event {
case .Next(_):
observer?.on(event)
case .Completed, .Error(_):
observer?.on(event)
dispose()
}
}
}
class AddRef<Element> : Producer<Element> {
typealias EventHandler = Event<Element> throws -> Void
private let _source: Observable<Element>
private let _refCount: RefCountDisposable
init(source: Observable<Element>, refCount: RefCountDisposable) {
_source = source
_refCount = refCount
}
override func run<O: ObserverType where O.E == Element>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let d = StableCompositeDisposable.create(_refCount.disposable, cancel)
let sink = AddRefSink(observer: observer, cancel: d)
setSink(sink)
return _source.subscribeSafe(sink)
}
}

View File

@ -8,3 +8,135 @@
import Foundation
class WindowTimeCountSink<S: SchedulerType, Element, O: ObserverType where O.E == Observable<Element>> : Sink<O>, ObserverType {
typealias Parent = WindowTimeCount<Element, S>
typealias E = Element
private let _parent: Parent
private let _lock = NSRecursiveLock()
private var _subject = PublishSubject<Element>()
private var _count = 0
private var _windowId = 0
private let _timerD = SerialDisposable()
private let _refCountDisposable: RefCountDisposable
private let _groupDisposable = CompositeDisposable()
init(parent: Parent, observer: O, cancel: Disposable) {
_parent = parent
_groupDisposable.addDisposable(_timerD)
_refCountDisposable = RefCountDisposable(disposable: _groupDisposable)
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
observer?.on(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable()))
createTimer(0)
_groupDisposable.addDisposable(_parent._source.subscribeSafe(self))
return _refCountDisposable
}
func startNewWindowAndCompleteCurrentOne() {
_subject.on(.Completed)
_subject = PublishSubject<Element>()
observer?.on(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable()))
}
func on(event: Event<E>) {
var newWindow = false
var newId = 0
_lock.performLocked {
switch event {
case .Next(let element):
_subject.on(.Next(element))
do {
try incrementChecked(&_count)
} catch (let e) {
_subject.on(.Error(e as ErrorType))
dispose()
}
if (_count == _parent._count) {
newWindow = true
_count = 0
newId = ++_windowId
self.startNewWindowAndCompleteCurrentOne()
}
case .Error(let error):
_subject.on(.Error(error))
observer?.on(.Error(error))
dispose()
case .Completed:
_subject.on(.Completed)
observer?.on(.Completed)
dispose()
}
}
if newWindow {
createTimer(newId)
}
}
func createTimer(windowId: Int) {
if _timerD.disposed {
return
}
if _windowId != windowId {
return
}
_timerD.disposable = _parent._scheduler.scheduleRelative(windowId, dueTime: _parent._timeSpan) { previousWindowId in
var newId = 0
self._lock.performLocked {
if previousWindowId != self._windowId {
return
}
self._count = 0
self._windowId = self._windowId &+ 1
newId = self._windowId
self.startNewWindowAndCompleteCurrentOne()
}
self.createTimer(newId)
return NopDisposable.instance
}
}
}
class WindowTimeCount<Element, S: SchedulerType> : Producer<Observable<Element>> {
private let _timeSpan: S.TimeInterval
private let _count: Int
private let _scheduler: S
private let _source: Observable<Element>
init(source: Observable<Element>, timeSpan: S.TimeInterval, count: Int, scheduler: S) {
_source = source
_timeSpan = timeSpan
_count = count
_scheduler = scheduler
}
override func run<O : ObserverType where O.E == Observable<Element>>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
let sink = WindowTimeCountSink(parent: self, observer: observer, cancel: cancel)
setSink(sink)
return sink.run()
}
}

View File

@ -209,3 +209,22 @@ extension ObservableType {
return BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
}
}
// MARK: window
extension ObservableType {
/**
Projects each element of an observable sequence into a window that is completed when either its full or a given amount of time has elapsed.
- parameter timeSpan: Maximum time length of a window.
- parameter count: Maximum element count of a window.
- parameter scheduler: Scheduler to run windowing timers on.
- returns: An observable sequence of windows (instances of `Observable`).
*/
@warn_unused_result(message="http://git.io/rxs.uo")
public func window<S: SchedulerType>(timeSpan timeSpan: S.TimeInterval, count: Int, scheduler: S)
-> Observable<Observable<E>> {
return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
}
}

View File

@ -18,8 +18,7 @@ class ObservableTimeTest : RxTest {
}
}
// throttle
// MARK: Throttle
extension ObservableTimeTest {
func test_ThrottleTimeSpan_AllPass() {
let scheduler = TestScheduler(initialClock: 0)
@ -274,8 +273,7 @@ extension ObservableTimeTest {
}
}
// sample
// MARK: Sample
extension ObservableTimeTest {
func testSample_Sampler_SamplerThrows() {
let scheduler = TestScheduler(initialClock: 0)
@ -710,8 +708,7 @@ extension ObservableTimeTest {
}
}
// interval
// MARK: Interval
extension ObservableTimeTest {
func testInterval_TimeSpan_Basic() {
@ -821,8 +818,7 @@ extension ObservableTimeTest {
}
}
// take
// MARK: Take
extension ObservableTimeTest {
func testTake_TakeZero() {
@ -997,8 +993,7 @@ extension ObservableTimeTest {
}
// take
// MARK: Delay Subscription
extension ObservableTimeTest {
func testDelaySubscription_TimeSpan_Simple() {
@ -1073,7 +1068,7 @@ extension ObservableTimeTest {
}
}
// skip
// MARK: Skip
extension ObservableTimeTest {
func testSkip_Zero() {
let scheduler = TestScheduler(initialClock: 0)
@ -1183,9 +1178,9 @@ extension ObservableTimeTest {
}
}
//
// MARK: Buffer
extension ObservableTimeTest {
func bufferWithTimeOrCount_Basic() {
func testBufferWithTimeOrCount_Basic() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
@ -1221,7 +1216,7 @@ extension ObservableTimeTest {
])
}
func bufferWithTimeOrCount_Error() {
func testBufferWithTimeOrCount_Error() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
@ -1256,7 +1251,7 @@ extension ObservableTimeTest {
])
}
func bufferWithTimeOrCount_Disposed() {
func testBufferWithTimeOrCount_Disposed() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
@ -1287,7 +1282,7 @@ extension ObservableTimeTest {
])
}
func bufferWithTimeOrCount_Default() {
func testBufferWithTimeOrCount_Default() {
let backgroundScheduler = SerialDispatchQueueScheduler(globalConcurrentQueuePriority: .Default)
let result = try! range(1, 10, backgroundScheduler)
@ -1299,4 +1294,210 @@ extension ObservableTimeTest {
XCTAssertEqual(result!, [4, 5, 6])
}
}
// MARK: Window
extension ObservableTimeTest {
func testWindowWithTimeOrCount_Basic() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(205, 1),
next(210, 2),
next(240, 3),
next(280, 4),
next(320, 5),
next(350, 6),
next(370, 7),
next(420, 8),
next(470, 9),
completed(600)
])
let res = scheduler.start { () -> Observable<String> in
let window: Observable<Observable<Int>> = xs.window(timeSpan: 70, count: 3, scheduler: scheduler)
let mappedWithIndex = window.mapWithIndex { (o: Observable<Int>, i: Int) -> Observable<String> in
return o.map { (e: Int) -> String in
return "\(i) \(e)"
}
}
let result = mappedWithIndex.merge()
return result
}
XCTAssertEqual(res.messages, [
next(205, "0 1"),
next(210, "0 2"),
next(240, "0 3"),
next(280, "1 4"),
next(320, "2 5"),
next(350, "2 6"),
next(370, "2 7"),
next(420, "3 8"),
next(470, "4 9"),
completed(600)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 600)
])
}
func testWindowWithTimeOrCount_Error() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(205, 1),
next(210, 2),
next(240, 3),
next(280, 4),
next(320, 5),
next(350, 6),
next(370, 7),
next(420, 8),
next(470, 9),
error(600, testError)
])
let res = scheduler.start { () -> Observable<String> in
let window: Observable<Observable<Int>> = xs.window(timeSpan: 70, count: 3, scheduler: scheduler)
let mappedWithIndex = window.mapWithIndex { (o: Observable<Int>, i: Int) -> Observable<String> in
return o.map { (e: Int) -> String in
return "\(i) \(e)"
}
}
let result = mappedWithIndex.merge()
return result
}
XCTAssertEqual(res.messages, [
next(205, "0 1"),
next(210, "0 2"),
next(240, "0 3"),
next(280, "1 4"),
next(320, "2 5"),
next(350, "2 6"),
next(370, "2 7"),
next(420, "3 8"),
next(470, "4 9"),
error(600, testError)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 600)
])
}
func testWindowWithTimeOrCount_Disposed() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(105, 0),
next(205, 1),
next(210, 2),
next(240, 3),
next(280, 4),
next(320, 5),
next(350, 6),
next(370, 7),
next(420, 8),
next(470, 9),
completed(600)
])
let res = scheduler.start(370) { () -> Observable<String> in
let window: Observable<Observable<Int>> = xs.window(timeSpan: 70, count: 3, scheduler: scheduler)
let mappedWithIndex = window.mapWithIndex { (o: Observable<Int>, i: Int) -> Observable<String> in
return o.map { (e: Int) -> String in
return "\(i) \(e)"
}
}
let result = mappedWithIndex.merge()
return result
}
XCTAssertEqual(res.messages, [
next(205, "0 1"),
next(210, "0 2"),
next(240, "0 3"),
next(280, "1 4"),
next(320, "2 5"),
next(350, "2 6"),
next(370, "2 7")
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 370)
])
}
/*
func testWindowWithTimeOrCount_BasicPeriod() {
let scheduler = TestScheduler(initialClock: 0)
let xs = scheduler.createHotObservable([
next(150, 1),
next(210, 2),
next(240, 3),
next(270, 4),
next(320, 5),
next(360, 6),
next(390, 7),
next(410, 8),
next(460, 9),
next(470, 10),
completed(490)
])
let res = scheduler.start { () -> Observable<String> in
let window: Observable<Observable<Int>> = xs.window(timeSpan: 100, count: 3, scheduler: scheduler)
let mappedWithIndex = window.mapWithIndex { (o: Observable<Int>, i: Int) -> Observable<String> in
return o.map { (e: Int) -> String in
return "\(i) \(e)"
}.concat(just("\(i) end"))
}
let result = mappedWithIndex.merge()
return result
}
XCTAssertEqual(res.messages, [
next(210, "0 2"),
next(240, "0 3"),
next(270, "0 4"),
next(300, "0 end"),
next(320, "1 5"),
next(360, "1 6"),
next(390, "1 7"),
next(400, "1 end"),
next(410, "2 8"),
next(460, "2 9"),
next(470, "2 10"),
next(490, "2 end"),
completed(490)
])
XCTAssertEqual(xs.subscriptions, [
Subscription(200, 490)
])
}*/
func windowWithTimeOrCount_Default() {
let backgroundScheduler = SerialDispatchQueueScheduler(globalConcurrentQueuePriority: .Default)
let result = try! range(1, 10, backgroundScheduler)
.window(timeSpan: 1000, count: 3, scheduler: backgroundScheduler)
.mapWithIndex { (o: Observable<Int>, i: Int) -> Observable<String> in
return o.map { (e: Int) -> String in
return "\(i) \(e)"
}
}
.merge()
.skip(4)
.toBlocking()
.first()
XCTAssertEqual(result!, "1 5")
}
}