diff --git a/RxCocoa/Common/CocoaUnits/Driver/Driver.swift b/RxCocoa/Common/CocoaUnits/Driver/Driver.swift index 0844fe1d..67a82d97 100644 --- a/RxCocoa/Common/CocoaUnits/Driver/Driver.swift +++ b/RxCocoa/Common/CocoaUnits/Driver/Driver.swift @@ -145,7 +145,7 @@ extension Driver { */ // @warn_unused_result(message:"http://git.io/rxs.uo") public static func of(_ elements: E ...) -> Driver { - let source = elements.toObservable(driverSubscribeOnScheduler) + let source = Observable.from(elements, scheduler: driverSubscribeOnScheduler) return Driver(raw: source) } } diff --git a/RxSwift/Observables/Implementations/Sequence.swift b/RxSwift/Observables/Implementations/Sequence.swift index fa8c5d40..453c32a2 100644 --- a/RxSwift/Observables/Implementations/Sequence.swift +++ b/RxSwift/Observables/Implementations/Sequence.swift @@ -8,8 +8,8 @@ import Foundation -class ObservableSequenceSink : Sink { - typealias Parent = ObservableSequence +class ObservableSequenceSink : Sink { + typealias Parent = ObservableSequence private let _parent: Parent @@ -19,10 +19,11 @@ class ObservableSequenceSink : Sink { } func run() -> Disposable { - return _parent._scheduler!.scheduleRecursive((0, _parent._elements)) { (state, recurse) in - if state.0 < state.1.count { - self.forwardOn(.next(state.1[state.0])) - recurse((state.0 + 1, state.1)) + return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) { (iterator, recurse) in + var mutableIterator = iterator + if let next = mutableIterator.0.next() { + self.forwardOn(.next(next)) + recurse(mutableIterator) } else { self.forwardOn(.completed) @@ -31,26 +32,16 @@ class ObservableSequenceSink : Sink { } } -class ObservableSequence : Producer { - private let _elements: [E] - private let _scheduler: ImmediateSchedulerType? +class ObservableSequence : Producer { + private let _elements: S + private let _scheduler: ImmediateSchedulerType - init(elements: [E], scheduler: ImmediateSchedulerType?) { + init(elements: S, scheduler: ImmediateSchedulerType) { _elements = elements _scheduler = scheduler } override func subscribe(_ observer: O) -> Disposable { - // optimized version without scheduler - guard _scheduler != nil else { - for element in _elements { - observer.on(.next(element)) - } - - observer.on(.completed) - return NopDisposable.instance - } - let sink = ObservableSequenceSink(parent: self, observer: observer) sink.disposable = sink.run() return sink diff --git a/RxSwift/Observables/Observable+Creation.swift b/RxSwift/Observables/Observable+Creation.swift index 1335e570..9f30dfc9 100644 --- a/RxSwift/Observables/Observable+Creation.swift +++ b/RxSwift/Observables/Observable+Creation.swift @@ -99,7 +99,7 @@ extension Observable { - returns: The observable sequence whose elements are pulled from the given arguments. */ // @warn_unused_result(message:"http://git.io/rxs.uo") - public static func of(_ elements: E ..., scheduler: ImmediateSchedulerType? = nil) -> Observable { + public static func of(_ elements: E ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable { return ObservableSequence(elements: elements, scheduler: scheduler) } @@ -191,7 +191,8 @@ extension Sequence { - returns: The observable sequence whose elements are pulled from the given enumerable sequence. */ // @warn_unused_result(message:"http://git.io/rxs.uo") - public func toObservable(_ scheduler: ImmediateSchedulerType? = nil) -> Observable { + @available(*, deprecated, renamed: "Observable.from()") + public func toObservable(_ scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable { return ObservableSequence(elements: Array(self), scheduler: scheduler) } } @@ -205,7 +206,32 @@ extension Array { - returns: The observable sequence whose elements are pulled from the given enumerable sequence. */ // @warn_unused_result(message:"http://git.io/rxs.uo") - public func toObservable(_ scheduler: ImmediateSchedulerType? = nil) -> Observable { + @available(*, deprecated, renamed: "Observable.from()") + public func toObservable(_ scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable { return ObservableSequence(elements: self, scheduler: scheduler) } } + +extension Observable { + /** + Converts an array to an observable sequence. + + - seealso: [from operator on reactivex.io](http://reactivex.io/documentation/operators/from.html) + + - returns: The observable sequence whose elements are pulled from the given enumerable sequence. + */ + public static func from(_ array: [E], scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable { + return ObservableSequence(elements: array, scheduler: scheduler) + } + + /** + Converts a sequence to an observable sequence. + + - seealso: [from operator on reactivex.io](http://reactivex.io/documentation/operators/from.html) + + - returns: The observable sequence whose elements are pulled from the given enumerable sequence. + */ + public static func from(_ sequence: S, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable { + return ObservableSequence(elements: sequence, scheduler: scheduler) + } +} diff --git a/Tests/RxSwiftTests/Tests/Observable+CreationTest.swift b/Tests/RxSwiftTests/Tests/Observable+CreationTest.swift index 808943b2..bd675c84 100644 --- a/Tests/RxSwiftTests/Tests/Observable+CreationTest.swift +++ b/Tests/RxSwiftTests/Tests/Observable+CreationTest.swift @@ -92,27 +92,12 @@ extension ObservableCreationTests { } } -// MARK: toObservable +// MARK: from extension ObservableCreationTests { - func testToObservable_complete_immediate() { + func testFromArray_complete_immediate() { let scheduler = TestScheduler(initialClock: 0) let res = scheduler.start { - [3, 1, 2, 4].toObservable() - } - - XCTAssertEqual(res.events, [ - next(200, 3), - next(200, 1), - next(200, 2), - next(200, 4), - completed(200) - ]) - } - - func testToObservable_complete() { - let scheduler = TestScheduler(initialClock: 0) - let res = scheduler.start { - [3, 1, 2, 4].toObservable(scheduler) + Observable.from([3, 1, 2, 4], scheduler: scheduler) } XCTAssertEqual(res.events, [ @@ -124,10 +109,25 @@ extension ObservableCreationTests { ]) } - func testToObservable_dispose() { + func testFromArray_complete() { + let scheduler = TestScheduler(initialClock: 0) + let res = scheduler.start { + Observable.from([3, 1, 2, 4], scheduler: scheduler) + } + + XCTAssertEqual(res.events, [ + next(201, 3), + next(202, 1), + next(203, 2), + next(204, 4), + completed(205) + ]) + } + + func testFromArray_dispose() { let scheduler = TestScheduler(initialClock: 0) let res = scheduler.start(203) { - [3, 1, 2, 4].toObservable(scheduler) + Observable.from([3, 1, 2, 4], scheduler: scheduler) } XCTAssertEqual(res.events, [ @@ -184,25 +184,25 @@ extension ObservableCreationTests { // MARK: toObservable extension ObservableCreationTests { - func testToObservableAnySequence_basic_immediate() { + func testFromAnySequence_basic_immediate() { let scheduler = TestScheduler(initialClock: 0) let res = scheduler.start { - AnySequence([3, 1, 2, 4]).toObservable() + Observable.from(AnySequence([3, 1, 2, 4]), scheduler: scheduler) } XCTAssertEqual(res.events, [ - next(200, 3), - next(200, 1), - next(200, 2), - next(200, 4), - completed(200) + next(201, 3), + next(202, 1), + next(203, 2), + next(204, 4), + completed(205) ]) } func testToObservableAnySequence_basic_testScheduler() { let scheduler = TestScheduler(initialClock: 0) let res = scheduler.start { - AnySequence([3, 1, 2, 4]).toObservable(scheduler) + Observable.from(AnySequence([3, 1, 2, 4]), scheduler: scheduler) } XCTAssertEqual(res.events, [ diff --git a/Tests/RxSwiftTests/Tests/Observable+MultipleTest.swift b/Tests/RxSwiftTests/Tests/Observable+MultipleTest.swift index b669a745..f3b81cdf 100644 --- a/Tests/RxSwiftTests/Tests/Observable+MultipleTest.swift +++ b/Tests/RxSwiftTests/Tests/Observable+MultipleTest.swift @@ -2500,17 +2500,7 @@ extension ObservableMultipleTest { // MARK: combine latest extension ObservableMultipleTest { - func testCombineLatest_DeadlockSimple() { - var nEvents = 0 - - let observable = Observable.combineLatest(Observable.of(0, 1, 2), Observable.of(0, 1, 2)) { $0 + $1 } - _ = observable.subscribeNext { n in - nEvents += 1 - } - - XCTAssertEqual(nEvents, 3) - } - + func testCombineLatest_DeadlockErrorAfterN() { var nEvents = 0 diff --git a/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift b/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift index b11de735..1a68a0df 100644 --- a/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift +++ b/Tests/RxSwiftTests/Tests/Observable+TimeTest.swift @@ -261,7 +261,7 @@ extension ObservableTimeTest { let start = Date() - let a = try! [Observable.just(0), Observable.never()].toObservable().concat() + let a = try! Observable.from([Observable.just(0), Observable.never()]).concat() .throttle(2.0, scheduler: scheduler) .toBlocking() .first()