diff --git a/Rx.xcodeproj/project.pbxproj b/Rx.xcodeproj/project.pbxproj index fdb94d15..693caf24 100644 --- a/Rx.xcodeproj/project.pbxproj +++ b/Rx.xcodeproj/project.pbxproj @@ -585,6 +585,10 @@ CB255BD81BC46A9C00798A4C /* RetryWhen.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB255BD61BC46A9C00798A4C /* RetryWhen.swift */; }; CB255BD91BC46A9C00798A4C /* RetryWhen.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB255BD61BC46A9C00798A4C /* RetryWhen.swift */; }; CB255BDA1BC46A9C00798A4C /* RetryWhen.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB255BD61BC46A9C00798A4C /* RetryWhen.swift */; }; + CB30D9E91BF0E3500084C1C0 /* SingleAsync.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB30D9E81BF0E3500084C1C0 /* SingleAsync.swift */; }; + CB30D9EA1BF0E3500084C1C0 /* SingleAsync.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB30D9E81BF0E3500084C1C0 /* SingleAsync.swift */; }; + CB30D9EB1BF0E3500084C1C0 /* SingleAsync.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB30D9E81BF0E3500084C1C0 /* SingleAsync.swift */; }; + CB30D9EC1BF0E3500084C1C0 /* SingleAsync.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB30D9E81BF0E3500084C1C0 /* SingleAsync.swift */; }; CB883B3B1BE24355000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3A1BE24355000AC2EE /* Window.swift */; }; CB883B3C1BE24355000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3A1BE24355000AC2EE /* Window.swift */; }; CB883B3D1BE24355000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3A1BE24355000AC2EE /* Window.swift */; }; @@ -1063,6 +1067,7 @@ C8F6A0F81BEE33C1007DF367 /* InvocableScheduledItem.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = InvocableScheduledItem.swift; sourceTree = ""; }; C8F6A0FD1BEE42DD007DF367 /* AnonymousInvocable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AnonymousInvocable.swift; sourceTree = ""; }; CB255BD61BC46A9C00798A4C /* RetryWhen.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RetryWhen.swift; sourceTree = ""; }; + CB30D9E81BF0E3500084C1C0 /* SingleAsync.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SingleAsync.swift; sourceTree = ""; }; CB883B3A1BE24355000AC2EE /* Window.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Window.swift; sourceTree = ""; }; CB883B3F1BE24C15000AC2EE /* RefCountDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RefCountDisposable.swift; sourceTree = ""; }; CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BooleanDisposable.swift; sourceTree = ""; }; @@ -1302,6 +1307,7 @@ C8093C861B8A72BE0088E94D /* Sample.swift */, C8093C871B8A72BE0088E94D /* Scan.swift */, C89CDB351BCB0DD7002063D9 /* ShareReplay1.swift */, + CB30D9E81BF0E3500084C1C0 /* SingleAsync.swift */, C8093C881B8A72BE0088E94D /* Sink.swift */, C8093C891B8A72BE0088E94D /* Skip.swift */, D285BAC31BC0231000B3F602 /* SkipUntil.swift */, @@ -2279,6 +2285,7 @@ C8093CDE1B8A72BE0088E94D /* DisposeBag.swift in Sources */, C8093D981B8A72BE0088E94D /* RecursiveScheduler.swift in Sources */, C8093D381B8A72BE0088E94D /* Scan.swift in Sources */, + CB30D9EA1BF0E3500084C1C0 /* SingleAsync.swift in Sources */, C8093CD21B8A72BE0088E94D /* Queue.swift in Sources */, C8C3DA131B93A3EA004D233E /* AnonymousObservable.swift in Sources */, C8093D201B8A72BE0088E94D /* FlatMap.swift in Sources */, @@ -2416,6 +2423,7 @@ C8093CDD1B8A72BE0088E94D /* DisposeBag.swift in Sources */, C8093D971B8A72BE0088E94D /* RecursiveScheduler.swift in Sources */, C8093D371B8A72BE0088E94D /* Scan.swift in Sources */, + CB30D9E91BF0E3500084C1C0 /* SingleAsync.swift in Sources */, C8093CD11B8A72BE0088E94D /* Queue.swift in Sources */, C8C3DA121B93A3EA004D233E /* AnonymousObservable.swift in Sources */, C8093D1F1B8A72BE0088E94D /* FlatMap.swift in Sources */, @@ -2553,6 +2561,7 @@ C8F0BFC91BBBFB8B001B112F /* DisposeBag.swift in Sources */, C8F0BFCA1BBBFB8B001B112F /* RecursiveScheduler.swift in Sources */, C8F0BFCB1BBBFB8B001B112F /* Scan.swift in Sources */, + CB30D9EC1BF0E3500084C1C0 /* SingleAsync.swift in Sources */, C8F0BFCC1BBBFB8B001B112F /* Queue.swift in Sources */, C8F0BFCD1BBBFB8B001B112F /* AnonymousObservable.swift in Sources */, C8F0BFCE1BBBFB8B001B112F /* FlatMap.swift in Sources */, @@ -2841,6 +2850,7 @@ D2EBEAEC1BB9B69E003A27DC /* Lock.swift in Sources */, D2EBEB3C1BB9B6D8003A27DC /* RecursiveScheduler.swift in Sources */, D2EBEAF61BB9B6B2003A27DC /* NopDisposable.swift in Sources */, + CB30D9EB1BF0E3500084C1C0 /* SingleAsync.swift in Sources */, D2EBEAFF1BB9B6BA003A27DC /* Buffer.swift in Sources */, D2EBEAF51BB9B6AE003A27DC /* NAryDisposable.swift in Sources */, D2EBEB1D1BB9B6C1003A27DC /* Scan.swift in Sources */, diff --git a/RxSwift/Observables/Implementations/SingleAsync.swift b/RxSwift/Observables/Implementations/SingleAsync.swift new file mode 100644 index 00000000..12135c17 --- /dev/null +++ b/RxSwift/Observables/Implementations/SingleAsync.swift @@ -0,0 +1,80 @@ +// +// SingleAsync.swift +// Rx +// +// Created by Junior B. on 09/11/15. +// Copyright © 2015 Krunoslav Zaher. All rights reserved. +// + +import Foundation + +class SingleAsyncSink : Sink, ObserverType { + typealias Parent = SingleAsync + typealias E = ElementType + + private let _parent: Parent + private var _seenValue: Bool = false + + init(parent: Parent, observer: O) { + _parent = parent + super.init(observer: observer) + } + + func on(event: Event) { + switch event { + case .Next(let value): + + if let predicate = _parent._predicate { + do { + print("val: \(value)") + let forward = try predicate(value) + if forward && _seenValue == false { + forwardOn(.Next(value)) + _seenValue = true + } else if forward && _seenValue { + forwardOn(.Error(RxError.MoreThanOneElement)) + dispose() + } + } catch (let error) { + forwardOn(.Error(error as ErrorType)) + dispose() + } + } else if _seenValue == false { + forwardOn(.Next(value)) + _seenValue = true + } else { + forwardOn(.Error(RxError.MoreThanOneElement)) + dispose() + } + + case .Error: + forwardOn(event) + dispose() + case .Completed: + if (!_seenValue) { + forwardOn(.Error(RxError.NoElements)) + } else { + forwardOn(.Completed) + } + dispose() + } + } +} + +class SingleAsync: Producer { + typealias Predicate = (Element) throws -> Bool + + private let _source: Observable + private let _predicate: Predicate? + + init(source: Observable, predicate: Predicate? = nil) { + _source = source + _predicate = predicate + } + + override func run(observer: O) -> Disposable { + let sink = SingleAsyncSink(parent: self, observer: observer) + sink.disposable = _source.subscribe(sink) + return sink + } +} \ No newline at end of file diff --git a/RxSwift/Observables/Observable+StandardSequenceOperators.swift b/RxSwift/Observables/Observable+StandardSequenceOperators.swift index 3dbed145..31dad0bb 100644 --- a/RxSwift/Observables/Observable+StandardSequenceOperators.swift +++ b/RxSwift/Observables/Observable+StandardSequenceOperators.swift @@ -218,7 +218,7 @@ extension ObservableType { } } -// elementAt +// MARK: elementAt extension ObservableType { @@ -233,4 +233,35 @@ extension ObservableType { -> Observable { return ElementAt(source: self.asObservable(), index: index, throwOnEmpty: true) } +} + +// MARK: single + +extension ObservableType { + + /** + The single operator is similar to first, but throws a `RxError.NoElements` or `RxError.MoreThanOneElement` + if the source Observable does not emit exactly one item before successfully completing. + + - returns: An observable sequence that emits a single item or throws an exception if more (or none) of them are emitted. + */ + @warn_unused_result(message="http://git.io/rxs.uo") + public func single() + -> Observable { + return SingleAsync(source: self.asObservable()) + } + + /** + The single operator is similar to first, but throws a `RxError.NoElements` or `RxError.MoreThanOneElement` + if the source Observable does not emit exactly one item before successfully completing. + + - parameter predicate: A function to test each source element for a condition. + - returns: An observable sequence that emits a single item or throws an exception if more (or none) of them are emitted. + */ + @warn_unused_result(message="http://git.io/rxs.uo") + public func single(predicate: (E) throws -> Bool) + -> Observable { + return SingleAsync(source: self.asObservable(), predicate: predicate) + } + } \ No newline at end of file