cursors not thread safe anymore

This commit is contained in:
Ivan Smolin 2017-04-28 17:43:55 +03:00
parent f9680dab9f
commit 2ddad9ee08
3 changed files with 22 additions and 70 deletions

View File

@ -29,14 +29,6 @@ public class FixedPageCursor<Cursor: CursorType>: CursorType {
fileprivate let pageSize: Int
private var internalCount = 0
private var internalExhausted: Bool {
return cursor.exhausted && cursor.count == internalCount
}
private let mutex = Mutex()
/// Initializer with enclosed cursor
///
/// - Parameters:
@ -48,48 +40,35 @@ public class FixedPageCursor<Cursor: CursorType>: CursorType {
}
public var exhausted: Bool {
return mutex.sync { internalExhausted }
return cursor.exhausted && cursor.count == count
}
public var count: Int {
return mutex.sync { internalCount }
}
public private(set) var count: Int = 0
public subscript(index: Int) -> Cursor.Element {
return mutex.sync { cursor[index] }
return cursor[index]
}
public func loadNextBatch() -> Observable<[Cursor.Element]> {
return loadNextBatch(usingMutex: mutex)
}
private func loadNextBatch(usingMutex mutex: Mutex?) -> Observable<[Cursor.Element]> {
return Observable.deferred {
mutex?.unbalancedLock()
if self.internalExhausted {
if self.exhausted {
throw CursorError.exhausted
}
let restOfLoaded = self.cursor.count - self.internalCount
let restOfLoaded = self.cursor.count - self.count
if restOfLoaded >= self.pageSize || self.cursor.exhausted {
let startIndex = self.internalCount
self.internalCount += min(restOfLoaded, self.pageSize)
let startIndex = self.count
self.count += min(restOfLoaded, self.pageSize)
return .just(self.cursor[startIndex..<self.internalCount])
return .just(self.cursor[startIndex..<self.count])
}
return self.cursor.loadNextBatch()
.flatMap { _ in
self.loadNextBatch(usingMutex: nil)
}
self.loadNextBatch()
}
}
.do(onNext: { _ in
mutex?.unbalancedUnlock()
}, onError: { _ in
mutex?.unbalancedUnlock()
})
}
}

View File

@ -55,8 +55,6 @@ public class MapCursor<Cursor: CursorType, T>: CursorType {
private var elements: [T] = []
private let mutex = Mutex()
/// Initializer with enclosed cursor
///
/// - Parameters:
@ -68,33 +66,24 @@ public class MapCursor<Cursor: CursorType, T>: CursorType {
}
public var exhausted: Bool {
return mutex.sync { cursor.exhausted }
return cursor.exhausted
}
public var count: Int {
return mutex.sync { elements.count }
return elements.count
}
public subscript(index: Int) -> T {
return mutex.sync { elements[index] }
return elements[index]
}
public func loadNextBatch() -> Observable<[T]> {
return Observable.deferred {
self.mutex.unbalancedLock()
return cursor.loadNextBatch().map { newItems in
let transformedNewItems = newItems.flatMap(self.transform)
self.elements += transformedNewItems
return self.cursor.loadNextBatch().map { newItems in
let transformedNewItems = newItems.flatMap(self.transform)
self.elements += transformedNewItems
return transformedNewItems
}
return transformedNewItems
}
.do(onNext: { _ in
self.mutex.unbalancedUnlock()
}, onError: { _ in
self.mutex.unbalancedUnlock()
})
}
}

View File

@ -27,11 +27,6 @@ public class StaticCursor<Element>: ResettableCursorType {
private let content: [Element]
private var internalExhausted = false
private var internalCount = 0
private let mutex = Mutex()
/// Initializer for array content type
///
/// - Parameter content: array with elements of Elemet type
@ -43,37 +38,26 @@ public class StaticCursor<Element>: ResettableCursorType {
self.content = other.content
}
public var exhausted: Bool {
return mutex.sync { internalExhausted }
}
public private(set) var exhausted = false
public var count: Int {
return mutex.sync { internalCount }
}
public private(set) var count = 0
public subscript(index: Int) -> Element {
return mutex.sync { content[index] }
return content[index]
}
public func loadNextBatch() -> Observable<[Element]> {
return Observable.deferred {
self.mutex.unbalancedLock()
if self.exhausted {
throw CursorError.exhausted
}
self.internalCount = self.content.count
self.count = self.content.count
self.internalExhausted = true
self.exhausted = true
return .just(self.content)
}
.do(onNext: { _ in
self.mutex.unbalancedUnlock()
}, onError: { _ in
self.mutex.unbalancedUnlock()
})
}
}