Deprecates `toObservable` in favor of `from` operator.
This commit is contained in:
parent
441a4a22db
commit
8a58deaa2b
|
|
@ -145,7 +145,7 @@ extension Driver {
|
|||
*/
|
||||
// @warn_unused_result(message:"http://git.io/rxs.uo")
|
||||
public static func of(_ elements: E ...) -> Driver<E> {
|
||||
let source = elements.toObservable(driverSubscribeOnScheduler)
|
||||
let source = Observable.from(elements, scheduler: driverSubscribeOnScheduler)
|
||||
return Driver(raw: source)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,8 +8,8 @@
|
|||
|
||||
import Foundation
|
||||
|
||||
class ObservableSequenceSink<O: ObserverType> : Sink<O> {
|
||||
typealias Parent = ObservableSequence<O.E>
|
||||
class ObservableSequenceSink<S: Sequence, O: ObserverType where S.Iterator.Element == O.E> : Sink<O> {
|
||||
typealias Parent = ObservableSequence<S>
|
||||
|
||||
private let _parent: Parent
|
||||
|
||||
|
|
@ -19,10 +19,11 @@ class ObservableSequenceSink<O: ObserverType> : Sink<O> {
|
|||
}
|
||||
|
||||
func run() -> Disposable {
|
||||
return _parent._scheduler!.scheduleRecursive((0, _parent._elements)) { (state, recurse) in
|
||||
if state.0 < state.1.count {
|
||||
self.forwardOn(.next(state.1[state.0]))
|
||||
recurse((state.0 + 1, state.1))
|
||||
return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) { (iterator, recurse) in
|
||||
var mutableIterator = iterator
|
||||
if let next = mutableIterator.0.next() {
|
||||
self.forwardOn(.next(next))
|
||||
recurse(mutableIterator)
|
||||
}
|
||||
else {
|
||||
self.forwardOn(.completed)
|
||||
|
|
@ -31,26 +32,16 @@ class ObservableSequenceSink<O: ObserverType> : Sink<O> {
|
|||
}
|
||||
}
|
||||
|
||||
class ObservableSequence<E> : Producer<E> {
|
||||
private let _elements: [E]
|
||||
private let _scheduler: ImmediateSchedulerType?
|
||||
class ObservableSequence<S: Sequence> : Producer<S.Iterator.Element> {
|
||||
private let _elements: S
|
||||
private let _scheduler: ImmediateSchedulerType
|
||||
|
||||
init(elements: [E], scheduler: ImmediateSchedulerType?) {
|
||||
init(elements: S, scheduler: ImmediateSchedulerType) {
|
||||
_elements = elements
|
||||
_scheduler = scheduler
|
||||
}
|
||||
|
||||
override func subscribe<O : ObserverType where O.E == E>(_ observer: O) -> Disposable {
|
||||
// optimized version without scheduler
|
||||
guard _scheduler != nil else {
|
||||
for element in _elements {
|
||||
observer.on(.next(element))
|
||||
}
|
||||
|
||||
observer.on(.completed)
|
||||
return NopDisposable.instance
|
||||
}
|
||||
|
||||
let sink = ObservableSequenceSink(parent: self, observer: observer)
|
||||
sink.disposable = sink.run()
|
||||
return sink
|
||||
|
|
|
|||
|
|
@ -99,7 +99,7 @@ extension Observable {
|
|||
- returns: The observable sequence whose elements are pulled from the given arguments.
|
||||
*/
|
||||
// @warn_unused_result(message:"http://git.io/rxs.uo")
|
||||
public static func of(_ elements: E ..., scheduler: ImmediateSchedulerType? = nil) -> Observable<E> {
|
||||
public static func of(_ elements: E ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
|
||||
return ObservableSequence(elements: elements, scheduler: scheduler)
|
||||
}
|
||||
|
||||
|
|
@ -191,7 +191,8 @@ extension Sequence {
|
|||
- returns: The observable sequence whose elements are pulled from the given enumerable sequence.
|
||||
*/
|
||||
// @warn_unused_result(message:"http://git.io/rxs.uo")
|
||||
public func toObservable(_ scheduler: ImmediateSchedulerType? = nil) -> Observable<Iterator.Element> {
|
||||
@available(*, deprecated, renamed: "Observable.from()")
|
||||
public func toObservable(_ scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Iterator.Element> {
|
||||
return ObservableSequence(elements: Array(self), scheduler: scheduler)
|
||||
}
|
||||
}
|
||||
|
|
@ -205,7 +206,32 @@ extension Array {
|
|||
- returns: The observable sequence whose elements are pulled from the given enumerable sequence.
|
||||
*/
|
||||
// @warn_unused_result(message:"http://git.io/rxs.uo")
|
||||
public func toObservable(_ scheduler: ImmediateSchedulerType? = nil) -> Observable<Iterator.Element> {
|
||||
@available(*, deprecated, renamed: "Observable.from()")
|
||||
public func toObservable(_ scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Iterator.Element> {
|
||||
return ObservableSequence(elements: self, scheduler: scheduler)
|
||||
}
|
||||
}
|
||||
|
||||
extension Observable {
|
||||
/**
|
||||
Converts an array to an observable sequence.
|
||||
|
||||
- seealso: [from operator on reactivex.io](http://reactivex.io/documentation/operators/from.html)
|
||||
|
||||
- returns: The observable sequence whose elements are pulled from the given enumerable sequence.
|
||||
*/
|
||||
public static func from(_ array: [E], scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
|
||||
return ObservableSequence(elements: array, scheduler: scheduler)
|
||||
}
|
||||
|
||||
/**
|
||||
Converts a sequence to an observable sequence.
|
||||
|
||||
- seealso: [from operator on reactivex.io](http://reactivex.io/documentation/operators/from.html)
|
||||
|
||||
- returns: The observable sequence whose elements are pulled from the given enumerable sequence.
|
||||
*/
|
||||
public static func from<S: Sequence where S.Iterator.Element == E>(_ sequence: S, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
|
||||
return ObservableSequence(elements: sequence, scheduler: scheduler)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -92,27 +92,12 @@ extension ObservableCreationTests {
|
|||
}
|
||||
}
|
||||
|
||||
// MARK: toObservable
|
||||
// MARK: from
|
||||
extension ObservableCreationTests {
|
||||
func testToObservable_complete_immediate() {
|
||||
func testFromArray_complete_immediate() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
let res = scheduler.start {
|
||||
[3, 1, 2, 4].toObservable()
|
||||
}
|
||||
|
||||
XCTAssertEqual(res.events, [
|
||||
next(200, 3),
|
||||
next(200, 1),
|
||||
next(200, 2),
|
||||
next(200, 4),
|
||||
completed(200)
|
||||
])
|
||||
}
|
||||
|
||||
func testToObservable_complete() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
let res = scheduler.start {
|
||||
[3, 1, 2, 4].toObservable(scheduler)
|
||||
Observable.from([3, 1, 2, 4], scheduler: scheduler)
|
||||
}
|
||||
|
||||
XCTAssertEqual(res.events, [
|
||||
|
|
@ -124,10 +109,25 @@ extension ObservableCreationTests {
|
|||
])
|
||||
}
|
||||
|
||||
func testToObservable_dispose() {
|
||||
func testFromArray_complete() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
let res = scheduler.start {
|
||||
Observable.from([3, 1, 2, 4], scheduler: scheduler)
|
||||
}
|
||||
|
||||
XCTAssertEqual(res.events, [
|
||||
next(201, 3),
|
||||
next(202, 1),
|
||||
next(203, 2),
|
||||
next(204, 4),
|
||||
completed(205)
|
||||
])
|
||||
}
|
||||
|
||||
func testFromArray_dispose() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
let res = scheduler.start(203) {
|
||||
[3, 1, 2, 4].toObservable(scheduler)
|
||||
Observable.from([3, 1, 2, 4], scheduler: scheduler)
|
||||
}
|
||||
|
||||
XCTAssertEqual(res.events, [
|
||||
|
|
@ -184,25 +184,25 @@ extension ObservableCreationTests {
|
|||
|
||||
// MARK: toObservable
|
||||
extension ObservableCreationTests {
|
||||
func testToObservableAnySequence_basic_immediate() {
|
||||
func testFromAnySequence_basic_immediate() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
let res = scheduler.start {
|
||||
AnySequence([3, 1, 2, 4]).toObservable()
|
||||
Observable.from(AnySequence([3, 1, 2, 4]), scheduler: scheduler)
|
||||
}
|
||||
|
||||
XCTAssertEqual(res.events, [
|
||||
next(200, 3),
|
||||
next(200, 1),
|
||||
next(200, 2),
|
||||
next(200, 4),
|
||||
completed(200)
|
||||
next(201, 3),
|
||||
next(202, 1),
|
||||
next(203, 2),
|
||||
next(204, 4),
|
||||
completed(205)
|
||||
])
|
||||
}
|
||||
|
||||
func testToObservableAnySequence_basic_testScheduler() {
|
||||
let scheduler = TestScheduler(initialClock: 0)
|
||||
let res = scheduler.start {
|
||||
AnySequence([3, 1, 2, 4]).toObservable(scheduler)
|
||||
Observable.from(AnySequence([3, 1, 2, 4]), scheduler: scheduler)
|
||||
}
|
||||
|
||||
XCTAssertEqual(res.events, [
|
||||
|
|
|
|||
|
|
@ -2500,17 +2500,7 @@ extension ObservableMultipleTest {
|
|||
|
||||
// MARK: combine latest
|
||||
extension ObservableMultipleTest {
|
||||
func testCombineLatest_DeadlockSimple() {
|
||||
var nEvents = 0
|
||||
|
||||
let observable = Observable.combineLatest(Observable.of(0, 1, 2), Observable.of(0, 1, 2)) { $0 + $1 }
|
||||
_ = observable.subscribeNext { n in
|
||||
nEvents += 1
|
||||
}
|
||||
|
||||
XCTAssertEqual(nEvents, 3)
|
||||
}
|
||||
|
||||
|
||||
func testCombineLatest_DeadlockErrorAfterN() {
|
||||
var nEvents = 0
|
||||
|
||||
|
|
|
|||
|
|
@ -261,7 +261,7 @@ extension ObservableTimeTest {
|
|||
|
||||
let start = Date()
|
||||
|
||||
let a = try! [Observable.just(0), Observable.never()].toObservable().concat()
|
||||
let a = try! Observable.from([Observable.just(0), Observable.never()]).concat()
|
||||
.throttle(2.0, scheduler: scheduler)
|
||||
.toBlocking()
|
||||
.first()
|
||||
|
|
|
|||
Loading…
Reference in New Issue