diff --git a/README.md b/README.md index 2f475f0e..1a8b0aa6 100644 --- a/README.md +++ b/README.md @@ -480,7 +480,7 @@ $ pod install Add this to `Cartfile` ``` -git "git@github.com:ReactiveX/RxSwift.git" "2.0.0-beta.2" +github "ReactiveX/RxSwift" "2.0.0-beta.2" ``` ``` diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index d8557ff4..91cab0be 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -358,6 +358,10 @@ C88254341B8A752B00B02D69 /* UITableView+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88254121B8A752B00B02D69 /* UITableView+Rx.swift */; }; C88254351B8A752B00B02D69 /* UITextField+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88254131B8A752B00B02D69 /* UITextField+Rx.swift */; }; C88254361B8A752B00B02D69 /* UITextView+Rx.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88254141B8A752B00B02D69 /* UITextView+Rx.swift */; }; + C88E296B1BEB712E001CCB92 /* RunLoopLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */; }; + C88E296C1BEB712E001CCB92 /* RunLoopLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */; }; + C88E296D1BEB712E001CCB92 /* RunLoopLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */; }; + C88E296E1BEB712E001CCB92 /* RunLoopLock.swift in Sources */ = {isa = PBXBuildFile; fileRef = C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */; }; C8941BDF1BD5695C00A0E874 /* BlockingObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8941BDE1BD5695C00A0E874 /* BlockingObservable.swift */; }; C8941BE01BD5695C00A0E874 /* BlockingObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8941BDE1BD5695C00A0E874 /* BlockingObservable.swift */; }; C8941BE11BD5695C00A0E874 /* BlockingObservable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8941BDE1BD5695C00A0E874 /* BlockingObservable.swift */; }; @@ -560,6 +564,22 @@ C8F0C0441BBBFBB9001B112F /* _RXSwizzling.h in Headers */ = {isa = PBXBuildFile; fileRef = C8093E881B8A732E0088E94D /* _RXSwizzling.h */; settings = {ATTRIBUTES = (Public, ); }; }; C8F0C0451BBBFBB9001B112F /* _RXKVOObserver.h in Headers */ = {isa = PBXBuildFile; fileRef = C8093E861B8A732E0088E94D /* _RXKVOObserver.h */; settings = {ATTRIBUTES = (Public, ); }; }; C8F0C04F1BBBFBCE001B112F /* ObservableConvertibleType+Blocking.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093F581B8A73A20088E94D /* ObservableConvertibleType+Blocking.swift */; }; + CB883B3B1BE24355000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3A1BE24355000AC2EE /* Window.swift */; }; + CB883B3C1BE24355000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3A1BE24355000AC2EE /* Window.swift */; }; + CB883B3D1BE24355000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3A1BE24355000AC2EE /* Window.swift */; }; + CB883B3E1BE24355000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3A1BE24355000AC2EE /* Window.swift */; }; + CB883B401BE24C15000AC2EE /* RefCountDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3F1BE24C15000AC2EE /* RefCountDisposable.swift */; }; + CB883B411BE24C15000AC2EE /* RefCountDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3F1BE24C15000AC2EE /* RefCountDisposable.swift */; }; + CB883B421BE24C15000AC2EE /* RefCountDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3F1BE24C15000AC2EE /* RefCountDisposable.swift */; }; + CB883B431BE24C15000AC2EE /* RefCountDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3F1BE24C15000AC2EE /* RefCountDisposable.swift */; }; + CB883B451BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; }; + CB883B461BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; }; + CB883B471BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; }; + CB883B481BE256D4000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */; }; + CB883B4A1BE369AA000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B491BE369AA000AC2EE /* AddRef.swift */; }; + CB883B4B1BE369AA000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B491BE369AA000AC2EE /* AddRef.swift */; }; + CB883B4C1BE369AA000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B491BE369AA000AC2EE /* AddRef.swift */; }; + CB883B4D1BE369AA000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B491BE369AA000AC2EE /* AddRef.swift */; }; CBEE771F1BD649A000AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE771E1BD649A000AD584C /* ToArray.swift */; }; CBEE77201BD649A000AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE771E1BD649A000AD584C /* ToArray.swift */; }; CBEE77211BD649A000AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE771E1BD649A000AD584C /* ToArray.swift */; }; @@ -1005,6 +1025,7 @@ C88254131B8A752B00B02D69 /* UITextField+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UITextField+Rx.swift"; sourceTree = ""; }; C88254141B8A752B00B02D69 /* UITextView+Rx.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "UITextView+Rx.swift"; sourceTree = ""; }; C88BB8711B07E5ED0064D411 /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; }; + C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RunLoopLock.swift; sourceTree = ""; }; C8941BDE1BD5695C00A0E874 /* BlockingObservable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BlockingObservable.swift; sourceTree = ""; }; C8941BE31BD56B0700A0E874 /* BlockingObservable+Operators.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "BlockingObservable+Operators.swift"; sourceTree = ""; }; C89CDB351BCB0DD7002063D9 /* ShareReplay1.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = ShareReplay1.swift; sourceTree = ""; xcLanguageSpecificationIdentifier = xcode.lang.swift; }; @@ -1021,6 +1042,10 @@ C8F0C0021BBBFB8B001B112F /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; }; C8F0C04B1BBBFBB9001B112F /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; }; C8F0C0581BBBFBCE001B112F /* RxBlocking.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; }; + CB883B3A1BE24355000AC2EE /* Window.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Window.swift; sourceTree = ""; }; + CB883B3F1BE24C15000AC2EE /* RefCountDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RefCountDisposable.swift; sourceTree = ""; }; + CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BooleanDisposable.swift; sourceTree = ""; }; + CB883B491BE369AA000AC2EE /* AddRef.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AddRef.swift; sourceTree = ""; }; CBEE771E1BD649A000AD584C /* ToArray.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ToArray.swift; sourceTree = ""; }; CB255BD61BC46A9C00798A4C /* RetryWhen.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RetryWhen.swift; sourceTree = ""; }; D2138C751BB9BE9800339B5C /* RxCocoa.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxCocoa.framework; sourceTree = BUILT_PRODUCTS_DIR; }; @@ -1181,6 +1206,7 @@ children = ( C8093C541B8A72BE0088E94D /* AnonymousDisposable.swift */, C8093C551B8A72BE0088E94D /* BinaryDisposable.swift */, + CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */, C8093C571B8A72BE0088E94D /* CompositeDisposable.swift */, C8093C581B8A72BE0088E94D /* DisposeBag.swift */, C8093C591B8A72BE0088E94D /* DisposeBase.swift */, @@ -1188,6 +1214,7 @@ C8093C5A1B8A72BE0088E94D /* NAryDisposable.swift */, C8093C5B1B8A72BE0088E94D /* NAryDisposable.tt */, C8093C5C1B8A72BE0088E94D /* NopDisposable.swift */, + CB883B3F1BE24C15000AC2EE /* RefCountDisposable.swift */, C8093C5D1B8A72BE0088E94D /* ScheduledDisposable.swift */, C8093C5E1B8A72BE0088E94D /* ScopedDisposable.swift */, C8093C5F1B8A72BE0088E94D /* SerialDisposable.swift */, @@ -1217,8 +1244,7 @@ C8093C6A1B8A72BE0088E94D /* Implementations */ = { isa = PBXGroup; children = ( - C84CC53F1BDC3B3700E06A64 /* ElementAt.swift */, - D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */, + CB883B491BE369AA000AC2EE /* AddRef.swift */, C8093C6B1B8A72BE0088E94D /* Amb.swift */, C8C3DA111B93A3EA004D233E /* AnonymousObservable.swift */, C821DBA11BA4DCAB008F3809 /* Buffer.swift */, @@ -1234,6 +1260,7 @@ C8093C771B8A72BE0088E94D /* DelaySubscription.swift */, C8093C781B8A72BE0088E94D /* DistinctUntilChanged.swift */, C8093C791B8A72BE0088E94D /* Do.swift */, + C84CC53F1BDC3B3700E06A64 /* ElementAt.swift */, C8C3DA051B9393AC004D233E /* Empty.swift */, C8C3DA081B93941E004D233E /* FailWith.swift */, C8093C7A1B8A72BE0088E94D /* Filter.swift */, @@ -1258,6 +1285,7 @@ C8093C881B8A72BE0088E94D /* Sink.swift */, C8093C891B8A72BE0088E94D /* Skip.swift */, D285BAC31BC0231000B3F602 /* SkipUntil.swift */, + D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */, C8093C8A1B8A72BE0088E94D /* StartWith.swift */, C8093C8B1B8A72BE0088E94D /* SubscribeOn.swift */, C8093C8C1B8A72BE0088E94D /* Switch.swift */, @@ -1268,12 +1296,13 @@ C8093C901B8A72BE0088E94D /* Throttle.swift */, C8093C911B8A72BE0088E94D /* Timer.swift */, CBEE771E1BD649A000AD584C /* ToArray.swift */, + D235B23D1BD003DD007E84DA /* Using.swift */, + CB883B3A1BE24355000AC2EE /* Window.swift */, + D2245A1A1BD5657300E7146F /* WithLatestFrom.swift */, C8093C941B8A72BE0088E94D /* Zip.swift */, C8093C921B8A72BE0088E94D /* Zip+arity.swift */, C8093C931B8A72BE0088E94D /* Zip+arity.tt */, C8C3D9FD1B935EDF004D233E /* Zip+CollectionType.swift */, - D22B6D251BC8504A00BCE0AB /* SkipWhile.swift */, - D235B23D1BD003DD007E84DA /* Using.swift */, ); path = Implementations; sourceTree = ""; @@ -1406,6 +1435,7 @@ C8093F581B8A73A20088E94D /* ObservableConvertibleType+Blocking.swift */, C8941BDE1BD5695C00A0E874 /* BlockingObservable.swift */, C8941BE31BD56B0700A0E874 /* BlockingObservable+Operators.swift */, + C88E296A1BEB712E001CCB92 /* RunLoopLock.swift */, C8093F591B8A73A20088E94D /* README.md */, ); path = RxBlocking; @@ -2127,6 +2157,7 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( + C88E296B1BEB712E001CCB92 /* RunLoopLock.swift in Sources */, C8941BDF1BD5695C00A0E874 /* BlockingObservable.swift in Sources */, C8941BE41BD56B0700A0E874 /* BlockingObservable+Operators.swift in Sources */, C8093F5E1B8A73A20088E94D /* ObservableConvertibleType+Blocking.swift in Sources */, @@ -2137,6 +2168,7 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( + C88E296C1BEB712E001CCB92 /* RunLoopLock.swift in Sources */, C8941BE01BD5695C00A0E874 /* BlockingObservable.swift in Sources */, C8941BE51BD56B0700A0E874 /* BlockingObservable+Operators.swift in Sources */, C8093F5F1B8A73A20088E94D /* ObservableConvertibleType+Blocking.swift in Sources */, @@ -2162,6 +2194,7 @@ C8093CF01B8A72BE0088E94D /* StableCompositeDisposable.swift in Sources */, D2245A1C1BD63C4600E7146F /* WithLatestFrom.swift in Sources */, C8093D4E1B8A72BE0088E94D /* Zip+arity.swift in Sources */, + CB883B3C1BE24355000AC2EE /* Window.swift in Sources */, C8093D4C1B8A72BE0088E94D /* Timer.swift in Sources */, C8C3DA071B9393AC004D233E /* Empty.swift in Sources */, CB255BD81BC46A9C00798A4C /* RetryWhen.swift in Sources */, @@ -2169,6 +2202,7 @@ C8093D3A1B8A72BE0088E94D /* Sink.swift in Sources */, C8093D461B8A72BE0088E94D /* TakeUntil.swift in Sources */, C8093D941B8A72BE0088E94D /* MainScheduler.swift in Sources */, + CB883B461BE256D4000AC2EE /* BooleanDisposable.swift in Sources */, C84B38EF1BA433CD001B7D88 /* Generate.swift in Sources */, C8093D161B8A72BE0088E94D /* Deferred.swift in Sources */, C8093DA41B8A72BE0088E94D /* ReplaySubject.swift in Sources */, @@ -2192,6 +2226,7 @@ C8093CD81B8A72BE0088E94D /* BinaryDisposable.swift in Sources */, C89CDB371BCB0DD7002063D9 /* ShareReplay1.swift in Sources */, C8093D2A1B8A72BE0088E94D /* ObserveOn.swift in Sources */, + CB883B411BE24C15000AC2EE /* RefCountDisposable.swift in Sources */, C8093D361B8A72BE0088E94D /* Sample.swift in Sources */, C84CC54F1BDCF48200E06A64 /* LockOwnerType.swift in Sources */, D2752D621BC5551A0070C418 /* SkipUntil.swift in Sources */, @@ -2249,6 +2284,7 @@ C8640A041BA5B12A00D3C4E8 /* Repeat.swift in Sources */, C8093CF41B8A72BE0088E94D /* Error.swift in Sources */, C8093D141B8A72BE0088E94D /* Debug.swift in Sources */, + CB883B4B1BE369AA000AC2EE /* AddRef.swift in Sources */, C8093CCE1B8A72BE0088E94D /* Bag.swift in Sources */, C8093D301B8A72BE0088E94D /* Producer.swift in Sources */, C8093CF81B8A72BE0088E94D /* ImmediateSchedulerType.swift in Sources */, @@ -2291,6 +2327,7 @@ C8093CEF1B8A72BE0088E94D /* StableCompositeDisposable.swift in Sources */, D2245A1B1BD5657300E7146F /* WithLatestFrom.swift in Sources */, C8093D4D1B8A72BE0088E94D /* Zip+arity.swift in Sources */, + CB883B3B1BE24355000AC2EE /* Window.swift in Sources */, C8093D4B1B8A72BE0088E94D /* Timer.swift in Sources */, C8C3DA061B9393AC004D233E /* Empty.swift in Sources */, CB255BD71BC46A9C00798A4C /* RetryWhen.swift in Sources */, @@ -2298,6 +2335,7 @@ C8093D391B8A72BE0088E94D /* Sink.swift in Sources */, C8093D451B8A72BE0088E94D /* TakeUntil.swift in Sources */, C8093D931B8A72BE0088E94D /* MainScheduler.swift in Sources */, + CB883B451BE256D4000AC2EE /* BooleanDisposable.swift in Sources */, C84B38EE1BA433CD001B7D88 /* Generate.swift in Sources */, C8093D151B8A72BE0088E94D /* Deferred.swift in Sources */, C8093DA31B8A72BE0088E94D /* ReplaySubject.swift in Sources */, @@ -2321,6 +2359,7 @@ C8093CD71B8A72BE0088E94D /* BinaryDisposable.swift in Sources */, C89CDB361BCB0DD7002063D9 /* ShareReplay1.swift in Sources */, C8093D291B8A72BE0088E94D /* ObserveOn.swift in Sources */, + CB883B401BE24C15000AC2EE /* RefCountDisposable.swift in Sources */, C8093D351B8A72BE0088E94D /* Sample.swift in Sources */, C84CC54E1BDCF48200E06A64 /* LockOwnerType.swift in Sources */, D285BAC41BC0231000B3F602 /* SkipUntil.swift in Sources */, @@ -2378,6 +2417,7 @@ C8640A031BA5B12A00D3C4E8 /* Repeat.swift in Sources */, C8093CF31B8A72BE0088E94D /* Error.swift in Sources */, C8093D131B8A72BE0088E94D /* Debug.swift in Sources */, + CB883B4A1BE369AA000AC2EE /* AddRef.swift in Sources */, C8093CCD1B8A72BE0088E94D /* Bag.swift in Sources */, C8093D2F1B8A72BE0088E94D /* Producer.swift in Sources */, C8093CF71B8A72BE0088E94D /* ImmediateSchedulerType.swift in Sources */, @@ -2420,6 +2460,7 @@ C8F0BF9C1BBBFB8B001B112F /* StableCompositeDisposable.swift in Sources */, D2245A1E1BD63C4A00E7146F /* WithLatestFrom.swift in Sources */, C8F0BF9D1BBBFB8B001B112F /* Zip+arity.swift in Sources */, + CB883B3E1BE24355000AC2EE /* Window.swift in Sources */, C8F0BF9E1BBBFB8B001B112F /* Timer.swift in Sources */, C8F0BF9F1BBBFB8B001B112F /* Empty.swift in Sources */, CB255BDA1BC46A9C00798A4C /* RetryWhen.swift in Sources */, @@ -2427,6 +2468,7 @@ C8F0BFA11BBBFB8B001B112F /* Sink.swift in Sources */, C8F0BFA21BBBFB8B001B112F /* TakeUntil.swift in Sources */, C8F0BFA31BBBFB8B001B112F /* MainScheduler.swift in Sources */, + CB883B481BE256D4000AC2EE /* BooleanDisposable.swift in Sources */, C8F0BFA41BBBFB8B001B112F /* Generate.swift in Sources */, C8F0BFA51BBBFB8B001B112F /* Deferred.swift in Sources */, C8F0BFA61BBBFB8B001B112F /* ReplaySubject.swift in Sources */, @@ -2450,6 +2492,7 @@ C8F0BFB61BBBFB8B001B112F /* BinaryDisposable.swift in Sources */, C89CDB391BCB0DD7002063D9 /* ShareReplay1.swift in Sources */, C8F0BFB71BBBFB8B001B112F /* ObserveOn.swift in Sources */, + CB883B431BE24C15000AC2EE /* RefCountDisposable.swift in Sources */, C8F0BFB81BBBFB8B001B112F /* Sample.swift in Sources */, C84CC5511BDCF48200E06A64 /* LockOwnerType.swift in Sources */, D21C29311BC6A1C300448E70 /* SkipUntil.swift in Sources */, @@ -2507,6 +2550,7 @@ C8F0BFE71BBBFB8B001B112F /* Repeat.swift in Sources */, C8F0BFE81BBBFB8B001B112F /* Error.swift in Sources */, C8F0BFE91BBBFB8B001B112F /* Debug.swift in Sources */, + CB883B4D1BE369AA000AC2EE /* AddRef.swift in Sources */, C8F0BFEA1BBBFB8B001B112F /* Bag.swift in Sources */, C8F0BFEB1BBBFB8B001B112F /* Producer.swift in Sources */, C8F0BFEC1BBBFB8B001B112F /* ImmediateSchedulerType.swift in Sources */, @@ -2604,6 +2648,7 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( + C88E296E1BEB712E001CCB92 /* RunLoopLock.swift in Sources */, C8941BE21BD5695C00A0E874 /* BlockingObservable.swift in Sources */, C8941BE71BD56B0700A0E874 /* BlockingObservable+Operators.swift in Sources */, C8F0C04F1BBBFBCE001B112F /* ObservableConvertibleType+Blocking.swift in Sources */, @@ -2699,6 +2744,7 @@ D2EBEB1B1BB9B6C1003A27DC /* Repeat.swift in Sources */, D2245A1D1BD63C4700E7146F /* WithLatestFrom.swift in Sources */, D2EBEAF81BB9B6B2003A27DC /* ScopedDisposable.swift in Sources */, + CB883B3D1BE24355000AC2EE /* Window.swift in Sources */, D2EBEAEA1BB9B697003A27DC /* SchedulerType.swift in Sources */, D2EBEB031BB9B6C1003A27DC /* CombineLatest+CollectionType.swift in Sources */, CB255BD91BC46A9C00798A4C /* RetryWhen.swift in Sources */, @@ -2706,6 +2752,7 @@ D2EBEAE41BB9B697003A27DC /* ObservableType.swift in Sources */, D2EBEB331BB9B6CA003A27DC /* Observable+Time.swift in Sources */, D2EBEB191BB9B6C1003A27DC /* Reduce.swift in Sources */, + CB883B471BE256D4000AC2EE /* BooleanDisposable.swift in Sources */, D2EBEB001BB9B6BA003A27DC /* Catch.swift in Sources */, D2EBEB161BB9B6C1003A27DC /* ObserveOnSerialDispatchQueue.swift in Sources */, D2EBEB061BB9B6C1003A27DC /* Debug.swift in Sources */, @@ -2729,6 +2776,7 @@ D2EBEB091BB9B6C1003A27DC /* DistinctUntilChanged.swift in Sources */, D2EBEB2A1BB9B6C5003A27DC /* Zip+CollectionType.swift in Sources */, C89CDB381BCB0DD7002063D9 /* ShareReplay1.swift in Sources */, + CB883B421BE24C15000AC2EE /* RefCountDisposable.swift in Sources */, D2EBEB401BB9B6DE003A27DC /* BehaviorSubject.swift in Sources */, C84CC5501BDCF48200E06A64 /* LockOwnerType.swift in Sources */, D2EBEB271BB9B6C1003A27DC /* Timer.swift in Sources */, @@ -2786,6 +2834,7 @@ D2EBEB3D1BB9B6D8003A27DC /* SchedulerServices+Emulation.swift in Sources */, D2EBEB1C1BB9B6C1003A27DC /* Sample.swift in Sources */, D2EBEAFD1BB9B6BA003A27DC /* AnonymousObservable.swift in Sources */, + CB883B4C1BE369AA000AC2EE /* AddRef.swift in Sources */, D2EBEAFA1BB9B6B2003A27DC /* SingleAssignmentDisposable.swift in Sources */, D2EBEAF31BB9B6AE003A27DC /* DisposeBag.swift in Sources */, D2EBEAED1BB9B6A4003A27DC /* Bag.swift in Sources */, @@ -2813,6 +2862,7 @@ isa = PBXSourcesBuildPhase; buildActionMask = 2147483647; files = ( + C88E296D1BEB712E001CCB92 /* RunLoopLock.swift in Sources */, C8941BE11BD5695C00A0E874 /* BlockingObservable.swift in Sources */, C8941BE61BD56B0700A0E874 /* BlockingObservable+Operators.swift in Sources */, D2EBEB8A1BB9B9EE003A27DC /* ObservableConvertibleType+Blocking.swift in Sources */, diff --git a/RxBlocking/BlockingObservable+Operators.swift b/RxBlocking/BlockingObservable+Operators.swift index 76e816f0..3f05801c 100644 --- a/RxBlocking/BlockingObservable+Operators.swift +++ b/RxBlocking/BlockingObservable+Operators.swift @@ -13,44 +13,39 @@ import Foundation extension BlockingObservable { /** - Blocks current thread until sequence terminates. - - If sequence terminates with error, terminating error will be thrown. - - - returns: All elements of sequence. - */ - public func toArray() throws -> [E] { - let condition = NSCondition() - - var elements: [E] = Array() - - var error: ErrorType? - - var ended = false + Blocks current thread until sequence terminates. - _ = self.source.subscribe { e in - switch e { - case .Next(let element): - elements.append(element) - case .Error(let e): - error = e - condition.lock() - ended = true - condition.signal() - condition.unlock() - case .Completed: - condition.lock() - ended = true - condition.signal() - condition.unlock() + If sequence terminates with error, terminating error will be thrown. + + - returns: All elements of sequence. + */ + public func toArray() throws -> [E] { + var elements: [E] = Array() + + var error: ErrorType? + + let lock = RunLoopLock() + + let d = SingleAssignmentDisposable() + + lock.dispatch { + d.disposable = self.source.subscribe { e in + switch e { + case .Next(let element): + elements.append(element) + case .Error(let e): + error = e + lock.stop() + case .Completed: + lock.stop() + } } } - condition.lock() - while !ended { - condition.wait() - } - condition.unlock() - + + lock.run() + + d.dispose() + if let error = error { throw error } @@ -61,99 +56,87 @@ extension BlockingObservable { extension BlockingObservable { /** - Blocks current thread until sequence produces first element. - - If sequence terminates with error before producing first element, terminating error will be thrown. - - - returns: First element of sequence. If sequence is empty `nil` is returned. - */ + Blocks current thread until sequence produces first element. + + If sequence terminates with error before producing first element, terminating error will be thrown. + + - returns: First element of sequence. If sequence is empty `nil` is returned. + */ public func first() throws -> E? { - let condition = NSCondition() - var element: E? - + var error: ErrorType? - - var ended = false - + let d = SingleAssignmentDisposable() - - d.disposable = self.source.subscribe { e in - switch e { - case .Next(let e): - if element == nil { - element = e + + let lock = RunLoopLock() + + lock.dispatch { + d.disposable = self.source.subscribe { e in + switch e { + case .Next(let e): + if element == nil { + element = e + } + break + case .Error(let e): + error = e + default: + break } - break - case .Error(let e): - error = e - default: - break + + lock.stop() } - - condition.lock() - ended = true - condition.signal() - condition.unlock() - } - - condition.lock() - while !ended { - condition.wait() } + + lock.run() + d.dispose() - condition.unlock() - + if let error = error { throw error } - + return element } } extension BlockingObservable { /** - Blocks current thread until sequence terminates. - - If sequence terminates with error, terminating error will be thrown. - - - returns: Last element in the sequence. If sequence is empty `nil` is returned. - */ + Blocks current thread until sequence terminates. + + If sequence terminates with error, terminating error will be thrown. + + - returns: Last element in the sequence. If sequence is empty `nil` is returned. + */ public func last() throws -> E? { - let condition = NSCondition() - var element: E? - + var error: ErrorType? - - var ended = false - + let d = SingleAssignmentDisposable() - - d.disposable = self.source.subscribe { e in - switch e { - case .Next(let e): - element = e - return - case .Error(let e): - error = e - default: - break + + let lock = RunLoopLock() + + lock.dispatch { + d.disposable = self.source.subscribe { e in + switch e { + case .Next(let e): + element = e + return + case .Error(let e): + error = e + default: + break + } + + lock.stop() } - - condition.lock() - ended = true - condition.signal() - condition.unlock() } - condition.lock() - while !ended { - condition.wait() - } + lock.run() + d.dispose() - condition.unlock() if let error = error { throw error diff --git a/RxBlocking/RunLoopLock.swift b/RxBlocking/RunLoopLock.swift new file mode 100644 index 00000000..5ea9ba73 --- /dev/null +++ b/RxBlocking/RunLoopLock.swift @@ -0,0 +1,33 @@ +// +// RunLoopLock.swift +// Rx +// +// Created by Krunoslav Zaher on 11/5/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +class RunLoopLock : NSObject { + let currentRunLoop: CFRunLoopRef + + override init() { + currentRunLoop = CFRunLoopGetCurrent() + } + + func dispatch(action: () -> ()) { + CFRunLoopPerformBlock(currentRunLoop, kCFRunLoopDefaultMode, action) + CFRunLoopWakeUp(currentRunLoop) + } + + func stop() { + CFRunLoopPerformBlock(currentRunLoop, kCFRunLoopDefaultMode) { + CFRunLoopStop(self.currentRunLoop) + } + CFRunLoopWakeUp(currentRunLoop) + } + + func run() { + CFRunLoopRun() + } +} diff --git a/RxExample/RxExample.xcodeproj/project.pbxproj b/RxExample/RxExample.xcodeproj/project.pbxproj index 02b11de6..fcb9a82f 100644 --- a/RxExample/RxExample.xcodeproj/project.pbxproj +++ b/RxExample/RxExample.xcodeproj/project.pbxproj @@ -343,6 +343,10 @@ C8DF92EB1B0B38C0009BCF9A /* Images.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = C8DF92E91B0B38C0009BCF9A /* Images.xcassets */; }; C8DF92F61B0B43A4009BCF9A /* IntroductionExampleViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8DF92F51B0B43A4009BCF9A /* IntroductionExampleViewController.swift */; }; C8E9D2AF1BD3FD960079D0DB /* ActivityIndicator.swift in Sources */ = {isa = PBXBuildFile; fileRef = C80397391BD3E17D009D8B26 /* ActivityIndicator.swift */; }; + CB883B501BE3AC54000AC2EE /* BooleanDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B4E1BE3AC54000AC2EE /* BooleanDisposable.swift */; }; + CB883B511BE3AC54000AC2EE /* RefCountDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B4F1BE3AC54000AC2EE /* RefCountDisposable.swift */; }; + CB883B601BE3AC72000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B5E1BE3AC72000AC2EE /* Window.swift */; }; + CB883B611BE3AC72000AC2EE /* AddRef.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B5F1BE3AC72000AC2EE /* AddRef.swift */; }; CBEE77541BD8C7B700AD584C /* ToArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = CBEE77531BD8C7B700AD584C /* ToArray.swift */; }; D2245A191BD5654C00E7146F /* WithLatestFrom.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */; }; D2AF91981BD3D95900A008C1 /* Using.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2AF91881BD2C51900A008C1 /* Using.swift */; }; @@ -733,6 +737,10 @@ C8DF92F01B0B3E67009BCF9A /* Info-OSX.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; path = "Info-OSX.plist"; sourceTree = ""; }; C8DF92F21B0B3E71009BCF9A /* Info-iOS.plist */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.plist.xml; path = "Info-iOS.plist"; sourceTree = ""; }; C8DF92F51B0B43A4009BCF9A /* IntroductionExampleViewController.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = IntroductionExampleViewController.swift; sourceTree = ""; xcLanguageSpecificationIdentifier = xcode.lang.swift; }; + CB883B4E1BE3AC54000AC2EE /* BooleanDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BooleanDisposable.swift; sourceTree = ""; }; + CB883B4F1BE3AC54000AC2EE /* RefCountDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RefCountDisposable.swift; sourceTree = ""; }; + CB883B5E1BE3AC72000AC2EE /* Window.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Window.swift; sourceTree = ""; }; + CB883B5F1BE3AC72000AC2EE /* AddRef.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AddRef.swift; sourceTree = ""; }; CBEE77531BD8C7B700AD584C /* ToArray.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ToArray.swift; sourceTree = ""; }; D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WithLatestFrom.swift; sourceTree = ""; }; D2AF91881BD2C51900A008C1 /* Using.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Using.swift; sourceTree = ""; }; @@ -1105,12 +1113,14 @@ C84CC5831BDD484400E06A64 /* SubscriptionDisposable.swift */, C89464331BC6C2B00055219D /* AnonymousDisposable.swift */, C89464341BC6C2B00055219D /* BinaryDisposable.swift */, + CB883B4E1BE3AC54000AC2EE /* BooleanDisposable.swift */, C89464351BC6C2B00055219D /* CompositeDisposable.swift */, C89464361BC6C2B00055219D /* DisposeBag.swift */, C89464371BC6C2B00055219D /* DisposeBase.swift */, C89464381BC6C2B00055219D /* NAryDisposable.swift */, C89464391BC6C2B00055219D /* NAryDisposable.tt */, C894643A1BC6C2B00055219D /* NopDisposable.swift */, + CB883B4F1BE3AC54000AC2EE /* RefCountDisposable.swift */, C894643B1BC6C2B00055219D /* ScheduledDisposable.swift */, C894643C1BC6C2B00055219D /* ScopedDisposable.swift */, C894643D1BC6C2B00055219D /* SerialDisposable.swift */, @@ -1140,7 +1150,7 @@ C89464481BC6C2B00055219D /* Implementations */ = { isa = PBXGroup; children = ( - C84CC52D1BDC344100E06A64 /* ElementAt.swift */, + CB883B5F1BE3AC72000AC2EE /* AddRef.swift */, C89464491BC6C2B00055219D /* Amb.swift */, C894644A1BC6C2B00055219D /* AnonymousObservable.swift */, C894644C1BC6C2B00055219D /* Buffer.swift */, @@ -1156,6 +1166,7 @@ C89464561BC6C2B00055219D /* DelaySubscription.swift */, C89464571BC6C2B00055219D /* DistinctUntilChanged.swift */, C89464581BC6C2B00055219D /* Do.swift */, + C84CC52D1BDC344100E06A64 /* ElementAt.swift */, C89464591BC6C2B00055219D /* Empty.swift */, C894645A1BC6C2B00055219D /* FailWith.swift */, C894645B1BC6C2B00055219D /* Filter.swift */, @@ -1191,11 +1202,12 @@ C89464751BC6C2B00055219D /* Timer.swift */, CBEE77531BD8C7B700AD584C /* ToArray.swift */, D2AF91881BD2C51900A008C1 /* Using.swift */, + CB883B5E1BE3AC72000AC2EE /* Window.swift */, + D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */, C89464791BC6C2B00055219D /* Zip.swift */, C89464761BC6C2B00055219D /* Zip+arity.swift */, C89464771BC6C2B00055219D /* Zip+arity.tt */, C89464781BC6C2B00055219D /* Zip+CollectionType.swift */, - D2245A0B1BD564A700E7146F /* WithLatestFrom.swift */, ); path = Implementations; sourceTree = ""; @@ -1735,6 +1747,7 @@ C89465881BC6C2BC0055219D /* RxScrollViewDelegateProxy.swift in Sources */, C89464B71BC6C2B00055219D /* Observable+Extensions.swift in Sources */, C89464A01BC6C2B00055219D /* Lock.swift in Sources */, + CB883B601BE3AC72000AC2EE /* Window.swift in Sources */, C89464C91BC6C2B00055219D /* Do.swift in Sources */, C89464A41BC6C2B00055219D /* Queue.swift in Sources */, C89464B91BC6C2B00055219D /* ObservableConvertibleType.swift in Sources */, @@ -1855,11 +1868,13 @@ C894656E1BC6C2BC0055219D /* DelegateProxy.swift in Sources */, C89464EF1BC6C2B00055219D /* Observable+Debug.swift in Sources */, C89464E91BC6C2B00055219D /* Zip+CollectionType.swift in Sources */, + CB883B511BE3AC54000AC2EE /* RefCountDisposable.swift in Sources */, C89465761BC6C2BC0055219D /* KVOObserver.swift in Sources */, C89465641BC6C2BC0055219D /* _RXSwizzling.m in Sources */, C89464CA1BC6C2B00055219D /* Empty.swift in Sources */, C803973B1BD3E17D009D8B26 /* ActivityIndicator.swift in Sources */, C89464C61BC6C2B00055219D /* Deferred.swift in Sources */, + CB883B611BE3AC72000AC2EE /* AddRef.swift in Sources */, D2AF91981BD3D95900A008C1 /* Using.swift in Sources */, C8297E501B6CF905000589EA /* TableViewController.swift in Sources */, C8297E511B6CF905000589EA /* PartialUpdatesViewController.swift in Sources */, @@ -1889,6 +1904,7 @@ C89465711BC6C2BC0055219D /* Observable+Bind.swift in Sources */, C89464B01BC6C2B00055219D /* SerialDisposable.swift in Sources */, C89464A21BC6C2B00055219D /* Bag.swift in Sources */, + CB883B501BE3AC54000AC2EE /* BooleanDisposable.swift in Sources */, C894657B1BC6C2BC0055219D /* RxCLLocationManagerDelegateProxy.swift in Sources */, C8297E581B6CF905000589EA /* User.swift in Sources */, C89464B41BC6C2B00055219D /* Event.swift in Sources */, diff --git a/RxSwift/Disposables/BooleanDisposable.swift b/RxSwift/Disposables/BooleanDisposable.swift new file mode 100644 index 00000000..ec33b80b --- /dev/null +++ b/RxSwift/Disposables/BooleanDisposable.swift @@ -0,0 +1,47 @@ +// +// BooleanDisposable.swift +// Rx +// +// Created by Junior B. on 10/29/15. +// Copyright (c) 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +/** +Represents a disposable resource that can be checked for disposal status. +*/ +public class BooleanDisposable : Disposable, Cancelable { + + internal static let BooleanDisposableTrue = BooleanDisposable(disposed: true) + private var _disposed = false + + /** + Initializes a new instance of the `BooleanDisposable` class + */ + public init() { + } + + /** + Initializes a new instance of the `BooleanDisposable` class with given value + */ + public init(disposed: Bool) { + self._disposed = disposed + } + + /** + - returns: Was resource disposed. + */ + public var disposed: Bool { + get { + return _disposed + } + } + + /** + Sets the status to disposed, which can be observer through the `disposed` property. + */ + public func dispose() { + _disposed = true + } +} \ No newline at end of file diff --git a/RxSwift/Disposables/RefCountDisposable.swift b/RxSwift/Disposables/RefCountDisposable.swift new file mode 100644 index 00000000..53e308d4 --- /dev/null +++ b/RxSwift/Disposables/RefCountDisposable.swift @@ -0,0 +1,129 @@ +// +// RefCountDisposable.swift +// Rx +// +// Created by Junior B. on 10/29/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +/** + Represents a disposable resource that only disposes its underlying disposable resource when all dependent disposable objects have been disposed. + */ +public class RefCountDisposable : DisposeBase, Cancelable { + private var _lock = SpinLock() + private var _disposable = nil as Disposable? + private var _primaryDisposed = false + private var _count = 0 + + /** + - returns: Was resource disposed. + */ + public var disposed: Bool { + get { + _lock.lock(); defer { _lock.unlock() } + return _disposable == nil + } + } + + /** + Initializes a new instance of the `RefCountDisposable`. + */ + public init(disposable: Disposable) { + _disposable = disposable + super.init() + } + + /** + Holds a dependent disposable that when disposed decreases the refcount on the underlying disposable. + + When getter is called, a dependent disposable contributing to the reference count that manages the underlying disposable's lifetime is returned. + */ + public func retain() -> Disposable { + return _lock.calculateLocked { + if let _ = _disposable { + + do { + try incrementChecked(&_count) + } catch (_) { + rxFatalError("RefCountDisposable increment failed") + } + + return RefCountInnerDisposable(self) + } else { + return NopDisposable.instance + } + } + } + + /** + Disposes the underlying disposable only when all dependent disposables have been disposed. + */ + public func dispose() { + let oldDisposable: Disposable? = _lock.calculateLocked { + if let oldDisposable = _disposable where !_primaryDisposed + { + _primaryDisposed = true; + + if (_count == 0) + { + _disposable = nil + return oldDisposable + } + } + + return nil + } + + if let disposable = oldDisposable { + disposable.dispose() + } + } + + private func release() { + let oldDisposable: Disposable? = _lock.calculateLocked { + if let oldDisposable = _disposable { + do { + try decrementChecked(&_count) + } catch (_) { + rxFatalError("RefCountDisposable decrement on release failed") + } + + guard _count >= 0 else { + rxFatalError("RefCountDisposable counter is lower than 0") + } + + if _primaryDisposed && _count == 0 { + _disposable = nil + return oldDisposable + } + } + + return nil + } + + if let disposable = oldDisposable { + disposable.dispose() + } + } +} + +internal final class RefCountInnerDisposable: DisposeBase, Disposable +{ + private let _parent: RefCountDisposable + private var _disposed: Int32 = 0 + + init(_ parent: RefCountDisposable) + { + _parent = parent; + super.init() + } + + internal func dispose() + { + if OSAtomicCompareAndSwap32(0, 1, &_disposed) { + _parent.release() + } + } +} \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/AddRef.swift b/RxSwift/Observables/Implementations/AddRef.swift new file mode 100644 index 00000000..50358ec3 --- /dev/null +++ b/RxSwift/Observables/Implementations/AddRef.swift @@ -0,0 +1,47 @@ +// +// AddRef.swift +// Rx +// +// Created by Junior B. on 30/10/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +class AddRefSink : Sink, ObserverType { + typealias Element = O.E + + override init(observer: O) { + super.init(observer: observer) + } + + func on(event: Event) { + switch event { + case .Next(_): + forwardOn(event) + case .Completed, .Error(_): + forwardOn(event) + dispose() + } + } +} + +class AddRef : Producer { + typealias EventHandler = Event throws -> Void + + private let _source: Observable + private let _refCount: RefCountDisposable + + init(source: Observable, refCount: RefCountDisposable) { + _source = source + _refCount = refCount + } + + override func run(observer: O) -> Disposable { + let releaseDisposable = _refCount.retain() + let sink = AddRefSink(observer: observer) + sink.disposable = StableCompositeDisposable.create(releaseDisposable, _source.subscribeSafe(sink)) + + return sink + } +} \ No newline at end of file diff --git a/RxSwift/Observables/Implementations/Buffer.swift b/RxSwift/Observables/Implementations/Buffer.swift index b941e64f..bc07a900 100644 --- a/RxSwift/Observables/Implementations/Buffer.swift +++ b/RxSwift/Observables/Implementations/Buffer.swift @@ -99,8 +99,12 @@ class BufferTimeCountSink> + : Sink + , ObserverType + , LockOwnerType + , SynchronizedOnType { + typealias Parent = WindowTimeCount + typealias E = Element + + private let _parent: Parent + + let _lock = NSRecursiveLock() + + private var _subject = PublishSubject() + private var _count = 0 + private var _windowId = 0 + + private let _timerD = SerialDisposable() + private let _refCountDisposable: RefCountDisposable + private let _groupDisposable = CompositeDisposable() + + init(parent: Parent, observer: O) { + _parent = parent + + _groupDisposable.addDisposable(_timerD) + + _refCountDisposable = RefCountDisposable(disposable: _groupDisposable) + super.init(observer: observer) + } + + func run() -> Disposable { + + forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable())) + createTimer(_windowId) + + _groupDisposable.addDisposable(_parent._source.subscribeSafe(self)) + return _refCountDisposable + } + + func startNewWindowAndCompleteCurrentOne() { + _subject.on(.Completed) + _subject = PublishSubject() + + forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable())) + } + + func on(event: Event) { + synchronizedOn(event) + } + + func _synchronized_on(event: Event) { + var newWindow = false + var newId = 0 + + switch event { + case .Next(let element): + _subject.on(.Next(element)) + + do { + try incrementChecked(&_count) + } catch (let e) { + _subject.on(.Error(e as ErrorType)) + dispose() + } + + if (_count == _parent._count) { + newWindow = true + _count = 0 + newId = ++_windowId + self.startNewWindowAndCompleteCurrentOne() + } + + case .Error(let error): + _subject.on(.Error(error)) + forwardOn(.Error(error)) + dispose() + case .Completed: + _subject.on(.Completed) + forwardOn(.Completed) + dispose() + } + + if newWindow { + createTimer(newId) + } + } + + func createTimer(windowId: Int) { + if _timerD.disposed { + return + } + + if _windowId != windowId { + return + } + + let nextTimer = SingleAssignmentDisposable() + + _timerD.disposable = nextTimer + + nextTimer.disposable = _parent._scheduler.scheduleRelative(windowId, dueTime: _parent._timeSpan) { previousWindowId in + + var newId = 0 + + self._lock.performLocked { + if previousWindowId != self._windowId { + return + } + + self._count = 0 + self._windowId = self._windowId &+ 1 + newId = self._windowId + self.startNewWindowAndCompleteCurrentOne() + } + + self.createTimer(newId) + + return NopDisposable.instance + } + } +} + +class WindowTimeCount : Producer> { + + private let _timeSpan: S.TimeInterval + private let _count: Int + private let _scheduler: S + private let _source: Observable + + init(source: Observable, timeSpan: S.TimeInterval, count: Int, scheduler: S) { + _source = source + _timeSpan = timeSpan + _count = count + _scheduler = scheduler + } + + override func run>(observer: O) -> Disposable { + let sink = WindowTimeCountSink(parent: self, observer: observer) + sink.disposable = sink.run() + return sink + } +} diff --git a/RxSwift/Observables/Observable+Time.swift b/RxSwift/Observables/Observable+Time.swift index 6917b79a..7b46104a 100644 --- a/RxSwift/Observables/Observable+Time.swift +++ b/RxSwift/Observables/Observable+Time.swift @@ -209,3 +209,22 @@ extension ObservableType { return BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler) } } + +// MARK: window + +extension ObservableType { + + /** + Projects each element of an observable sequence into a window that is completed when either it’s full or a given amount of time has elapsed. + + - parameter timeSpan: Maximum time length of a window. + - parameter count: Maximum element count of a window. + - parameter scheduler: Scheduler to run windowing timers on. + - returns: An observable sequence of windows (instances of `Observable`). + */ + @warn_unused_result(message="http://git.io/rxs.uo") + public func window(timeSpan timeSpan: S.TimeInterval, count: Int, scheduler: S) + -> Observable> { + return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler) + } +} diff --git a/RxTests/RxSwiftTests/Tests/DisposableTest.swift b/RxTests/RxSwiftTests/Tests/DisposableTest.swift index 4a309809..5ed560b8 100644 --- a/RxTests/RxSwiftTests/Tests/DisposableTest.swift +++ b/RxTests/RxSwiftTests/Tests/DisposableTest.swift @@ -123,4 +123,50 @@ class DisposableTest : RxTest { XCTAssertEqual(numberDisposed, 2) XCTAssertEqual(compositeDisposable.count, 0) } + + func testRefCountDisposable_RefCounting() { + let d = BooleanDisposable() + let r = RefCountDisposable(disposable: d) + + XCTAssertEqual(r.disposed, false) + + let d1 = r.retain() + let d2 = r.retain() + + XCTAssertEqual(d.disposed, false) + + d1.dispose() + XCTAssertEqual(d.disposed, false) + + d2.dispose() + XCTAssertEqual(d.disposed, false) + + r.dispose() + XCTAssertEqual(d.disposed, true) + + let d3 = r.retain(); + d3.dispose() + } + + func testRefCountDisposable_PrimaryDisposesFirst() { + let d = BooleanDisposable() + let r = RefCountDisposable(disposable: d) + + XCTAssertEqual(r.disposed, false) + + let d1 = r.retain() + let d2 = r.retain() + + XCTAssertEqual(d.disposed, false) + + d1.dispose() + XCTAssertEqual(d.disposed, false) + + r.dispose() + XCTAssertEqual(d.disposed, false) + + d2.dispose() + XCTAssertEqual(d.disposed, true) + + } } \ No newline at end of file diff --git a/RxTests/RxSwiftTests/Tests/Observable+BlockingTest.swift b/RxTests/RxSwiftTests/Tests/Observable+BlockingTest.swift index 448cc3ae..00b4c410 100644 --- a/RxTests/RxSwiftTests/Tests/Observable+BlockingTest.swift +++ b/RxTests/RxSwiftTests/Tests/Observable+BlockingTest.swift @@ -30,8 +30,8 @@ extension ObservableBlockingTest { try (failWith(testError) as Observable).toBlocking().toArray() XCTFail("It should fail") } - catch { - + catch let e { + XCTAssertTrue(e as NSError === testError) } } @@ -67,8 +67,8 @@ extension ObservableBlockingTest { try (failWith(testError) as Observable).toBlocking().first() XCTFail() } - catch { - + catch let e { + XCTAssertTrue(e as NSError === testError) } } @@ -104,8 +104,8 @@ extension ObservableBlockingTest { try (failWith(testError) as Observable).toBlocking().last() XCTFail() } - catch { - + catch let e { + XCTAssertTrue(e as NSError === testError) } } diff --git a/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift b/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift index 8ccc7784..5b7ce9cb 100644 --- a/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift +++ b/RxTests/RxSwiftTests/Tests/Observable+TimeTest.swift @@ -18,8 +18,7 @@ class ObservableTimeTest : RxTest { } } -// throttle - +// MARK: Throttle extension ObservableTimeTest { func test_ThrottleTimeSpan_AllPass() { let scheduler = TestScheduler(initialClock: 0) @@ -274,8 +273,7 @@ extension ObservableTimeTest { } } -// sample - +// MARK: Sample extension ObservableTimeTest { func testSample_Sampler_SamplerThrows() { let scheduler = TestScheduler(initialClock: 0) @@ -710,8 +708,7 @@ extension ObservableTimeTest { } } -// interval - +// MARK: Interval extension ObservableTimeTest { func testInterval_TimeSpan_Basic() { @@ -821,8 +818,7 @@ extension ObservableTimeTest { } } -// take - +// MARK: Take extension ObservableTimeTest { func testTake_TakeZero() { @@ -997,8 +993,7 @@ extension ObservableTimeTest { } -// take - +// MARK: Delay Subscription extension ObservableTimeTest { func testDelaySubscription_TimeSpan_Simple() { @@ -1073,7 +1068,7 @@ extension ObservableTimeTest { } } -// skip +// MARK: Skip extension ObservableTimeTest { func testSkip_Zero() { let scheduler = TestScheduler(initialClock: 0) @@ -1183,9 +1178,9 @@ extension ObservableTimeTest { } } -// +// MARK: Buffer extension ObservableTimeTest { - func bufferWithTimeOrCount_Basic() { + func testBufferWithTimeOrCount_Basic() { let scheduler = TestScheduler(initialClock: 0) let xs = scheduler.createHotObservable([ @@ -1221,7 +1216,7 @@ extension ObservableTimeTest { ]) } - func bufferWithTimeOrCount_Error() { + func testBufferWithTimeOrCount_Error() { let scheduler = TestScheduler(initialClock: 0) let xs = scheduler.createHotObservable([ @@ -1256,7 +1251,7 @@ extension ObservableTimeTest { ]) } - func bufferWithTimeOrCount_Disposed() { + func testBufferWithTimeOrCount_Disposed() { let scheduler = TestScheduler(initialClock: 0) let xs = scheduler.createHotObservable([ @@ -1287,7 +1282,7 @@ extension ObservableTimeTest { ]) } - func bufferWithTimeOrCount_Default() { + func testBufferWithTimeOrCount_Default() { let backgroundScheduler = SerialDispatchQueueScheduler(globalConcurrentQueuePriority: .Default) let result = try! range(1, 10, backgroundScheduler) @@ -1299,4 +1294,210 @@ extension ObservableTimeTest { XCTAssertEqual(result!, [4, 5, 6]) } +} + +// MARK: Window +extension ObservableTimeTest { + func testWindowWithTimeOrCount_Basic() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(205, 1), + next(210, 2), + next(240, 3), + next(280, 4), + next(320, 5), + next(350, 6), + next(370, 7), + next(420, 8), + next(470, 9), + completed(600) + ]) + + let res = scheduler.start { () -> Observable in + let window: Observable> = xs.window(timeSpan: 70, count: 3, scheduler: scheduler) + let mappedWithIndex = window.mapWithIndex { (o: Observable, i: Int) -> Observable in + return o.map { (e: Int) -> String in + return "\(i) \(e)" + } + } + let result = mappedWithIndex.merge() + return result + } + + XCTAssertEqual(res.messages, [ + next(205, "0 1"), + next(210, "0 2"), + next(240, "0 3"), + next(280, "1 4"), + next(320, "2 5"), + next(350, "2 6"), + next(370, "2 7"), + next(420, "3 8"), + next(470, "4 9"), + completed(600) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 600) + ]) + } + + func testWindowWithTimeOrCount_Error() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(205, 1), + next(210, 2), + next(240, 3), + next(280, 4), + next(320, 5), + next(350, 6), + next(370, 7), + next(420, 8), + next(470, 9), + error(600, testError) + ]) + + let res = scheduler.start { () -> Observable in + let window: Observable> = xs.window(timeSpan: 70, count: 3, scheduler: scheduler) + let mappedWithIndex = window.mapWithIndex { (o: Observable, i: Int) -> Observable in + return o.map { (e: Int) -> String in + return "\(i) \(e)" + } + } + let result = mappedWithIndex.merge() + return result + } + + XCTAssertEqual(res.messages, [ + next(205, "0 1"), + next(210, "0 2"), + next(240, "0 3"), + next(280, "1 4"), + next(320, "2 5"), + next(350, "2 6"), + next(370, "2 7"), + next(420, "3 8"), + next(470, "4 9"), + error(600, testError) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 600) + ]) + } + + func testWindowWithTimeOrCount_Disposed() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(105, 0), + next(205, 1), + next(210, 2), + next(240, 3), + next(280, 4), + next(320, 5), + next(350, 6), + next(370, 7), + next(420, 8), + next(470, 9), + completed(600) + ]) + + let res = scheduler.start(370) { () -> Observable in + let window: Observable> = xs.window(timeSpan: 70, count: 3, scheduler: scheduler) + let mappedWithIndex = window.mapWithIndex { (o: Observable, i: Int) -> Observable in + return o.map { (e: Int) -> String in + return "\(i) \(e)" + } + } + let result = mappedWithIndex.merge() + return result + } + + XCTAssertEqual(res.messages, [ + next(205, "0 1"), + next(210, "0 2"), + next(240, "0 3"), + next(280, "1 4"), + next(320, "2 5"), + next(350, "2 6"), + next(370, "2 7") + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 370) + ]) + } + + /* + func testWindowWithTimeOrCount_BasicPeriod() { + let scheduler = TestScheduler(initialClock: 0) + + let xs = scheduler.createHotObservable([ + next(150, 1), + next(210, 2), + next(240, 3), + next(270, 4), + next(320, 5), + next(360, 6), + next(390, 7), + next(410, 8), + next(460, 9), + next(470, 10), + completed(490) + ]) + + let res = scheduler.start { () -> Observable in + let window: Observable> = xs.window(timeSpan: 100, count: 3, scheduler: scheduler) + let mappedWithIndex = window.mapWithIndex { (o: Observable, i: Int) -> Observable in + return o.map { (e: Int) -> String in + return "\(i) \(e)" + }.concat(just("\(i) end")) + } + let result = mappedWithIndex.merge() + return result + } + + XCTAssertEqual(res.messages, [ + next(210, "0 2"), + next(240, "0 3"), + next(270, "0 4"), + next(300, "0 end"), + next(320, "1 5"), + next(360, "1 6"), + next(390, "1 7"), + next(400, "1 end"), + next(410, "2 8"), + next(460, "2 9"), + next(470, "2 10"), + next(490, "2 end"), + completed(490) + ]) + + XCTAssertEqual(xs.subscriptions, [ + Subscription(200, 490) + ]) + + }*/ + + func windowWithTimeOrCount_Default() { + let backgroundScheduler = SerialDispatchQueueScheduler(globalConcurrentQueuePriority: .Default) + + let result = try! range(1, 10, backgroundScheduler) + .window(timeSpan: 1000, count: 3, scheduler: backgroundScheduler) + .mapWithIndex { (o: Observable, i: Int) -> Observable in + return o.map { (e: Int) -> String in + return "\(i) \(e)" + } + } + .merge() + .skip(4) + .toBlocking() + .first() + + XCTAssertEqual(result!, "1 5") + } + } \ No newline at end of file