Reputation: 5388
I have singleton class
class DeviceController:NSObject, CocoaMQTTDelegate {
static let sharedInstance = DeviceController()
var deviceOnArray:[String] = []
var deviceOffArray:[String] = []
private override init() {
clientID = "xyz-" + String(ProcessInfo().processIdentifier)
mqtt = CocoaMQTT(clientID: clientID, host: "device_controller.xyz.net", port: 1883)
mqtt.username = "username"
mqtt.password = "password"
mqtt.willMessage = CocoaMQTTWill(topic: "/will", message: "dieout")
mqtt.keepAlive = 30
mqtt.cleanSession = true
DeviceController.isConnecting = true
super.init()
mqtt.delegate = self
mqtt.connect()
self.registerBackgroundTask()
}
func sendArm(topic:String){
// add device to deviceOnArray
}
func sendDisarm(topic:String){
// remove device from deviceOnArray if exist here if I check by code that device is in array it returns false but on console if print it contains the device, It only heppens when I call sendArm and sendDisarm with a second.
let lockQueue = DispatchQueue(label: "com.test.LockQueue")
lockQueue.sync() {
// all code now inside this
}
// I also used above code but it's not working
}
}
If you read the code then you'll know that I am facing problem in reading correct value from deviceOnArray/deviceOffArray, I am not sure how to explain this problem but I think what I need here is Obj-C atomic thread safe variable. Any idea how create one ?
Upvotes: 1
Views: 15549
Reputation: 25304
import Foundation
class AtomicValue<T> {
private var _value: T
private var accessQueue: DispatchQueue!
init (value: T) {
_value = value
let address = Unmanaged.passUnretained(self).toOpaque()
let label = "accessQueue.\(type(of: self)).\(address)"
accessQueue = DispatchQueue(label: label)
}
}
// MARK: Get/Set with synchronization in current queue
extension AtomicValue {
func waitSet(lockValueAccessWhile closure: ((_ currentValue: T) -> T)?) {
accessQueue.sync { [weak self] in
guard let self = self, let value = closure?(self._value) else { return }
self._value = value
}
}
func waitGet(lockValueAccessWhile closure: ((_ currentValue: T) -> Void)?) {
accessQueue.sync { [weak self] in
guard let self = self else { return }
closure?(self._value)
}
}
func waitUpdate(lockValueAccessWhile closure: ((_ currentValue: inout T) -> Void)?) {
accessQueue.sync { [weak self] in
guard let self = self else { return }
closure?(&self._value)
}
}
func waitMap<V>(lockValueAccessWhile closure: ((_ currentValue: T) -> V)?) -> NotQueueSafeValue<V> {
var value: V!
waitGet { value = closure?($0) }
return NotQueueSafeValue(notQueueSafeValue: value)
}
// Be careful with func waitGet() -> NotQueueSafeValue<T>. It is ONLY for WRITE (SAVE).
// BAD CODE: atomicValue.waitGet().notQueueSafeValue.doSometing().
// it is possible that atomicValue._value could be changed while func doSometing() is performing
// GOOD CODE: atomicValue.waitGet { $0.doSometing() }.
// atomicValue._value will never changed while func doSometing() is performing
struct NotQueueSafeValue<T> { let notQueueSafeValue: T }
func waitGet() -> NotQueueSafeValue<T> {
var value: T!
waitGet { value = $0 }
return NotQueueSafeValue(notQueueSafeValue: value)
}
func waitSet(value: T) { waitSet { _ in return value } }
}
// MARK: Get/Set in current queue
extension AtomicValue {
func set(waitAccessIn queue: DispatchQueue, closure: ((_ currentValue: T) -> T)?) {
queue.async { [weak self] in self?.waitSet(lockValueAccessWhile: closure) }
}
func get(waitAccessIn queue: DispatchQueue, closure: ((_ currentValue: T) -> Void)?) {
queue.async { [weak self] in self?.waitGet(lockValueAccessWhile: closure) }
}
func update(waitAccessIn queue: DispatchQueue, closure: ((_ currentValue: inout T) -> Void)?) {
queue.async { [weak self] in self?.waitUpdate(lockValueAccessWhile: closure) }
}
}
// Usage
let atomicValue = AtomicValue(value: 0)
atomicValue.waitSet { currentValue -> Int in
// Multiple sequential (sync) actions
return currentValue + 1
}
atomicValue.waitGet { currentValue in
// Multiple sequential (sync) actions
print("\(currentValue)")
}
atomicValue.waitUpdate { currentValue in
// Multiple sequential (sync) actions
currentValue += 1
}
atomicValue.set(waitAccessIn: .global(qos: .default)) { currentValue -> Int in
// Multiple sequential (sync) actions
return currentValue + 1
}
atomicValue.get(waitAccessIn: .global(qos: .default)) { currentValue in
// Multiple sequential (sync) actions
print("\(currentValue)")
}
atomicValue.update(waitAccessIn: .global(qos: .background)) { currentValue in
// Multiple sequential (sync) actions
currentValue += 1
}
print(atomicValue.waitGet().notQueueSafeValue)
atomicValue.waitSet(value: 2)
print("\(atomicValue.waitMap { "value: \($0)" }.notQueueSafeValue )")
Do not forget to paste the solution code here.
class Test {
private let atomicValue = AtomicValue(value: 0)
private(set) var count = 0
private var usecDelay: UInt32 = 1000
private let range = (0..<100)
private func _set(currentValue: Int, dispatchGroup: DispatchGroup? = nil) -> Int {
let newValue = currentValue + 1
//print("\(queue.label) queue: \(newValue)")
count += 1
usleep(usecDelay)
dispatchGroup?.leave()
return newValue
}
private func _get(value: Int, queueLabel: String, dispatchGroup: DispatchGroup? = nil) {
print(" get \(queueLabel) queue: \(value)")
usleep(usecDelay)
dispatchGroup?.leave()
}
func test1(queue: DispatchQueue, completion: (() -> Void)?) {
queue.async { [weak self] in
guard let self = self else { return }
self.range.forEach { _ in
self.atomicValue.waitSet { [weak self] in
return self?._set(currentValue: $0) ?? $0
}
self.atomicValue.waitGet { [weak self] in
self?._get(value: $0, queueLabel: queue.label)
}
}
print("test1.work at \(queue.label) queue completed")
completion?()
}
}
func test2(queue: DispatchQueue, completion: (() -> Void)?) {
let dispatchGroup = DispatchGroup()
range.forEach { _ in
dispatchGroup.enter()
dispatchGroup.enter()
}
queue.async { [weak self] in
guard let self = self else { return }
self.range.forEach { _ in
self.atomicValue.waitUpdate { [weak self] in
guard let value = self?._set(currentValue: $0, dispatchGroup: dispatchGroup) else { return }
$0 = value
}
self.atomicValue.waitGet { [weak self] in
self?._get(value: $0, queueLabel: queue.label, dispatchGroup: dispatchGroup)
}
}
}
dispatchGroup.notify(queue: queue) {
print("test2.work at \(queue.label) queue completed")
completion?()
}
}
func test3(queue: DispatchQueue, waitIn waitQueue: DispatchQueue, completion: (() -> Void)?) {
let dispatchGroup = DispatchGroup()
range.forEach { _ in
dispatchGroup.enter()
dispatchGroup.enter()
dispatchGroup.enter()
}
queue.async { [weak self] in
guard let self = self else { return }
self.range.forEach { _ in
self.atomicValue.set(waitAccessIn: waitQueue) { [weak self] in
return self?._set(currentValue: $0, dispatchGroup: dispatchGroup) ?? $0
}
self.atomicValue.get(waitAccessIn: waitQueue) { [weak self] in
self?._get(value: $0, queueLabel: queue.label, dispatchGroup: dispatchGroup)
}
self.atomicValue.update(waitAccessIn: waitQueue) { [weak self] in
guard let value = self?._set(currentValue: $0, dispatchGroup: dispatchGroup) else { return }
$0 = value
}
}
}
dispatchGroup.notify(queue: queue) {
print("test3.work at \(queue.label) queue completed")
completion?()
}
}
func run() {
let dspatchGroup = DispatchGroup()
dspatchGroup.enter()
test1(queue: .global(qos: .utility)) { dspatchGroup.leave() } // result: count += range.max()
dspatchGroup.enter()
test1(queue: .global(qos: .unspecified)) { dspatchGroup.leave() } // result: count += range.max()
dspatchGroup.enter()
test2(queue: .main) { dspatchGroup.leave() } // result: count += range.max()
dspatchGroup.enter()
test2(queue: .global(qos: .unspecified)) { dspatchGroup.leave() } // result: count += range.max()
dspatchGroup.enter()
test3(queue: .global(qos: .default), waitIn: .global(qos: .background)) { dspatchGroup.leave() } // result: count += 2*range.max()
dspatchGroup.enter()
test3(queue: .main, waitIn: .global(qos: .userInteractive)) { dspatchGroup.leave() } // result: count += 2*range.max()
dspatchGroup.notify(queue: .main) { print("End. Count: \(self.count)") }
}
}
Test().run()
get com.apple.root.default-qos queue: 3
get com.apple.root.default-qos queue: 4
get com.apple.root.default-qos queue: 4
get com.apple.root.utility-qos queue: 5
get com.apple.root.default-qos queue: 6
get com.apple.root.default-qos queue: 7
get com.apple.root.default-qos queue: 8
get com.apple.root.default-qos queue: 9
get com.apple.root.default-qos queue: 10
get com.apple.root.default-qos queue: 11
get com.apple.root.default-qos queue: 12
get com.apple.root.default-qos queue: 13
get com.apple.root.default-qos queue: 14
test3.work at com.apple.root.default-qos queue completed
get com.apple.main-thread queue: 15
get com.apple.root.default-qos queue: 17
get com.apple.root.default-qos queue: 18
get com.apple.root.utility-qos queue: 18
get com.apple.main-thread queue: 19
get com.apple.root.default-qos queue: 21
get com.apple.root.default-qos queue: 22
get com.apple.root.utility-qos queue: 22
get com.apple.main-thread queue: 23
get com.apple.root.default-qos queue: 25
get com.apple.root.default-qos queue: 26
get com.apple.root.utility-qos queue: 26
get com.apple.main-thread queue: 27
get com.apple.root.default-qos queue: 29
get com.apple.root.default-qos queue: 30
get com.apple.root.utility-qos queue: 30
get com.apple.main-thread queue: 31
get com.apple.root.default-qos queue: 33
get com.apple.root.default-qos queue: 34
get com.apple.root.utility-qos queue: 34
get com.apple.main-thread queue: 35
get com.apple.root.default-qos queue: 37
get com.apple.root.default-qos queue: 38
get com.apple.root.utility-qos queue: 38
get com.apple.main-thread queue: 39
get com.apple.root.default-qos queue: 41
get com.apple.root.default-qos queue: 42
get com.apple.root.utility-qos queue: 42
get com.apple.main-thread queue: 43
get com.apple.root.default-qos queue: 45
get com.apple.root.default-qos queue: 46
get com.apple.root.utility-qos queue: 46
get com.apple.main-thread queue: 47
get com.apple.root.default-qos queue: 49
test1.work at com.apple.root.default-qos queue completed
get com.apple.root.default-qos queue: 50
test2.work at com.apple.root.default-qos queue completed
get com.apple.root.utility-qos queue: 50
get com.apple.main-thread queue: 50
test1.work at com.apple.root.utility-qos queue completed
get com.apple.main-thread queue: 51
test2.work at com.apple.main-thread queue completed
get com.apple.main-thread queue: 52
get com.apple.main-thread queue: 53
get com.apple.main-thread queue: 54
get com.apple.main-thread queue: 55
get com.apple.main-thread queue: 56
get com.apple.main-thread queue: 57
get com.apple.main-thread queue: 58
get com.apple.main-thread queue: 59
get com.apple.main-thread queue: 60
test3.work at com.apple.main-thread queue completed
End. Count: 60
Upvotes: 7
Reputation: 115104
You can use a serial dispatch queue to ensure that the array is only updated in a thread safe manner.
It is also best to change your deviceOnArray
property to private to ensure that it cannot be accessed by some other object. If you need to expose this array to other objects, do so via a computed property. e.g.
class DeviceController:NSObject, CocoaMQTTDelegate {
static let sharedInstance = DeviceController()
private var deviceOnArray:[String] = []
var deviceOn: [String] {
return self.deviceOnArray
}
var deviceOffArray:[String] = []
private let dispatchQueue = DispatchQueue(label:"DeviceControllerQueue")
private override init() {
clientID = "xyz-" + String(ProcessInfo().processIdentifier)
mqtt = CocoaMQTT(clientID: clientID, host: "device_controller.xyz.net", port: 1883)
mqtt.username = "username"
mqtt.password = "password"
mqtt.willMessage = CocoaMQTTWill(topic: "/will", message: "dieout")
mqtt.keepAlive = 30
mqtt.cleanSession = true
DeviceController.isConnecting = true
super.init()
mqtt.delegate = self
mqtt.connect()
self.registerBackgroundTask()
}
func sendArm(topic:String){
// add device to deviceOnArray
self.dispatchQueue.sync {
deviceOnArray.append(topic)
}
}
func sendDisarm(topic:String){
// remove device from deviceOnArray if exist here.
self.dispatchQueue.sync {
if let index = self.deviceOnArray.index(of: topic) {
self.deviceOnArray.remove(at: index)
}
}
}
}
Upvotes: 3
Reputation: 10209
Atomic properties won't help you here. They are intended to sync on assignment of the property as a whole and not of the internals (e.g. they do not sync insertion/removal of elements to the array). They almost only ensure correct retain/release/autorelease calls in order to keep your program from crashing / leaking.
What you would need is DispatchSemaphore or something similar (or maybe more native stuff, posix pthread_mutex
stuff) to ensure mutual exclusive access.
Upvotes: 2