Reputation: 3
My app have an import function that'll execute a Sidekiq Worker and import a bunch of CSV rows, saving them to my database. This works fine when I execute Sidekiq in my local machine, but when I deploy the code to production Sidekiq will execute the job correctly only once. When I use the import function a second time the job goes directly to the history pile in Sidekiq, and the logic inside the worker is never executed. It's really strange because it throws no error and it's like the job was executed correctly. For staging I'm using Redis in AWS Elastic Cache.
redis_version: 5.0.6
rails, "5.0.7"
sidekiq, "6.0.5"
sidekiq-failures, "1.0.0"
sidekiq-history, "0.0.11"
sidekiq-limit_fetch, "3.4.0"
sidekiq-pro, "5.0.1"
sidekiq-unique-jobs, "6.0.15"
I would appreciate any tips related to problems you faced before similar to this, or anything else I can do to debug this problem. I already ran
Sidekiq.redis { |conn| conn.ping }
=> "PONG"
So looks like Redis is connected ok.
Project Worker
# frozen_string_literal: true
class ImportWorker
include Sidekiq::Worker
sidekiq_options queue: "import_worker", lock: :until_executed, retry: false
def perform(import_id)
import = Import.find_by(id: import_id)
return if import.blank?
path = import.file.expiring_url(10)
file = open(path)
csv = CSV.parse(file.read, headers: true)
import.update!( number_of_lines_in_csv: csv.size,
import_started_at: DateTime.now)
created_transactions = []
csv.each do |row|
guid = row["TransactionUniqueId"]
next if guid.blank?
existing_transaction = Transaction.find_by(transaction_unique_id: guid)
next if existing_transaction.present?
attributes = Transaction.convert_attributes(import, row).merge(imported_at: Time.now)
transaction = Transaction.create!(attributes)
created_transactions << [transaction.id, guid]
Rails.logger.info "Transaction #{row["TransactionUniqueId"]} created."
end
import.update!(import_finished_at: DateTime.now,
imported: true)
send_mail(import_id, created_transactions)
end
def send_mail(import_id, created_transactions)
["[email protected]", "[email protected]"].each do |email|
ImportTransactionsMailer.import_processed(import_id, email, created_transactions).deliver
end
end
end
Edit 1: Sorry, I forgot to say that I'm using Cloud66 to deploy my app, if this help in any way.
Upvotes: 0
Views: 540
Reputation: 3
I found what the problem was. So, I was triggering my ImportWorker in a after_create
hook in my Import model as bellow.
# frozen_string_literal: true
class Import < ApplicationRecord
has_many :transactions
belongs_to :admin_user
has_attached_file :file, s3_protocol: :https
validates_attachment_content_type :file, content_type: ["text/plain",
"text/csv",
"application/vnd.ms-excel",
"application/octet-stream"]
validates :file, attachment_presence: true
has_paper_trail
after_create :run_import_in_background
def run_import_in_background
ImportWorker.perform_async(id)
end
end
But when the Worker executed the first lines to find the Import
class ImportWorker
include Sidekiq::Worker
sidekiq_options queue: "import_worker", lock: :until_executed, retry: false
def perform(import_id)
import = Import.find_by(id: import_id)
return if import.blank?
...
if the import was nil
it should return. The problem is, I assumed that the import would never be nil
, because this was triggered from an after_create
hook, but it actually was coming as nil
. When I changed my return line to raise StandardError.new("Empty import object.") if import.blank?
the worker started to fail.
So I also changed my worker sidekiq_options
from retry: false
to retry: 3
and in the second try the worker executed ok, because it could now find the Import with the specified id. So I think this is some kind of sync problem between the after_create
hook and Sidekiq. This might be related to using S3 gem with this setup too. Saving the file in S3 can be causing some delay in saving the object in the DB.
You can see the final Worker code bellow.
# frozen_string_literal: true
class ImportWorker
include Sidekiq::Worker
sidekiq_options queue: "import_worker", lock: :until_executed, retry: 3
def perform(import_id)
import = Import.find_by(id: import_id)
raise StandardError.new("Empty import object.") if import.blank?
...
Upvotes: 0
Reputation: 22228
sidekiq_options queue: "import_worker", lock: :until_executed, retry: false
What happens if an error occurs? Will the job will be discarded but the uniqueness lock will remain, preventing further jobs from enqueuing?
Upvotes: 1