user2970050
user2970050

Reputation: 307

Worker hijacking Active Record model

I am a ruby-junior. My app allows a user to enter Contacts and/or upload a CSV file.

I am using the following versions:

  ruby "2.3.0"
  gem "rails", "4.2.5.1" gem "pg", "0.17.1" # postgresql database 
  gem "delayed_job_active_record", ">= 4.0.0.beta1" # background job
  processing gem "delayed_job_web", ">= 1.2.0" # web interface for delayed job

Also using:

> class CsvUploader < CarrierWave::Uploader::Base

def store_dir "uploads/#{model.class.to_s.underscore}/#{mounted_as}/#{model.id}" end

Here is the worker:

class ImportCsvFileWorker

def self.perform(csv_file_id) csv_file = CsvFile.find(csv_file_id)

csv_file.import!
csv_file.send_report!   end

end

I am using smarecsv parsing service

def process_csv parser = ::ImportData::SmartCsvParser.new(csv_file.file_url)

parser.each do |smart_row|
  csv_file.increment!(:total_parsed_records)
  begin
    self.contact = process_row(smart_row)
  rescue => e
    row_parse_error(smart_row, e)
  end
end   rescue => e # parser error or unexpected error
csv_file.save_import_error(e)   end

Does delayed_job lock the dbase for the User/Contact so i can't add any Contacts via the App?

Locally, the app is frozen/hanging or seems locked until background delayed_job is completed (BTW if i run on Heroku, it causes H12 errors but figure I need to fix the issue locally first). Just trying to understand - what is causing it to be locked? Should it be doing this? Is it code (the business logic of the CSV file and the View of Adding a Contact both work independent)? But the App side will not work if there is a background job running or is it the way Active Record handles it. Is there a way around this?

I have not isolated it but am suspicious that if any background job is running, the app becomes unavailable.

I have tried to include all the relevant facts - let me know if any further details needed. many thanks for help.

UPDATE - i discovered i have a ContactMergingService that seems to locking all the contacts. If i comment out this service below, my application does not hang.

So my question is what are other options = Before adding a Contact, what I am trying to do is find all existing same email address (if I find it, I append contact details). how do i do this without locking dbase?

is it because I am using 'find' method? is there a better way?

> class ContactMergingService
> 
>   attr_reader :new_contact, :user
> 
>   def initialize(user, new_contact, _existing_records)
>     @user = user
>     @new_contact = new_contact
>     @found_records = matching_emails_and_phone_numbers   
>   end
> 
>   def perform
>     Rails.logger.info "[CSV.merging] Checking if new contact matches existing contact..."
>     if (existing_contact = existing_contact())
>       Rails.logger.info "[CSV.merging] Contact match found."
>       merge(existing_contact, new_contact)
>       existing_contact
>     else
>       Rails.logger.info "[CSV.merging] No contact match found."
>       new_contact
>     end   end
> 
>   private
> 
>   def existing_contact
>     Rails.logger.info "[CSV.merging] Found records: #{@found_records.inspect}"
>     if @found_records.present?
>       @user.contacts.find @found_records.first.owner_id # Fetch first owner
>     end   end
> 
>   def merge(existing_contact, new_contact)
>     Rails.logger.info "[CSV.merging] Merging with existing contact (ID: #{existing_contact.id})..."
>     merge_records(existing_contact, new_contact)   end
> 
>   def merge_records(existing_relation, new_relation)
>     existing_relation.attributes do |field, value|
>       if value.blank? && new_relation[field].present?
>         existing_relation[field] = new_relation[field]
>       end
>     end
>     new_relation.email_addresses.each do |email_address|
>       Rails.logger.info "[CSV.merging.emails] Email: #{email_address.inspect}"
>       if existing_relation.email_addresses.find_by(email: email_address.email)
>         Rails.logger.info "[CSV.merging.emails] Email address exists."
>       else
>         Rails.logger.info "[CSV.merging.emails] Email does not already exist. Saving..."
>         email_address.owner = existing_relation
>         email_address.save!
>       end
>     end
>     new_relation.phone_numbers.each do |phone_number|
>       Rails.logger.info "[CSV.merging.phone] Phone Number: #{phone_number.inspect}"
>       if existing_relation.phone_numbers.find_by(number: phone_number.number)
>         Rails.logger.info "[CSV.merging.phone] Phone number exists."
>       else
>         Rails.logger.info "[CSV.merging.phone] Phone Number does not already exist. Saving..."
>         phone_number.owner = existing_relation
>         phone_number.save!
>       end
>     end   end
> 
>   def matching_emails_and_phone_numbers
>     records = []
>     if @user
>       records << matching_emails
>       records << matching_phone_numbers
>       Rails.logger.info "[CSV.merging] merged records: #{records.inspect}"
>       records.flatten!
>       Rails.logger.info "[CSV.merging] flattened records: #{records.inspect}"
>       records.compact!
>       Rails.logger.info "[CSV.merging] compacted records: #{records.inspect}"
>     end
>     records   end
> 
>   def matching_emails
>     existing_emails = []
>     new_contact_emails = @new_contact.email_addresses
>     Rails.logger.info "[CSV.merging] new_contact_emails: #{new_contact_emails.inspect}"
>     new_contact_emails.each do |email|
>       Rails.logger.info "[CSV.merging] Checking for a match on email: #{email.inspect}..."
>       if existing_email = @user.contact_email_addresses.find_by(email: email.email, primary: email.primary)
>         Rails.logger.info "[CSV.merging] Found a matching email"
>         existing_emails << existing_email
>       else
>         Rails.logger.info "[CSV.merging] No match found"
>         false
>       end
>     end
>     existing_emails   end
> 
>   def matching_phone_numbers
>     existing_phone_numbers = []
>     @new_contact.phone_numbers.each do |phone_number|
>       Rails.logger.info "[CSV.merging] Checking for a match on phone_number: #{phone_number.inspect}..."
>       if existing_phone_number = @user.contact_phone_numbers.find_by(number: phone_number.number)
>         Rails.logger.info "[CSV.merging] Found a matching phone number"
>         existing_phone_numbers << existing_phone_number
>       else
>         Rails.logger.info "[CSV.merging] No match found"
>         false
>       end
>     end
>     existing_phone_numbers   end
> 
>   def clean_phone_number(number)
>     number.gsub(/[\s\-\(\)]+/, "")   end
> 
> end

Upvotes: 1

Views: 160

Answers (2)

user2970050
user2970050

Reputation: 307

We have concluded that the cause of the issue was when CsvParsingService#perform runs, it puts AccessShareLocks on certain tables in the database (we think Contacts, EmailAddresses, PhoneNumbers, and maybe Users).

The locks persist until the method finishes. Any other request that tried to access one of these locked tables will just sit and wait until the database becomes unlocked. Because the method parses each row of a given uploaded csv_file, it takes as long as 90 minutes to run.

Any request to the app that tries to access one of these locked tables will stop and wait until the tables are unlocked. Because Heroku will cut off a request after 30 seconds, this is what was generating the H12 errors (on the application side).

The cause of the issue was that the gem state-machine_active record wraps every state transition inside a transaction by default.

The worker was calling the parsing service by running csv_file.import!, which triggered a transition on the csv_file state machine, which then called CsvParsingService and parsed each row. Since the state machine was wrapping everything inside a transaction, nothing committed until the state transition was complete.

By updating the gem to version 0.4.0pre, and adding the option use_transactions: false to the state machine in the CsvFile model, it no longer locks the database when calling .import! and processing.

Upvotes: 0

aarkerio
aarkerio

Reputation: 2354

You can try something like:

 Thread.new do
    ActiveRecord::Base.transaction do   
      User.import(user_data)
    end
    ActiveRecord::Base.connection.close
 end

In your CVS importing code.

Upvotes: 1

Related Questions