diff --git a/.gitignore b/.gitignore index 479941ae..6d47140e 100644 --- a/.gitignore +++ b/.gitignore @@ -49,4 +49,4 @@ Carthage/Build # Swift Package Manager .build/ - +Packages/ diff --git a/Package.swift b/Package.swift index 12d0dd22..3cd2b792 100644 --- a/Package.swift +++ b/Package.swift @@ -30,7 +30,6 @@ let package = Package( dependencies: [ .Target(name: "RxSwift"), .Target(name: "RxBlocking"), - .Target(name: "RxCocoa"), .Target(name: "RxTests") ] ) diff --git a/RxSwift/Concurrency/Lock.swift b/RxSwift/Concurrency/Lock.swift index 815133c6..db3e2169 100644 --- a/RxSwift/Concurrency/Lock.swift +++ b/RxSwift/Concurrency/Lock.swift @@ -13,46 +13,97 @@ protocol Lock { func unlock() } -/** -Simple wrapper for spin lock. -*/ -struct SpinLock { - private var _lock = OS_SPINLOCK_INIT - - init() { - - } +#if os(Linux) + import Glibc - mutating func lock() { - OSSpinLockLock(&_lock) - } + /** + Simple wrapper for spin lock. + */ + class SpinLock { + private var _lock: pthread_spinlock_t = 0 - mutating func unlock() { - OSSpinLockUnlock(&_lock) - } - - mutating func performLocked(@noescape action: () -> Void) { - OSSpinLockLock(&_lock) - action() - OSSpinLockUnlock(&_lock) - } - - mutating func calculateLocked(@noescape action: () -> T) -> T { - OSSpinLockLock(&_lock) - let result = action() - OSSpinLockUnlock(&_lock) - return result - } + init() { + if (pthread_spin_init(&_lock, 0) != 0) { + fatalError("Spin lock initialization failed") + } + } - mutating func calculateLockedOrFail(@noescape action: () throws -> T) throws -> T { - OSSpinLockLock(&_lock) - defer { - OSSpinLockUnlock(&_lock) - } - let result = try action() - return result - } -} + func lock() { + pthread_spin_lock(&_lock) + } + + func unlock() { + pthread_spin_unlock(&_lock) + } + + func performLocked(@noescape action: () -> Void) { + pthread_spin_lock(&_lock) + action() + pthread_spin_unlock(&_lock) + } + + func calculateLocked(@noescape action: () -> T) -> T { + pthread_spin_lock(&_lock) + let result = action() + pthread_spin_unlock(&_lock) + return result + } + + func calculateLockedOrFail(@noescape action: () throws -> T) throws -> T { + pthread_spin_lock(&_lock) + defer { + pthread_spin_unlock(&_lock) + } + let result = try action() + return result + } + + deinit { + pthread_spin_destroy(&_lock) + } + } +#else + /** + Simple wrapper for spin lock. + */ + struct SpinLock { + private var _lock = OS_SPINLOCK_INIT + + init() { + + } + + mutating func lock() { + OSSpinLockLock(&_lock) + } + + mutating func unlock() { + OSSpinLockUnlock(&_lock) + } + + mutating func performLocked(@noescape action: () -> Void) { + OSSpinLockLock(&_lock) + action() + OSSpinLockUnlock(&_lock) + } + + mutating func calculateLocked(@noescape action: () -> T) -> T { + OSSpinLockLock(&_lock) + let result = action() + OSSpinLockUnlock(&_lock) + return result + } + + mutating func calculateLockedOrFail(@noescape action: () throws -> T) throws -> T { + OSSpinLockLock(&_lock) + defer { + OSSpinLockUnlock(&_lock) + } + let result = try action() + return result + } + } +#endif extension NSRecursiveLock : Lock { func performLocked(@noescape action: () -> Void) { @@ -60,14 +111,14 @@ extension NSRecursiveLock : Lock { action() self.unlock() } - + func calculateLocked(@noescape action: () -> T) -> T { self.lock() let result = action() self.unlock() return result } - + func calculateLockedOrFail(@noescape action: () throws -> T) throws -> T { self.lock() defer { @@ -102,4 +153,4 @@ extension pthread_mutex_t { pthread_mutex_unlock(&self) } } -*/ \ No newline at end of file +*/ diff --git a/RxSwift/Disposables/AnonymousDisposable.swift b/RxSwift/Disposables/AnonymousDisposable.swift index b9aef47a..923ab543 100644 --- a/RxSwift/Disposables/AnonymousDisposable.swift +++ b/RxSwift/Disposables/AnonymousDisposable.swift @@ -15,10 +15,10 @@ When dispose method is called, disposal action will be dereferenced. */ public final class AnonymousDisposable : DisposeBase, Cancelable { public typealias DisposeAction = () -> Void - - private var _disposed: Int32 = 0 + + private var _disposed: AtomicInt = 0 private var _disposeAction: DisposeAction? - + /** - returns: Was resource disposed. */ @@ -27,10 +27,10 @@ public final class AnonymousDisposable : DisposeBase, Cancelable { return _disposed == 1 } } - + /** Constructs a new disposable with the given action used for disposal. - + - parameter disposeAction: Disposal action which will be run upon calling `dispose`. */ public init(_ disposeAction: DisposeAction) { @@ -40,11 +40,11 @@ public final class AnonymousDisposable : DisposeBase, Cancelable { /** Calls the disposal action if and only if the current instance hasn't been disposed yet. - + After invoking disposal action, disposal action will be dereferenced. */ public func dispose() { - if OSAtomicCompareAndSwap32(0, 1, &_disposed) { + if AtomicCompareAndSwap(0, 1, &_disposed) { assert(_disposed == 1) if let action = _disposeAction { @@ -53,4 +53,4 @@ public final class AnonymousDisposable : DisposeBase, Cancelable { } } } -} \ No newline at end of file +} diff --git a/RxSwift/Disposables/BinaryDisposable.swift b/RxSwift/Disposables/BinaryDisposable.swift index 35a4d941..c19fc0a3 100644 --- a/RxSwift/Disposables/BinaryDisposable.swift +++ b/RxSwift/Disposables/BinaryDisposable.swift @@ -12,13 +12,13 @@ import Foundation Represents two disposable resources that are disposed together. */ public final class BinaryDisposable : DisposeBase, Cancelable { - - private var _disposed: Int32 = 0 + + private var _disposed: AtomicInt = 0 // state private var _disposable1: Disposable? private var _disposable2: Disposable? - + /** - returns: Was resource disposed. */ @@ -27,10 +27,10 @@ public final class BinaryDisposable : DisposeBase, Cancelable { return _disposed > 0 } } - + /** Constructs new binary disposable from two disposables. - + - parameter disposable1: First disposable - parameter disposable2: Second disposable */ @@ -39,18 +39,18 @@ public final class BinaryDisposable : DisposeBase, Cancelable { _disposable2 = disposable2 super.init() } - + /** Calls the disposal action if and only if the current instance hasn't been disposed yet. - + After invoking disposal action, disposal action will be dereferenced. */ public func dispose() { - if OSAtomicCompareAndSwap32(0, 1, &_disposed) { + if AtomicCompareAndSwap(0, 1, &_disposed) { _disposable1?.dispose() _disposable2?.dispose() _disposable1 = nil _disposable2 = nil } } -} \ No newline at end of file +} diff --git a/RxSwift/Disposables/RefCountDisposable.swift b/RxSwift/Disposables/RefCountDisposable.swift index e40f3bc7..2bd13175 100644 --- a/RxSwift/Disposables/RefCountDisposable.swift +++ b/RxSwift/Disposables/RefCountDisposable.swift @@ -16,7 +16,7 @@ public class RefCountDisposable : DisposeBase, Cancelable { private var _disposable = nil as Disposable? private var _primaryDisposed = false private var _count = 0 - + /** - returns: Was resource disposed. */ @@ -26,7 +26,7 @@ public class RefCountDisposable : DisposeBase, Cancelable { return _disposable == nil } } - + /** Initializes a new instance of the `RefCountDisposable`. */ @@ -34,16 +34,16 @@ public class RefCountDisposable : DisposeBase, Cancelable { _disposable = disposable super.init() } - + /** Holds a dependent disposable that when disposed decreases the refcount on the underlying disposable. - + When getter is called, a dependent disposable contributing to the reference count that manages the underlying disposable's lifetime is returned. */ public func retain() -> Disposable { return _lock.calculateLocked { if let _ = _disposable { - + do { try incrementChecked(&_count) } catch (_) { @@ -56,7 +56,7 @@ public class RefCountDisposable : DisposeBase, Cancelable { } } } - + /** Disposes the underlying disposable only when all dependent disposables have been disposed. */ @@ -65,22 +65,22 @@ public class RefCountDisposable : DisposeBase, Cancelable { if let oldDisposable = _disposable where !_primaryDisposed { _primaryDisposed = true - + if (_count == 0) { _disposable = nil return oldDisposable } } - + return nil } - + if let disposable = oldDisposable { disposable.dispose() } } - + private func release() { let oldDisposable: Disposable? = _lock.calculateLocked { if let oldDisposable = _disposable { @@ -89,20 +89,20 @@ public class RefCountDisposable : DisposeBase, Cancelable { } catch (_) { rxFatalError("RefCountDisposable decrement on release failed") } - + guard _count >= 0 else { rxFatalError("RefCountDisposable counter is lower than 0") } - + if _primaryDisposed && _count == 0 { _disposable = nil return oldDisposable } } - + return nil } - + if let disposable = oldDisposable { disposable.dispose() } @@ -112,18 +112,18 @@ public class RefCountDisposable : DisposeBase, Cancelable { internal final class RefCountInnerDisposable: DisposeBase, Disposable { private let _parent: RefCountDisposable - private var _disposed: Int32 = 0 - + private var _disposed: AtomicInt = 0 + init(_ parent: RefCountDisposable) { _parent = parent super.init() } - + internal func dispose() { - if OSAtomicCompareAndSwap32(0, 1, &_disposed) { + if AtomicCompareAndSwap(0, 1, &_disposed) { _parent.release() } } -} \ No newline at end of file +} diff --git a/RxSwift/Disposables/ScheduledDisposable.swift b/RxSwift/Disposables/ScheduledDisposable.swift index dea13edf..e6bb97ae 100644 --- a/RxSwift/Disposables/ScheduledDisposable.swift +++ b/RxSwift/Disposables/ScheduledDisposable.swift @@ -18,9 +18,9 @@ Represents a disposable resource whose disposal invocation will be scheduled on */ public class ScheduledDisposable : Cancelable { public let scheduler: ImmediateSchedulerType - - private var _disposed: Int32 = 0 - + + private var _disposed: AtomicInt = 0 + // state private var _disposable: Disposable? @@ -32,10 +32,10 @@ public class ScheduledDisposable : Cancelable { return _disposed == 1 } } - + /** Initializes a new instance of the `ScheduledDisposable` that uses a `scheduler` on which to dispose the `disposable`. - + - parameter scheduler: Scheduler where the disposable resource will be disposed on. - parameter disposable: Disposable resource to dispose on the given scheduler. */ @@ -43,18 +43,18 @@ public class ScheduledDisposable : Cancelable { self.scheduler = scheduler _disposable = disposable } - + /** Disposes the wrapped disposable on the provided scheduler. */ public func dispose() { scheduler.schedule(self, action: disposeScheduledDisposable) } - + func disposeInner() { - if OSAtomicCompareAndSwap32(0, 1, &_disposed) { + if AtomicCompareAndSwap(0, 1, &_disposed) { _disposable!.dispose() _disposable = nil } } -} \ No newline at end of file +} diff --git a/RxSwift/Observables/Implementations/AnonymousObservable.swift b/RxSwift/Observables/Implementations/AnonymousObservable.swift index 9ea13e16..b283ce90 100644 --- a/RxSwift/Observables/Implementations/AnonymousObservable.swift +++ b/RxSwift/Observables/Implementations/AnonymousObservable.swift @@ -7,19 +7,18 @@ // import Foundation -import Darwin class AnonymousObservableSink : Sink, ObserverType { typealias E = O.E typealias Parent = AnonymousObservable - + // state - private var _isStopped: Int32 = 0 + private var _isStopped: AtomicInt = 0 override init(observer: O) { super.init(observer: observer) } - + func on(event: Event) { switch event { case .Next: @@ -28,13 +27,13 @@ class AnonymousObservableSink : Sink, ObserverType { } forwardOn(event) case .Error, .Completed: - if OSAtomicCompareAndSwap32(0, 1, &_isStopped) { + if AtomicCompareAndSwap(0, 1, &_isStopped) { forwardOn(event) dispose() } } } - + func run(parent: Parent) -> Disposable { return parent._subscribeHandler(AnyObserver(self)) } @@ -48,7 +47,7 @@ class AnonymousObservable : Producer { init(_ subscribeHandler: SubscribeHandler) { _subscribeHandler = subscribeHandler } - + override func run(observer: O) -> Disposable { let sink = AnonymousObservableSink(observer: observer) sink.disposable = sink.run(self) diff --git a/RxSwift/Observers/ObserverBase.swift b/RxSwift/Observers/ObserverBase.swift index 38ce0032..749f707c 100644 --- a/RxSwift/Observers/ObserverBase.swift +++ b/RxSwift/Observers/ObserverBase.swift @@ -10,9 +10,9 @@ import Foundation class ObserverBase : Disposable, ObserverType { typealias E = ElementType - - private var _isStopped: Int32 = 0 - + + private var _isStopped: AtomicInt = 0 + func on(event: Event) { switch event { case .Next: @@ -20,20 +20,20 @@ class ObserverBase : Disposable, ObserverType { onCore(event) } case .Error, .Completed: - - if !OSAtomicCompareAndSwap32(0, 1, &_isStopped) { + + if !AtomicCompareAndSwap(0, 1, &_isStopped) { return } - + onCore(event) } } - + func onCore(event: Event) { abstractMethod() } - + func dispose() { _isStopped = 1 } -} \ No newline at end of file +} diff --git a/RxSwift/Platform/Darwin.Platform.swift b/RxSwift/Platform/Darwin.Platform.swift new file mode 100644 index 00000000..c81c1471 --- /dev/null +++ b/RxSwift/Platform/Darwin.Platform.swift @@ -0,0 +1,31 @@ +#if os(OSX) || os(iOS) || os(tvOS) || os(watchOS) +import Darwin + +let AtomicInt = Int32 + +let AtomicCompareAndSwap = OSAtomicCompareAndSwap32 +let AtomicIncrement = OSAtomicIncrement32 +let AtomicDecrement = OSAtomicDecrement32 + +extension NSThread { + static func setThreadLocalStorageValue(value: T?, forKey key: AnyObject) { + let currentThread = NSThread.currentThread() + var threadDictionary = currentThread.threadDictionary + + if let newValue = value { + threadDictionary[key] = newValue + } + else { + threadDictionary.removeObjectForKey(key) + } + + } + static func getThreadLocalStorageValueForKey(key: String) -> T? { + let currentThread = NSThread.currentThread() + let threadDictionary = currentThread.threadDictionary + + return threadDictionary[key] as? T + } +} + +#endif diff --git a/RxSwift/Platform/Linux.Platform.swift b/RxSwift/Platform/Linux.Platform.swift new file mode 100644 index 00000000..072374bd --- /dev/null +++ b/RxSwift/Platform/Linux.Platform.swift @@ -0,0 +1,200 @@ +#if os(Linux) +//////////////////////////////////////////////////////////////////////////////// +// This is not the greatest API in the world, this is just a tribute. +// !!! Proof of concept until libdispatch becomes operational. !!! +//////////////////////////////////////////////////////////////////////////////// + +import Foundation +import XCTest +import Glibc +import SwiftShims + +// MARK: CoreFoundation run loop mock + +public typealias CFRunLoopRef = Int +public let kCFRunLoopDefaultMode = "CFRunLoopDefaultMode" + +typealias Action = () -> () + +var queue = Queue(capacity: 100) + +var runLoopCounter = 0 + +extension NSThread { + public var isMainThread: Bool { + return true + } +} + +public func CFRunLoopWakeUp(runLoop: CFRunLoopRef) { +} + +public func CFRunLoopStop(runLoop: CFRunLoopRef) { + runLoopCounter -= 1 +} + +public func CFRunLoopPerformBlock(runLoop: CFRunLoopRef, _ mode: String, _ action: () -> ()) { + queue.enqueue(action) +} + +public func CFRunLoopRun() { + let currentValueOfCounter = runLoopCounter + 1 + while let front = queue.tryDequeue() { + front() + if runLoopCounter < currentValueOfCounter - 1 { + fatalError("called stop twice") + } + + if runLoopCounter == currentValueOfCounter - 1 { + break + } + } +} + +public func CFRunLoopGetCurrent() -> CFRunLoopRef { + return 0 +} + +// MARK: Atomic, just something that works for single thread case + +typealias AtomicInt = Int64 + +func AtomicIncrement(increment: UnsafeMutablePointer) -> AtomicInt { + increment.memory = increment.memory + 1 + return increment.memory +} + +func AtomicDecrement(increment: UnsafeMutablePointer) -> AtomicInt { + increment.memory = increment.memory - 1 + return increment.memory +} + +func AtomicCompareAndSwap(l: AtomicInt, _ r: AtomicInt, _ target: UnsafeMutablePointer) -> Bool { + //return __sync_val_compare_and_swap(target, l, r) + if target.memory == l { + target.memory = r + return true + } + + return false +} + +extension NSThread { + static func setThreadLocalStorageValue(value: T?, forKey key: String) { + let currentThread = NSThread.currentThread() + var threadDictionary = currentThread.threadDictionary + + if let newValue = value { + threadDictionary[key] = newValue + } + else { + threadDictionary[key] = nil + } + + currentThread.threadDictionary = threadDictionary + } + static func getThreadLocalStorageValueForKey(key: String) -> T? { + let currentThread = NSThread.currentThread() + let threadDictionary = currentThread.threadDictionary + + return threadDictionary[key] as? T + } +} + +// + +// MARK: objc mock + +public func objc_sync_enter(self: AnyObject) { +} + +public func objc_sync_exit(self: AnyObject) { +} + + +// MARK: libdispatch + +public typealias dispatch_time_t = Int +public typealias dispatch_source_t = Int +public typealias dispatch_source_type_t = Int +public typealias dispatch_queue_t = Int +public typealias dispatch_object_t = Int +public typealias dispatch_block_t = () -> () +public typealias dispatch_queue_attr_t = Int + +public let DISPATCH_QUEUE_SERIAL = 0 + +public let DISPATCH_QUEUE_PRIORITY_HIGH = 1 +public let DISPATCH_QUEUE_PRIORITY_DEFAULT = 2 +public let DISPATCH_QUEUE_PRIORITY_LOW = 3 + +public let DISPATCH_SOURCE_TYPE_TIMER = 0 +public let DISPATCH_TIME_FOREVER = 1 as UInt64 +public let NSEC_PER_SEC = 1 + +public let DISPATCH_TIME_NOW = -1 + +public func dispatch_time(when: dispatch_time_t, _ delta: Int64) -> dispatch_time_t { + return when + Int(delta) +} + +public func dispatch_queue_create(label: UnsafePointer, _ attr: dispatch_queue_attr_t!) -> dispatch_queue_t! { + return 0 +} + +public func dispatch_set_target_queue(object: dispatch_object_t!, _ queue: dispatch_queue_t!) { +} + +public func dispatch_async(queue2: dispatch_queue_t, _ block: dispatch_block_t) { + queue.enqueue(block) +} + +public func dispatch_source_create(type: dispatch_source_type_t, _ handle: UInt, _ mask: UInt, _ queue: dispatch_queue_t!) -> dispatch_source_t! { + return 0 +} + +public func dispatch_source_set_timer(source: dispatch_source_t, _ start: dispatch_time_t, _ interval: UInt64, _ leeway: UInt64) { + +} + +public func dispatch_source_set_event_handler(source: dispatch_source_t, _ handler: dispatch_block_t!) { + queue.enqueue(handler) +} + +public func dispatch_resume(object: dispatch_object_t) { +} + +public func dispatch_source_cancel(source: dispatch_source_t) { +} + +public func dispatch_get_global_queue(identifier: Int, _ flags: UInt) -> dispatch_queue_t! { + return 0 +} + +public func dispatch_get_main_queue() -> dispatch_queue_t! { + return 0 +} + +// MARK: XCTest + +public class Expectation { + public func fulfill() { + } +} + +extension XCTestCase { + public func setUp() { + } + + public func tearDown() { + } + + public func expectationWithDescription(description: String) -> Expectation { + return Expectation() + } + + public func waitForExpectationsWithTimeout(time: NSTimeInterval, action: ErrorType? -> Void) { + } +} + +#endif diff --git a/RxSwift/Schedulers/CurrentThreadScheduler.swift b/RxSwift/Schedulers/CurrentThreadScheduler.swift index e2d5b2b4..caa52ede 100644 --- a/RxSwift/Schedulers/CurrentThreadScheduler.swift +++ b/RxSwift/Schedulers/CurrentThreadScheduler.swift @@ -8,40 +8,45 @@ import Foundation -let CurrentThreadSchedulerKeyInstance = CurrentThreadSchedulerKey() -let CurrentThreadSchedulerQueueKeyInstance = CurrentThreadSchedulerQueueKey() +#if os(Linux) + let CurrentThreadSchedulerKeyInstance = "RxSwift.CurrentThreadScheduler.SchedulerKey" + let CurrentThreadSchedulerQueueKeyInstance = "RxSwift.CurrentThreadScheduler.Queue" +#else + let CurrentThreadSchedulerKeyInstance = CurrentThreadSchedulerKey() + let CurrentThreadSchedulerQueueKeyInstance = CurrentThreadSchedulerQueueKey() -class CurrentThreadSchedulerKey : NSObject, NSCopying { - override func isEqual(object: AnyObject?) -> Bool { - return object === CurrentThreadSchedulerKeyInstance - } - - override var hashValue: Int { return -904739208 } - - override func copy() -> AnyObject { - return CurrentThreadSchedulerKeyInstance - } - - func copyWithZone(zone: NSZone) -> AnyObject { - return CurrentThreadSchedulerKeyInstance - } -} + class CurrentThreadSchedulerKey : NSObject, NSCopying { + override func isEqual(object: AnyObject?) -> Bool { + return object === CurrentThreadSchedulerKeyInstance + } -class CurrentThreadSchedulerQueueKey : NSObject, NSCopying { - override func isEqual(object: AnyObject?) -> Bool { - return object === CurrentThreadSchedulerQueueKeyInstance - } - - override var hashValue: Int { return -904739207 } - - override func copy() -> AnyObject { - return CurrentThreadSchedulerQueueKeyInstance - } - - func copyWithZone(zone: NSZone) -> AnyObject { - return CurrentThreadSchedulerQueueKeyInstance - } -} + override var hash: Int { return -904739208 } + + override func copy() -> AnyObject { + return CurrentThreadSchedulerKeyInstance + } + + func copyWithZone(zone: NSZone) -> AnyObject { + return CurrentThreadSchedulerKeyInstance + } + } + + class CurrentThreadSchedulerQueueKey : NSObject, NSCopying { + override func isEqual(object: AnyObject?) -> Bool { + return object === CurrentThreadSchedulerQueueKeyInstance + } + + override var hash: Int { return -904739207 } + + override func copy() -> AnyObject { + return CurrentThreadSchedulerQueueKeyInstance + } + + func copyWithZone(zone: NSZone) -> AnyObject { + return CurrentThreadSchedulerQueueKeyInstance + } + } +#endif /** Represents an object that schedules units of work on the current thread. @@ -52,50 +57,40 @@ This scheduler is also sometimes called `trampoline scheduler`. */ public class CurrentThreadScheduler : ImmediateSchedulerType { typealias ScheduleQueue = RxMutableBox> - + /** The singleton instance of the current thread scheduler. */ public static let instance = CurrentThreadScheduler() - + static var queue : ScheduleQueue? { get { - return NSThread.currentThread().threadDictionary[CurrentThreadSchedulerQueueKeyInstance] as? ScheduleQueue + return NSThread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKeyInstance) } set { - let threadDictionary = NSThread.currentThread().threadDictionary - if let newValue = newValue { - threadDictionary[CurrentThreadSchedulerQueueKeyInstance] = newValue - } - else { - threadDictionary.removeObjectForKey(CurrentThreadSchedulerQueueKeyInstance) - } + NSThread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKeyInstance) } } - + /** Gets a value that indicates whether the caller must call a `schedule` method. */ public static private(set) var isScheduleRequired: Bool { get { - return NSThread.currentThread().threadDictionary[CurrentThreadSchedulerKeyInstance] == nil + let value = NSThread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerKeyInstance) as String? + return value == nil } - set(value) { - if value { - NSThread.currentThread().threadDictionary.removeObjectForKey(CurrentThreadSchedulerKeyInstance) - } - else { - NSThread.currentThread().threadDictionary[CurrentThreadSchedulerKeyInstance] = CurrentThreadSchedulerKeyInstance - } + set(isScheduleRequired) { + NSThread.setThreadLocalStorageValue(isScheduleRequired ? nil : CurrentThreadSchedulerKeyInstance, forKey: CurrentThreadSchedulerKeyInstance) } } - + /** Schedules an action to be executed as soon as possible on current thread. - + If this method is called on some thread that doesn't have `CurrentThreadScheduler` installed, scheduler will be automatically installed and uninstalled after all work is performed. - + - parameter state: State passed to the action to be executed. - parameter action: Action to be executed. - returns: The disposable object used to cancel the scheduled action (best effort). @@ -140,4 +135,4 @@ public class CurrentThreadScheduler : ImmediateSchedulerType { queue.value.enqueue(scheduledItem) return scheduledItem } -} \ No newline at end of file +} diff --git a/RxSwift/Schedulers/MainScheduler.swift b/RxSwift/Schedulers/MainScheduler.swift index ce88153c..70350475 100644 --- a/RxSwift/Schedulers/MainScheduler.swift +++ b/RxSwift/Schedulers/MainScheduler.swift @@ -22,7 +22,7 @@ public final class MainScheduler : SerialDispatchQueueScheduler { private let _mainQueue: dispatch_queue_t - var numberEnqueued: Int32 = 0 + var numberEnqueued: AtomicInt = 0 private init() { _mainQueue = dispatch_get_main_queue() @@ -42,13 +42,13 @@ public final class MainScheduler : SerialDispatchQueueScheduler { rxFatalError("Executing on backgound thread. Please use `MainScheduler.sharedInstance.schedule` to schedule work on main thread.") } } - + override func scheduleInternal(state: StateType, action: StateType -> Disposable) -> Disposable { - let currentNumberEnqueued = OSAtomicIncrement32(&numberEnqueued) + let currentNumberEnqueued = AtomicIncrement(&numberEnqueued) if NSThread.currentThread().isMainThread && currentNumberEnqueued == 1 { let disposable = action(state) - OSAtomicDecrement32(&numberEnqueued) + AtomicDecrement(&numberEnqueued) return disposable } @@ -59,10 +59,9 @@ public final class MainScheduler : SerialDispatchQueueScheduler { action(state) } - OSAtomicDecrement32(&self.numberEnqueued) + AtomicDecrement(&self.numberEnqueued) } return cancel } } - diff --git a/RxSwift/Schedulers/SchedulerServices+Emulation.swift b/RxSwift/Schedulers/SchedulerServices+Emulation.swift index ee91188f..7601a69e 100644 --- a/RxSwift/Schedulers/SchedulerServices+Emulation.swift +++ b/RxSwift/Schedulers/SchedulerServices+Emulation.swift @@ -16,15 +16,15 @@ enum SchedulePeriodicRecursiveCommand { class SchedulePeriodicRecursive { typealias RecursiveAction = State -> State typealias RecursiveScheduler = AnyRecursiveScheduler - + private let _scheduler: SchedulerType private let _startAfter: RxTimeInterval private let _period: RxTimeInterval private let _action: RecursiveAction - + private var _state: State - private var _pendingTickCount: Int32 = 0 - + private var _pendingTickCount: AtomicInt = 0 + init(scheduler: SchedulerType, startAfter: RxTimeInterval, period: RxTimeInterval, action: RecursiveAction, state: State) { _scheduler = scheduler _startAfter = startAfter @@ -32,11 +32,11 @@ class SchedulePeriodicRecursive { _action = action _state = state } - + func start() -> Disposable { return _scheduler.scheduleRecursive(SchedulePeriodicRecursiveCommand.Tick, dueTime: _startAfter, action: self.tick) } - + func tick(command: SchedulePeriodicRecursiveCommand, scheduler: RecursiveScheduler) -> Void { // Tries to emulate periodic scheduling as best as possible. // The problem that could arise is if handling periodic ticks take too long, or @@ -44,17 +44,17 @@ class SchedulePeriodicRecursive { switch command { case .Tick: scheduler.schedule(.Tick, dueTime: _period) - + // The idea is that if on tick there wasn't any item enqueued, schedule to perform work immediatelly. // Else work will be scheduled after previous enqueued work completes. - if OSAtomicIncrement32(&_pendingTickCount) == 1 { + if AtomicIncrement(&_pendingTickCount) == 1 { self.tick(.DispatchStart, scheduler: scheduler) } - + case .DispatchStart: _state = _action(_state) // Start work and schedule check is this last batch of work - if OSAtomicDecrement32(&_pendingTickCount) > 0 { + if AtomicDecrement(&_pendingTickCount) > 0 { // This gives priority to scheduler emulation, it's not perfect, but helps scheduler.schedule(SchedulePeriodicRecursiveCommand.DispatchStart) } diff --git a/Sources/RxSwift/Darwin.Platform.swift b/Sources/RxSwift/Darwin.Platform.swift new file mode 120000 index 00000000..e902a8ef --- /dev/null +++ b/Sources/RxSwift/Darwin.Platform.swift @@ -0,0 +1 @@ +../../RxSwift/Platform/Darwin.Platform.swift \ No newline at end of file diff --git a/Sources/RxSwift/Linux.Platform.swift b/Sources/RxSwift/Linux.Platform.swift new file mode 120000 index 00000000..f7df219c --- /dev/null +++ b/Sources/RxSwift/Linux.Platform.swift @@ -0,0 +1 @@ +../../RxSwift/Platform/Linux.Platform.swift \ No newline at end of file