Reputation: 13
I have an issue with my Ruby on Rails application. So, there are workers, which are listening for different rabbitMQ topics. Each worker make some changes it it's DB (mariaDB) table and in the end update 'last_connection' field in common object 'device', where I have issue.
This is one worker:
include Sneakers::Worker
# This worker will connect to "queue" queue
# env is set to nil since by default the actuall queue name would be
# "queue_development"
from_queue "sensor_log"
# work method receives message payload in raw format
def work(raw_message)
logger.info ("sensor_log " + raw_message)
msg = JSON.parse(raw_message)
# msg = {"deviceId" => 102,"timestamp" => 1487318555,"sensor" => 5, "values" => [1,2,3,4,5,6,7,8], "isNewSensor" => false, "logDelayed" => false}
begin
@device = Device.find(msg["deviceId"])
ActiveRecord::Base.transaction do
# Initialize
timestamp = Time.at(msg["timestamp"])
MiddlewareLog.create(
device_id: msg["deviceId"],
message: JSON.pretty_generate(msg),
queue: MiddlewareLog.queues[:sensor_log]
)
if(msg["ext_brd_type"] == 6)
logger.info ("Logging external sensors lm2, lm4, lm4")
# Logging external sensors lm2, lm4, lm4
Sensors::SENSORS_EXT.map do |code|
SensorLog.create(
device_id: msg["deviceId"],
sensor: code,
state: msg[code],
value: msg[code],
is_delayed: msg["logDelayed"],
created_at: timestamp
)
end
else
logger.info ("Logging native sensors")
# Logging native
device_sensors = @device.new_version? ? Sensors::SENSORS_CODES_NEW : Sensors::SENSORS_CODES
@sensors = device_sensors.reject{|code, sensor| code & msg["sensor"] == 0}
@sensors.map do |code, sensor|
SensorLog.create(
device_id: msg["deviceId"],
sensor: sensor[:name],
state: sensor[:state],
value: msg["values"][sensor[:bit_position]],
is_delayed: msg["logDelayed"],
created_at: timestamp
)
end
Rollbar.warning("Unknown device sensor", :message => msg, :sensor => msg["sensor"]) if @sensors.empty?
@device.update_sensors_state(msg["values"]) if @sensors.any?
end
# Avoid updated_at deadlock
@device.save!(touch: false)
end
# Touch updated_at and last_connection_at
@device.touch(:last_connection_at)
ack! # we need to let queue know that message was received
rescue => exception
logger.error ("sensors_log exception:")
logger.error exception
Rollbar.error(exception, :message => msg)
requeue!
end
end
end
Here is the second one:
class SystemLogsWorker
include Sneakers::Worker
# This worker will connect to "queue" queue
# env is set to nil since by default the actuall queue name would be
# "queue_development"
from_queue "system_log"
# @logger = Logger.new(STDOUT)
# @logger.level = Logger::INFO
# work method receives message payload in raw format
def work(raw_message)
# @logger.info raw_message
logger.info ("system_log " + raw_message)
msg = JSON.parse(raw_message)
# msg = {"deviceId":102,"timestamp":1487318555,"system":2069,"logDelayed":false,"fault_code":1}
begin
@device = Device.find(msg["deviceId"])
ActiveRecord::Base.transaction do
# Initialize
timestamp = Time.at(msg["timestamp"])
MiddlewareLog.create(
device_id: msg["deviceId"],
message: JSON.pretty_generate(msg),
queue: MiddlewareLog.queues[:system_log]
)
@system = Systems::EVENTS_CODES[msg["system"]]
# @logger.warn("Unknown device system", :message => msg, :system => msg[:system]) unless @system
# logger.warn("Unknown device system", :message => msg, :system => msg["system"]) unless @system
# Rollbar.warning("Unknown device system", :message => msg, :system => msg["system"]) unless @system
logger.warn("Unknown device system. Message:" + raw_message) unless @system
Rollbar.warning("Unknown device system. Message:" + raw_message) unless @system
# Loggin
system_log = SystemLog.create(
device_id: msg["deviceId"],
system: @system[:name],
state: @system[:state],
is_delayed: msg["logDelayed"],
fault_code: msg["fault_code"],
created_at: timestamp
) if @system
@device.update_systems_state(system_log) if @system
# Avoid updated_at deadlock
@device.save!(touch: false)
end
# Touch updated_at and last_connection_at
@device.touch(:last_connection_at)
ack! # we need to let queue know that message was received
rescue => exception
logger.error ("system_log exception:")
logger.error exception
Rollbar.error(exception, :message => msg)
requeue!
end
end
end
In the runtime I get the message:
2020-06-18T11:09:08Z p-13299 t-gmvtsrzac ERROR: sensors_log exception: 2020-06-18T11:09:08Z p-13299 t-gmvtsrzac ERROR: Mysql2::Error: Deadlock found when trying to get lock; try restarting transaction: UPDATE
devices
SETdevices
.updated_at
= '2020-06-18 11:09:08',devices
.last_connection_at
= '2020-06-18 11:09:08' WHEREdevices
.id
= 30242020-06-18T11:09:08Z p-13299 t-gmvtsq74w ERROR: system_log exception: 2020-06-18T11:09:08Z p-13299 t-gmvtsq74w ERROR: Mysql2::Error: Deadlock found when trying to get lock; try restarting transaction: UPDATE
devices
SETdevices
.updated_at
= '2020-06-18 11:09:08',devices
.last_connection_at
= '2020-06-18 11:09:08' WHEREdevices
.id
= 3024
I think that the problem point in
@device.touch(:last_connection_at)
, because in one time both workers trying to update one table row.
I'm not so good with ruby and will be glad to any help with this.
Upvotes: 1
Views: 1233
Reputation: 1585
Have you tried using a lock within the transaction before updating the database data?
@device.lock!
@device.save!(touch: false)
@device.touch(:last_connection_at)
You can also start a lock and transaction at the same time using with_lock:
@device = Device.find(msg["deviceId"])
@device.with_lock do
# your block here
end
As described in https://api.rubyonrails.org/classes/ActiveRecord/Locking/Pessimistic.html
Upvotes: 0