RxSwift compiles first time on Linux.

This commit is contained in:
Krunoslav Zaher 2015-12-25 19:29:39 -08:00
parent 4651df2317
commit a982be0942
16 changed files with 450 additions and 174 deletions

2
.gitignore vendored
View File

@ -49,4 +49,4 @@ Carthage/Build
# Swift Package Manager
.build/
Packages/

View File

@ -30,7 +30,6 @@ let package = Package(
dependencies: [
.Target(name: "RxSwift"),
.Target(name: "RxBlocking"),
.Target(name: "RxCocoa"),
.Target(name: "RxTests")
]
)

View File

@ -13,46 +13,97 @@ protocol Lock {
func unlock()
}
/**
Simple wrapper for spin lock.
*/
struct SpinLock {
private var _lock = OS_SPINLOCK_INIT
init() {
}
#if os(Linux)
import Glibc
mutating func lock() {
OSSpinLockLock(&_lock)
}
/**
Simple wrapper for spin lock.
*/
class SpinLock {
private var _lock: pthread_spinlock_t = 0
mutating func unlock() {
OSSpinLockUnlock(&_lock)
}
mutating func performLocked(@noescape action: () -> Void) {
OSSpinLockLock(&_lock)
action()
OSSpinLockUnlock(&_lock)
}
mutating func calculateLocked<T>(@noescape action: () -> T) -> T {
OSSpinLockLock(&_lock)
let result = action()
OSSpinLockUnlock(&_lock)
return result
}
init() {
if (pthread_spin_init(&_lock, 0) != 0) {
fatalError("Spin lock initialization failed")
}
}
mutating func calculateLockedOrFail<T>(@noescape action: () throws -> T) throws -> T {
OSSpinLockLock(&_lock)
defer {
OSSpinLockUnlock(&_lock)
}
let result = try action()
return result
}
}
func lock() {
pthread_spin_lock(&_lock)
}
func unlock() {
pthread_spin_unlock(&_lock)
}
func performLocked(@noescape action: () -> Void) {
pthread_spin_lock(&_lock)
action()
pthread_spin_unlock(&_lock)
}
func calculateLocked<T>(@noescape action: () -> T) -> T {
pthread_spin_lock(&_lock)
let result = action()
pthread_spin_unlock(&_lock)
return result
}
func calculateLockedOrFail<T>(@noescape action: () throws -> T) throws -> T {
pthread_spin_lock(&_lock)
defer {
pthread_spin_unlock(&_lock)
}
let result = try action()
return result
}
deinit {
pthread_spin_destroy(&_lock)
}
}
#else
/**
Simple wrapper for spin lock.
*/
struct SpinLock {
private var _lock = OS_SPINLOCK_INIT
init() {
}
mutating func lock() {
OSSpinLockLock(&_lock)
}
mutating func unlock() {
OSSpinLockUnlock(&_lock)
}
mutating func performLocked(@noescape action: () -> Void) {
OSSpinLockLock(&_lock)
action()
OSSpinLockUnlock(&_lock)
}
mutating func calculateLocked<T>(@noescape action: () -> T) -> T {
OSSpinLockLock(&_lock)
let result = action()
OSSpinLockUnlock(&_lock)
return result
}
mutating func calculateLockedOrFail<T>(@noescape action: () throws -> T) throws -> T {
OSSpinLockLock(&_lock)
defer {
OSSpinLockUnlock(&_lock)
}
let result = try action()
return result
}
}
#endif
extension NSRecursiveLock : Lock {
func performLocked(@noescape action: () -> Void) {
@ -60,14 +111,14 @@ extension NSRecursiveLock : Lock {
action()
self.unlock()
}
func calculateLocked<T>(@noescape action: () -> T) -> T {
self.lock()
let result = action()
self.unlock()
return result
}
func calculateLockedOrFail<T>(@noescape action: () throws -> T) throws -> T {
self.lock()
defer {
@ -102,4 +153,4 @@ extension pthread_mutex_t {
pthread_mutex_unlock(&self)
}
}
*/
*/

View File

@ -15,10 +15,10 @@ When dispose method is called, disposal action will be dereferenced.
*/
public final class AnonymousDisposable : DisposeBase, Cancelable {
public typealias DisposeAction = () -> Void
private var _disposed: Int32 = 0
private var _disposed: AtomicInt = 0
private var _disposeAction: DisposeAction?
/**
- returns: Was resource disposed.
*/
@ -27,10 +27,10 @@ public final class AnonymousDisposable : DisposeBase, Cancelable {
return _disposed == 1
}
}
/**
Constructs a new disposable with the given action used for disposal.
- parameter disposeAction: Disposal action which will be run upon calling `dispose`.
*/
public init(_ disposeAction: DisposeAction) {
@ -40,11 +40,11 @@ public final class AnonymousDisposable : DisposeBase, Cancelable {
/**
Calls the disposal action if and only if the current instance hasn't been disposed yet.
After invoking disposal action, disposal action will be dereferenced.
*/
public func dispose() {
if OSAtomicCompareAndSwap32(0, 1, &_disposed) {
if AtomicCompareAndSwap(0, 1, &_disposed) {
assert(_disposed == 1)
if let action = _disposeAction {
@ -53,4 +53,4 @@ public final class AnonymousDisposable : DisposeBase, Cancelable {
}
}
}
}
}

View File

@ -12,13 +12,13 @@ import Foundation
Represents two disposable resources that are disposed together.
*/
public final class BinaryDisposable : DisposeBase, Cancelable {
private var _disposed: Int32 = 0
private var _disposed: AtomicInt = 0
// state
private var _disposable1: Disposable?
private var _disposable2: Disposable?
/**
- returns: Was resource disposed.
*/
@ -27,10 +27,10 @@ public final class BinaryDisposable : DisposeBase, Cancelable {
return _disposed > 0
}
}
/**
Constructs new binary disposable from two disposables.
- parameter disposable1: First disposable
- parameter disposable2: Second disposable
*/
@ -39,18 +39,18 @@ public final class BinaryDisposable : DisposeBase, Cancelable {
_disposable2 = disposable2
super.init()
}
/**
Calls the disposal action if and only if the current instance hasn't been disposed yet.
After invoking disposal action, disposal action will be dereferenced.
*/
public func dispose() {
if OSAtomicCompareAndSwap32(0, 1, &_disposed) {
if AtomicCompareAndSwap(0, 1, &_disposed) {
_disposable1?.dispose()
_disposable2?.dispose()
_disposable1 = nil
_disposable2 = nil
}
}
}
}

View File

@ -16,7 +16,7 @@ public class RefCountDisposable : DisposeBase, Cancelable {
private var _disposable = nil as Disposable?
private var _primaryDisposed = false
private var _count = 0
/**
- returns: Was resource disposed.
*/
@ -26,7 +26,7 @@ public class RefCountDisposable : DisposeBase, Cancelable {
return _disposable == nil
}
}
/**
Initializes a new instance of the `RefCountDisposable`.
*/
@ -34,16 +34,16 @@ public class RefCountDisposable : DisposeBase, Cancelable {
_disposable = disposable
super.init()
}
/**
Holds a dependent disposable that when disposed decreases the refcount on the underlying disposable.
When getter is called, a dependent disposable contributing to the reference count that manages the underlying disposable's lifetime is returned.
*/
public func retain() -> Disposable {
return _lock.calculateLocked {
if let _ = _disposable {
do {
try incrementChecked(&_count)
} catch (_) {
@ -56,7 +56,7 @@ public class RefCountDisposable : DisposeBase, Cancelable {
}
}
}
/**
Disposes the underlying disposable only when all dependent disposables have been disposed.
*/
@ -65,22 +65,22 @@ public class RefCountDisposable : DisposeBase, Cancelable {
if let oldDisposable = _disposable where !_primaryDisposed
{
_primaryDisposed = true
if (_count == 0)
{
_disposable = nil
return oldDisposable
}
}
return nil
}
if let disposable = oldDisposable {
disposable.dispose()
}
}
private func release() {
let oldDisposable: Disposable? = _lock.calculateLocked {
if let oldDisposable = _disposable {
@ -89,20 +89,20 @@ public class RefCountDisposable : DisposeBase, Cancelable {
} catch (_) {
rxFatalError("RefCountDisposable decrement on release failed")
}
guard _count >= 0 else {
rxFatalError("RefCountDisposable counter is lower than 0")
}
if _primaryDisposed && _count == 0 {
_disposable = nil
return oldDisposable
}
}
return nil
}
if let disposable = oldDisposable {
disposable.dispose()
}
@ -112,18 +112,18 @@ public class RefCountDisposable : DisposeBase, Cancelable {
internal final class RefCountInnerDisposable: DisposeBase, Disposable
{
private let _parent: RefCountDisposable
private var _disposed: Int32 = 0
private var _disposed: AtomicInt = 0
init(_ parent: RefCountDisposable)
{
_parent = parent
super.init()
}
internal func dispose()
{
if OSAtomicCompareAndSwap32(0, 1, &_disposed) {
if AtomicCompareAndSwap(0, 1, &_disposed) {
_parent.release()
}
}
}
}

View File

@ -18,9 +18,9 @@ Represents a disposable resource whose disposal invocation will be scheduled on
*/
public class ScheduledDisposable : Cancelable {
public let scheduler: ImmediateSchedulerType
private var _disposed: Int32 = 0
private var _disposed: AtomicInt = 0
// state
private var _disposable: Disposable?
@ -32,10 +32,10 @@ public class ScheduledDisposable : Cancelable {
return _disposed == 1
}
}
/**
Initializes a new instance of the `ScheduledDisposable` that uses a `scheduler` on which to dispose the `disposable`.
- parameter scheduler: Scheduler where the disposable resource will be disposed on.
- parameter disposable: Disposable resource to dispose on the given scheduler.
*/
@ -43,18 +43,18 @@ public class ScheduledDisposable : Cancelable {
self.scheduler = scheduler
_disposable = disposable
}
/**
Disposes the wrapped disposable on the provided scheduler.
*/
public func dispose() {
scheduler.schedule(self, action: disposeScheduledDisposable)
}
func disposeInner() {
if OSAtomicCompareAndSwap32(0, 1, &_disposed) {
if AtomicCompareAndSwap(0, 1, &_disposed) {
_disposable!.dispose()
_disposable = nil
}
}
}
}

View File

@ -7,19 +7,18 @@
//
import Foundation
import Darwin
class AnonymousObservableSink<O: ObserverType> : Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// state
private var _isStopped: Int32 = 0
private var _isStopped: AtomicInt = 0
override init(observer: O) {
super.init(observer: observer)
}
func on(event: Event<E>) {
switch event {
case .Next:
@ -28,13 +27,13 @@ class AnonymousObservableSink<O: ObserverType> : Sink<O>, ObserverType {
}
forwardOn(event)
case .Error, .Completed:
if OSAtomicCompareAndSwap32(0, 1, &_isStopped) {
if AtomicCompareAndSwap(0, 1, &_isStopped) {
forwardOn(event)
dispose()
}
}
}
func run(parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
@ -48,7 +47,7 @@ class AnonymousObservable<Element> : Producer<Element> {
init(_ subscribeHandler: SubscribeHandler) {
_subscribeHandler = subscribeHandler
}
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
let sink = AnonymousObservableSink(observer: observer)
sink.disposable = sink.run(self)

View File

@ -10,9 +10,9 @@ import Foundation
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
private var _isStopped: Int32 = 0
private var _isStopped: AtomicInt = 0
func on(event: Event<E>) {
switch event {
case .Next:
@ -20,20 +20,20 @@ class ObserverBase<ElementType> : Disposable, ObserverType {
onCore(event)
}
case .Error, .Completed:
if !OSAtomicCompareAndSwap32(0, 1, &_isStopped) {
if !AtomicCompareAndSwap(0, 1, &_isStopped) {
return
}
onCore(event)
}
}
func onCore(event: Event<E>) {
abstractMethod()
}
func dispose() {
_isStopped = 1
}
}
}

View File

@ -0,0 +1,31 @@
#if os(OSX) || os(iOS) || os(tvOS) || os(watchOS)
import Darwin
let AtomicInt = Int32
let AtomicCompareAndSwap = OSAtomicCompareAndSwap32
let AtomicIncrement = OSAtomicIncrement32
let AtomicDecrement = OSAtomicDecrement32
extension NSThread {
static func setThreadLocalStorageValue<T: AnyObject>(value: T?, forKey key: AnyObject) {
let currentThread = NSThread.currentThread()
var threadDictionary = currentThread.threadDictionary
if let newValue = value {
threadDictionary[key] = newValue
}
else {
threadDictionary.removeObjectForKey(key)
}
}
static func getThreadLocalStorageValueForKey<T>(key: String) -> T? {
let currentThread = NSThread.currentThread()
let threadDictionary = currentThread.threadDictionary
return threadDictionary[key] as? T
}
}
#endif

View File

@ -0,0 +1,200 @@
#if os(Linux)
////////////////////////////////////////////////////////////////////////////////
// This is not the greatest API in the world, this is just a tribute.
// !!! Proof of concept until libdispatch becomes operational. !!!
////////////////////////////////////////////////////////////////////////////////
import Foundation
import XCTest
import Glibc
import SwiftShims
// MARK: CoreFoundation run loop mock
public typealias CFRunLoopRef = Int
public let kCFRunLoopDefaultMode = "CFRunLoopDefaultMode"
typealias Action = () -> ()
var queue = Queue<Action>(capacity: 100)
var runLoopCounter = 0
extension NSThread {
public var isMainThread: Bool {
return true
}
}
public func CFRunLoopWakeUp(runLoop: CFRunLoopRef) {
}
public func CFRunLoopStop(runLoop: CFRunLoopRef) {
runLoopCounter -= 1
}
public func CFRunLoopPerformBlock(runLoop: CFRunLoopRef, _ mode: String, _ action: () -> ()) {
queue.enqueue(action)
}
public func CFRunLoopRun() {
let currentValueOfCounter = runLoopCounter + 1
while let front = queue.tryDequeue() {
front()
if runLoopCounter < currentValueOfCounter - 1 {
fatalError("called stop twice")
}
if runLoopCounter == currentValueOfCounter - 1 {
break
}
}
}
public func CFRunLoopGetCurrent() -> CFRunLoopRef {
return 0
}
// MARK: Atomic, just something that works for single thread case
typealias AtomicInt = Int64
func AtomicIncrement(increment: UnsafeMutablePointer<AtomicInt>) -> AtomicInt {
increment.memory = increment.memory + 1
return increment.memory
}
func AtomicDecrement(increment: UnsafeMutablePointer<AtomicInt>) -> AtomicInt {
increment.memory = increment.memory - 1
return increment.memory
}
func AtomicCompareAndSwap(l: AtomicInt, _ r: AtomicInt, _ target: UnsafeMutablePointer<AtomicInt>) -> Bool {
//return __sync_val_compare_and_swap(target, l, r)
if target.memory == l {
target.memory = r
return true
}
return false
}
extension NSThread {
static func setThreadLocalStorageValue<T: AnyObject>(value: T?, forKey key: String) {
let currentThread = NSThread.currentThread()
var threadDictionary = currentThread.threadDictionary
if let newValue = value {
threadDictionary[key] = newValue
}
else {
threadDictionary[key] = nil
}
currentThread.threadDictionary = threadDictionary
}
static func getThreadLocalStorageValueForKey<T: AnyObject>(key: String) -> T? {
let currentThread = NSThread.currentThread()
let threadDictionary = currentThread.threadDictionary
return threadDictionary[key] as? T
}
}
//
// MARK: objc mock
public func objc_sync_enter(self: AnyObject) {
}
public func objc_sync_exit(self: AnyObject) {
}
// MARK: libdispatch
public typealias dispatch_time_t = Int
public typealias dispatch_source_t = Int
public typealias dispatch_source_type_t = Int
public typealias dispatch_queue_t = Int
public typealias dispatch_object_t = Int
public typealias dispatch_block_t = () -> ()
public typealias dispatch_queue_attr_t = Int
public let DISPATCH_QUEUE_SERIAL = 0
public let DISPATCH_QUEUE_PRIORITY_HIGH = 1
public let DISPATCH_QUEUE_PRIORITY_DEFAULT = 2
public let DISPATCH_QUEUE_PRIORITY_LOW = 3
public let DISPATCH_SOURCE_TYPE_TIMER = 0
public let DISPATCH_TIME_FOREVER = 1 as UInt64
public let NSEC_PER_SEC = 1
public let DISPATCH_TIME_NOW = -1
public func dispatch_time(when: dispatch_time_t, _ delta: Int64) -> dispatch_time_t {
return when + Int(delta)
}
public func dispatch_queue_create(label: UnsafePointer<Int8>, _ attr: dispatch_queue_attr_t!) -> dispatch_queue_t! {
return 0
}
public func dispatch_set_target_queue(object: dispatch_object_t!, _ queue: dispatch_queue_t!) {
}
public func dispatch_async(queue2: dispatch_queue_t, _ block: dispatch_block_t) {
queue.enqueue(block)
}
public func dispatch_source_create(type: dispatch_source_type_t, _ handle: UInt, _ mask: UInt, _ queue: dispatch_queue_t!) -> dispatch_source_t! {
return 0
}
public func dispatch_source_set_timer(source: dispatch_source_t, _ start: dispatch_time_t, _ interval: UInt64, _ leeway: UInt64) {
}
public func dispatch_source_set_event_handler(source: dispatch_source_t, _ handler: dispatch_block_t!) {
queue.enqueue(handler)
}
public func dispatch_resume(object: dispatch_object_t) {
}
public func dispatch_source_cancel(source: dispatch_source_t) {
}
public func dispatch_get_global_queue(identifier: Int, _ flags: UInt) -> dispatch_queue_t! {
return 0
}
public func dispatch_get_main_queue() -> dispatch_queue_t! {
return 0
}
// MARK: XCTest
public class Expectation {
public func fulfill() {
}
}
extension XCTestCase {
public func setUp() {
}
public func tearDown() {
}
public func expectationWithDescription(description: String) -> Expectation {
return Expectation()
}
public func waitForExpectationsWithTimeout(time: NSTimeInterval, action: ErrorType? -> Void) {
}
}
#endif

View File

@ -8,40 +8,45 @@
import Foundation
let CurrentThreadSchedulerKeyInstance = CurrentThreadSchedulerKey()
let CurrentThreadSchedulerQueueKeyInstance = CurrentThreadSchedulerQueueKey()
#if os(Linux)
let CurrentThreadSchedulerKeyInstance = "RxSwift.CurrentThreadScheduler.SchedulerKey"
let CurrentThreadSchedulerQueueKeyInstance = "RxSwift.CurrentThreadScheduler.Queue"
#else
let CurrentThreadSchedulerKeyInstance = CurrentThreadSchedulerKey()
let CurrentThreadSchedulerQueueKeyInstance = CurrentThreadSchedulerQueueKey()
class CurrentThreadSchedulerKey : NSObject, NSCopying {
override func isEqual(object: AnyObject?) -> Bool {
return object === CurrentThreadSchedulerKeyInstance
}
override var hashValue: Int { return -904739208 }
override func copy() -> AnyObject {
return CurrentThreadSchedulerKeyInstance
}
func copyWithZone(zone: NSZone) -> AnyObject {
return CurrentThreadSchedulerKeyInstance
}
}
class CurrentThreadSchedulerKey : NSObject, NSCopying {
override func isEqual(object: AnyObject?) -> Bool {
return object === CurrentThreadSchedulerKeyInstance
}
class CurrentThreadSchedulerQueueKey : NSObject, NSCopying {
override func isEqual(object: AnyObject?) -> Bool {
return object === CurrentThreadSchedulerQueueKeyInstance
}
override var hashValue: Int { return -904739207 }
override func copy() -> AnyObject {
return CurrentThreadSchedulerQueueKeyInstance
}
func copyWithZone(zone: NSZone) -> AnyObject {
return CurrentThreadSchedulerQueueKeyInstance
}
}
override var hash: Int { return -904739208 }
override func copy() -> AnyObject {
return CurrentThreadSchedulerKeyInstance
}
func copyWithZone(zone: NSZone) -> AnyObject {
return CurrentThreadSchedulerKeyInstance
}
}
class CurrentThreadSchedulerQueueKey : NSObject, NSCopying {
override func isEqual(object: AnyObject?) -> Bool {
return object === CurrentThreadSchedulerQueueKeyInstance
}
override var hash: Int { return -904739207 }
override func copy() -> AnyObject {
return CurrentThreadSchedulerQueueKeyInstance
}
func copyWithZone(zone: NSZone) -> AnyObject {
return CurrentThreadSchedulerQueueKeyInstance
}
}
#endif
/**
Represents an object that schedules units of work on the current thread.
@ -52,50 +57,40 @@ This scheduler is also sometimes called `trampoline scheduler`.
*/
public class CurrentThreadScheduler : ImmediateSchedulerType {
typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
/**
The singleton instance of the current thread scheduler.
*/
public static let instance = CurrentThreadScheduler()
static var queue : ScheduleQueue? {
get {
return NSThread.currentThread().threadDictionary[CurrentThreadSchedulerQueueKeyInstance] as? ScheduleQueue
return NSThread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKeyInstance)
}
set {
let threadDictionary = NSThread.currentThread().threadDictionary
if let newValue = newValue {
threadDictionary[CurrentThreadSchedulerQueueKeyInstance] = newValue
}
else {
threadDictionary.removeObjectForKey(CurrentThreadSchedulerQueueKeyInstance)
}
NSThread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKeyInstance)
}
}
/**
Gets a value that indicates whether the caller must call a `schedule` method.
*/
public static private(set) var isScheduleRequired: Bool {
get {
return NSThread.currentThread().threadDictionary[CurrentThreadSchedulerKeyInstance] == nil
let value = NSThread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerKeyInstance) as String?
return value == nil
}
set(value) {
if value {
NSThread.currentThread().threadDictionary.removeObjectForKey(CurrentThreadSchedulerKeyInstance)
}
else {
NSThread.currentThread().threadDictionary[CurrentThreadSchedulerKeyInstance] = CurrentThreadSchedulerKeyInstance
}
set(isScheduleRequired) {
NSThread.setThreadLocalStorageValue(isScheduleRequired ? nil : CurrentThreadSchedulerKeyInstance, forKey: CurrentThreadSchedulerKeyInstance)
}
}
/**
Schedules an action to be executed as soon as possible on current thread.
If this method is called on some thread that doesn't have `CurrentThreadScheduler` installed, scheduler will be
automatically installed and uninstalled after all work is performed.
- parameter state: State passed to the action to be executed.
- parameter action: Action to be executed.
- returns: The disposable object used to cancel the scheduled action (best effort).
@ -140,4 +135,4 @@ public class CurrentThreadScheduler : ImmediateSchedulerType {
queue.value.enqueue(scheduledItem)
return scheduledItem
}
}
}

View File

@ -22,7 +22,7 @@ public final class MainScheduler : SerialDispatchQueueScheduler {
private let _mainQueue: dispatch_queue_t
var numberEnqueued: Int32 = 0
var numberEnqueued: AtomicInt = 0
private init() {
_mainQueue = dispatch_get_main_queue()
@ -42,13 +42,13 @@ public final class MainScheduler : SerialDispatchQueueScheduler {
rxFatalError("Executing on backgound thread. Please use `MainScheduler.sharedInstance.schedule` to schedule work on main thread.")
}
}
override func scheduleInternal<StateType>(state: StateType, action: StateType -> Disposable) -> Disposable {
let currentNumberEnqueued = OSAtomicIncrement32(&numberEnqueued)
let currentNumberEnqueued = AtomicIncrement(&numberEnqueued)
if NSThread.currentThread().isMainThread && currentNumberEnqueued == 1 {
let disposable = action(state)
OSAtomicDecrement32(&numberEnqueued)
AtomicDecrement(&numberEnqueued)
return disposable
}
@ -59,10 +59,9 @@ public final class MainScheduler : SerialDispatchQueueScheduler {
action(state)
}
OSAtomicDecrement32(&self.numberEnqueued)
AtomicDecrement(&self.numberEnqueued)
}
return cancel
}
}

View File

@ -16,15 +16,15 @@ enum SchedulePeriodicRecursiveCommand {
class SchedulePeriodicRecursive<State> {
typealias RecursiveAction = State -> State
typealias RecursiveScheduler = AnyRecursiveScheduler<SchedulePeriodicRecursiveCommand>
private let _scheduler: SchedulerType
private let _startAfter: RxTimeInterval
private let _period: RxTimeInterval
private let _action: RecursiveAction
private var _state: State
private var _pendingTickCount: Int32 = 0
private var _pendingTickCount: AtomicInt = 0
init(scheduler: SchedulerType, startAfter: RxTimeInterval, period: RxTimeInterval, action: RecursiveAction, state: State) {
_scheduler = scheduler
_startAfter = startAfter
@ -32,11 +32,11 @@ class SchedulePeriodicRecursive<State> {
_action = action
_state = state
}
func start() -> Disposable {
return _scheduler.scheduleRecursive(SchedulePeriodicRecursiveCommand.Tick, dueTime: _startAfter, action: self.tick)
}
func tick(command: SchedulePeriodicRecursiveCommand, scheduler: RecursiveScheduler) -> Void {
// Tries to emulate periodic scheduling as best as possible.
// The problem that could arise is if handling periodic ticks take too long, or
@ -44,17 +44,17 @@ class SchedulePeriodicRecursive<State> {
switch command {
case .Tick:
scheduler.schedule(.Tick, dueTime: _period)
// The idea is that if on tick there wasn't any item enqueued, schedule to perform work immediatelly.
// Else work will be scheduled after previous enqueued work completes.
if OSAtomicIncrement32(&_pendingTickCount) == 1 {
if AtomicIncrement(&_pendingTickCount) == 1 {
self.tick(.DispatchStart, scheduler: scheduler)
}
case .DispatchStart:
_state = _action(_state)
// Start work and schedule check is this last batch of work
if OSAtomicDecrement32(&_pendingTickCount) > 0 {
if AtomicDecrement(&_pendingTickCount) > 0 {
// This gives priority to scheduler emulation, it's not perfect, but helps
scheduler.schedule(SchedulePeriodicRecursiveCommand.DispatchStart)
}

View File

@ -0,0 +1 @@
../../RxSwift/Platform/Darwin.Platform.swift

View File

@ -0,0 +1 @@
../../RxSwift/Platform/Linux.Platform.swift