From d4cda2430e354d99583df4deec12a458c0a14288 Mon Sep 17 00:00:00 2001 From: Krunoslav Zaher Date: Thu, 5 Nov 2015 12:34:20 +0100 Subject: [PATCH 1/5] Blocking operators run runloop while blocking. --- Rx.xcodeproj/project.pbxproj | 10 + RxBlocking/BlockingObservable+Operators.swift | 189 ++++++++---------- RxBlocking/RunLoopLock.swift | 33 +++ .../Tests/Observable+BlockingTest.swift | 12 +- 4 files changed, 135 insertions(+), 109 deletions(-) create mode 100644 RxBlocking/RunLoopLock.swift diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index 99c1d709..0d2175a6 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -358,6 +358,10 @@ C88254341B8A752B00B02D69 /* UITableView+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88254121B8A752B00B02D69 /* UITableView+Rx.swift */; }; C88254351B8A752B00B02D69 /* UITextField+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88254131B8A752B00B02D69 /* UITextField+Rx.swift */; }; C88254361B8A752B00B02D69 /* UITextView+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88254141B8A752B00B02D69 /* UITextView+Rx.swift */; }; + C88E296B1BEB712E001CCB92 /* RunLoopLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */; }; + C88E296C1BEB712E001CCB92 /* RunLoopLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */; }; + C88E296D1BEB712E001CCB92 /* RunLoopLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */; }; + C88E296E1BEB712E001CCB92 /* RunLoopLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */; }; C8941BDF1BD5695C00A0E874 /* BlockingObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8941BDE1BD5695C00A0E874 /* BlockingObservable.swift */; }; C8941BE01BD5695C00A0E874 /* BlockingObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8941BDE1BD5695C00A0E874 /* BlockingObservable.swift */; }; C8941BE11BD5695C00A0E874 /* BlockingObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8941BDE1BD5695C00A0E874 /* BlockingObservable.swift */; }; @@ -1001,6 +1005,7 @@ C88254131B8A752B00B02D69 /* UITextField+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UITextField+Rx.swift"; sourceTree = ""; }; C88254141B8A752B00B02D69 /* UITextView+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UITextView+Rx.swift"; sourceTree = ""; }; C88BB8711B07E5ED0064D411 /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; }; + C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RunLoopLock.swift; sourceTree = ""; }; C8941BDE1BD5695C00A0E874 /* BlockingObservable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BlockingObservable.swift; sourceTree = ""; }; C8941BE31BD56B0700A0E874 /* BlockingObservable+Operators.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "BlockingObservable+Operators.swift"; sourceTree = ""; }; C89CDB351BCB0DD7002063D9 /* ShareReplay1.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = ShareReplay1.swift; sourceTree = ""; xcLanguageSpecificationIdentifier = xcode.lang.swift; }; @@ -1400,6 +1405,7 @@ C8093F581B8A73A20088E94D /* ObservableConvertibleType+Blocking.swift */, C8941BDE1BD5695C00A0E874 /* BlockingObservable.swift */, C8941BE31BD56B0700A0E874 /* BlockingObservable+Operators.swift */, + C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */, C8093F591B8A73A20088E94D /* README.md */, ); path = RxBlocking; @@ -2121,6 +2127,7 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( + C88E296B1BEB712E001CCB92 /* RunLoopLock.swift in Sources */, C8941BDF1BD5695C00A0E874 /* BlockingObservable.swift in Sources */, C8941BE41BD56B0700A0E874 /* BlockingObservable+Operators.swift in Sources */, C8093F5E1B8A73A20088E94D /* ObservableConvertibleType+Blocking.swift in Sources */, @@ -2131,6 +2138,7 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( + C88E296C1BEB712E001CCB92 /* RunLoopLock.swift in Sources */, C8941BE01BD5695C00A0E874 /* BlockingObservable.swift in Sources */, C8941BE51BD56B0700A0E874 /* BlockingObservable+Operators.swift in Sources */, C8093F5F1B8A73A20088E94D /* ObservableConvertibleType+Blocking.swift in Sources */, @@ -2595,6 +2603,7 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( + C88E296E1BEB712E001CCB92 /* RunLoopLock.swift in Sources */, C8941BE21BD5695C00A0E874 /* BlockingObservable.swift in Sources */, C8941BE71BD56B0700A0E874 /* BlockingObservable+Operators.swift in Sources */, C8F0C04F1BBBFBCE001B112F /* ObservableConvertibleType+Blocking.swift in Sources */, @@ -2803,6 +2812,7 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( + C88E296D1BEB712E001CCB92 /* RunLoopLock.swift in Sources */, C8941BE11BD5695C00A0E874 /* BlockingObservable.swift in Sources */, C8941BE61BD56B0700A0E874 /* BlockingObservable+Operators.swift in Sources */, D2EBEB8A1BB9B9EE003A27DC /* ObservableConvertibleType+Blocking.swift in Sources */, diff --git a/RxBlocking/BlockingObservable+Operators.swift b/RxBlocking/BlockingObservable+Operators.swift index 76e816f0..3f05801c 100644 --- a/RxBlocking/BlockingObservable+Operators.swift +++ b/RxBlocking/BlockingObservable+Operators.swift @@ -13,44 +13,39 @@ import Foundation extension BlockingObservable { /** - Blocks current thread until sequence terminates. - - If sequence terminates with error, terminating error will be thrown. - - - returns: All elements of sequence. - */ - public func toArray() throws -> [E] { - let condition = NSCondition() - - var elements: [E] = Array() - - var error: ErrorType? - - var ended = false + Blocks current thread until sequence terminates. - _ = self.source.subscribe { e in - switch e { - case .Next(let element): - elements.append(element) - case .Error(let e): - error = e - condition.lock() - ended = true - condition.signal() - condition.unlock() - case .Completed: - condition.lock() - ended = true - condition.signal() - condition.unlock() + If sequence terminates with error, terminating error will be thrown. + + - returns: All elements of sequence. + */ + public func toArray() throws -> [E] { + var elements: [E] = Array() + + var error: ErrorType? + + let lock = RunLoopLock() + + let d = SingleAssignmentDisposable() + + lock.dispatch { + d.disposable = self.source.subscribe { e in + switch e { + case .Next(let element): + elements.append(element) + case .Error(let e): + error = e + lock.stop() + case .Completed: + lock.stop() + } } } - condition.lock() - while !ended { - condition.wait() - } - condition.unlock() - + + lock.run() + + d.dispose() + if let error = error { throw error } @@ -61,99 +56,87 @@ extension BlockingObservable { extension BlockingObservable { /** - Blocks current thread until sequence produces first element. - - If sequence terminates with error before producing first element, terminating error will be thrown. - - - returns: First element of sequence. If sequence is empty `nil` is returned. - */ + Blocks current thread until sequence produces first element. + + If sequence terminates with error before producing first element, terminating error will be thrown. + + - returns: First element of sequence. If sequence is empty `nil` is returned. + */ public func first() throws -> E? { - let condition = NSCondition() - var element: E? - + var error: ErrorType? - - var ended = false - + let d = SingleAssignmentDisposable() - - d.disposable = self.source.subscribe { e in - switch e { - case .Next(let e): - if element == nil { - element = e + + let lock = RunLoopLock() + + lock.dispatch { + d.disposable = self.source.subscribe { e in + switch e { + case .Next(let e): + if element == nil { + element = e + } + break + case .Error(let e): + error = e + default: + break } - break - case .Error(let e): - error = e - default: - break + + lock.stop() } - - condition.lock() - ended = true - condition.signal() - condition.unlock() - } - - condition.lock() - while !ended { - condition.wait() } + + lock.run() + d.dispose() - condition.unlock() - + if let error = error { throw error } - + return element } } extension BlockingObservable { /** - Blocks current thread until sequence terminates. - - If sequence terminates with error, terminating error will be thrown. - - - returns: Last element in the sequence. If sequence is empty `nil` is returned. - */ + Blocks current thread until sequence terminates. + + If sequence terminates with error, terminating error will be thrown. + + - returns: Last element in the sequence. If sequence is empty `nil` is returned. + */ public func last() throws -> E? { - let condition = NSCondition() - var element: E? - + var error: ErrorType? - - var ended = false - + let d = SingleAssignmentDisposable() - - d.disposable = self.source.subscribe { e in - switch e { - case .Next(let e): - element = e - return - case .Error(let e): - error = e - default: - break + + let lock = RunLoopLock() + + lock.dispatch { + d.disposable = self.source.subscribe { e in + switch e { + case .Next(let e): + element = e + return + case .Error(let e): + error = e + default: + break + } + + lock.stop() } - - condition.lock() - ended = true - condition.signal() - condition.unlock() } - condition.lock() - while !ended { - condition.wait() - } + lock.run() + d.dispose() - condition.unlock() if let error = error { throw error diff --git a/RxBlocking/RunLoopLock.swift b/RxBlocking/RunLoopLock.swift new file mode 100644 index 00000000..5ea9ba73 --- /dev/null +++ b/RxBlocking/RunLoopLock.swift @@ -0,0 +1,33 @@ +// +// RunLoopLock.swift +// Rx +// +// Created by Krunoslav Zaher on 11/5/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +class RunLoopLock : NSObject { + let currentRunLoop: CFRunLoopRef + + override init() { + currentRunLoop = CFRunLoopGetCurrent() + } + + func dispatch(action: () -> ()) { + CFRunLoopPerformBlock(currentRunLoop, kCFRunLoopDefaultMode, action) + CFRunLoopWakeUp(currentRunLoop) + } + + func stop() { + CFRunLoopPerformBlock(currentRunLoop, kCFRunLoopDefaultMode) { + CFRunLoopStop(self.currentRunLoop) + } + CFRunLoopWakeUp(currentRunLoop) + } + + func run() { + CFRunLoopRun() + } +} diff --git a/RxTests/RxSwiftTests/Tests/Observable+BlockingTest.swift b/RxTests/RxSwiftTests/Tests/Observable+BlockingTest.swift index 448cc3ae..00b4c410 100644 --- a/RxTests/RxSwiftTests/Tests/Observable+BlockingTest.swift +++ b/RxTests/RxSwiftTests/Tests/Observable+BlockingTest.swift @@ -30,8 +30,8 @@ extension ObservableBlockingTest { try (failWith(testError) as Observable).toBlocking().toArray() XCTFail("It should fail") } - catch { - + catch let e { + XCTAssertTrue(e as NSError === testError) } } @@ -67,8 +67,8 @@ extension ObservableBlockingTest { try (failWith(testError) as Observable).toBlocking().first() XCTFail() } - catch { - + catch let e { + XCTAssertTrue(e as NSError === testError) } } @@ -104,8 +104,8 @@ extension ObservableBlockingTest { try (failWith(testError) as Observable).toBlocking().last() XCTFail() } - catch { - + catch let e { + XCTAssertTrue(e as NSError === testError) } } From f20618ed756d442fd662dea56f6cb29c8b7a4f24 Mon Sep 17 00:00:00 2001 From: Fernando Paredes Date: Thu, 5 Nov 2015 09:57:57 -0800 Subject: [PATCH 2/5] Update README to use standard Carthage syntax --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2f475f0e..1a8b0aa6 100644 --- a/README.md +++ b/README.md @@ -480,7 +480,7 @@ $ pod install Add this to `Cartfile` ``` -git "git@github.com:ReactiveX/RxSwift.git" "2.0.0-beta.2" +github "ReactiveX/RxSwift" "2.0.0-beta.2" ``` ``` From e7723a21738a87e875a42fcdb60c9445bbebed52 Mon Sep 17 00:00:00 2001 From: Junior B Date: Fri, 30 Oct 2015 09:06:10 +0100 Subject: [PATCH 3/5] Adds `RefCountDisposable` and `BooleanDisposable` and necessary tests --- Rx.xcodeproj/project.pbxproj | 38 ++++- RxSwift/Disposables/BooleanDisposable.swift | 47 +++++++ RxSwift/Disposables/RefCountDisposable.swift | 131 ++++++++++++++++++ .../Observables/Implementations/Window.swift | 10 ++ .../RxSwiftTests/Tests/DisposableTest.swift | 46 ++++++ 5 files changed, 268 insertions(+), 4 deletions(-) create mode 100644 RxSwift/Disposables/BooleanDisposable.swift create mode 100644 RxSwift/Disposables/RefCountDisposable.swift create mode 100644 RxSwift/Observables/Implementations/Window.swift diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index 0d2175a6..a2d22f45 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -564,6 +564,18 @@ C8F0C0441BBBFBB9001B112F /* _RXSwizzling.h in Headers */ = {isa = PBXBuildFile; fileRef = C8093E881B8A732E0088E94D /* _RXSwizzling.h */; settings = {ATTRIBUTES = (Public, ); }; }; C8F0C0451BBBFBB9001B112F /* _RXKVOObserver.h in Headers */ = {isa = PBXBuildFile; fileRef = C8093E861B8A732E0088E94D /* _RXKVOObserver.h */; settings = {ATTRIBUTES = (Public, ); }; }; C8F0C04F1BBBFBCE001B112F /* ObservableConvertibleType+Blocking.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093F581B8A73A20088E94D /* ObservableConvertibleType+Blocking.swift */; }; + CB883B3B1BE24355000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3A1BE24355000AC2EE /* Window.swift */; }; + CB883B3C1BE24355000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3A1BE24355000AC2EE /* Window.swift */; }; + CB883B3D1BE24355000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3A1BE24355000AC2EE /* Window.swift */; }; + CB883B3E1BE24355000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3A1BE24355000AC2EE /* Window.swift */; }; + CB883B401BE24C15000AC2EE /* RefCountDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3F1BE24C15000AC2EE /* RefCountDisposable.swift */; }; + CB883B411BE24C15000AC2EE /* RefCountDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3F1BE24C15000AC2EE /* RefCountDisposable.swift */; }; + CB883B421BE24C15000AC2EE /* RefCountDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3F1BE24C15000AC2EE /* RefCountDisposable.swift */; }; + CB883B431BE24C15000AC2EE /* RefCountDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3F1BE24C15000AC2EE /* RefCountDisposable.swift */; }; + CB883B451BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; }; + CB883B461BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; }; + CB883B471BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; }; + CB883B481BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; }; CBEE771F1BD649A000AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE771E1BD649A000AD584C /* ToArray.swift */; }; CBEE77201BD649A000AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE771E1BD649A000AD584C /* ToArray.swift */; }; CBEE77211BD649A000AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE771E1BD649A000AD584C /* ToArray.swift */; }; @@ -1022,6 +1034,9 @@ C8F0C0021BBBFB8B001B112F /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; }; C8F0C04B1BBBFBB9001B112F /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; }; C8F0C0581BBBFBCE001B112F /* RxBlocking.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; }; + CB883B3A1BE24355000AC2EE /* Window.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Window.swift; sourceTree = ""; }; + CB883B3F1BE24C15000AC2EE /* RefCountDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RefCountDisposable.swift; sourceTree = ""; }; + CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BooleanDisposable.swift; sourceTree = ""; }; CBEE771E1BD649A000AD584C /* ToArray.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ToArray.swift; sourceTree = ""; }; D2138C751BB9BE9800339B5C /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; }; D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = ""; }; @@ -1181,6 +1196,7 @@ children = ( C8093C541B8A72BE0088E94D /* AnonymousDisposable.swift */, C8093C551B8A72BE0088E94D /* BinaryDisposable.swift */, + CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */, C8093C571B8A72BE0088E94D /* CompositeDisposable.swift */, C8093C581B8A72BE0088E94D /* DisposeBag.swift */, C8093C591B8A72BE0088E94D /* DisposeBase.swift */, @@ -1188,6 +1204,7 @@ C8093C5A1B8A72BE0088E94D /* NAryDisposable.swift */, C8093C5B1B8A72BE0088E94D /* NAryDisposable.tt */, C8093C5C1B8A72BE0088E94D /* NopDisposable.swift */, + CB883B3F1BE24C15000AC2EE /* RefCountDisposable.swift */, C8093C5D1B8A72BE0088E94D /* ScheduledDisposable.swift */, C8093C5E1B8A72BE0088E94D /* ScopedDisposable.swift */, C8093C5F1B8A72BE0088E94D /* SerialDisposable.swift */, @@ -1217,8 +1234,6 @@ C8093C6A1B8A72BE0088E94D /* Implementations */ = { isa = PBXGroup; children = ( - C84CC53F1BDC3B3700E06A64 /* ElementAt.swift */, - D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */, C8093C6B1B8A72BE0088E94D /* Amb.swift */, C8C3DA111B93A3EA004D233E /* AnonymousObservable.swift */, C821DBA11BA4DCAB008F3809 /* Buffer.swift */, @@ -1234,6 +1249,7 @@ C8093C771B8A72BE0088E94D /* DelaySubscription.swift */, C8093C781B8A72BE0088E94D /* DistinctUntilChanged.swift */, C8093C791B8A72BE0088E94D /* Do.swift */, + C84CC53F1BDC3B3700E06A64 /* ElementAt.swift */, C8C3DA051B9393AC004D233E /* Empty.swift */, C8C3DA081B93941E004D233E /* FailWith.swift */, C8093C7A1B8A72BE0088E94D /* Filter.swift */, @@ -1257,6 +1273,7 @@ C8093C881B8A72BE0088E94D /* Sink.swift */, C8093C891B8A72BE0088E94D /* Skip.swift */, D285BAC31BC0231000B3F602 /* SkipUntil.swift */, + D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */, C8093C8A1B8A72BE0088E94D /* StartWith.swift */, C8093C8B1B8A72BE0088E94D /* SubscribeOn.swift */, C8093C8C1B8A72BE0088E94D /* Switch.swift */, @@ -1267,12 +1284,13 @@ C8093C901B8A72BE0088E94D /* Throttle.swift */, C8093C911B8A72BE0088E94D /* Timer.swift */, CBEE771E1BD649A000AD584C /* ToArray.swift */, + D235B23D1BD003DD007E84DA /* Using.swift */, + CB883B3A1BE24355000AC2EE /* Window.swift */, + D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */, C8093C941B8A72BE0088E94D /* Zip.swift */, C8093C921B8A72BE0088E94D /* Zip+arity.swift */, C8093C931B8A72BE0088E94D /* Zip+arity.tt */, C8C3D9FD1B935EDF004D233E /* Zip+CollectionType.swift */, - D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */, - D235B23D1BD003DD007E84DA /* Using.swift */, ); path = Implementations; sourceTree = ""; @@ -2164,12 +2182,14 @@ C8093CF01B8A72BE0088E94D /* StableCompositeDisposable.swift in Sources */, D2245A1C1BD63C4600E7146F /* WithLatestFrom.swift in Sources */, C8093D4E1B8A72BE0088E94D /* Zip+arity.swift in Sources */, + CB883B3C1BE24355000AC2EE /* Window.swift in Sources */, C8093D4C1B8A72BE0088E94D /* Timer.swift in Sources */, C8C3DA071B9393AC004D233E /* Empty.swift in Sources */, C8093D881B8A72BE0088E94D /* RxBox.swift in Sources */, C8093D3A1B8A72BE0088E94D /* Sink.swift in Sources */, C8093D461B8A72BE0088E94D /* TakeUntil.swift in Sources */, C8093D941B8A72BE0088E94D /* MainScheduler.swift in Sources */, + CB883B461BE256D4000AC2EE /* BooleanDisposable.swift in Sources */, C84B38EF1BA433CD001B7D88 /* Generate.swift in Sources */, C8093D161B8A72BE0088E94D /* Deferred.swift in Sources */, C8093DA41B8A72BE0088E94D /* ReplaySubject.swift in Sources */, @@ -2193,6 +2213,7 @@ C8093CD81B8A72BE0088E94D /* BinaryDisposable.swift in Sources */, C89CDB371BCB0DD7002063D9 /* ShareReplay1.swift in Sources */, C8093D2A1B8A72BE0088E94D /* ObserveOn.swift in Sources */, + CB883B411BE24C15000AC2EE /* RefCountDisposable.swift in Sources */, C8093D361B8A72BE0088E94D /* Sample.swift in Sources */, C84CC54F1BDCF48200E06A64 /* LockOwnerType.swift in Sources */, D2752D621BC5551A0070C418 /* SkipUntil.swift in Sources */, @@ -2292,12 +2313,14 @@ C8093CEF1B8A72BE0088E94D /* StableCompositeDisposable.swift in Sources */, D2245A1B1BD5657300E7146F /* WithLatestFrom.swift in Sources */, C8093D4D1B8A72BE0088E94D /* Zip+arity.swift in Sources */, + CB883B3B1BE24355000AC2EE /* Window.swift in Sources */, C8093D4B1B8A72BE0088E94D /* Timer.swift in Sources */, C8C3DA061B9393AC004D233E /* Empty.swift in Sources */, C8093D871B8A72BE0088E94D /* RxBox.swift in Sources */, C8093D391B8A72BE0088E94D /* Sink.swift in Sources */, C8093D451B8A72BE0088E94D /* TakeUntil.swift in Sources */, C8093D931B8A72BE0088E94D /* MainScheduler.swift in Sources */, + CB883B451BE256D4000AC2EE /* BooleanDisposable.swift in Sources */, C84B38EE1BA433CD001B7D88 /* Generate.swift in Sources */, C8093D151B8A72BE0088E94D /* Deferred.swift in Sources */, C8093DA31B8A72BE0088E94D /* ReplaySubject.swift in Sources */, @@ -2321,6 +2344,7 @@ C8093CD71B8A72BE0088E94D /* BinaryDisposable.swift in Sources */, C89CDB361BCB0DD7002063D9 /* ShareReplay1.swift in Sources */, C8093D291B8A72BE0088E94D /* ObserveOn.swift in Sources */, + CB883B401BE24C15000AC2EE /* RefCountDisposable.swift in Sources */, C8093D351B8A72BE0088E94D /* Sample.swift in Sources */, C84CC54E1BDCF48200E06A64 /* LockOwnerType.swift in Sources */, D285BAC41BC0231000B3F602 /* SkipUntil.swift in Sources */, @@ -2420,12 +2444,14 @@ C8F0BF9C1BBBFB8B001B112F /* StableCompositeDisposable.swift in Sources */, D2245A1E1BD63C4A00E7146F /* WithLatestFrom.swift in Sources */, C8F0BF9D1BBBFB8B001B112F /* Zip+arity.swift in Sources */, + CB883B3E1BE24355000AC2EE /* Window.swift in Sources */, C8F0BF9E1BBBFB8B001B112F /* Timer.swift in Sources */, C8F0BF9F1BBBFB8B001B112F /* Empty.swift in Sources */, C8F0BFA01BBBFB8B001B112F /* RxBox.swift in Sources */, C8F0BFA11BBBFB8B001B112F /* Sink.swift in Sources */, C8F0BFA21BBBFB8B001B112F /* TakeUntil.swift in Sources */, C8F0BFA31BBBFB8B001B112F /* MainScheduler.swift in Sources */, + CB883B481BE256D4000AC2EE /* BooleanDisposable.swift in Sources */, C8F0BFA41BBBFB8B001B112F /* Generate.swift in Sources */, C8F0BFA51BBBFB8B001B112F /* Deferred.swift in Sources */, C8F0BFA61BBBFB8B001B112F /* ReplaySubject.swift in Sources */, @@ -2449,6 +2475,7 @@ C8F0BFB61BBBFB8B001B112F /* BinaryDisposable.swift in Sources */, C89CDB391BCB0DD7002063D9 /* ShareReplay1.swift in Sources */, C8F0BFB71BBBFB8B001B112F /* ObserveOn.swift in Sources */, + CB883B431BE24C15000AC2EE /* RefCountDisposable.swift in Sources */, C8F0BFB81BBBFB8B001B112F /* Sample.swift in Sources */, C84CC5511BDCF48200E06A64 /* LockOwnerType.swift in Sources */, D21C29311BC6A1C300448E70 /* SkipUntil.swift in Sources */, @@ -2699,12 +2726,14 @@ D2EBEB1B1BB9B6C1003A27DC /* Repeat.swift in Sources */, D2245A1D1BD63C4700E7146F /* WithLatestFrom.swift in Sources */, D2EBEAF81BB9B6B2003A27DC /* ScopedDisposable.swift in Sources */, + CB883B3D1BE24355000AC2EE /* Window.swift in Sources */, D2EBEAEA1BB9B697003A27DC /* SchedulerType.swift in Sources */, D2EBEB031BB9B6C1003A27DC /* CombineLatest+CollectionType.swift in Sources */, D2EBEADC1BB9B697003A27DC /* Cancelable.swift in Sources */, D2EBEAE41BB9B697003A27DC /* ObservableType.swift in Sources */, D2EBEB331BB9B6CA003A27DC /* Observable+Time.swift in Sources */, D2EBEB191BB9B6C1003A27DC /* Reduce.swift in Sources */, + CB883B471BE256D4000AC2EE /* BooleanDisposable.swift in Sources */, D2EBEB001BB9B6BA003A27DC /* Catch.swift in Sources */, D2EBEB161BB9B6C1003A27DC /* ObserveOnSerialDispatchQueue.swift in Sources */, D2EBEB061BB9B6C1003A27DC /* Debug.swift in Sources */, @@ -2728,6 +2757,7 @@ D2EBEB091BB9B6C1003A27DC /* DistinctUntilChanged.swift in Sources */, D2EBEB2A1BB9B6C5003A27DC /* Zip+CollectionType.swift in Sources */, C89CDB381BCB0DD7002063D9 /* ShareReplay1.swift in Sources */, + CB883B421BE24C15000AC2EE /* RefCountDisposable.swift in Sources */, D2EBEB401BB9B6DE003A27DC /* BehaviorSubject.swift in Sources */, C84CC5501BDCF48200E06A64 /* LockOwnerType.swift in Sources */, D2EBEB271BB9B6C1003A27DC /* Timer.swift in Sources */, diff --git a/RxSwift/Disposables/BooleanDisposable.swift b/RxSwift/Disposables/BooleanDisposable.swift new file mode 100644 index 00000000..ec33b80b --- /dev/null +++ b/RxSwift/Disposables/BooleanDisposable.swift @@ -0,0 +1,47 @@ +// +// BooleanDisposable.swift +// Rx +// +// Created by Junior B. on 10/29/15. +// Copyright (c) 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +/** +Represents a disposable resource that can be checked for disposal status. +*/ +public class BooleanDisposable : Disposable, Cancelable { + + internal static let BooleanDisposableTrue = BooleanDisposable(disposed: true) + private var _disposed = false + + /** + Initializes a new instance of the `BooleanDisposable` class + */ + public init() { + } + + /** + Initializes a new instance of the `BooleanDisposable` class with given value + */ + public init(disposed: Bool) { + self._disposed = disposed + } + + /** + - returns: Was resource disposed. + */ + public var disposed: Bool { + get { + return _disposed + } + } + + /** + Sets the status to disposed, which can be observer through the `disposed` property. + */ + public func dispose() { + _disposed = true + } +} \ No newline at end of file diff --git a/RxSwift/Disposables/RefCountDisposable.swift b/RxSwift/Disposables/RefCountDisposable.swift new file mode 100644 index 00000000..4b44b848 --- /dev/null +++ b/RxSwift/Disposables/RefCountDisposable.swift @@ -0,0 +1,131 @@ +// +// RefCountDisposable.swift +// Rx +// +// Created by Junior B. on 10/29/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +/** + Represents a disposable resource that only disposes its underlying disposable resource when all dependent disposable objects have been disposed. + */ +public class RefCountDisposable : DisposeBase, Cancelable { + private var _lock = SpinLock() + private var _disposable = nil as Disposable? + private var _primaryDisposed = false + private var _count = 0 + + /** + - returns: Was resource disposed. + */ + public var disposed: Bool { + get { + return _disposable == nil + } + } + + /** + Initializes a new instance of the `RefCountDisposable`. + */ + public init(disposable: Disposable) { + _disposable = disposable + super.init() + } + + /** + Holds a dependent disposable that when disposed decreases the refcount on the underlying disposable. + + When getter is called, a dependent disposable contributing to the reference count that manages the underlying disposable's lifetime is returned. + */ + public var disposable: Disposable { + get { + return _lock.calculateLocked { + if let _ = _disposable { + + do{ + try incrementChecked(&_count) + } catch (_){ + rxFatalError("RefCountDisposable increment failed") + } + + return RefCountInnerDisposable(self) + } else { + return NopDisposable.instance + } + } + } + } + + /** + Disposes the underlying disposable only when all dependent disposables have been disposed. + */ + public func dispose() { + let disposable: Disposable? = _lock.calculateLocked { + if let d = _disposable where !_primaryDisposed + { + _primaryDisposed = true; + + if (_count == 0) + { + _disposable = nil + return d + } + } + + return nil + } + + if let disposable = disposable { + disposable.dispose() + } + } + + private func release() { + let disposable: Disposable? = _lock.calculateLocked { + if let d = _disposable + { + do{ + try decrementChecked(&_count) + } catch (_){ + rxFatalError("RefCountDisposable decrement on release failed") + } + + guard _count >= 0 else { + rxFatalError("RefCountDisposable counter is lower than 0") + } + + if _primaryDisposed && _count == 0 { + _disposable = nil + return d + } + } + + return nil + } + + if let disposable = disposable { + disposable.dispose() + } + } +} + +internal final class RefCountInnerDisposable: DisposeBase, Disposable +{ + private let _parent: RefCountDisposable + private var _disposed: Int32 = 0 + + init(_ parent: RefCountDisposable) + { + _parent = parent; + super.init() + } + + internal func dispose() + { + if OSAtomicCompareAndSwap32(0, 1, &_disposed) { + _parent.release() + } + } +} \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Window.swift b/RxSwift/Observables/Implementations/Window.swift new file mode 100644 index 00000000..c1a3a954 --- /dev/null +++ b/RxSwift/Observables/Implementations/Window.swift @@ -0,0 +1,10 @@ +// +// Buffer.swift +// Rx +// +// Created by Junior B. on 29/10/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + diff --git a/RxTests/RxSwiftTests/Tests/DisposableTest.swift b/RxTests/RxSwiftTests/Tests/DisposableTest.swift index 4a309809..1b56f12e 100644 --- a/RxTests/RxSwiftTests/Tests/DisposableTest.swift +++ b/RxTests/RxSwiftTests/Tests/DisposableTest.swift @@ -123,4 +123,50 @@ class DisposableTest : RxTest { XCTAssertEqual(numberDisposed, 2) XCTAssertEqual(compositeDisposable.count, 0) } + + func testRefCountDisposable_RefCounting() { + let d = BooleanDisposable() + let r = RefCountDisposable(disposable: d) + + XCTAssertEqual(r.disposed, false) + + let d1 = r.disposable; + let d2 = r.disposable; + + XCTAssertEqual(d.disposed, false) + + d1.dispose() + XCTAssertEqual(d.disposed, false) + + d2.dispose() + XCTAssertEqual(d.disposed, false) + + r.dispose() + XCTAssertEqual(d.disposed, true) + + let d3 = r.disposable; + d3.dispose() + } + + func testRefCountDisposable_PrimaryDisposesFirst() { + let d = BooleanDisposable() + let r = RefCountDisposable(disposable: d) + + XCTAssertEqual(r.disposed, false) + + let d1 = r.disposable; + let d2 = r.disposable; + + XCTAssertEqual(d.disposed, false) + + d1.dispose() + XCTAssertEqual(d.disposed, false) + + r.dispose() + XCTAssertEqual(d.disposed, false) + + d2.dispose() + XCTAssertEqual(d.disposed, true) + + } } \ No newline at end of file From 7fe9a87cb52adf192262f9e1ecbb413dabdad883 Mon Sep 17 00:00:00 2001 From: Junior B Date: Fri, 30 Oct 2015 14:57:13 +0100 Subject: [PATCH 4/5] Add `window` operator, time/count version --- Rx.xcodeproj/project.pbxproj | 10 + RxExample/RxExample.xcodeproj/project.pbxproj | 20 +- .../Observables/Implementations/AddRef.swift | 47 ++++ .../Observables/Implementations/Window.swift | 132 ++++++++++ RxSwift/Observables/Observable+Time.swift | 19 ++ .../Tests/Observable+TimeTest.swift | 233 ++++++++++++++++-- 6 files changed, 443 insertions(+), 18 deletions(-) create mode 100644 RxSwift/Observables/Implementations/AddRef.swift diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index a2d22f45..d71823d7 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -576,6 +576,10 @@ CB883B461BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; }; CB883B471BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; }; CB883B481BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; }; + CB883B4A1BE369AA000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B491BE369AA000AC2EE /* AddRef.swift */; }; + CB883B4B1BE369AA000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B491BE369AA000AC2EE /* AddRef.swift */; }; + CB883B4C1BE369AA000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B491BE369AA000AC2EE /* AddRef.swift */; }; + CB883B4D1BE369AA000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B491BE369AA000AC2EE /* AddRef.swift */; }; CBEE771F1BD649A000AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE771E1BD649A000AD584C /* ToArray.swift */; }; CBEE77201BD649A000AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE771E1BD649A000AD584C /* ToArray.swift */; }; CBEE77211BD649A000AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE771E1BD649A000AD584C /* ToArray.swift */; }; @@ -1037,6 +1041,7 @@ CB883B3A1BE24355000AC2EE /* Window.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Window.swift; sourceTree = ""; }; CB883B3F1BE24C15000AC2EE /* RefCountDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RefCountDisposable.swift; sourceTree = ""; }; CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BooleanDisposable.swift; sourceTree = ""; }; + CB883B491BE369AA000AC2EE /* AddRef.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AddRef.swift; sourceTree = ""; }; CBEE771E1BD649A000AD584C /* ToArray.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ToArray.swift; sourceTree = ""; }; D2138C751BB9BE9800339B5C /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; }; D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = ""; }; @@ -1234,6 +1239,7 @@ C8093C6A1B8A72BE0088E94D /* Implementations */ = { isa = PBXGroup; children = ( + CB883B491BE369AA000AC2EE /* AddRef.swift */, C8093C6B1B8A72BE0088E94D /* Amb.swift */, C8C3DA111B93A3EA004D233E /* AnonymousObservable.swift */, C821DBA11BA4DCAB008F3809 /* Buffer.swift */, @@ -2271,6 +2277,7 @@ C8640A041BA5B12A00D3C4E8 /* Repeat.swift in Sources */, C8093CF41B8A72BE0088E94D /* Error.swift in Sources */, C8093D141B8A72BE0088E94D /* Debug.swift in Sources */, + CB883B4B1BE369AA000AC2EE /* AddRef.swift in Sources */, C8093CCE1B8A72BE0088E94D /* Bag.swift in Sources */, C8093D301B8A72BE0088E94D /* Producer.swift in Sources */, C8093CF81B8A72BE0088E94D /* ImmediateSchedulerType.swift in Sources */, @@ -2402,6 +2409,7 @@ C8640A031BA5B12A00D3C4E8 /* Repeat.swift in Sources */, C8093CF31B8A72BE0088E94D /* Error.swift in Sources */, C8093D131B8A72BE0088E94D /* Debug.swift in Sources */, + CB883B4A1BE369AA000AC2EE /* AddRef.swift in Sources */, C8093CCD1B8A72BE0088E94D /* Bag.swift in Sources */, C8093D2F1B8A72BE0088E94D /* Producer.swift in Sources */, C8093CF71B8A72BE0088E94D /* ImmediateSchedulerType.swift in Sources */, @@ -2533,6 +2541,7 @@ C8F0BFE71BBBFB8B001B112F /* Repeat.swift in Sources */, C8F0BFE81BBBFB8B001B112F /* Error.swift in Sources */, C8F0BFE91BBBFB8B001B112F /* Debug.swift in Sources */, + CB883B4D1BE369AA000AC2EE /* AddRef.swift in Sources */, C8F0BFEA1BBBFB8B001B112F /* Bag.swift in Sources */, C8F0BFEB1BBBFB8B001B112F /* Producer.swift in Sources */, C8F0BFEC1BBBFB8B001B112F /* ImmediateSchedulerType.swift in Sources */, @@ -2815,6 +2824,7 @@ D2EBEB3D1BB9B6D8003A27DC /* SchedulerServices+Emulation.swift in Sources */, D2EBEB1C1BB9B6C1003A27DC /* Sample.swift in Sources */, D2EBEAFD1BB9B6BA003A27DC /* AnonymousObservable.swift in Sources */, + CB883B4C1BE369AA000AC2EE /* AddRef.swift in Sources */, D2EBEAFA1BB9B6B2003A27DC /* SingleAssignmentDisposable.swift in Sources */, D2EBEAF31BB9B6AE003A27DC /* DisposeBag.swift in Sources */, D2EBEAED1BB9B6A4003A27DC /* Bag.swift in Sources */, diff --git a/RxExample/RxExample.xcodeproj/project.pbxproj b/RxExample/RxExample.xcodeproj/project.pbxproj index 02b11de6..fcb9a82f 100644 --- a/RxExample/RxExample.xcodeproj/project.pbxproj +++ b/RxExample/RxExample.xcodeproj/project.pbxproj @@ -343,6 +343,10 @@ C8DF92EB1B0B38C0009BCF9A /* Images.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = C8DF92E91B0B38C0009BCF9A /* Images.xcassets */; }; C8DF92F61B0B43A4009BCF9A /* IntroductionExampleViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8DF92F51B0B43A4009BCF9A /* IntroductionExampleViewController.swift */; }; C8E9D2AF1BD3FD960079D0DB /* ActivityIndicator.swift in Sources */ = {isa = PBXBuildFile; fileRef = C80397391BD3E17D009D8B26 /* ActivityIndicator.swift */; }; + CB883B501BE3AC54000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B4E1BE3AC54000AC2EE /* BooleanDisposable.swift */; }; + CB883B511BE3AC54000AC2EE /* RefCountDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B4F1BE3AC54000AC2EE /* RefCountDisposable.swift */; }; + CB883B601BE3AC72000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B5E1BE3AC72000AC2EE /* Window.swift */; }; + CB883B611BE3AC72000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B5F1BE3AC72000AC2EE /* AddRef.swift */; }; CBEE77541BD8C7B700AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE77531BD8C7B700AD584C /* ToArray.swift */; }; D2245A191BD5654C00E7146F /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */; }; D2AF91981BD3D95900A008C1 /* Using.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2AF91881BD2C51900A008C1 /* Using.swift */; }; @@ -733,6 +737,10 @@ C8DF92F01B0B3E67009BCF9A /* Info-OSX.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; path = "Info-OSX.plist"; sourceTree = ""; }; C8DF92F21B0B3E71009BCF9A /* Info-iOS.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; path = "Info-iOS.plist"; sourceTree = ""; }; C8DF92F51B0B43A4009BCF9A /* IntroductionExampleViewController.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = IntroductionExampleViewController.swift; sourceTree = ""; xcLanguageSpecificationIdentifier = xcode.lang.swift; }; + CB883B4E1BE3AC54000AC2EE /* BooleanDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BooleanDisposable.swift; sourceTree = ""; }; + CB883B4F1BE3AC54000AC2EE /* RefCountDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RefCountDisposable.swift; sourceTree = ""; }; + CB883B5E1BE3AC72000AC2EE /* Window.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Window.swift; sourceTree = ""; }; + CB883B5F1BE3AC72000AC2EE /* AddRef.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AddRef.swift; sourceTree = ""; }; CBEE77531BD8C7B700AD584C /* ToArray.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ToArray.swift; sourceTree = ""; }; D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = ""; }; D2AF91881BD2C51900A008C1 /* Using.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Using.swift; sourceTree = ""; }; @@ -1105,12 +1113,14 @@ C84CC5831BDD484400E06A64 /* SubscriptionDisposable.swift */, C89464331BC6C2B00055219D /* AnonymousDisposable.swift */, C89464341BC6C2B00055219D /* BinaryDisposable.swift */, + CB883B4E1BE3AC54000AC2EE /* BooleanDisposable.swift */, C89464351BC6C2B00055219D /* CompositeDisposable.swift */, C89464361BC6C2B00055219D /* DisposeBag.swift */, C89464371BC6C2B00055219D /* DisposeBase.swift */, C89464381BC6C2B00055219D /* NAryDisposable.swift */, C89464391BC6C2B00055219D /* NAryDisposable.tt */, C894643A1BC6C2B00055219D /* NopDisposable.swift */, + CB883B4F1BE3AC54000AC2EE /* RefCountDisposable.swift */, C894643B1BC6C2B00055219D /* ScheduledDisposable.swift */, C894643C1BC6C2B00055219D /* ScopedDisposable.swift */, C894643D1BC6C2B00055219D /* SerialDisposable.swift */, @@ -1140,7 +1150,7 @@ C89464481BC6C2B00055219D /* Implementations */ = { isa = PBXGroup; children = ( - C84CC52D1BDC344100E06A64 /* ElementAt.swift */, + CB883B5F1BE3AC72000AC2EE /* AddRef.swift */, C89464491BC6C2B00055219D /* Amb.swift */, C894644A1BC6C2B00055219D /* AnonymousObservable.swift */, C894644C1BC6C2B00055219D /* Buffer.swift */, @@ -1156,6 +1166,7 @@ C89464561BC6C2B00055219D /* DelaySubscription.swift */, C89464571BC6C2B00055219D /* DistinctUntilChanged.swift */, C89464581BC6C2B00055219D /* Do.swift */, + C84CC52D1BDC344100E06A64 /* ElementAt.swift */, C89464591BC6C2B00055219D /* Empty.swift */, C894645A1BC6C2B00055219D /* FailWith.swift */, C894645B1BC6C2B00055219D /* Filter.swift */, @@ -1191,11 +1202,12 @@ C89464751BC6C2B00055219D /* Timer.swift */, CBEE77531BD8C7B700AD584C /* ToArray.swift */, D2AF91881BD2C51900A008C1 /* Using.swift */, + CB883B5E1BE3AC72000AC2EE /* Window.swift */, + D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */, C89464791BC6C2B00055219D /* Zip.swift */, C89464761BC6C2B00055219D /* Zip+arity.swift */, C89464771BC6C2B00055219D /* Zip+arity.tt */, C89464781BC6C2B00055219D /* Zip+CollectionType.swift */, - D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */, ); path = Implementations; sourceTree = ""; @@ -1735,6 +1747,7 @@ C89465881BC6C2BC0055219D /* RxScrollViewDelegateProxy.swift in Sources */, C89464B71BC6C2B00055219D /* Observable+Extensions.swift in Sources */, C89464A01BC6C2B00055219D /* Lock.swift in Sources */, + CB883B601BE3AC72000AC2EE /* Window.swift in Sources */, C89464C91BC6C2B00055219D /* Do.swift in Sources */, C89464A41BC6C2B00055219D /* Queue.swift in Sources */, C89464B91BC6C2B00055219D /* ObservableConvertibleType.swift in Sources */, @@ -1855,11 +1868,13 @@ C894656E1BC6C2BC0055219D /* DelegateProxy.swift in Sources */, C89464EF1BC6C2B00055219D /* Observable+Debug.swift in Sources */, C89464E91BC6C2B00055219D /* Zip+CollectionType.swift in Sources */, + CB883B511BE3AC54000AC2EE /* RefCountDisposable.swift in Sources */, C89465761BC6C2BC0055219D /* KVOObserver.swift in Sources */, C89465641BC6C2BC0055219D /* _RXSwizzling.m in Sources */, C89464CA1BC6C2B00055219D /* Empty.swift in Sources */, C803973B1BD3E17D009D8B26 /* ActivityIndicator.swift in Sources */, C89464C61BC6C2B00055219D /* Deferred.swift in Sources */, + CB883B611BE3AC72000AC2EE /* AddRef.swift in Sources */, D2AF91981BD3D95900A008C1 /* Using.swift in Sources */, C8297E501B6CF905000589EA /* TableViewController.swift in Sources */, C8297E511B6CF905000589EA /* PartialUpdatesViewController.swift in Sources */, @@ -1889,6 +1904,7 @@ C89465711BC6C2BC0055219D /* Observable+Bind.swift in Sources */, C89464B01BC6C2B00055219D /* SerialDisposable.swift in Sources */, C89464A21BC6C2B00055219D /* Bag.swift in Sources */, + CB883B501BE3AC54000AC2EE /* BooleanDisposable.swift in Sources */, C894657B1BC6C2BC0055219D /* RxCLLocationManagerDelegateProxy.swift in Sources */, C8297E581B6CF905000589EA /* User.swift in Sources */, C89464B41BC6C2B00055219D /* Event.swift in Sources */, diff --git a/RxSwift/Observables/Implementations/AddRef.swift b/RxSwift/Observables/Implementations/AddRef.swift new file mode 100644 index 00000000..36176dff --- /dev/null +++ b/RxSwift/Observables/Implementations/AddRef.swift @@ -0,0 +1,47 @@ +// +// AddRef.swift +// Rx +// +// Created by Junior B. on 30/10/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +class AddRefSink : Sink, ObserverType { + typealias Element = O.E + + override init(observer: O, cancel: Disposable) { + super.init(observer: observer, cancel: cancel) + } + + func on(event: Event) { + switch event { + case .Next(_): + observer?.on(event) + case .Completed, .Error(_): + observer?.on(event) + dispose() + } + } +} + +class AddRef : Producer { + typealias EventHandler = Event throws -> Void + + private let _source: Observable + private let _refCount: RefCountDisposable + + init(source: Observable, refCount: RefCountDisposable) { + _source = source + _refCount = refCount + } + + override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + let d = StableCompositeDisposable.create(_refCount.disposable, cancel) + + let sink = AddRefSink(observer: observer, cancel: d) + setSink(sink) + return _source.subscribeSafe(sink) + } +} \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Window.swift b/RxSwift/Observables/Implementations/Window.swift index c1a3a954..48a911de 100644 --- a/RxSwift/Observables/Implementations/Window.swift +++ b/RxSwift/Observables/Implementations/Window.swift @@ -8,3 +8,135 @@ import Foundation +class WindowTimeCountSink> : Sink, ObserverType { + typealias Parent = WindowTimeCount + typealias E = Element + + private let _parent: Parent + + private let _lock = NSRecursiveLock() + + private var _subject = PublishSubject() + private var _count = 0 + private var _windowId = 0 + + private let _timerD = SerialDisposable() + private let _refCountDisposable: RefCountDisposable + private let _groupDisposable = CompositeDisposable() + + init(parent: Parent, observer: O, cancel: Disposable) { + _parent = parent + + _groupDisposable.addDisposable(_timerD) + + _refCountDisposable = RefCountDisposable(disposable: _groupDisposable) + super.init(observer: observer, cancel: cancel) + } + + func run() -> Disposable { + + observer?.on(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable())) + createTimer(0) + + _groupDisposable.addDisposable(_parent._source.subscribeSafe(self)) + return _refCountDisposable + } + + func startNewWindowAndCompleteCurrentOne() { + _subject.on(.Completed) + _subject = PublishSubject() + + observer?.on(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable())) + } + + func on(event: Event) { + + var newWindow = false + var newId = 0 + + _lock.performLocked { + switch event { + case .Next(let element): + _subject.on(.Next(element)) + + do { + try incrementChecked(&_count) + } catch (let e) { + _subject.on(.Error(e as ErrorType)) + dispose() + } + + if (_count == _parent._count) { + newWindow = true + _count = 0 + newId = ++_windowId + self.startNewWindowAndCompleteCurrentOne() + } + + case .Error(let error): + _subject.on(.Error(error)) + observer?.on(.Error(error)) + dispose() + case .Completed: + _subject.on(.Completed) + observer?.on(.Completed) + dispose() + } + } + + if newWindow { + createTimer(newId) + } + } + + func createTimer(windowId: Int) { + if _timerD.disposed { + return + } + + if _windowId != windowId { + return + } + + _timerD.disposable = _parent._scheduler.scheduleRelative(windowId, dueTime: _parent._timeSpan) { previousWindowId in + + var newId = 0 + + self._lock.performLocked { + if previousWindowId != self._windowId { + return + } + + self._count = 0 + self._windowId = self._windowId &+ 1 + newId = self._windowId + self.startNewWindowAndCompleteCurrentOne() + } + + self.createTimer(newId) + + return NopDisposable.instance + } + } +} + +class WindowTimeCount : Producer> { + + private let _timeSpan: S.TimeInterval + private let _count: Int + private let _scheduler: S + private let _source: Observable + + init(source: Observable, timeSpan: S.TimeInterval, count: Int, scheduler: S) { + _source = source + _timeSpan = timeSpan + _count = count + _scheduler = scheduler + } + + override func run>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { + let sink = WindowTimeCountSink(parent: self, observer: observer, cancel: cancel) + setSink(sink) + return sink.run() + } +} diff --git a/RxSwift/Observables/Observable+Time.swift b/RxSwift/Observables/Observable+Time.swift index 6917b79a..7b46104a 100644 --- a/RxSwift/Observables/Observable+Time.swift +++ b/RxSwift/Observables/Observable+Time.swift @@ -209,3 +209,22 @@ extension ObservableType { return BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler) } } + +// MARK: window + +extension ObservableType { + + /** + Projects each element of an observable sequence into a window that is completed when either it’s full or a given amount of time has elapsed. + + - parameter timeSpan: Maximum time length of a window. + - parameter count: Maximum element count of a window. + - parameter scheduler: Scheduler to run windowing timers on. + - returns: An observable sequence of windows (instances of `Observable`). + */ + @warn_unused_result(message="http://git.io/rxs.uo") + public func window(timeSpan timeSpan: S.TimeInterval, count: Int, scheduler: S) + -> Observable> { + return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler) + } +} diff --git a/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift b/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift index 8ccc7784..5b7ce9cb 100644 --- a/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift +++ b/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift @@ -18,8 +18,7 @@ class ObservableTimeTest : RxTest { } } -// throttle - +// MARK: Throttle extension ObservableTimeTest { func test_ThrottleTimeSpan_AllPass() { let scheduler = TestScheduler(initialClock: 0) @@ -274,8 +273,7 @@ extension ObservableTimeTest { } } -// sample - +// MARK: Sample extension ObservableTimeTest { func testSample_Sampler_SamplerThrows() { let scheduler = TestScheduler(initialClock: 0) @@ -710,8 +708,7 @@ extension ObservableTimeTest { } } -// interval - +// MARK: Interval extension ObservableTimeTest { func testInterval_TimeSpan_Basic() { @@ -821,8 +818,7 @@ extension ObservableTimeTest { } } -// take - +// MARK: Take extension ObservableTimeTest { func testTake_TakeZero() { @@ -997,8 +993,7 @@ extension ObservableTimeTest { } -// take - +// MARK: Delay Subscription extension ObservableTimeTest { func testDelaySubscription_TimeSpan_Simple() { @@ -1073,7 +1068,7 @@ extension ObservableTimeTest { } } -// skip +// MARK: Skip extension ObservableTimeTest { func testSkip_Zero() { let scheduler = TestScheduler(initialClock: 0) @@ -1183,9 +1178,9 @@ extension ObservableTimeTest { } } -// +// MARK: Buffer extension ObservableTimeTest { - func bufferWithTimeOrCount_Basic() { + func testBufferWithTimeOrCount_Basic() { let scheduler = TestScheduler(initialClock: 0) let xs = scheduler.createHotObservable([ @@ -1221,7 +1216,7 @@ extension ObservableTimeTest { ]) } - func bufferWithTimeOrCount_Error() { + func testBufferWithTimeOrCount_Error() { let scheduler = TestScheduler(initialClock: 0) let xs = scheduler.createHotObservable([ @@ -1256,7 +1251,7 @@ extension ObservableTimeTest { ]) } - func bufferWithTimeOrCount_Disposed() { + func testBufferWithTimeOrCount_Disposed() { let scheduler = TestScheduler(initialClock: 0) let xs = scheduler.createHotObservable([ @@ -1287,7 +1282,7 @@ extension ObservableTimeTest { ]) } - func bufferWithTimeOrCount_Default() { + func testBufferWithTimeOrCount_Default() { let backgroundScheduler = SerialDispatchQueueScheduler(globalConcurrentQueuePriority: .Default) let result = try! range(1, 10, backgroundScheduler) @@ -1299,4 +1294,210 @@ extension ObservableTimeTest { XCTAssertEqual(result!, [4, 5, 6]) } +} + +// MARK: Window +extension ObservableTimeTest { + func testWindowWithTimeOrCount_Basic() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(205, 1), + next(210, 2), + next(240, 3), + next(280, 4), + next(320, 5), + next(350, 6), + next(370, 7), + next(420, 8), + next(470, 9), + completed(600) + ]) + + let res = scheduler.start { () -> Observable in + let window: Observable> = xs.window(timeSpan: 70, count: 3, scheduler: scheduler) + let mappedWithIndex = window.mapWithIndex { (o: Observable, i: Int) -> Observable in + return o.map { (e: Int) -> String in + return "\(i) \(e)" + } + } + let result = mappedWithIndex.merge() + return result + } + + XCTAssertEqual(res.messages, [ + next(205, "0 1"), + next(210, "0 2"), + next(240, "0 3"), + next(280, "1 4"), + next(320, "2 5"), + next(350, "2 6"), + next(370, "2 7"), + next(420, "3 8"), + next(470, "4 9"), + completed(600) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 600) + ]) + } + + func testWindowWithTimeOrCount_Error() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(205, 1), + next(210, 2), + next(240, 3), + next(280, 4), + next(320, 5), + next(350, 6), + next(370, 7), + next(420, 8), + next(470, 9), + error(600, testError) + ]) + + let res = scheduler.start { () -> Observable in + let window: Observable> = xs.window(timeSpan: 70, count: 3, scheduler: scheduler) + let mappedWithIndex = window.mapWithIndex { (o: Observable, i: Int) -> Observable in + return o.map { (e: Int) -> String in + return "\(i) \(e)" + } + } + let result = mappedWithIndex.merge() + return result + } + + XCTAssertEqual(res.messages, [ + next(205, "0 1"), + next(210, "0 2"), + next(240, "0 3"), + next(280, "1 4"), + next(320, "2 5"), + next(350, "2 6"), + next(370, "2 7"), + next(420, "3 8"), + next(470, "4 9"), + error(600, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 600) + ]) + } + + func testWindowWithTimeOrCount_Disposed() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(105, 0), + next(205, 1), + next(210, 2), + next(240, 3), + next(280, 4), + next(320, 5), + next(350, 6), + next(370, 7), + next(420, 8), + next(470, 9), + completed(600) + ]) + + let res = scheduler.start(370) { () -> Observable in + let window: Observable> = xs.window(timeSpan: 70, count: 3, scheduler: scheduler) + let mappedWithIndex = window.mapWithIndex { (o: Observable, i: Int) -> Observable in + return o.map { (e: Int) -> String in + return "\(i) \(e)" + } + } + let result = mappedWithIndex.merge() + return result + } + + XCTAssertEqual(res.messages, [ + next(205, "0 1"), + next(210, "0 2"), + next(240, "0 3"), + next(280, "1 4"), + next(320, "2 5"), + next(350, "2 6"), + next(370, "2 7") + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 370) + ]) + } + + /* + func testWindowWithTimeOrCount_BasicPeriod() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 1), + next(210, 2), + next(240, 3), + next(270, 4), + next(320, 5), + next(360, 6), + next(390, 7), + next(410, 8), + next(460, 9), + next(470, 10), + completed(490) + ]) + + let res = scheduler.start { () -> Observable in + let window: Observable> = xs.window(timeSpan: 100, count: 3, scheduler: scheduler) + let mappedWithIndex = window.mapWithIndex { (o: Observable, i: Int) -> Observable in + return o.map { (e: Int) -> String in + return "\(i) \(e)" + }.concat(just("\(i) end")) + } + let result = mappedWithIndex.merge() + return result + } + + XCTAssertEqual(res.messages, [ + next(210, "0 2"), + next(240, "0 3"), + next(270, "0 4"), + next(300, "0 end"), + next(320, "1 5"), + next(360, "1 6"), + next(390, "1 7"), + next(400, "1 end"), + next(410, "2 8"), + next(460, "2 9"), + next(470, "2 10"), + next(490, "2 end"), + completed(490) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 490) + ]) + + }*/ + + func windowWithTimeOrCount_Default() { + let backgroundScheduler = SerialDispatchQueueScheduler(globalConcurrentQueuePriority: .Default) + + let result = try! range(1, 10, backgroundScheduler) + .window(timeSpan: 1000, count: 3, scheduler: backgroundScheduler) + .mapWithIndex { (o: Observable, i: Int) -> Observable in + return o.map { (e: Int) -> String in + return "\(i) \(e)" + } + } + .merge() + .skip(4) + .toBlocking() + .first() + + XCTAssertEqual(result!, "1 5") + } + } \ No newline at end of file From a99409d6e4761f4b5779ad3e35f483231e874bf0 Mon Sep 17 00:00:00 2001 From: Krunoslav Zaher Date: Sat, 7 Nov 2015 12:56:01 +0100 Subject: [PATCH 5/5] Unification with optimization refactorings. --- RxSwift/Disposables/RefCountDisposable.swift | 48 +++++----- .../Observables/Implementations/AddRef.swift | 20 ++-- .../Observables/Implementations/Buffer.swift | 6 +- .../Observables/Implementations/Window.swift | 93 ++++++++++--------- .../RxSwiftTests/Tests/DisposableTest.swift | 10 +- 5 files changed, 94 insertions(+), 83 deletions(-) diff --git a/RxSwift/Disposables/RefCountDisposable.swift b/RxSwift/Disposables/RefCountDisposable.swift index 4b44b848..53e308d4 100644 --- a/RxSwift/Disposables/RefCountDisposable.swift +++ b/RxSwift/Disposables/RefCountDisposable.swift @@ -22,6 +22,7 @@ public class RefCountDisposable : DisposeBase, Cancelable { */ public var disposed: Bool { get { + _lock.lock(); defer { _lock.unlock() } return _disposable == nil } } @@ -39,21 +40,19 @@ public class RefCountDisposable : DisposeBase, Cancelable { When getter is called, a dependent disposable contributing to the reference count that manages the underlying disposable's lifetime is returned. */ - public var disposable: Disposable { - get { - return _lock.calculateLocked { - if let _ = _disposable { - - do{ - try incrementChecked(&_count) - } catch (_){ - rxFatalError("RefCountDisposable increment failed") - } - - return RefCountInnerDisposable(self) - } else { - return NopDisposable.instance + public func retain() -> Disposable { + return _lock.calculateLocked { + if let _ = _disposable { + + do { + try incrementChecked(&_count) + } catch (_) { + rxFatalError("RefCountDisposable increment failed") } + + return RefCountInnerDisposable(self) + } else { + return NopDisposable.instance } } } @@ -62,33 +61,32 @@ public class RefCountDisposable : DisposeBase, Cancelable { Disposes the underlying disposable only when all dependent disposables have been disposed. */ public func dispose() { - let disposable: Disposable? = _lock.calculateLocked { - if let d = _disposable where !_primaryDisposed + let oldDisposable: Disposable? = _lock.calculateLocked { + if let oldDisposable = _disposable where !_primaryDisposed { _primaryDisposed = true; if (_count == 0) { _disposable = nil - return d + return oldDisposable } } return nil } - if let disposable = disposable { + if let disposable = oldDisposable { disposable.dispose() } } private func release() { - let disposable: Disposable? = _lock.calculateLocked { - if let d = _disposable - { - do{ + let oldDisposable: Disposable? = _lock.calculateLocked { + if let oldDisposable = _disposable { + do { try decrementChecked(&_count) - } catch (_){ + } catch (_) { rxFatalError("RefCountDisposable decrement on release failed") } @@ -98,14 +96,14 @@ public class RefCountDisposable : DisposeBase, Cancelable { if _primaryDisposed && _count == 0 { _disposable = nil - return d + return oldDisposable } } return nil } - if let disposable = disposable { + if let disposable = oldDisposable { disposable.dispose() } } diff --git a/RxSwift/Observables/Implementations/AddRef.swift b/RxSwift/Observables/Implementations/AddRef.swift index 36176dff..50358ec3 100644 --- a/RxSwift/Observables/Implementations/AddRef.swift +++ b/RxSwift/Observables/Implementations/AddRef.swift @@ -11,16 +11,16 @@ import Foundation class AddRefSink : Sink, ObserverType { typealias Element = O.E - override init(observer: O, cancel: Disposable) { - super.init(observer: observer, cancel: cancel) + override init(observer: O) { + super.init(observer: observer) } func on(event: Event) { switch event { case .Next(_): - observer?.on(event) + forwardOn(event) case .Completed, .Error(_): - observer?.on(event) + forwardOn(event) dispose() } } @@ -37,11 +37,11 @@ class AddRef : Producer { _refCount = refCount } - override func run(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let d = StableCompositeDisposable.create(_refCount.disposable, cancel) - - let sink = AddRefSink(observer: observer, cancel: d) - setSink(sink) - return _source.subscribeSafe(sink) + override func run(observer: O) -> Disposable { + let releaseDisposable = _refCount.retain() + let sink = AddRefSink(observer: observer) + sink.disposable = StableCompositeDisposable.create(releaseDisposable, _source.subscribeSafe(sink)) + + return sink } } \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Buffer.swift b/RxSwift/Observables/Implementations/Buffer.swift index b941e64f..bc07a900 100644 --- a/RxSwift/Observables/Implementations/Buffer.swift +++ b/RxSwift/Observables/Implementations/Buffer.swift @@ -99,8 +99,12 @@ class BufferTimeCountSink> : Sink, ObserverType { +class WindowTimeCountSink> + : Sink + , ObserverType + , LockOwnerType + , SynchronizedOnType { typealias Parent = WindowTimeCount typealias E = Element private let _parent: Parent - private let _lock = NSRecursiveLock() + let _lock = NSRecursiveLock() private var _subject = PublishSubject() private var _count = 0 @@ -24,19 +28,19 @@ class WindowTimeCountSink Disposable { - observer?.on(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable())) - createTimer(0) + forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable())) + createTimer(_windowId) _groupDisposable.addDisposable(_parent._source.subscribeSafe(self)) return _refCountDisposable @@ -46,44 +50,45 @@ class WindowTimeCountSink() - observer?.on(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable())) + forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable())) } - + func on(event: Event) { - + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { var newWindow = false var newId = 0 - _lock.performLocked { - switch event { - case .Next(let element): - _subject.on(.Next(element)) - - do { - try incrementChecked(&_count) - } catch (let e) { - _subject.on(.Error(e as ErrorType)) - dispose() - } - - if (_count == _parent._count) { - newWindow = true - _count = 0 - newId = ++_windowId - self.startNewWindowAndCompleteCurrentOne() - } - - case .Error(let error): - _subject.on(.Error(error)) - observer?.on(.Error(error)) - dispose() - case .Completed: - _subject.on(.Completed) - observer?.on(.Completed) + switch event { + case .Next(let element): + _subject.on(.Next(element)) + + do { + try incrementChecked(&_count) + } catch (let e) { + _subject.on(.Error(e as ErrorType)) dispose() } + + if (_count == _parent._count) { + newWindow = true + _count = 0 + newId = ++_windowId + self.startNewWindowAndCompleteCurrentOne() + } + + case .Error(let error): + _subject.on(.Error(error)) + forwardOn(.Error(error)) + dispose() + case .Completed: + _subject.on(.Completed) + forwardOn(.Completed) + dispose() } - + if newWindow { createTimer(newId) } @@ -97,8 +102,12 @@ class WindowTimeCountSink : Producer> _scheduler = scheduler } - override func run>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable { - let sink = WindowTimeCountSink(parent: self, observer: observer, cancel: cancel) - setSink(sink) - return sink.run() + override func run>(observer: O) -> Disposable { + let sink = WindowTimeCountSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink } } diff --git a/RxTests/RxSwiftTests/Tests/DisposableTest.swift b/RxTests/RxSwiftTests/Tests/DisposableTest.swift index 1b56f12e..5ed560b8 100644 --- a/RxTests/RxSwiftTests/Tests/DisposableTest.swift +++ b/RxTests/RxSwiftTests/Tests/DisposableTest.swift @@ -130,8 +130,8 @@ class DisposableTest : RxTest { XCTAssertEqual(r.disposed, false) - let d1 = r.disposable; - let d2 = r.disposable; + let d1 = r.retain() + let d2 = r.retain() XCTAssertEqual(d.disposed, false) @@ -144,7 +144,7 @@ class DisposableTest : RxTest { r.dispose() XCTAssertEqual(d.disposed, true) - let d3 = r.disposable; + let d3 = r.retain(); d3.dispose() } @@ -154,8 +154,8 @@ class DisposableTest : RxTest { XCTAssertEqual(r.disposed, false) - let d1 = r.disposable; - let d2 = r.disposable; + let d1 = r.retain() + let d2 = r.retain() XCTAssertEqual(d.disposed, false)