From 7fe9a87cb52adf192262f9e1ecbb413dabdad883 Mon Sep 17 00:00:00 2001 From: Junior B Date: Fri, 30 Oct 2015 14:57:13 +0100 Subject: [PATCH] Add `window` operator, time/count version --- Rx.xcodeproj/project.pbxproj | 10 + RxExample/RxExample.xcodeproj/project.pbxproj | 20 +- .../Observables/Implementations/AddRef.swift | 47 ++++ .../Observables/Implementations/Window.swift | 132 ++++++++++ RxSwift/Observables/Observable+Time.swift | 19 ++ .../Tests/Observable+TimeTest.swift | 233 ++++++++++++++++-- 6 files changed, 443 insertions(+), 18 deletions(-) create mode 100644 RxSwift/Observables/Implementations/AddRef.swift diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index a2d22f45..d71823d7 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -576,6 +576,10 @@ CB883B461BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; }; CB883B471BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; }; CB883B481BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; }; + CB883B4A1BE369AA000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B491BE369AA000AC2EE /* AddRef.swift */; }; + CB883B4B1BE369AA000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B491BE369AA000AC2EE /* AddRef.swift */; }; + CB883B4C1BE369AA000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B491BE369AA000AC2EE /* AddRef.swift */; }; + CB883B4D1BE369AA000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B491BE369AA000AC2EE /* AddRef.swift */; }; CBEE771F1BD649A000AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE771E1BD649A000AD584C /* ToArray.swift */; }; CBEE77201BD649A000AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE771E1BD649A000AD584C /* ToArray.swift */; }; CBEE77211BD649A000AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE771E1BD649A000AD584C /* ToArray.swift */; }; @@ -1037,6 +1041,7 @@ CB883B3A1BE24355000AC2EE /* Window.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Window.swift; sourceTree = ""; }; CB883B3F1BE24C15000AC2EE /* RefCountDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RefCountDisposable.swift; sourceTree = ""; }; CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BooleanDisposable.swift; sourceTree = ""; }; + CB883B491BE369AA000AC2EE /* AddRef.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AddRef.swift; sourceTree = ""; }; CBEE771E1BD649A000AD584C /* ToArray.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ToArray.swift; sourceTree = ""; }; D2138C751BB9BE9800339B5C /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; }; D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = ""; }; @@ -1234,6 +1239,7 @@ C8093C6A1B8A72BE0088E94D /* Implementations */ = { isa = PBXGroup; children = ( + CB883B491BE369AA000AC2EE /* AddRef.swift */, C8093C6B1B8A72BE0088E94D /* Amb.swift */, C8C3DA111B93A3EA004D233E /* AnonymousObservable.swift */, C821DBA11BA4DCAB008F3809 /* Buffer.swift */, @@ -2271,6 +2277,7 @@ C8640A041BA5B12A00D3C4E8 /* Repeat.swift in Sources */, C8093CF41B8A72BE0088E94D /* Error.swift in Sources */, C8093D141B8A72BE0088E94D /* Debug.swift in Sources */, + CB883B4B1BE369AA000AC2EE /* AddRef.swift in Sources */, C8093CCE1B8A72BE0088E94D /* Bag.swift in Sources */, C8093D301B8A72BE0088E94D /* Producer.swift in Sources */, C8093CF81B8A72BE0088E94D /* ImmediateSchedulerType.swift in Sources */, @@ -2402,6 +2409,7 @@ C8640A031BA5B12A00D3C4E8 /* Repeat.swift in Sources */, C8093CF31B8A72BE0088E94D /* Error.swift in Sources */, C8093D131B8A72BE0088E94D /* Debug.swift in Sources */, + CB883B4A1BE369AA000AC2EE /* AddRef.swift in Sources */, C8093CCD1B8A72BE0088E94D /* Bag.swift in Sources */, C8093D2F1B8A72BE0088E94D /* Producer.swift in Sources */, C8093CF71B8A72BE0088E94D /* ImmediateSchedulerType.swift in Sources */, @@ -2533,6 +2541,7 @@ C8F0BFE71BBBFB8B001B112F /* Repeat.swift in Sources */, C8F0BFE81BBBFB8B001B112F /* Error.swift in Sources */, C8F0BFE91BBBFB8B001B112F /* Debug.swift in Sources */, + CB883B4D1BE369AA000AC2EE /* AddRef.swift in Sources */, C8F0BFEA1BBBFB8B001B112F /* Bag.swift in Sources */, C8F0BFEB1BBBFB8B001B112F /* Producer.swift in Sources */, C8F0BFEC1BBBFB8B001B112F /* ImmediateSchedulerType.swift in Sources */, @@ -2815,6 +2824,7 @@ D2EBEB3D1BB9B6D8003A27DC /* SchedulerServices+Emulation.swift in Sources */, D2EBEB1C1BB9B6C1003A27DC /* Sample.swift in Sources */, D2EBEAFD1BB9B6BA003A27DC /* AnonymousObservable.swift in Sources */, + CB883B4C1BE369AA000AC2EE /* AddRef.swift in Sources */, D2EBEAFA1BB9B6B2003A27DC /* SingleAssignmentDisposable.swift in Sources */, D2EBEAF31BB9B6AE003A27DC /* DisposeBag.swift in Sources */, D2EBEAED1BB9B6A4003A27DC /* Bag.swift in Sources */, diff --git a/RxExample/RxExample.xcodeproj/project.pbxproj b/RxExample/RxExample.xcodeproj/project.pbxproj index 02b11de6..fcb9a82f 100644 --- a/RxExample/RxExample.xcodeproj/project.pbxproj +++ b/RxExample/RxExample.xcodeproj/project.pbxproj @@ -343,6 +343,10 @@ C8DF92EB1B0B38C0009BCF9A /* Images.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = C8DF92E91B0B38C0009BCF9A /* Images.xcassets */; }; C8DF92F61B0B43A4009BCF9A /* IntroductionExampleViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8DF92F51B0B43A4009BCF9A /* IntroductionExampleViewController.swift */; }; C8E9D2AF1BD3FD960079D0DB /* ActivityIndicator.swift in Sources */ = {isa = PBXBuildFile; fileRef = C80397391BD3E17D009D8B26 /* ActivityIndicator.swift */; }; + CB883B501BE3AC54000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B4E1BE3AC54000AC2EE /* BooleanDisposable.swift */; }; + CB883B511BE3AC54000AC2EE /* RefCountDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B4F1BE3AC54000AC2EE /* RefCountDisposable.swift */; }; + CB883B601BE3AC72000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B5E1BE3AC72000AC2EE /* Window.swift */; }; + CB883B611BE3AC72000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B5F1BE3AC72000AC2EE /* AddRef.swift */; }; CBEE77541BD8C7B700AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE77531BD8C7B700AD584C /* ToArray.swift */; }; D2245A191BD5654C00E7146F /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */; }; D2AF91981BD3D95900A008C1 /* Using.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2AF91881BD2C51900A008C1 /* Using.swift */; }; @@ -733,6 +737,10 @@ C8DF92F01B0B3E67009BCF9A /* Info-OSX.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; path = "Info-OSX.plist"; sourceTree = ""; }; C8DF92F21B0B3E71009BCF9A /* Info-iOS.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; path = "Info-iOS.plist"; sourceTree = ""; }; C8DF92F51B0B43A4009BCF9A /* IntroductionExampleViewController.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = IntroductionExampleViewController.swift; sourceTree = ""; xcLanguageSpecificationIdentifier = xcode.lang.swift; }; + CB883B4E1BE3AC54000AC2EE /* BooleanDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BooleanDisposable.swift; sourceTree = ""; }; + CB883B4F1BE3AC54000AC2EE /* RefCountDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RefCountDisposable.swift; sourceTree = ""; }; + CB883B5E1BE3AC72000AC2EE /* Window.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Window.swift; sourceTree = ""; }; + CB883B5F1BE3AC72000AC2EE /* AddRef.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AddRef.swift; sourceTree = ""; }; CBEE77531BD8C7B700AD584C /* ToArray.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ToArray.swift; sourceTree = ""; }; D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = ""; }; D2AF91881BD2C51900A008C1 /* Using.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Using.swift; sourceTree = ""; }; @@ -1105,12 +1113,14 @@ C84CC5831BDD484400E06A64 /* SubscriptionDisposable.swift */, C89464331BC6C2B00055219D /* AnonymousDisposable.swift */, C89464341BC6C2B00055219D /* BinaryDisposable.swift */, + CB883B4E1BE3AC54000AC2EE /* BooleanDisposable.swift */, C89464351BC6C2B00055219D /* CompositeDisposable.swift */, C89464361BC6C2B00055219D /* DisposeBag.swift */, C89464371BC6C2B00055219D /* DisposeBase.swift */, C89464381BC6C2B00055219D /* NAryDisposable.swift */, C89464391BC6C2B00055219D /* NAryDisposable.tt */, C894643A1BC6C2B00055219D /* NopDisposable.swift */, + CB883B4F1BE3AC54000AC2EE /* RefCountDisposable.swift */, C894643B1BC6C2B00055219D /* ScheduledDisposable.swift */, C894643C1BC6C2B00055219D /* ScopedDisposable.swift */, C894643D1BC6C2B00055219D /* SerialDisposable.swift */, @@ -1140,7 +1150,7 @@ C89464481BC6C2B00055219D /* Implementations */ = { isa = PBXGroup; children = ( - C84CC52D1BDC344100E06A64 /* ElementAt.swift */, + CB883B5F1BE3AC72000AC2EE /* AddRef.swift */, C89464491BC6C2B00055219D /* Amb.swift */, C894644A1BC6C2B00055219D /* AnonymousObservable.swift */, C894644C1BC6C2B00055219D /* Buffer.swift */, @@ -1156,6 +1166,7 @@ C89464561BC6C2B00055219D /* DelaySubscription.swift */, C89464571BC6C2B00055219D /* DistinctUntilChanged.swift */, C89464581BC6C2B00055219D /* Do.swift */, + C84CC52D1BDC344100E06A64 /* ElementAt.swift */, C89464591BC6C2B00055219D /* Empty.swift */, C894645A1BC6C2B00055219D /* FailWith.swift */, C894645B1BC6C2B00055219D /* Filter.swift */, @@ -1191,11 +1202,12 @@ C89464751BC6C2B00055219D /* Timer.swift */, CBEE77531BD8C7B700AD584C /* ToArray.swift */, D2AF91881BD2C51900A008C1 /* Using.swift */, + CB883B5E1BE3AC72000AC2EE /* Window.swift */, + D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */, C89464791BC6C2B00055219D /* Zip.swift */, C89464761BC6C2B00055219D /* Zip+arity.swift */, C89464771BC6C2B00055219D /* Zip+arity.tt */, C89464781BC6C2B00055219D /* Zip+CollectionType.swift */, - D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */, ); path = Implementations; sourceTree = ""; @@ -1735,6 +1747,7 @@ C89465881BC6C2BC0055219D /* RxScrollViewDelegateProxy.swift in Sources */, C89464B71BC6C2B00055219D /* Observable+Extensions.swift in Sources */, C89464A01BC6C2B00055219D /* Lock.swift in Sources */, + CB883B601BE3AC72000AC2EE /* Window.swift in Sources */, C89464C91BC6C2B00055219D /* Do.swift in Sources */, C89464A41BC6C2B00055219D /* Queue.swift in Sources */, C89464B91BC6C2B00055219D /* ObservableConvertibleType.swift in Sources */, @@ -1855,11 +1868,13 @@ C894656E1BC6C2BC0055219D /* DelegateProxy.swift in Sources */, C89464EF1BC6C2B00055219D /* Observable+Debug.swift in Sources */, C89464E91BC6C2B00055219D /* Zip+CollectionType.swift in Sources */, + CB883B511BE3AC54000AC2EE /* RefCountDisposable.swift in Sources */, C89465761BC6C2BC0055219D /* KVOObserver.swift in Sources */, C89465641BC6C2BC0055219D /* _RXSwizzling.m in Sources */, C89464CA1BC6C2B00055219D /* Empty.swift in Sources */, C803973B1BD3E17D009D8B26 /* ActivityIndicator.swift in Sources */, C89464C61BC6C2B00055219D /* Deferred.swift in Sources */, + CB883B611BE3AC72000AC2EE /* AddRef.swift in Sources */, D2AF91981BD3D95900A008C1 /* Using.swift in Sources */, C8297E501B6CF905000589EA /* TableViewController.swift in Sources */, C8297E511B6CF905000589EA /* PartialUpdatesViewController.swift in Sources */, @@ -1889,6 +1904,7 @@ C89465711BC6C2BC0055219D /* Observable+Bind.swift in Sources */, C89464B01BC6C2B00055219D /* SerialDisposable.swift in Sources */, C89464A21BC6C2B00055219D /* Bag.swift in Sources */, + CB883B501BE3AC54000AC2EE /* BooleanDisposable.swift in Sources */, C894657B1BC6C2BC0055219D /* RxCLLocationManagerDelegateProxy.swift in Sources */, C8297E581B6CF905000589EA /* User.swift in Sources */, C89464B41BC6C2B00055219D /* Event.swift in Sources */, diff --git a/RxSwift/Observables/Implementations/AddRef.swift b/RxSwift/Observables/Implementations/AddRef.swift new file mode 100644 index 00000000..36176dff --- /dev/null +++ b/RxSwift/Observables/Implementations/AddRef.swift @@ -0,0 +1,47 @@ +// +// AddRef.swift +// Rx +// +// Created by Junior B. on 30/10/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +class AddRefSink : Sink, ObserverType { + typealias Element = O.E + + override init(observer: O, cancel: Disposable) { + super.init(observer: observer, cancel: cancel) + } + + func on(event: Event) { + switch event { + case .Next(_): + observer?.on(event) + case .Completed, .Error(_): + observer?.on(event) + dispose() + } + } +} + +class AddRef : Producer { + typealias EventHandler = Event throws -> Void + + private let _source: Observable + private let _refCount: RefCountDisposable + + init(source: Observable, refCount: RefCountDisposable) { + _source = source + _refCount = refCount + } + + override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + let d = StableCompositeDisposable.create(_refCount.disposable, cancel) + + let sink = AddRefSink(observer: observer, cancel: d) + setSink(sink) + return _source.subscribeSafe(sink) + } +} \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Window.swift b/RxSwift/Observables/Implementations/Window.swift index c1a3a954..48a911de 100644 --- a/RxSwift/Observables/Implementations/Window.swift +++ b/RxSwift/Observables/Implementations/Window.swift @@ -8,3 +8,135 @@ import Foundation +class WindowTimeCountSink> : Sink, ObserverType { + typealias Parent = WindowTimeCount + typealias E = Element + + private let _parent: Parent + + private let _lock = NSRecursiveLock() + + private var _subject = PublishSubject() + private var _count = 0 + private var _windowId = 0 + + private let _timerD = SerialDisposable() + private let _refCountDisposable: RefCountDisposable + private let _groupDisposable = CompositeDisposable() + + init(parent: Parent, observer: O, cancel: Disposable) { + _parent = parent + + _groupDisposable.addDisposable(_timerD) + + _refCountDisposable = RefCountDisposable(disposable: _groupDisposable) + super.init(observer: observer, cancel: cancel) + } + + func run() -> Disposable { + + observer?.on(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable())) + createTimer(0) + + _groupDisposable.addDisposable(_parent._source.subscribeSafe(self)) + return _refCountDisposable + } + + func startNewWindowAndCompleteCurrentOne() { + _subject.on(.Completed) + _subject = PublishSubject() + + observer?.on(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable())) + } + + func on(event: Event) { + + var newWindow = false + var newId = 0 + + _lock.performLocked { + switch event { + case .Next(let element): + _subject.on(.Next(element)) + + do { + try incrementChecked(&_count) + } catch (let e) { + _subject.on(.Error(e as ErrorType)) + dispose() + } + + if (_count == _parent._count) { + newWindow = true + _count = 0 + newId = ++_windowId + self.startNewWindowAndCompleteCurrentOne() + } + + case .Error(let error): + _subject.on(.Error(error)) + observer?.on(.Error(error)) + dispose() + case .Completed: + _subject.on(.Completed) + observer?.on(.Completed) + dispose() + } + } + + if newWindow { + createTimer(newId) + } + } + + func createTimer(windowId: Int) { + if _timerD.disposed { + return + } + + if _windowId != windowId { + return + } + + _timerD.disposable = _parent._scheduler.scheduleRelative(windowId, dueTime: _parent._timeSpan) { previousWindowId in + + var newId = 0 + + self._lock.performLocked { + if previousWindowId != self._windowId { + return + } + + self._count = 0 + self._windowId = self._windowId &+ 1 + newId = self._windowId + self.startNewWindowAndCompleteCurrentOne() + } + + self.createTimer(newId) + + return NopDisposable.instance + } + } +} + +class WindowTimeCount : Producer> { + + private let _timeSpan: S.TimeInterval + private let _count: Int + private let _scheduler: S + private let _source: Observable + + init(source: Observable, timeSpan: S.TimeInterval, count: Int, scheduler: S) { + _source = source + _timeSpan = timeSpan + _count = count + _scheduler = scheduler + } + + override func run>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + let sink = WindowTimeCountSink(parent: self, observer: observer, cancel: cancel) + setSink(sink) + return sink.run() + } +} diff --git a/RxSwift/Observables/Observable+Time.swift b/RxSwift/Observables/Observable+Time.swift index 6917b79a..7b46104a 100644 --- a/RxSwift/Observables/Observable+Time.swift +++ b/RxSwift/Observables/Observable+Time.swift @@ -209,3 +209,22 @@ extension ObservableType { return BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler) } } + +// MARK: window + +extension ObservableType { + + /** + Projects each element of an observable sequence into a window that is completed when either it’s full or a given amount of time has elapsed. + + - parameter timeSpan: Maximum time length of a window. + - parameter count: Maximum element count of a window. + - parameter scheduler: Scheduler to run windowing timers on. + - returns: An observable sequence of windows (instances of `Observable`). + */ + @warn_unused_result(message="http://git.io/rxs.uo") + public func window(timeSpan timeSpan: S.TimeInterval, count: Int, scheduler: S) + -> Observable> { + return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler) + } +} diff --git a/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift b/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift index 8ccc7784..5b7ce9cb 100644 --- a/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift +++ b/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift @@ -18,8 +18,7 @@ class ObservableTimeTest : RxTest { } } -// throttle - +// MARK: Throttle extension ObservableTimeTest { func test_ThrottleTimeSpan_AllPass() { let scheduler = TestScheduler(initialClock: 0) @@ -274,8 +273,7 @@ extension ObservableTimeTest { } } -// sample - +// MARK: Sample extension ObservableTimeTest { func testSample_Sampler_SamplerThrows() { let scheduler = TestScheduler(initialClock: 0) @@ -710,8 +708,7 @@ extension ObservableTimeTest { } } -// interval - +// MARK: Interval extension ObservableTimeTest { func testInterval_TimeSpan_Basic() { @@ -821,8 +818,7 @@ extension ObservableTimeTest { } } -// take - +// MARK: Take extension ObservableTimeTest { func testTake_TakeZero() { @@ -997,8 +993,7 @@ extension ObservableTimeTest { } -// take - +// MARK: Delay Subscription extension ObservableTimeTest { func testDelaySubscription_TimeSpan_Simple() { @@ -1073,7 +1068,7 @@ extension ObservableTimeTest { } } -// skip +// MARK: Skip extension ObservableTimeTest { func testSkip_Zero() { let scheduler = TestScheduler(initialClock: 0) @@ -1183,9 +1178,9 @@ extension ObservableTimeTest { } } -// +// MARK: Buffer extension ObservableTimeTest { - func bufferWithTimeOrCount_Basic() { + func testBufferWithTimeOrCount_Basic() { let scheduler = TestScheduler(initialClock: 0) let xs = scheduler.createHotObservable([ @@ -1221,7 +1216,7 @@ extension ObservableTimeTest { ]) } - func bufferWithTimeOrCount_Error() { + func testBufferWithTimeOrCount_Error() { let scheduler = TestScheduler(initialClock: 0) let xs = scheduler.createHotObservable([ @@ -1256,7 +1251,7 @@ extension ObservableTimeTest { ]) } - func bufferWithTimeOrCount_Disposed() { + func testBufferWithTimeOrCount_Disposed() { let scheduler = TestScheduler(initialClock: 0) let xs = scheduler.createHotObservable([ @@ -1287,7 +1282,7 @@ extension ObservableTimeTest { ]) } - func bufferWithTimeOrCount_Default() { + func testBufferWithTimeOrCount_Default() { let backgroundScheduler = SerialDispatchQueueScheduler(globalConcurrentQueuePriority: .Default) let result = try! range(1, 10, backgroundScheduler) @@ -1299,4 +1294,210 @@ extension ObservableTimeTest { XCTAssertEqual(result!, [4, 5, 6]) } +} + +// MARK: Window +extension ObservableTimeTest { + func testWindowWithTimeOrCount_Basic() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(205, 1), + next(210, 2), + next(240, 3), + next(280, 4), + next(320, 5), + next(350, 6), + next(370, 7), + next(420, 8), + next(470, 9), + completed(600) + ]) + + let res = scheduler.start { () -> Observable in + let window: Observable> = xs.window(timeSpan: 70, count: 3, scheduler: scheduler) + let mappedWithIndex = window.mapWithIndex { (o: Observable, i: Int) -> Observable in + return o.map { (e: Int) -> String in + return "\(i) \(e)" + } + } + let result = mappedWithIndex.merge() + return result + } + + XCTAssertEqual(res.messages, [ + next(205, "0 1"), + next(210, "0 2"), + next(240, "0 3"), + next(280, "1 4"), + next(320, "2 5"), + next(350, "2 6"), + next(370, "2 7"), + next(420, "3 8"), + next(470, "4 9"), + completed(600) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 600) + ]) + } + + func testWindowWithTimeOrCount_Error() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(205, 1), + next(210, 2), + next(240, 3), + next(280, 4), + next(320, 5), + next(350, 6), + next(370, 7), + next(420, 8), + next(470, 9), + error(600, testError) + ]) + + let res = scheduler.start { () -> Observable in + let window: Observable> = xs.window(timeSpan: 70, count: 3, scheduler: scheduler) + let mappedWithIndex = window.mapWithIndex { (o: Observable, i: Int) -> Observable in + return o.map { (e: Int) -> String in + return "\(i) \(e)" + } + } + let result = mappedWithIndex.merge() + return result + } + + XCTAssertEqual(res.messages, [ + next(205, "0 1"), + next(210, "0 2"), + next(240, "0 3"), + next(280, "1 4"), + next(320, "2 5"), + next(350, "2 6"), + next(370, "2 7"), + next(420, "3 8"), + next(470, "4 9"), + error(600, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 600) + ]) + } + + func testWindowWithTimeOrCount_Disposed() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(105, 0), + next(205, 1), + next(210, 2), + next(240, 3), + next(280, 4), + next(320, 5), + next(350, 6), + next(370, 7), + next(420, 8), + next(470, 9), + completed(600) + ]) + + let res = scheduler.start(370) { () -> Observable in + let window: Observable> = xs.window(timeSpan: 70, count: 3, scheduler: scheduler) + let mappedWithIndex = window.mapWithIndex { (o: Observable, i: Int) -> Observable in + return o.map { (e: Int) -> String in + return "\(i) \(e)" + } + } + let result = mappedWithIndex.merge() + return result + } + + XCTAssertEqual(res.messages, [ + next(205, "0 1"), + next(210, "0 2"), + next(240, "0 3"), + next(280, "1 4"), + next(320, "2 5"), + next(350, "2 6"), + next(370, "2 7") + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 370) + ]) + } + + /* + func testWindowWithTimeOrCount_BasicPeriod() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 1), + next(210, 2), + next(240, 3), + next(270, 4), + next(320, 5), + next(360, 6), + next(390, 7), + next(410, 8), + next(460, 9), + next(470, 10), + completed(490) + ]) + + let res = scheduler.start { () -> Observable in + let window: Observable> = xs.window(timeSpan: 100, count: 3, scheduler: scheduler) + let mappedWithIndex = window.mapWithIndex { (o: Observable, i: Int) -> Observable in + return o.map { (e: Int) -> String in + return "\(i) \(e)" + }.concat(just("\(i) end")) + } + let result = mappedWithIndex.merge() + return result + } + + XCTAssertEqual(res.messages, [ + next(210, "0 2"), + next(240, "0 3"), + next(270, "0 4"), + next(300, "0 end"), + next(320, "1 5"), + next(360, "1 6"), + next(390, "1 7"), + next(400, "1 end"), + next(410, "2 8"), + next(460, "2 9"), + next(470, "2 10"), + next(490, "2 end"), + completed(490) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 490) + ]) + + }*/ + + func windowWithTimeOrCount_Default() { + let backgroundScheduler = SerialDispatchQueueScheduler(globalConcurrentQueuePriority: .Default) + + let result = try! range(1, 10, backgroundScheduler) + .window(timeSpan: 1000, count: 3, scheduler: backgroundScheduler) + .mapWithIndex { (o: Observable, i: Int) -> Observable in + return o.map { (e: Int) -> String in + return "\(i) \(e)" + } + } + .merge() + .skip(4) + .toBlocking() + .first() + + XCTAssertEqual(result!, "1 5") + } + } \ No newline at end of file