fully use rx for concurrent array processing

This commit is contained in:
Ivan Smolin 2016-12-29 16:12:23 +03:00
parent 3810c27ae6
commit c8114d92ec
3 changed files with 18 additions and 96 deletions

View File

@ -11,7 +11,6 @@
78011AB31D48B53600EA16A2 /* ApiRequestParameters.swift in Sources */ = {isa = PBXBuildFile; fileRef = 78011AB21D48B53600EA16A2 /* ApiRequestParameters.swift */; };
780D23431DA412470084620D /* CGImage+Alpha.swift in Sources */ = {isa = PBXBuildFile; fileRef = 780D23421DA412470084620D /* CGImage+Alpha.swift */; };
780D23461DA416F80084620D /* CGContext+Initializers.swift in Sources */ = {isa = PBXBuildFile; fileRef = 780D23451DA416F80084620D /* CGContext+Initializers.swift */; };
780F56C71E0D7608004530B6 /* ResultOperation.swift in Sources */ = {isa = PBXBuildFile; fileRef = 780F56C61E0D7608004530B6 /* ResultOperation.swift */; };
780F56CA1E0D76B8004530B6 /* Sequence+ConcurrentMap.swift in Sources */ = {isa = PBXBuildFile; fileRef = 780F56C91E0D76B8004530B6 /* Sequence+ConcurrentMap.swift */; };
780F56CC1E0D7ACA004530B6 /* ObservableMappable.swift in Sources */ = {isa = PBXBuildFile; fileRef = 780F56CB1E0D7ACA004530B6 /* ObservableMappable.swift */; };
7827C9341DE4ADB2009DA4E6 /* Alamofire.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 7827C92E1DE4ADB2009DA4E6 /* Alamofire.framework */; };
@ -94,7 +93,6 @@
78011AB21D48B53600EA16A2 /* ApiRequestParameters.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ApiRequestParameters.swift; sourceTree = "<group>"; };
780D23421DA412470084620D /* CGImage+Alpha.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "CGImage+Alpha.swift"; sourceTree = "<group>"; };
780D23451DA416F80084620D /* CGContext+Initializers.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "CGContext+Initializers.swift"; sourceTree = "<group>"; };
780F56C61E0D7608004530B6 /* ResultOperation.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ResultOperation.swift; sourceTree = "<group>"; };
780F56C91E0D76B8004530B6 /* Sequence+ConcurrentMap.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "Sequence+ConcurrentMap.swift"; sourceTree = "<group>"; };
780F56CB1E0D7ACA004530B6 /* ObservableMappable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ObservableMappable.swift; sourceTree = "<group>"; };
7827C92E1DE4ADB2009DA4E6 /* Alamofire.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; name = Alamofire.framework; path = ../../../Carthage/Build/iOS/Alamofire.framework; sourceTree = "<group>"; };
@ -252,14 +250,6 @@
path = CGContext;
sourceTree = "<group>";
};
780F56C51E0D75F7004530B6 /* Operations */ = {
isa = PBXGroup;
children = (
780F56C61E0D7608004530B6 /* ResultOperation.swift */,
);
path = Operations;
sourceTree = "<group>";
};
780F56C81E0D76A5004530B6 /* Sequence */ = {
isa = PBXGroup;
children = (
@ -376,7 +366,6 @@
children = (
78B0FC7B1C6B2BAE00358B64 /* Logging */,
78753E2A1DE58BED006BC0FB /* Cursors */,
780F56C51E0D75F7004530B6 /* Operations */,
);
path = Classes;
sourceTree = "<group>";
@ -740,7 +729,6 @@
787783671CA04D4A001CDC9B /* String+SizeCalculation.swift in Sources */,
7873D1511E112B0D001816EB /* Any+Cast.swift in Sources */,
78B036431DA4FEC90021D5CC /* CGImage+Transform.swift in Sources */,
780F56C71E0D7608004530B6 /* ResultOperation.swift in Sources */,
78011A641D47ABC500EA16A2 /* UIView+DefaultReuseIdentifier.swift in Sources */,
786D78EC1D53C46E006B2CEA /* AlamofireManager+Extensions.swift in Sources */,
78B0FC811C6B2CD500358B64 /* App.swift in Sources */,

View File

@ -1,29 +0,0 @@
//
// ResultOperation.swift
// LeadKit
//
// Created by Ivan Smolin on 23/12/16.
// Copyright © 2016 Touch Instinct. All rights reserved.
//
import Foundation
/// Subclass of Operation which contains result of executed operation
public class ResultOperation<T>: Operation {
/// Result of executed operation or nil if operation is not finished yet or throw error
public var result: T?
public typealias ExecutionClosure = () throws -> T
private let executionClosure: ExecutionClosure
public init(executionClosure: @escaping ExecutionClosure) {
self.executionClosure = executionClosure
}
override public func main() {
result = try? executionClosure()
}
}

View File

@ -8,81 +8,44 @@
import RxSwift
public typealias ConcurrentMapCancelClosure = () -> ()
public extension Sequence {
/// Method which asynchronous transforms sequence using given transform closure
/// and given number of concurrent operations
///
/// - Parameters:
/// - transform: Transform closure
/// - concurrentOperationCount: Number of concurrent operations
/// - completion: Completion handler with results of transform
/// - Returns: Closure whitch can be called to cancel asynchronous operation
@discardableResult
func concurrentMap<R>(transform: @escaping ((Iterator.Element) throws -> R),
concurrentOperationCount: Int = ProcessInfo.processInfo.activeProcessorCount,
completion: @escaping (([R]) -> ())) -> ConcurrentMapCancelClosure {
/// - qos: Target global dispatch queue, by quality of service class.
/// - transform: Transform closure
/// - Returns: Observable of array which contains transform return type
func concurrentRxMap<R>(concurrentOperationCount: Int = ProcessInfo.processInfo.activeProcessorCount,
qos: DispatchQoS = .default,
transform: @escaping ((Iterator.Element) throws -> R)) -> Observable<[R]> {
let operationsCount = Swift.max(1, concurrentOperationCount)
let operationQueue = OperationQueue()
operationQueue.maxConcurrentOperationCount = operationsCount
let array = Array(self)
let step = Int(ceil(Double(array.count) / Double(operationsCount)))
let numberOfSlices = Int(ceil(Double(array.count) / Double(step)))
DispatchQueue.global().async {
let operations: [ResultOperation<[R]>] = (0..<numberOfSlices).map {
let start = $0 * step
let end = Swift.min(start + step, array.count)
let indexedRanges: [(idx: Int, range: CountableRange<Int>)] = (0..<numberOfSlices).map {
let start = $0 * step
let end = Swift.min(start + step, array.count)
return ResultOperation { try array[start..<end].map(transform) }
}
operationQueue.addOperations(operations, waitUntilFinished: true)
var results: [R] = [] // var is used for performance optimization
let operationsResults = operations.flatMap { $0.result }
for operationResult in operationsResults {
results += operationResult
}
completion(results)
return ($0, start..<end)
}
return {
operationQueue.cancelAllOperations()
}
}
let scheduler = ConcurrentDispatchQueueScheduler(qos: qos)
}
public extension Sequence {
/// Reactive version of concurrentMap<R>(transform:concurrentOperationCount:completion:)
///
/// - Parameters:
/// - concurrentOperationCount: Number of concurrent operations
/// - transform: Transform closure
/// - Returns: Observable of transform return type
func concurrentRxMap<R>(concurrentOperationCount: Int = ProcessInfo.processInfo.activeProcessorCount,
transform: @escaping ((Iterator.Element) throws -> R)) -> Observable<[R]> {
return Observable<[R]>.create { observer in
let disposeHandler = self.concurrentMap(transform: transform,
concurrentOperationCount: concurrentOperationCount) {
observer.onNext($0)
observer.onCompleted()
return Observable.from(indexedRanges)
.flatMap { indexedRange -> Observable<(idx: Int, results: [R])> in
return Observable.just(indexedRange)
.observeOn(scheduler)
.map { (idx: $0.idx, results: try array[$0.range].map(transform)) }
}
return Disposables.create(with: disposeHandler)
}
.toArray()
.map { $0.sorted { $0.0.idx < $0.1.idx }.flatMap { $0.results } }
}
}