From 63a60b359b57552e1bc42423ecac00a2a3f179c0 Mon Sep 17 00:00:00 2001 From: Krunoslav Zaher Date: Sat, 17 Oct 2015 22:19:44 +0200 Subject: [PATCH] Adds `ConcurrentMainScheduler` and `ImmediateScheduler`. --- Rx.xcodeproj/project.pbxproj | 24 ++++- RxCocoa/Common/CocoaUnits/Driver/Driver.swift | 2 +- .../ObservableConvertibleType+Driver.swift | 6 +- RxExample/RxExample.xcodeproj/project.pbxproj | 8 ++ RxSwift/Concurrency/AsyncLock.swift | 1 + .../Schedulers/ConcurrentMainScheduler.swift | 92 +++++++++++++++++++ RxSwift/Schedulers/ImmediateScheduler.swift | 39 ++++++++ RxSwift/Schedulers/MainScheduler.swift | 37 ++++++-- .../SerialDispatchQueueScheduler.swift | 5 +- 9 files changed, 200 insertions(+), 14 deletions(-) create mode 100644 RxSwift/Schedulers/ConcurrentMainScheduler.swift create mode 100644 RxSwift/Schedulers/ImmediateScheduler.swift diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index b5ea901c..5065dec5 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -334,6 +334,14 @@ C89CDB371BCB0DD7002063D9 /* ShareReplay1.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89CDB351BCB0DD7002063D9 /* ShareReplay1.swift */; }; C89CDB381BCB0DD7002063D9 /* ShareReplay1.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89CDB351BCB0DD7002063D9 /* ShareReplay1.swift */; }; C89CDB391BCB0DD7002063D9 /* ShareReplay1.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89CDB351BCB0DD7002063D9 /* ShareReplay1.swift */; }; + C8B144FB1BD2D44500267DCE /* ConcurrentMainScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8B144FA1BD2D44500267DCE /* ConcurrentMainScheduler.swift */; }; + C8B144FC1BD2D44500267DCE /* ConcurrentMainScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8B144FA1BD2D44500267DCE /* ConcurrentMainScheduler.swift */; }; + C8B144FD1BD2D44500267DCE /* ConcurrentMainScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8B144FA1BD2D44500267DCE /* ConcurrentMainScheduler.swift */; }; + C8B144FE1BD2D44500267DCE /* ConcurrentMainScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8B144FA1BD2D44500267DCE /* ConcurrentMainScheduler.swift */; }; + C8B145001BD2D80100267DCE /* ImmediateScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8B144FF1BD2D80100267DCE /* ImmediateScheduler.swift */; }; + C8B145011BD2D80100267DCE /* ImmediateScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8B144FF1BD2D80100267DCE /* ImmediateScheduler.swift */; }; + C8B145021BD2D80100267DCE /* ImmediateScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8B144FF1BD2D80100267DCE /* ImmediateScheduler.swift */; }; + C8B145031BD2D80100267DCE /* ImmediateScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8B144FF1BD2D80100267DCE /* ImmediateScheduler.swift */; }; C8C3D9FE1B935EDF004D233E /* Zip+CollectionType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3D9FD1B935EDF004D233E /* Zip+CollectionType.swift */; }; C8C3D9FF1B935EDF004D233E /* Zip+CollectionType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3D9FD1B935EDF004D233E /* Zip+CollectionType.swift */; }; C8C3DA031B9390C4004D233E /* Just.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C3DA021B9390C4004D233E /* Just.swift */; }; @@ -944,6 +952,8 @@ C88BB8711B07E5ED0064D411 /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; }; C89CDB351BCB0DD7002063D9 /* ShareReplay1.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = ShareReplay1.swift; sourceTree = ""; xcLanguageSpecificationIdentifier = xcode.lang.swift; }; C8A56AD71AD7424700B4673B /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; }; + C8B144FA1BD2D44500267DCE /* ConcurrentMainScheduler.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ConcurrentMainScheduler.swift; sourceTree = ""; }; + C8B144FF1BD2D80100267DCE /* ImmediateScheduler.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ImmediateScheduler.swift; sourceTree = ""; }; C8C3D9FD1B935EDF004D233E /* Zip+CollectionType.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Zip+CollectionType.swift"; sourceTree = ""; }; C8C3DA021B9390C4004D233E /* Just.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Just.swift; sourceTree = ""; }; C8C3DA051B9393AC004D233E /* Empty.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Empty.swift; sourceTree = ""; }; @@ -1211,15 +1221,17 @@ C8093CB41B8A72BE0088E94D /* Schedulers */ = { isa = PBXGroup; children = ( - C84B38E71BA43380001B7D88 /* ScheduledItem.swift */, C8093CB51B8A72BE0088E94D /* ConcurrentDispatchQueueScheduler.swift */, + C8B144FA1BD2D44500267DCE /* ConcurrentMainScheduler.swift */, + C8C3DA0E1B939767004D233E /* CurrentThreadScheduler.swift */, C8093CB61B8A72BE0088E94D /* DispatchQueueSchedulerPriority.swift */, + C8B144FF1BD2D80100267DCE /* ImmediateScheduler.swift */, C8093CB71B8A72BE0088E94D /* MainScheduler.swift */, C8093CB81B8A72BE0088E94D /* OperationQueueScheduler.swift */, C8093CB91B8A72BE0088E94D /* RecursiveScheduler.swift */, + C84B38E71BA43380001B7D88 /* ScheduledItem.swift */, C8093CBB1B8A72BE0088E94D /* SchedulerServices+Emulation.swift */, C8093CBC1B8A72BE0088E94D /* SerialDispatchQueueScheduler.swift */, - C8C3DA0E1B939767004D233E /* CurrentThreadScheduler.swift */, ); path = Schedulers; sourceTree = ""; @@ -2070,6 +2082,7 @@ C8093D9C1B8A72BE0088E94D /* SchedulerServices+Emulation.swift in Sources */, C8093D6A1B8A72BE0088E94D /* AnyObserver.swift in Sources */, C8093D3C1B8A72BE0088E94D /* Skip.swift in Sources */, + C8B144FC1BD2D44500267DCE /* ConcurrentMainScheduler.swift in Sources */, C8093CF01B8A72BE0088E94D /* StableCompositeDisposable.swift in Sources */, C8093D4E1B8A72BE0088E94D /* Zip+arity.swift in Sources */, C8093D4C1B8A72BE0088E94D /* Timer.swift in Sources */, @@ -2086,6 +2099,7 @@ C8093CFC1B8A72BE0088E94D /* Observable+Extensions.swift in Sources */, C8093D4A1B8A72BE0088E94D /* Throttle.swift in Sources */, C8093D041B8A72BE0088E94D /* AsObservable.swift in Sources */, + C8B145011BD2D80100267DCE /* ImmediateScheduler.swift in Sources */, C8093D061B8A72BE0088E94D /* Catch.swift in Sources */, C8093D0C1B8A72BE0088E94D /* CombineLatest.swift in Sources */, D2FC15B31BCB95E5007361FF /* SkipWhile.swift in Sources */, @@ -2187,6 +2201,7 @@ C8093D9B1B8A72BE0088E94D /* SchedulerServices+Emulation.swift in Sources */, C8093D691B8A72BE0088E94D /* AnyObserver.swift in Sources */, C8093D3B1B8A72BE0088E94D /* Skip.swift in Sources */, + C8B144FB1BD2D44500267DCE /* ConcurrentMainScheduler.swift in Sources */, C8093CEF1B8A72BE0088E94D /* StableCompositeDisposable.swift in Sources */, C8093D4D1B8A72BE0088E94D /* Zip+arity.swift in Sources */, C8093D4B1B8A72BE0088E94D /* Timer.swift in Sources */, @@ -2203,6 +2218,7 @@ C8093CFB1B8A72BE0088E94D /* Observable+Extensions.swift in Sources */, C8093D491B8A72BE0088E94D /* Throttle.swift in Sources */, C8093D031B8A72BE0088E94D /* AsObservable.swift in Sources */, + C8B145001BD2D80100267DCE /* ImmediateScheduler.swift in Sources */, C8093D051B8A72BE0088E94D /* Catch.swift in Sources */, C8093D0B1B8A72BE0088E94D /* CombineLatest.swift in Sources */, D22B6D261BC8504A00BCE0AB /* SkipWhile.swift in Sources */, @@ -2304,6 +2320,7 @@ C8F0BF991BBBFB8B001B112F /* SchedulerServices+Emulation.swift in Sources */, C8F0BF9A1BBBFB8B001B112F /* AnyObserver.swift in Sources */, C8F0BF9B1BBBFB8B001B112F /* Skip.swift in Sources */, + C8B144FE1BD2D44500267DCE /* ConcurrentMainScheduler.swift in Sources */, C8F0BF9C1BBBFB8B001B112F /* StableCompositeDisposable.swift in Sources */, C8F0BF9D1BBBFB8B001B112F /* Zip+arity.swift in Sources */, C8F0BF9E1BBBFB8B001B112F /* Timer.swift in Sources */, @@ -2320,6 +2337,7 @@ C8F0BFA91BBBFB8B001B112F /* Observable+Extensions.swift in Sources */, C8F0BFAA1BBBFB8B001B112F /* Throttle.swift in Sources */, C8F0BFAB1BBBFB8B001B112F /* AsObservable.swift in Sources */, + C8B145031BD2D80100267DCE /* ImmediateScheduler.swift in Sources */, C8F0BFAC1BBBFB8B001B112F /* Catch.swift in Sources */, C8F0BFAD1BBBFB8B001B112F /* CombineLatest.swift in Sources */, D2FC15B51BCB95E8007361FF /* SkipWhile.swift in Sources */, @@ -2569,6 +2587,7 @@ D2EBEAE11BB9B697003A27DC /* ImmediateSchedulerType.swift in Sources */, D2EBEB0B1BB9B6C1003A27DC /* Empty.swift in Sources */, D2EBEAF11BB9B6AE003A27DC /* BinaryDisposable.swift in Sources */, + C8B144FD1BD2D44500267DCE /* ConcurrentMainScheduler.swift in Sources */, D2EBEB1B1BB9B6C1003A27DC /* Repeat.swift in Sources */, D2EBEAF81BB9B6B2003A27DC /* ScopedDisposable.swift in Sources */, D2EBEAEA1BB9B697003A27DC /* SchedulerType.swift in Sources */, @@ -2585,6 +2604,7 @@ D2EBEAEE1BB9B6A4003A27DC /* InfiniteSequence.swift in Sources */, D2EBEB2D1BB9B6CA003A27DC /* Observable+Concurrency.swift in Sources */, D2EBEB381BB9B6D8003A27DC /* ConcurrentDispatchQueueScheduler.swift in Sources */, + C8B145021BD2D80100267DCE /* ImmediateScheduler.swift in Sources */, D2EBEB131BB9B6C1003A27DC /* Multicast.swift in Sources */, D2EBEB111BB9B6C1003A27DC /* Map.swift in Sources */, D2FC15B41BCB95E7007361FF /* SkipWhile.swift in Sources */, diff --git a/RxCocoa/Common/CocoaUnits/Driver/Driver.swift b/RxCocoa/Common/CocoaUnits/Driver/Driver.swift index 429ac81d..c4b11260 100644 --- a/RxCocoa/Common/CocoaUnits/Driver/Driver.swift +++ b/RxCocoa/Common/CocoaUnits/Driver/Driver.swift @@ -108,7 +108,7 @@ public struct Drive { /** Returns an empty observable sequence, using the specified scheduler to send out the single `Completed` message. - + - returns: An observable sequence with no elements. */ public static func empty() -> Driver { diff --git a/RxCocoa/Common/CocoaUnits/Driver/ObservableConvertibleType+Driver.swift b/RxCocoa/Common/CocoaUnits/Driver/ObservableConvertibleType+Driver.swift index a89dfc3b..95f6e4ba 100644 --- a/RxCocoa/Common/CocoaUnits/Driver/ObservableConvertibleType+Driver.swift +++ b/RxCocoa/Common/CocoaUnits/Driver/ObservableConvertibleType+Driver.swift @@ -21,7 +21,7 @@ extension ObservableConvertibleType { public func asDriver(onErrorJustReturn onErrorJustReturn: E) -> Driver { let source = self .asObservable() - .subscribeOn(MainScheduler.sharedInstance) + .subscribeOn(ConcurrentMainScheduler.sharedInstance) .observeOn(MainScheduler.sharedInstance) .catchErrorJustReturn(onErrorJustReturn) return Driver(source) @@ -36,7 +36,7 @@ extension ObservableConvertibleType { public func asDriver(onErrorDriveWith onErrorDriveWith: Driver) -> Driver { let source = self .asObservable() - .subscribeOn(MainScheduler.sharedInstance) + .subscribeOn(ConcurrentMainScheduler.sharedInstance) .observeOn(MainScheduler.sharedInstance) .catchError { _ in onErrorDriveWith.asObservable() @@ -53,7 +53,7 @@ extension ObservableConvertibleType { public func asDriver(onErrorRecover onErrorRecover: (error: ErrorType) -> Driver) -> Driver { let source = self .asObservable() - .subscribeOn(MainScheduler.sharedInstance) + .subscribeOn(ConcurrentMainScheduler.sharedInstance) .observeOn(MainScheduler.sharedInstance) .catchError { error in onErrorRecover(error: error).asObservable() diff --git a/RxExample/RxExample.xcodeproj/project.pbxproj b/RxExample/RxExample.xcodeproj/project.pbxproj index 925d88f2..ebc65c94 100644 --- a/RxExample/RxExample.xcodeproj/project.pbxproj +++ b/RxExample/RxExample.xcodeproj/project.pbxproj @@ -295,6 +295,8 @@ C8A468F31B8A8C2600BF917B /* RxSwift.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = C8A468EB1B8A8BC900BF917B /* RxSwift.framework */; }; C8A7501F1B94E77C00D8D046 /* RxDataSourceStarterKit.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8A7501E1B94E77C00D8D046 /* RxDataSourceStarterKit.swift */; }; C8A750201B94E78200D8D046 /* RxDataSourceStarterKit.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8A7501E1B94E77C00D8D046 /* RxDataSourceStarterKit.swift */; }; + C8B145141BD2E4D000267DCE /* ConcurrentMainScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8B145051BD2E45200267DCE /* ConcurrentMainScheduler.swift */; }; + C8B145161BD2E4D500267DCE /* ImmediateScheduler.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8B145041BD2E45200267DCE /* ImmediateScheduler.swift */; }; C8C46DA81B47F7110020D71E /* CollectionViewImageCell.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C46DA31B47F7110020D71E /* CollectionViewImageCell.swift */; }; C8C46DA91B47F7110020D71E /* WikipediaImageCell.xib in Resources */ = {isa = PBXBuildFile; fileRef = C8C46DA41B47F7110020D71E /* WikipediaImageCell.xib */; }; C8C46DAA1B47F7110020D71E /* WikipediaSearchCell.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8C46DA51B47F7110020D71E /* WikipediaSearchCell.swift */; }; @@ -669,6 +671,8 @@ C8A468EF1B8A8BD000BF917B /* RxBlocking.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; path = RxBlocking.framework; sourceTree = BUILT_PRODUCTS_DIR; }; C8A7501E1B94E77C00D8D046 /* RxDataSourceStarterKit.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxDataSourceStarterKit.swift; sourceTree = ""; }; C8AF26F11B49ABD300131C03 /* README.md */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = net.daringfireball.markdown; path = README.md; sourceTree = ""; }; + C8B145041BD2E45200267DCE /* ImmediateScheduler.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ImmediateScheduler.swift; sourceTree = ""; }; + C8B145051BD2E45200267DCE /* ConcurrentMainScheduler.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ConcurrentMainScheduler.swift; sourceTree = ""; }; C8C46DA31B47F7110020D71E /* CollectionViewImageCell.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CollectionViewImageCell.swift; sourceTree = ""; }; C8C46DA41B47F7110020D71E /* WikipediaImageCell.xib */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = file.xib; path = WikipediaImageCell.xib; sourceTree = ""; }; C8C46DA51B47F7110020D71E /* WikipediaSearchCell.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = WikipediaSearchCell.swift; sourceTree = ""; }; @@ -1149,8 +1153,10 @@ isa = PBXGroup; children = ( C894648E1BC6C2B00055219D /* ConcurrentDispatchQueueScheduler.swift */, + C8B145051BD2E45200267DCE /* ConcurrentMainScheduler.swift */, C894648F1BC6C2B00055219D /* CurrentThreadScheduler.swift */, C89464901BC6C2B00055219D /* DispatchQueueSchedulerPriority.swift */, + C8B145041BD2E45200267DCE /* ImmediateScheduler.swift */, C89464911BC6C2B00055219D /* MainScheduler.swift */, C89464921BC6C2B00055219D /* OperationQueueScheduler.swift */, C89464931BC6C2B00055219D /* RecursiveScheduler.swift */, @@ -1729,6 +1735,7 @@ C8297E451B6CF905000589EA /* SectionedViewType.swift in Sources */, C89464D51BC6C2B00055219D /* ObserveOnSerialDispatchQueue.swift in Sources */, C894658E1BC6C2BC0055219D /* UIAlertView+Rx.swift in Sources */, + C8B145161BD2E4D500267DCE /* ImmediateScheduler.swift in Sources */, C8297E461B6CF905000589EA /* Example.swift in Sources */, C89465081BC6C2B00055219D /* PublishSubject.swift in Sources */, C89464FC1BC6C2B00055219D /* RxBox.swift in Sources */, @@ -1781,6 +1788,7 @@ C8297E501B6CF905000589EA /* TableViewController.swift in Sources */, C8297E511B6CF905000589EA /* PartialUpdatesViewController.swift in Sources */, C894649F1BC6C2B00055219D /* AsyncLock.swift in Sources */, + C8B145141BD2E4D000267DCE /* ConcurrentMainScheduler.swift in Sources */, C8297E521B6CF905000589EA /* Dependencies.swift in Sources */, C80DDED91BCE9046006A1832 /* ObservableConvertibleType+Driver.swift in Sources */, C89464E11BC6C2B00055219D /* Switch.swift in Sources */, diff --git a/RxSwift/Concurrency/AsyncLock.swift b/RxSwift/Concurrency/AsyncLock.swift index c90572e0..b727a943 100644 --- a/RxSwift/Concurrency/AsyncLock.swift +++ b/RxSwift/Concurrency/AsyncLock.swift @@ -24,6 +24,7 @@ class AsyncLock : Disposable { private let lock = NSRecursiveLock() private var queue: Queue = Queue(capacity: 2) + private var isAcquired: Bool = false private var hasFaulted: Bool = false diff --git a/RxSwift/Schedulers/ConcurrentMainScheduler.swift b/RxSwift/Schedulers/ConcurrentMainScheduler.swift new file mode 100644 index 00000000..046d460b --- /dev/null +++ b/RxSwift/Schedulers/ConcurrentMainScheduler.swift @@ -0,0 +1,92 @@ +// +// ConcurrentMainScheduler.swift +// Rx +// +// Created by Krunoslav Zaher on 10/17/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +/** +Abstracts work that needs to be performed on `MainThread`. In case `schedule` methods are called from main thread, it will perform action immediately without scheduling. + +This scheduler is optimized for `subscribeOn` operator. If you want to observe observable sequence elements on main thread using `observeOn` operator, +`MainScheduler` is more suitable for that purpose. +*/ +public final class ConcurrentMainScheduler : SchedulerType { + public typealias TimeInterval = NSTimeInterval + public typealias Time = NSDate + + private let _mainScheduler: MainScheduler + private let _mainQueue: dispatch_queue_t + + /** + - returns: Current time. + */ + public var now : NSDate { + get { + return _mainScheduler.now + } + } + + private init(mainScheduler: MainScheduler) { + _mainQueue = dispatch_get_main_queue() + _mainScheduler = mainScheduler + } + + /** + Singleton instance of `ConcurrentMainScheduler` + */ + public static let sharedInstance = ConcurrentMainScheduler(mainScheduler: MainScheduler.sharedInstance) + + /** + Schedules an action to be executed immediatelly. + + - parameter state: State passed to the action to be executed. + - parameter action: Action to be executed. + - returns: The disposable object used to cancel the scheduled action (best effort). + */ + public func schedule(state: StateType, action: (StateType) -> Disposable) -> Disposable { + if NSThread.currentThread().isMainThread { + return action(state) + } + + let cancel = SingleAssignmentDisposable() + + dispatch_async(_mainQueue) { + if cancel.disposed { + return + } + + cancel.disposable = action(state) + } + + return cancel + } + + /** + Schedules an action to be executed. + + - parameter state: State passed to the action to be executed. + - parameter dueTime: Relative time after which to execute the action. + - parameter action: Action to be executed. + - returns: The disposable object used to cancel the scheduled action (best effort). + */ + public final func scheduleRelative(state: StateType, dueTime: NSTimeInterval, action: (StateType) -> Disposable) -> Disposable { + return _mainScheduler.scheduleRelative(state, dueTime: dueTime, action: action) + } + + /** + Schedules a periodic piece of work. + + - parameter state: State passed to the action to be executed. + - parameter startAfter: Period after which initial work should be run. + - parameter period: Period for running the work periodically. + - parameter action: Action to be executed. + - returns: The disposable object used to cancel the scheduled action (best effort). + */ + public func schedulePeriodic(state: StateType, startAfter: TimeInterval, period: TimeInterval, action: (StateType) -> StateType) -> Disposable { + return _mainScheduler.schedulePeriodic(state, startAfter: startAfter, period: period, action: action) + } +} \ No newline at end of file diff --git a/RxSwift/Schedulers/ImmediateScheduler.swift b/RxSwift/Schedulers/ImmediateScheduler.swift new file mode 100644 index 00000000..53ef92fc --- /dev/null +++ b/RxSwift/Schedulers/ImmediateScheduler.swift @@ -0,0 +1,39 @@ +// +// ImmediateScheduler.swift +// Rx +// +// Created by Krunoslav Zaher on 10/17/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +/** +Represents an object that schedules units of work to run immediately on the current thread. +*/ +private class ImmediateScheduler : ImmediateSchedulerType { + + private let _asyncLock = AsyncLock() + + /** + Schedules an action to be executed immediatelly. + + In case `schedule` is called recursively from inside of `action` callback, scheduled `action` will be enqueued + and executed after current `action`. (`AsyncLock` behavior) + + - parameter state: State passed to the action to be executed. + - parameter action: Action to be executed. + - returns: The disposable object used to cancel the scheduled action (best effort). + */ + func schedule(state: StateType, action: (StateType) -> Disposable) -> Disposable { + let disposable = SingleAssignmentDisposable() + _asyncLock.wait { + if disposable.disposed { + return + } + disposable.disposable = action(state) + } + + return disposable + } +} \ No newline at end of file diff --git a/RxSwift/Schedulers/MainScheduler.swift b/RxSwift/Schedulers/MainScheduler.swift index d800e91f..ce88153c 100644 --- a/RxSwift/Schedulers/MainScheduler.swift +++ b/RxSwift/Schedulers/MainScheduler.swift @@ -14,17 +14,25 @@ Abstracts work that needs to be performed on `MainThread`. In case `schedule` me This scheduler is usually used to perform UI work. Main scheduler is a specialization of `SerialDispatchQueueScheduler`. + +This scheduler is optimized for `observeOn` operator. To ensure observable sequence is subscribed on main thread using `subscribeOn` +operator please use `ConcurrentMainScheduler` because it is more optimized for that purpose. */ public final class MainScheduler : SerialDispatchQueueScheduler { - + + private let _mainQueue: dispatch_queue_t + + var numberEnqueued: Int32 = 0 + private init() { - super.init(serialQueue: dispatch_get_main_queue()) + _mainQueue = dispatch_get_main_queue() + super.init(serialQueue: _mainQueue) } /** Singleton instance of `MainScheduler` */ - public static let sharedInstance: MainScheduler = MainScheduler() + public static let sharedInstance = MainScheduler() /** In case this method is called on a background thread it will throw an exception. @@ -36,10 +44,25 @@ public final class MainScheduler : SerialDispatchQueueScheduler { } override func scheduleInternal(state: StateType, action: StateType -> Disposable) -> Disposable { - if NSThread.currentThread().isMainThread { - return action(state) + let currentNumberEnqueued = OSAtomicIncrement32(&numberEnqueued) + + if NSThread.currentThread().isMainThread && currentNumberEnqueued == 1 { + let disposable = action(state) + OSAtomicDecrement32(&numberEnqueued) + return disposable } - - return super.scheduleInternal(state, action: action) + + let cancel = SingleAssignmentDisposable() + + dispatch_async(_mainQueue) { + if !cancel.disposed { + action(state) + } + + OSAtomicDecrement32(&self.numberEnqueued) + } + + return cancel } } + diff --git a/RxSwift/Schedulers/SerialDispatchQueueScheduler.swift b/RxSwift/Schedulers/SerialDispatchQueueScheduler.swift index 88dc79fd..246546eb 100644 --- a/RxSwift/Schedulers/SerialDispatchQueueScheduler.swift +++ b/RxSwift/Schedulers/SerialDispatchQueueScheduler.swift @@ -30,7 +30,10 @@ public class SerialDispatchQueueScheduler: SchedulerType { public typealias Time = NSDate private let serialQueue : dispatch_queue_t - + + /** + - returns: Current time. + */ public var now : NSDate { get { return NSDate()