add docs and refactor a little bit

This commit is contained in:
Ivan Smolin 2016-12-28 15:10:47 +03:00
parent c2802a42f6
commit fff115b2db
1 changed files with 19 additions and 16 deletions

View File

@ -8,13 +8,22 @@
import RxSwift
public typealias CancelClosureType = () -> ()
public typealias ConcurrentMapCancelClosure = () -> ()
public extension Sequence {
/// Method which asynchronous transforms sequence using given transform closure
/// and given number of concurrent operations
///
/// - Parameters:
/// - transform: Transform closure
/// - concurrentOperationCount: Number of concurrent operations
/// - completion: Completion handler with results of transform
/// - Returns: Closure whitch can be called to cancel asynchronous operation
@discardableResult
func concurrentMap<R>(transform: @escaping ((Iterator.Element) throws -> R),
concurrentOperationCount: Int = ProcessInfo.processInfo.activeProcessorCount,
completion: @escaping (([R]) -> ())) -> CancelClosureType {
completion: @escaping (([R]) -> ())) -> ConcurrentMapCancelClosure {
let operationsCount = Swift.max(1, concurrentOperationCount)
@ -56,8 +65,14 @@ public extension Sequence {
public extension Sequence {
func concurrentRxMap<R>(transform: @escaping ((Iterator.Element) throws -> R),
concurrentOperationCount: Int = ProcessInfo.processInfo.activeProcessorCount) -> Observable<[R]> {
/// Reactive version of concurrentMap<R>(transform:concurrentOperationCount:completion:)
///
/// - Parameters:
/// - concurrentOperationCount: Number of concurrent operations
/// - transform: Transform closure
/// - Returns: Observable of transform return type
func concurrentRxMap<R>(concurrentOperationCount: Int = ProcessInfo.processInfo.activeProcessorCount,
transform: @escaping ((Iterator.Element) throws -> R)) -> Observable<[R]> {
return Observable<[R]>.create { observer in
let disposeHandler = self.concurrentMap(transform: transform,
@ -69,17 +84,5 @@ public extension Sequence {
return Disposables.create(with: disposeHandler)
}
}
func concurrentRxMap<R>(transform: @escaping ((Iterator.Element) throws -> R)) -> Observable<[R]> {
return Observable<[R]>.create { observer in
let disposeHandler = self.concurrentMap(transform: transform) {
observer.onNext($0)
observer.onCompleted()
}
return Disposables.create(with: disposeHandler)
}
}
}