Add `single` non-blocking operator
This commit is contained in:
parent
45d1aa6047
commit
7e92fb7248
|
|
@ -585,6 +585,10 @@
|
|||
CB255BD81BC46A9C00798A4C /* RetryWhen.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB255BD61BC46A9C00798A4C /* RetryWhen.swift */; };
|
||||
CB255BD91BC46A9C00798A4C /* RetryWhen.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB255BD61BC46A9C00798A4C /* RetryWhen.swift */; };
|
||||
CB255BDA1BC46A9C00798A4C /* RetryWhen.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB255BD61BC46A9C00798A4C /* RetryWhen.swift */; };
|
||||
CB30D9E91BF0E3500084C1C0 /* SingleAsync.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB30D9E81BF0E3500084C1C0 /* SingleAsync.swift */; };
|
||||
CB30D9EA1BF0E3500084C1C0 /* SingleAsync.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB30D9E81BF0E3500084C1C0 /* SingleAsync.swift */; };
|
||||
CB30D9EB1BF0E3500084C1C0 /* SingleAsync.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB30D9E81BF0E3500084C1C0 /* SingleAsync.swift */; };
|
||||
CB30D9EC1BF0E3500084C1C0 /* SingleAsync.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB30D9E81BF0E3500084C1C0 /* SingleAsync.swift */; };
|
||||
CB883B3B1BE24355000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3A1BE24355000AC2EE /* Window.swift */; };
|
||||
CB883B3C1BE24355000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3A1BE24355000AC2EE /* Window.swift */; };
|
||||
CB883B3D1BE24355000AC2EE /* Window.swift in Sources */ = {isa = PBXBuildFile; fileRef = CB883B3A1BE24355000AC2EE /* Window.swift */; };
|
||||
|
|
@ -1063,6 +1067,7 @@
|
|||
C8F6A0F81BEE33C1007DF367 /* InvocableScheduledItem.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = InvocableScheduledItem.swift; sourceTree = "<group>"; };
|
||||
C8F6A0FD1BEE42DD007DF367 /* AnonymousInvocable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = AnonymousInvocable.swift; sourceTree = "<group>"; };
|
||||
CB255BD61BC46A9C00798A4C /* RetryWhen.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RetryWhen.swift; sourceTree = "<group>"; };
|
||||
CB30D9E81BF0E3500084C1C0 /* SingleAsync.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SingleAsync.swift; sourceTree = "<group>"; };
|
||||
CB883B3A1BE24355000AC2EE /* Window.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = Window.swift; sourceTree = "<group>"; };
|
||||
CB883B3F1BE24C15000AC2EE /* RefCountDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = RefCountDisposable.swift; sourceTree = "<group>"; };
|
||||
CB883B441BE256D4000AC2EE /* BooleanDisposable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BooleanDisposable.swift; sourceTree = "<group>"; };
|
||||
|
|
@ -1302,6 +1307,7 @@
|
|||
C8093C861B8A72BE0088E94D /* Sample.swift */,
|
||||
C8093C871B8A72BE0088E94D /* Scan.swift */,
|
||||
C89CDB351BCB0DD7002063D9 /* ShareReplay1.swift */,
|
||||
CB30D9E81BF0E3500084C1C0 /* SingleAsync.swift */,
|
||||
C8093C881B8A72BE0088E94D /* Sink.swift */,
|
||||
C8093C891B8A72BE0088E94D /* Skip.swift */,
|
||||
D285BAC31BC0231000B3F602 /* SkipUntil.swift */,
|
||||
|
|
@ -2279,6 +2285,7 @@
|
|||
C8093CDE1B8A72BE0088E94D /* DisposeBag.swift in Sources */,
|
||||
C8093D981B8A72BE0088E94D /* RecursiveScheduler.swift in Sources */,
|
||||
C8093D381B8A72BE0088E94D /* Scan.swift in Sources */,
|
||||
CB30D9EA1BF0E3500084C1C0 /* SingleAsync.swift in Sources */,
|
||||
C8093CD21B8A72BE0088E94D /* Queue.swift in Sources */,
|
||||
C8C3DA131B93A3EA004D233E /* AnonymousObservable.swift in Sources */,
|
||||
C8093D201B8A72BE0088E94D /* FlatMap.swift in Sources */,
|
||||
|
|
@ -2416,6 +2423,7 @@
|
|||
C8093CDD1B8A72BE0088E94D /* DisposeBag.swift in Sources */,
|
||||
C8093D971B8A72BE0088E94D /* RecursiveScheduler.swift in Sources */,
|
||||
C8093D371B8A72BE0088E94D /* Scan.swift in Sources */,
|
||||
CB30D9E91BF0E3500084C1C0 /* SingleAsync.swift in Sources */,
|
||||
C8093CD11B8A72BE0088E94D /* Queue.swift in Sources */,
|
||||
C8C3DA121B93A3EA004D233E /* AnonymousObservable.swift in Sources */,
|
||||
C8093D1F1B8A72BE0088E94D /* FlatMap.swift in Sources */,
|
||||
|
|
@ -2553,6 +2561,7 @@
|
|||
C8F0BFC91BBBFB8B001B112F /* DisposeBag.swift in Sources */,
|
||||
C8F0BFCA1BBBFB8B001B112F /* RecursiveScheduler.swift in Sources */,
|
||||
C8F0BFCB1BBBFB8B001B112F /* Scan.swift in Sources */,
|
||||
CB30D9EC1BF0E3500084C1C0 /* SingleAsync.swift in Sources */,
|
||||
C8F0BFCC1BBBFB8B001B112F /* Queue.swift in Sources */,
|
||||
C8F0BFCD1BBBFB8B001B112F /* AnonymousObservable.swift in Sources */,
|
||||
C8F0BFCE1BBBFB8B001B112F /* FlatMap.swift in Sources */,
|
||||
|
|
@ -2841,6 +2850,7 @@
|
|||
D2EBEAEC1BB9B69E003A27DC /* Lock.swift in Sources */,
|
||||
D2EBEB3C1BB9B6D8003A27DC /* RecursiveScheduler.swift in Sources */,
|
||||
D2EBEAF61BB9B6B2003A27DC /* NopDisposable.swift in Sources */,
|
||||
CB30D9EB1BF0E3500084C1C0 /* SingleAsync.swift in Sources */,
|
||||
D2EBEAFF1BB9B6BA003A27DC /* Buffer.swift in Sources */,
|
||||
D2EBEAF51BB9B6AE003A27DC /* NAryDisposable.swift in Sources */,
|
||||
D2EBEB1D1BB9B6C1003A27DC /* Scan.swift in Sources */,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,80 @@
|
|||
//
|
||||
// SingleAsync.swift
|
||||
// Rx
|
||||
//
|
||||
// Created by Junior B. on 09/11/15.
|
||||
// Copyright © 2015 Krunoslav Zaher. All rights reserved.
|
||||
//
|
||||
|
||||
import Foundation
|
||||
|
||||
class SingleAsyncSink<ElementType, O: ObserverType where O.E == ElementType> : Sink<O>, ObserverType {
|
||||
typealias Parent = SingleAsync<ElementType>
|
||||
typealias E = ElementType
|
||||
|
||||
private let _parent: Parent
|
||||
private var _seenValue: Bool = false
|
||||
|
||||
init(parent: Parent, observer: O) {
|
||||
_parent = parent
|
||||
super.init(observer: observer)
|
||||
}
|
||||
|
||||
func on(event: Event<E>) {
|
||||
switch event {
|
||||
case .Next(let value):
|
||||
|
||||
if let predicate = _parent._predicate {
|
||||
do {
|
||||
print("val: \(value)")
|
||||
let forward = try predicate(value)
|
||||
if forward && _seenValue == false {
|
||||
forwardOn(.Next(value))
|
||||
_seenValue = true
|
||||
} else if forward && _seenValue {
|
||||
forwardOn(.Error(RxError.MoreThanOneElement))
|
||||
dispose()
|
||||
}
|
||||
} catch (let error) {
|
||||
forwardOn(.Error(error as ErrorType))
|
||||
dispose()
|
||||
}
|
||||
} else if _seenValue == false {
|
||||
forwardOn(.Next(value))
|
||||
_seenValue = true
|
||||
} else {
|
||||
forwardOn(.Error(RxError.MoreThanOneElement))
|
||||
dispose()
|
||||
}
|
||||
|
||||
case .Error:
|
||||
forwardOn(event)
|
||||
dispose()
|
||||
case .Completed:
|
||||
if (!_seenValue) {
|
||||
forwardOn(.Error(RxError.NoElements))
|
||||
} else {
|
||||
forwardOn(.Completed)
|
||||
}
|
||||
dispose()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class SingleAsync<Element>: Producer<Element> {
|
||||
typealias Predicate = (Element) throws -> Bool
|
||||
|
||||
private let _source: Observable<Element>
|
||||
private let _predicate: Predicate?
|
||||
|
||||
init(source: Observable<Element>, predicate: Predicate? = nil) {
|
||||
_source = source
|
||||
_predicate = predicate
|
||||
}
|
||||
|
||||
override func run<O : ObserverType where O.E == Element>(observer: O) -> Disposable {
|
||||
let sink = SingleAsyncSink(parent: self, observer: observer)
|
||||
sink.disposable = _source.subscribe(sink)
|
||||
return sink
|
||||
}
|
||||
}
|
||||
|
|
@ -218,7 +218,7 @@ extension ObservableType {
|
|||
}
|
||||
}
|
||||
|
||||
// elementAt
|
||||
// MARK: elementAt
|
||||
|
||||
extension ObservableType {
|
||||
|
||||
|
|
@ -233,4 +233,35 @@ extension ObservableType {
|
|||
-> Observable<E> {
|
||||
return ElementAt(source: self.asObservable(), index: index, throwOnEmpty: true)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: single
|
||||
|
||||
extension ObservableType {
|
||||
|
||||
/**
|
||||
The single operator is similar to first, but throws a `RxError.NoElements` or `RxError.MoreThanOneElement`
|
||||
if the source Observable does not emit exactly one item before successfully completing.
|
||||
|
||||
- returns: An observable sequence that emits a single item or throws an exception if more (or none) of them are emitted.
|
||||
*/
|
||||
@warn_unused_result(message="http://git.io/rxs.uo")
|
||||
public func single()
|
||||
-> Observable<E> {
|
||||
return SingleAsync(source: self.asObservable())
|
||||
}
|
||||
|
||||
/**
|
||||
The single operator is similar to first, but throws a `RxError.NoElements` or `RxError.MoreThanOneElement`
|
||||
if the source Observable does not emit exactly one item before successfully completing.
|
||||
|
||||
- parameter predicate: A function to test each source element for a condition.
|
||||
- returns: An observable sequence that emits a single item or throws an exception if more (or none) of them are emitted.
|
||||
*/
|
||||
@warn_unused_result(message="http://git.io/rxs.uo")
|
||||
public func single(predicate: (E) throws -> Bool)
|
||||
-> Observable<E> {
|
||||
return SingleAsync(source: self.asObservable(), predicate: predicate)
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue