Adds `buffer` operator, tidy up for `Disposables`.
This commit is contained in:
parent
cf53818605
commit
ebe338675c
|
|
@ -47,8 +47,6 @@
|
|||
C8093CEE1B8A72BE0088E94D /* SingleAssignmentDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C601B8A72BE0088E94D /* SingleAssignmentDisposable.swift */; };
|
||||
C8093CEF1B8A72BE0088E94D /* StableCompositeDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C611B8A72BE0088E94D /* StableCompositeDisposable.swift */; };
|
||||
C8093CF01B8A72BE0088E94D /* StableCompositeDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C611B8A72BE0088E94D /* StableCompositeDisposable.swift */; };
|
||||
C8093CF11B8A72BE0088E94D /* TernaryDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C621B8A72BE0088E94D /* TernaryDisposable.swift */; };
|
||||
C8093CF21B8A72BE0088E94D /* TernaryDisposable.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C621B8A72BE0088E94D /* TernaryDisposable.swift */; };
|
||||
C8093CF31B8A72BE0088E94D /* Error.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C631B8A72BE0088E94D /* Error.swift */; };
|
||||
C8093CF41B8A72BE0088E94D /* Error.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C631B8A72BE0088E94D /* Error.swift */; };
|
||||
C8093CF51B8A72BE0088E94D /* Event.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8093C641B8A72BE0088E94D /* Event.swift */; };
|
||||
|
|
@ -258,6 +256,8 @@
|
|||
C80D339B1B922FB00014629D /* ControlProperty.swift in Sources */ = {isa = PBXBuildFile; fileRef = C80D33941B922FB00014629D /* ControlProperty.swift */; };
|
||||
C80D342E1B9245A40014629D /* CombineLatest+CollectionType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C80D342D1B9245A40014629D /* CombineLatest+CollectionType.swift */; };
|
||||
C80D342F1B9245A40014629D /* CombineLatest+CollectionType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C80D342D1B9245A40014629D /* CombineLatest+CollectionType.swift */; };
|
||||
C821DBA21BA4DCAB008F3809 /* Buffer.swift in Sources */ = {isa = PBXBuildFile; fileRef = C821DBA11BA4DCAB008F3809 /* Buffer.swift */; };
|
||||
C821DBA31BA4DCAB008F3809 /* Buffer.swift in Sources */ = {isa = PBXBuildFile; fileRef = C821DBA11BA4DCAB008F3809 /* Buffer.swift */; };
|
||||
C84B38E91BA43380001B7D88 /* ScheduledItem.swift in Sources */ = {isa = PBXBuildFile; fileRef = C84B38E71BA43380001B7D88 /* ScheduledItem.swift */; };
|
||||
C84B38EA1BA43380001B7D88 /* ScheduledItem.swift in Sources */ = {isa = PBXBuildFile; fileRef = C84B38E71BA43380001B7D88 /* ScheduledItem.swift */; };
|
||||
C84B38EE1BA433CD001B7D88 /* Generate.swift in Sources */ = {isa = PBXBuildFile; fileRef = C84B38ED1BA433CD001B7D88 /* Generate.swift */; };
|
||||
|
|
@ -341,7 +341,6 @@
|
|||
C8093C5F1B8A72BE0088E94D /* SerialDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SerialDisposable.swift; sourceTree = "<group>"; };
|
||||
C8093C601B8A72BE0088E94D /* SingleAssignmentDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SingleAssignmentDisposable.swift; sourceTree = "<group>"; };
|
||||
C8093C611B8A72BE0088E94D /* StableCompositeDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = StableCompositeDisposable.swift; sourceTree = "<group>"; };
|
||||
C8093C621B8A72BE0088E94D /* TernaryDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = TernaryDisposable.swift; sourceTree = "<group>"; };
|
||||
C8093C631B8A72BE0088E94D /* Error.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Error.swift; sourceTree = "<group>"; };
|
||||
C8093C641B8A72BE0088E94D /* Event.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Event.swift; sourceTree = "<group>"; };
|
||||
C8093C651B8A72BE0088E94D /* ImmediateSchedulerType.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ImmediateSchedulerType.swift; sourceTree = "<group>"; };
|
||||
|
|
@ -454,6 +453,7 @@
|
|||
C80D33931B922FB00014629D /* ControlEvent.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ControlEvent.swift; sourceTree = "<group>"; };
|
||||
C80D33941B922FB00014629D /* ControlProperty.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ControlProperty.swift; sourceTree = "<group>"; };
|
||||
C80D342D1B9245A40014629D /* CombineLatest+CollectionType.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "CombineLatest+CollectionType.swift"; sourceTree = "<group>"; };
|
||||
C821DBA11BA4DCAB008F3809 /* Buffer.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Buffer.swift; sourceTree = "<group>"; };
|
||||
C84B38E71BA43380001B7D88 /* ScheduledItem.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ScheduledItem.swift; sourceTree = "<group>"; };
|
||||
C84B38ED1BA433CD001B7D88 /* Generate.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Generate.swift; sourceTree = "<group>"; };
|
||||
C88253F11B8A752B00B02D69 /* RxCollectionViewReactiveArrayDataSource.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RxCollectionViewReactiveArrayDataSource.swift; sourceTree = "<group>"; };
|
||||
|
|
@ -614,7 +614,6 @@
|
|||
C8093C5F1B8A72BE0088E94D /* SerialDisposable.swift */,
|
||||
C8093C601B8A72BE0088E94D /* SingleAssignmentDisposable.swift */,
|
||||
C8093C611B8A72BE0088E94D /* StableCompositeDisposable.swift */,
|
||||
C8093C621B8A72BE0088E94D /* TernaryDisposable.swift */,
|
||||
);
|
||||
path = Disposables;
|
||||
sourceTree = "<group>";
|
||||
|
|
@ -639,9 +638,10 @@
|
|||
C8093C6A1B8A72BE0088E94D /* Implementations */ = {
|
||||
isa = PBXGroup;
|
||||
children = (
|
||||
C84B38ED1BA433CD001B7D88 /* Generate.swift */,
|
||||
C8093C6B1B8A72BE0088E94D /* Amb.swift */,
|
||||
C8C3DA111B93A3EA004D233E /* AnonymousObservable.swift */,
|
||||
C8093C6D1B8A72BE0088E94D /* AsObservable.swift */,
|
||||
C821DBA11BA4DCAB008F3809 /* Buffer.swift */,
|
||||
C8093C6E1B8A72BE0088E94D /* Catch.swift */,
|
||||
C8093C711B8A72BE0088E94D /* CombineLatest.swift */,
|
||||
C8093C6F1B8A72BE0088E94D /* CombineLatest+arity.swift */,
|
||||
|
|
@ -654,11 +654,16 @@
|
|||
C8093C771B8A72BE0088E94D /* DelaySubscription.swift */,
|
||||
C8093C781B8A72BE0088E94D /* DistinctUntilChanged.swift */,
|
||||
C8093C791B8A72BE0088E94D /* Do.swift */,
|
||||
C8C3DA051B9393AC004D233E /* Empty.swift */,
|
||||
C8C3DA081B93941E004D233E /* FailWith.swift */,
|
||||
C8093C7A1B8A72BE0088E94D /* Filter.swift */,
|
||||
C8093C7B1B8A72BE0088E94D /* FlatMap.swift */,
|
||||
C84B38ED1BA433CD001B7D88 /* Generate.swift */,
|
||||
C8C3DA021B9390C4004D233E /* Just.swift */,
|
||||
C8093C7C1B8A72BE0088E94D /* Map.swift */,
|
||||
C8093C7D1B8A72BE0088E94D /* Merge.swift */,
|
||||
C8093C7E1B8A72BE0088E94D /* Multicast.swift */,
|
||||
C8C3DA0B1B93959F004D233E /* Never.swift */,
|
||||
C8093C801B8A72BE0088E94D /* ObserveOn.swift */,
|
||||
C8093C811B8A72BE0088E94D /* ObserveOnSerialDispatchQueue.swift */,
|
||||
C8093C831B8A72BE0088E94D /* Producer.swift */,
|
||||
|
|
@ -680,11 +685,6 @@
|
|||
C8093C921B8A72BE0088E94D /* Zip+arity.swift */,
|
||||
C8093C931B8A72BE0088E94D /* Zip+arity.tt */,
|
||||
C8C3D9FD1B935EDF004D233E /* Zip+CollectionType.swift */,
|
||||
C8C3DA021B9390C4004D233E /* Just.swift */,
|
||||
C8C3DA051B9393AC004D233E /* Empty.swift */,
|
||||
C8C3DA081B93941E004D233E /* FailWith.swift */,
|
||||
C8C3DA0B1B93959F004D233E /* Never.swift */,
|
||||
C8C3DA111B93A3EA004D233E /* AnonymousObservable.swift */,
|
||||
);
|
||||
path = Implementations;
|
||||
sourceTree = "<group>";
|
||||
|
|
@ -1360,6 +1360,7 @@
|
|||
C8093D541B8A72BE0088E94D /* Observable+Aggregate.swift in Sources */,
|
||||
C8093D2C1B8A72BE0088E94D /* ObserveOnSerialDispatchQueue.swift in Sources */,
|
||||
C8093D3E1B8A72BE0088E94D /* StartWith.swift in Sources */,
|
||||
C821DBA31BA4DCAB008F3809 /* Buffer.swift in Sources */,
|
||||
C8093D481B8A72BE0088E94D /* TakeWhile.swift in Sources */,
|
||||
C8093D001B8A72BE0088E94D /* Amb.swift in Sources */,
|
||||
C8093D1C1B8A72BE0088E94D /* Do.swift in Sources */,
|
||||
|
|
@ -1374,7 +1375,6 @@
|
|||
C8093D141B8A72BE0088E94D /* Debug.swift in Sources */,
|
||||
C8093CCE1B8A72BE0088E94D /* Bag.swift in Sources */,
|
||||
C8093D301B8A72BE0088E94D /* Producer.swift in Sources */,
|
||||
C8093CF21B8A72BE0088E94D /* TernaryDisposable.swift in Sources */,
|
||||
C8093CF81B8A72BE0088E94D /* ImmediateSchedulerType.swift in Sources */,
|
||||
C8093CC61B8A72BE0088E94D /* Cancelable.swift in Sources */,
|
||||
C8093CE81B8A72BE0088E94D /* ScheduledDisposable.swift in Sources */,
|
||||
|
|
@ -1470,6 +1470,7 @@
|
|||
C8093D531B8A72BE0088E94D /* Observable+Aggregate.swift in Sources */,
|
||||
C8093D2B1B8A72BE0088E94D /* ObserveOnSerialDispatchQueue.swift in Sources */,
|
||||
C8093D3D1B8A72BE0088E94D /* StartWith.swift in Sources */,
|
||||
C821DBA21BA4DCAB008F3809 /* Buffer.swift in Sources */,
|
||||
C8093D471B8A72BE0088E94D /* TakeWhile.swift in Sources */,
|
||||
C8093CFF1B8A72BE0088E94D /* Amb.swift in Sources */,
|
||||
C8093D1B1B8A72BE0088E94D /* Do.swift in Sources */,
|
||||
|
|
@ -1484,7 +1485,6 @@
|
|||
C8093D131B8A72BE0088E94D /* Debug.swift in Sources */,
|
||||
C8093CCD1B8A72BE0088E94D /* Bag.swift in Sources */,
|
||||
C8093D2F1B8A72BE0088E94D /* Producer.swift in Sources */,
|
||||
C8093CF11B8A72BE0088E94D /* TernaryDisposable.swift in Sources */,
|
||||
C8093CF71B8A72BE0088E94D /* ImmediateSchedulerType.swift in Sources */,
|
||||
C8093CC51B8A72BE0088E94D /* Cancelable.swift in Sources */,
|
||||
C8093CE71B8A72BE0088E94D /* ScheduledDisposable.swift in Sources */,
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
|
|
@ -16,8 +16,8 @@ When dispose method is called, disposal action will be dereferenced.
|
|||
public final class AnonymousDisposable : DisposeBase, Cancelable {
|
||||
public typealias DisposeAction = () -> Void
|
||||
|
||||
var _disposed: Int32 = 0
|
||||
var disposeAction: DisposeAction?
|
||||
private var _disposed: Int32 = 0
|
||||
private var disposeAction: DisposeAction?
|
||||
|
||||
/**
|
||||
- returns: Was resource disposed.
|
||||
|
|
|
|||
|
|
@ -12,10 +12,12 @@ import Foundation
|
|||
Represents two disposable resources that are disposed together.
|
||||
*/
|
||||
public final class BinaryDisposable : DisposeBase, Cancelable {
|
||||
var disposable1: Disposable?
|
||||
var disposable2: Disposable?
|
||||
|
||||
var _disposed: Int32 = 0
|
||||
private var _disposed: Int32 = 0
|
||||
|
||||
// state
|
||||
private var disposable1: Disposable?
|
||||
private var disposable2: Disposable?
|
||||
|
||||
/**
|
||||
- returns: Was resource disposed.
|
||||
|
|
|
|||
|
|
@ -14,8 +14,10 @@ Represents a group of disposable resources that are disposed together.
|
|||
public class CompositeDisposable : DisposeBase, Disposable, Cancelable {
|
||||
public typealias DisposeKey = Bag<Disposable>.KeyType
|
||||
|
||||
var lock = SpinLock()
|
||||
var disposables: Bag<Disposable>? = Bag()
|
||||
private var lock = SpinLock()
|
||||
|
||||
// state
|
||||
private var disposables: Bag<Disposable>? = Bag()
|
||||
|
||||
public var disposed: Bool {
|
||||
get {
|
||||
|
|
|
|||
|
|
@ -32,9 +32,12 @@ or create new one in it's place.
|
|||
In case explicit disposal is necessary, there is also `CompositeDisposable`.
|
||||
*/
|
||||
public class DisposeBag: DisposeBase {
|
||||
|
||||
private var lock = SpinLock()
|
||||
var disposables = [Disposable]()
|
||||
var disposed = false
|
||||
|
||||
// state
|
||||
private var disposables = [Disposable]()
|
||||
private var disposed = false
|
||||
|
||||
/**
|
||||
Constructs new empty dispose bag.
|
||||
|
|
|
|||
|
|
@ -14,8 +14,10 @@ Represents a disposable resource whose disposal invocation will be scheduled on
|
|||
public class ScheduledDisposable : Cancelable {
|
||||
public let scheduler: ImmediateSchedulerType
|
||||
|
||||
var _disposed: Int32 = 0
|
||||
var _disposable: Disposable?
|
||||
private var _disposed: Int32 = 0
|
||||
|
||||
// state
|
||||
private var _disposable: Disposable?
|
||||
|
||||
/**
|
||||
- returns: Was resource disposed.
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ extension Disposable {
|
|||
This returns ARC (RAII) like resource management to `RxSwift`.
|
||||
*/
|
||||
public class ScopedDisposable : DisposeBase {
|
||||
var disposable: Disposable?
|
||||
private var disposable: Disposable?
|
||||
|
||||
/**
|
||||
Initializes new instance with a single disposable.
|
||||
|
|
|
|||
|
|
@ -15,8 +15,8 @@ public class SerialDisposable : DisposeBase, Cancelable {
|
|||
var lock = SpinLock()
|
||||
|
||||
// state
|
||||
var _current = nil as Disposable?
|
||||
var _disposed = false
|
||||
private var _current = nil as Disposable?
|
||||
private var _disposed = false
|
||||
|
||||
/**
|
||||
- returns: Was resource disposed.
|
||||
|
|
|
|||
|
|
@ -14,12 +14,12 @@ Represents a disposable resource which only allows a single assignment of its un
|
|||
If an underlying disposable resource has already been set, future attempts to set the underlying disposable resource will throw an exception.
|
||||
*/
|
||||
public class SingleAssignmentDisposable : DisposeBase, Disposable, Cancelable {
|
||||
var lock = SpinLock()
|
||||
private var lock = SpinLock()
|
||||
|
||||
// state
|
||||
var _disposed = false
|
||||
var _disposableSet = false
|
||||
var _disposable = nil as Disposable?
|
||||
private var _disposed = false
|
||||
private var _disposableSet = false
|
||||
private var _disposable = nil as Disposable?
|
||||
|
||||
/**
|
||||
- returns: A value that indicates whether the object is disposed.
|
||||
|
|
|
|||
|
|
@ -12,9 +12,4 @@ public final class StableCompositeDisposable {
|
|||
public static func create(disposable1: Disposable, _ disposable2: Disposable) -> Disposable {
|
||||
return BinaryDisposable(disposable1, disposable2)
|
||||
}
|
||||
|
||||
public static func create(disposable1: Disposable, _ disposable2: Disposable, _ disposable3: Disposable) -> Disposable {
|
||||
return TernaryDisposable(disposable1, disposable2, disposable3)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,41 +0,0 @@
|
|||
//
|
||||
// TernaryDisposable.swift
|
||||
// RxSwift
|
||||
//
|
||||
// Created by Krunoslav Zaher on 6/12/15.
|
||||
// Copyright (c) 2015 Krunoslav Zaher. All rights reserved.
|
||||
//
|
||||
|
||||
import Foundation
|
||||
|
||||
class TernaryDisposable : DisposeBase, Cancelable {
|
||||
var disposable1: Disposable?
|
||||
var disposable2: Disposable?
|
||||
var disposable3: Disposable?
|
||||
|
||||
var _disposed: Int32 = 0
|
||||
|
||||
var disposed: Bool {
|
||||
get {
|
||||
return _disposed > 0
|
||||
}
|
||||
}
|
||||
|
||||
init(_ disposable1: Disposable, _ disposable2: Disposable, _ disposable3: Disposable) {
|
||||
self.disposable1 = disposable1
|
||||
self.disposable2 = disposable2
|
||||
self.disposable3 = disposable3
|
||||
super.init()
|
||||
}
|
||||
|
||||
func dispose() {
|
||||
if OSAtomicCompareAndSwap32(0, 1, &_disposed) {
|
||||
disposable1?.dispose()
|
||||
disposable2?.dispose()
|
||||
disposable3?.dispose()
|
||||
disposable1 = nil
|
||||
disposable2 = nil
|
||||
disposable3 = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,108 @@
|
|||
//
|
||||
// Buffer.swift
|
||||
// Rx
|
||||
//
|
||||
// Created by Krunoslav Zaher on 9/13/15.
|
||||
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
|
||||
//
|
||||
|
||||
import Foundation
|
||||
|
||||
class BufferTimeCount<Element, S: SchedulerType> : Producer<[Element]> {
|
||||
let timeSpan: S.TimeInterval
|
||||
let count: Int
|
||||
let scheduler: S
|
||||
let source: Observable<Element>
|
||||
|
||||
init(source: Observable<Element>, timeSpan: S.TimeInterval, count: Int, scheduler: S) {
|
||||
self.source = source
|
||||
self.timeSpan = timeSpan
|
||||
self.count = count
|
||||
self.scheduler = scheduler
|
||||
}
|
||||
|
||||
override func run<O : ObserverType where O.E == [Element]>(observer: O, cancel: Disposable, setSink: (Disposable) -> Void) -> Disposable {
|
||||
let sink = BufferTimeCountSink(parent: self, observer: observer, cancel: cancel)
|
||||
setSink(sink)
|
||||
return sink.run()
|
||||
}
|
||||
}
|
||||
|
||||
class BufferTimeCountSink<S: SchedulerType, Element, O: ObserverType where O.E == [Element]> : Sink<O>, ObserverType {
|
||||
typealias Parent = BufferTimeCount<Element, S>
|
||||
typealias E = Element
|
||||
|
||||
let parent: Parent
|
||||
|
||||
let lock = NSRecursiveLock()
|
||||
|
||||
// state
|
||||
let timerD = SerialDisposable()
|
||||
var buffer = [Element]()
|
||||
var windowID = 0
|
||||
|
||||
init(parent: Parent, observer: O, cancel: Disposable) {
|
||||
self.parent = parent
|
||||
super.init(observer: observer, cancel: cancel)
|
||||
}
|
||||
|
||||
func run() -> Disposable {
|
||||
createTimer(self.windowID)
|
||||
return StableCompositeDisposable.create(timerD, self.parent.source.subscribeSafe(self))
|
||||
}
|
||||
|
||||
func startNewWindowAndSendCurrentOne() {
|
||||
self.windowID = self.windowID &+ 1
|
||||
let windowID = self.windowID
|
||||
|
||||
let buffer = self.buffer
|
||||
self.buffer = []
|
||||
self.observer?.on(.Next(buffer))
|
||||
|
||||
createTimer(windowID)
|
||||
}
|
||||
|
||||
func on(event: Event<E>) {
|
||||
self.lock.performLocked {
|
||||
switch event {
|
||||
case .Next(let element):
|
||||
buffer.append(element)
|
||||
|
||||
if buffer.count == parent.count {
|
||||
startNewWindowAndSendCurrentOne()
|
||||
}
|
||||
|
||||
case .Error(let error):
|
||||
self.buffer = []
|
||||
self.observer?.on(.Error(error))
|
||||
self.dispose()
|
||||
case .Completed:
|
||||
self.observer?.on(.Next(self.buffer))
|
||||
self.observer?.on(.Completed)
|
||||
self.dispose()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func createTimer(windowID: Int) {
|
||||
if timerD.disposed {
|
||||
return
|
||||
}
|
||||
|
||||
if self.windowID != windowID {
|
||||
return
|
||||
}
|
||||
|
||||
timerD.disposable = parent.scheduler.scheduleRelative(windowID, dueTime: self.parent.timeSpan) { previousWindowID in
|
||||
self.lock.performLocked {
|
||||
if previousWindowID != self.windowID {
|
||||
return
|
||||
}
|
||||
|
||||
self.startNewWindowAndSendCurrentOne()
|
||||
}
|
||||
|
||||
return NopDisposable.instance
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -17,7 +17,7 @@ protocol CombineLatestProtocol : class {
|
|||
class CombineLatestSink<O: ObserverType> : Sink<O>, CombineLatestProtocol {
|
||||
typealias Element = O.E
|
||||
|
||||
var lock = NSRecursiveLock()
|
||||
let lock = NSRecursiveLock()
|
||||
|
||||
var hasValueAll: Bool
|
||||
var hasValue: [Bool]
|
||||
|
|
|
|||
|
|
@ -42,7 +42,9 @@ public class ConnectableObservable<S: SubjectType> : Observable<S.E>, Connectabl
|
|||
let subject: S
|
||||
let source: Observable<S.SubjectObserverType.E>
|
||||
|
||||
var lock = NSRecursiveLock()
|
||||
let lock = NSRecursiveLock()
|
||||
|
||||
// state
|
||||
var connection: ConnectionType?
|
||||
|
||||
public init(source: Observable<S.SubjectObserverType.E>, subject: S) {
|
||||
|
|
|
|||
|
|
@ -58,11 +58,13 @@ class FlatMapSink<SourceType, S: ObservableType, O: ObserverType where O.E == S.
|
|||
typealias Parent = FlatMap<SourceType, S>
|
||||
|
||||
let parent: Parent
|
||||
|
||||
|
||||
let lock = NSRecursiveLock()
|
||||
|
||||
// state
|
||||
let group = CompositeDisposable()
|
||||
let sourceSubscription = SingleAssignmentDisposable()
|
||||
|
||||
|
||||
var stopped = false
|
||||
|
||||
init(parent: Parent, observer: O, cancel: Disposable) {
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ class MergeSink<S: ObservableType, O: ObserverType where O.E == S.E> : Sink<O>,
|
|||
|
||||
let parent: Parent
|
||||
|
||||
var lock = NSRecursiveLock()
|
||||
let lock = NSRecursiveLock()
|
||||
|
||||
// state
|
||||
var stopped = false
|
||||
|
|
@ -154,7 +154,9 @@ class MergeConcurrentSink<S: ObservableType, O: ObserverType where S.E == O.E> :
|
|||
|
||||
let parent: Parent
|
||||
|
||||
var lock = NSRecursiveLock()
|
||||
let lock = NSRecursiveLock()
|
||||
|
||||
// state
|
||||
var stopped = false
|
||||
var activeCount = 0
|
||||
var queue = RxMutableBox(QueueType(capacity: 2))
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ class RefCountSink<CO: ConnectableObservableType, O: ObserverType where CO.E ==
|
|||
}
|
||||
|
||||
class RefCount<CO: ConnectableObservableType>: Producer<CO.E> {
|
||||
var lock = NSRecursiveLock()
|
||||
let lock = NSRecursiveLock()
|
||||
|
||||
// state
|
||||
var count = 0
|
||||
|
|
|
|||
|
|
@ -58,7 +58,8 @@ class SampleSequenceSink<O: ObserverType, SampleType> : Sink<O>, ObserverType {
|
|||
|
||||
let parent: Parent
|
||||
|
||||
var lock = NSRecursiveLock()
|
||||
let lock = NSRecursiveLock()
|
||||
|
||||
// state
|
||||
var element = nil as Element?
|
||||
var atEnd = false
|
||||
|
|
|
|||
|
|
@ -70,6 +70,8 @@ class SkipTimeSink<ElementType, S: SchedulerType, O: ObserverType where O.E == E
|
|||
let parent: Parent
|
||||
|
||||
let lock = NSRecursiveLock()
|
||||
|
||||
// state
|
||||
var open = false
|
||||
|
||||
init(parent: Parent, observer: O, cancel: Disposable) {
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ class SwitchSink<S: ObservableType, O: ObserverType where S.E == O.E> : Sink<O>,
|
|||
let innerSubscription: SerialDisposable = SerialDisposable()
|
||||
let parent: Parent
|
||||
|
||||
var lock = NSRecursiveLock()
|
||||
let lock = NSRecursiveLock()
|
||||
|
||||
// state
|
||||
var stopped = false
|
||||
|
|
|
|||
|
|
@ -70,9 +70,10 @@ class TakeCount<Element>: Producer<Element> {
|
|||
class TakeTimeSink<ElementType, S: SchedulerType, O: ObserverType where O.E == ElementType> : Sink<O>, ObserverType {
|
||||
typealias Parent = TakeTime<ElementType, S>
|
||||
typealias E = ElementType
|
||||
|
||||
let parent: Parent
|
||||
|
||||
let lock = NSRecursiveLock()
|
||||
let parent: Parent
|
||||
|
||||
init(parent: Parent, observer: O, cancel: Disposable) {
|
||||
self.parent = parent
|
||||
|
|
|
|||
|
|
@ -62,6 +62,7 @@ class TakeUntilSink<ElementType, Other, O: ObserverType where O.E == ElementType
|
|||
let parent: Parent
|
||||
|
||||
let lock = NSRecursiveLock()
|
||||
|
||||
// state
|
||||
var open = false
|
||||
|
||||
|
|
|
|||
|
|
@ -14,7 +14,8 @@ class ThrottleSink<O: ObserverType, Scheduler: SchedulerType> : Sink<O>, Observe
|
|||
|
||||
let parent: ParentType
|
||||
|
||||
var lock = NSRecursiveLock()
|
||||
let lock = NSRecursiveLock()
|
||||
|
||||
// state
|
||||
var id = 0 as UInt64
|
||||
var value: Element? = nil
|
||||
|
|
|
|||
|
|
@ -109,6 +109,8 @@ class ZipObserver<ElementType> : ObserverType {
|
|||
var parent: ZipSinkProtocol?
|
||||
|
||||
let lock: NSRecursiveLock
|
||||
|
||||
// state
|
||||
let index: Int
|
||||
let this: Disposable
|
||||
let setNextValue: ValueSetter
|
||||
|
|
|
|||
|
|
@ -177,4 +177,24 @@ extension ObservableType {
|
|||
-> Observable<E> {
|
||||
return DelaySubscription(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
|
||||
}
|
||||
}
|
||||
|
||||
// buffer
|
||||
|
||||
extension ObservableType {
|
||||
|
||||
/**
|
||||
Projects each element of an observable sequence into a buffer that's sent out when either it's full or a given amount of time has elapsed, using the specified scheduler to run timers.
|
||||
|
||||
A useful real-world analogy of this overload is the behavior of a ferry leaving the dock when all seats are taken, or at the scheduled time of departure, whichever event occurs first.
|
||||
|
||||
- parameter timeSpan: Maximum time length of a buffer.
|
||||
- parameter count: Maximum element count of a buffer.
|
||||
- parameter scheduler: Scheduler to run buffering timers on.
|
||||
- returns: An observable sequence of buffers.
|
||||
*/
|
||||
public func buffer<S: SchedulerType>(timeSpan timeSpan: S.TimeInterval, count: Int, scheduler: S)
|
||||
-> Observable<[E]> {
|
||||
return BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
|
||||
}
|
||||
}
|
||||
|
|
@ -32,6 +32,8 @@ public class RecursiveSchedulerOf<State, TimeInterval> {
|
|||
typealias Action = (state: State, scheduler: RecursiveSchedulerOf<State, TimeInterval>) -> Void
|
||||
|
||||
let lock = NSRecursiveLock()
|
||||
|
||||
// state
|
||||
let group = CompositeDisposable()
|
||||
|
||||
var action: Action?
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ public class PublishSubject<Element> : Observable<Element>, SubjectType, Cancela
|
|||
|
||||
typealias DisposeKey = Bag<ObserverOf<Element>>.KeyType
|
||||
|
||||
private var lock = NSRecursiveLock()
|
||||
private let lock = NSRecursiveLock()
|
||||
|
||||
// state
|
||||
var _disposed = false
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ public class ReplaySubject<Element> : Observable<Element>, SubjectType, Observer
|
|||
}
|
||||
|
||||
class ReplayBufferBase<Element> : ReplaySubject<Element> {
|
||||
var lock = NSRecursiveLock()
|
||||
let lock = NSRecursiveLock()
|
||||
|
||||
// state
|
||||
var disposed = false
|
||||
|
|
|
|||
|
|
@ -1184,3 +1184,109 @@ extension ObservableTimeTest {
|
|||
])
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
extension ObservableTimeTest {
|
||||
func bufferWithTimeOrCount_Basic() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
next(205, 1),
|
||||
next(210, 2),
|
||||
next(240, 3),
|
||||
next(280, 4),
|
||||
next(320, 5),
|
||||
next(350, 6),
|
||||
next(370, 7),
|
||||
next(420, 8),
|
||||
next(470, 9),
|
||||
completed(600)
|
||||
])
|
||||
|
||||
let res = scheduler.start {
|
||||
xs.buffer(timeSpan: 70, count: 3, scheduler: scheduler).map { EquatableArray($0) }
|
||||
}
|
||||
|
||||
XCTAssertEqual(res.messages, [
|
||||
next(240, EquatableArray([1, 2, 3])),
|
||||
next(310, EquatableArray([4])),
|
||||
next(370, EquatableArray([5, 6, 7])),
|
||||
next(440, EquatableArray([8])),
|
||||
next(510, EquatableArray([9])),
|
||||
next(580, EquatableArray([])),
|
||||
next(600, EquatableArray([])),
|
||||
completed(600)
|
||||
])
|
||||
|
||||
XCTAssertEqual(xs.subscriptions, [
|
||||
Subscription(200, 600)
|
||||
])
|
||||
}
|
||||
|
||||
func bufferWithTimeOrCount_Error() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
next(205, 1),
|
||||
next(210, 2),
|
||||
next(240, 3),
|
||||
next(280, 4),
|
||||
next(320, 5),
|
||||
next(350, 6),
|
||||
next(370, 7),
|
||||
next(420, 8),
|
||||
next(470, 9),
|
||||
error(600, testError)
|
||||
])
|
||||
|
||||
let res = scheduler.start {
|
||||
xs.buffer(timeSpan: 70, count: 3, scheduler: scheduler).map { EquatableArray($0) }
|
||||
}
|
||||
|
||||
XCTAssertEqual(res.messages, [
|
||||
next(240, EquatableArray([1, 2, 3])),
|
||||
next(310, EquatableArray([4])),
|
||||
next(370, EquatableArray([5, 6, 7])),
|
||||
next(440, EquatableArray([8])),
|
||||
next(510, EquatableArray([9])),
|
||||
next(580, EquatableArray([])),
|
||||
error(600, testError)
|
||||
])
|
||||
|
||||
XCTAssertEqual(xs.subscriptions, [
|
||||
Subscription(200, 600)
|
||||
])
|
||||
}
|
||||
|
||||
func bufferWithTimeOrCount_Disposed() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
|
||||
let xs = scheduler.createHotObservable([
|
||||
next(205, 1),
|
||||
next(210, 2),
|
||||
next(240, 3),
|
||||
next(280, 4),
|
||||
next(320, 5),
|
||||
next(350, 6),
|
||||
next(370, 7),
|
||||
next(420, 8),
|
||||
next(470, 9),
|
||||
completed(600)
|
||||
])
|
||||
|
||||
let res = scheduler.start(370) {
|
||||
xs.buffer(timeSpan: 70, count: 3, scheduler: scheduler).map { EquatableArray($0) }
|
||||
}
|
||||
|
||||
XCTAssertEqual(res.messages, [
|
||||
next(240, EquatableArray([1, 2, 3])),
|
||||
next(310, EquatableArray([4])),
|
||||
next(370, EquatableArray([5, 6, 7]))
|
||||
])
|
||||
|
||||
XCTAssertEqual(xs.subscriptions, [
|
||||
Subscription(200, 370)
|
||||
])
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue