mark
mark

Reputation: 62804

How can I insert millions of records into a mongo DB from a large zipped csv file efficiently?

I am trying to insert about 8 million of records into Mongo and it seems to insert them with the rate of 1000 records per second, which is extremely slow.

The code is written in python, so it may be the problem of python, but I doubt it. Here is the code:

def str2datetime(str):
  return None if (not str or str == r'\N') else datetime.strptime(str, '%Y-%m-%d %H:%M:%S')
def str2bool(str):
  return None if (not str or str == r'\N') else (False if str == '0' else True)
def str2int(str):
  return None if (not str or str == r'\N') else int(str)
def str2float(str):
  return None if (not str or str == r'\N') else float(str)
def str2float2int(str):
  return None if (not str or str == r'\N') else int(float(str) + 0.5)
def str2latin1(str):
  return unicode(str, 'latin-1')

_ = lambda x: x

converters_map = {
  'test_id': str2int,
  'android_device_id': str2int,
  'android_fingerprint': _,
  'test_date': str2datetime,
  'client_ip_address': _,
  'download_kbps': str2int,
  'upload_kbps': str2int,
  'latency': str2int,
  'server_name': _,
  'server_country': _,
  'server_country_code': _,
  'server_latitude': str2float,
  'server_longitude': str2float,
  'client_country': _,
  'client_country_code': _,
  'client_region_name': str2latin1,
  'client_region_code': _,
  'client_city': str2latin1,
  'client_latitude': str2float,
  'client_longitude': str2float,
  'miles_between': str2float2int,
  'connection_type': str2int,
  'isp_name': _,
  'is_isp': str2bool,
  'network_operator_name': _,
  'network_operator': _,
  'brand': _,
  'device': _,
  'hardware': _,
  'build_id': _,
  'manufacturer': _,
  'model': str2latin1,
  'product': _,
  'cdma_cell_id': str2int,
  'gsm_cell_id': str2int,
  'client_ip_id': str2int,
  'user_agent': _,
  'client_net_speed': str2int,
  'iphone_device_id': str2int,
  'carrier_name': _,
  'iso_country_code': _,
  'mobile_country_code': str2int,
  'mobile_network_code': str2int,
  'model': str2latin1,
  'version': _,
  'server_sponsor_name': _,
}

def read_csv_zip(path):
  with ZipFile(path) as z:
    with z.open(z.namelist()[0]) as input:
      r = csv.reader(input)
      header = r.next()
      converters = tuple((title if title != 'test_id' else '_id', converters_map[title]) for title in header)
      for row in r:
        row = {converter[0]:converter[1](value) for converter, value in zip(converters, row)}
        yield row

argv = [x for x in argv if not x == '']
if len(argv) == 1:
  print("Usage: " + argv[0] + " zip-file")
  exit(1)

zip_file = argv[1]
collection_name = zip_file[:zip_file.index('_')]

print("Populating " + collection_name + " with the data from " + zip_file)
with Connection() as connection:
  db = connection.db
  collection = db.__getattr__(collection_name)
  i = 0;
  try:
    start = time()
    for item in read_csv_zip(zip_file):
      i += 1
      if (i % 1000) == 0:
        stdout.write("\r%d " % i)
        stdout.flush()
      try:
        collection.insert(item)
      except Exception as exc:
        print("Failed at the record #{0} (id = {1})".format(i,item['_id']))
        print exc
    print("Elapsed time = {0} seconds, {1} records.".format(time() - start, i))
    raw_input("Press ENTER to exit")
  except Exception as exc:
    print("Failed at the record #{0} (id = {1})".format(i,item['_id']))
    print exc
    exit(1)

It takes 350 seconds to insert 262796 records (one csv file).

The mongo server is running on the same machine and no one is using it. So, I could write directly to the database file, if there was a way.

I am not interested in sharding, because 8 million records are not supposed to require sharding, aren't they?

My question is what am I doing wrong? Maybe my choice of DB is wrong? The typical flow is that once a month the records are refreshed and then only queries are made against the database.

Thanks.

EDIT

It turns out, that the bottleneck is not mongo, but reading the zip file. I have changed the code to read the zip file in chunks of 1000 rows and then feed them to mongo in one call to Collection.insert. It the zip file, which takes all the time. Here is the modified code:

def insert_documents(collection, source, i, batch_size):
  count = 0;
  while True:
    items = list(itertools.islice(source, batch_size))
    if len(items) == 0:
      break;
    old_i = i
    count += len(items)
    i += len(items)
    if (old_i / 1000) != (i / 1000):
      sys.stdout.write("\r%d " % i)
      sys.stdout.flush()
    try:
      collection.insert(items)
    except Exception as exc:
      print("Failed at some record between #{0} (id = {1}) and #{2} (id = {3})".format(old_i,items[0]['_id'],i,items[-1]['_id']))
      print exc
  return count

def main():
  argv = [x for x in sys.argv if not x == '']
  if len(argv) == 1:
    print("Usage: " + argv[0] + " zip-file")
    exit(1)

  zip_file = argv[1]
  collection_name = zip_file[:zip_file.index('_')]

  print("Populating " + collection_name + " with the data from " + zip_file)
  with Connection() as connection:
    ookla = connection.ookla
    collection = ookla.__getattr__(collection_name)
    i = 0;
    start = time()
    count = insert_documents(collection, read_csv_zip(zip_file), i, 1000)
    i += count
    print("Elapsed time = {0} seconds, {1} records.".format(time() - start, count))
    raw_input("Press ENTER to exit")

if __name__ == "__main__":
  main()

It turns out that most of the time goes into items = list(itertools.islice(source, batch_size)).

Any ideas on how to improve it?

Upvotes: 2

Views: 5096

Answers (2)

Remon van Vliet
Remon van Vliet

Reputation: 18615

Although you've pointed out in your comments that you can't use mongoimport you can, and should. Dates can be imported perfectly as well as your str2latin conversion. Simply pre-process your csv to be mongoimport compatible and you're golden.

convert dates to {myDate:{$date: msSinceEpoch}} and mongoimport will understand it. So with one pre-processing step you can use mongoimport and given your usecase I don't see why that would be a problem.

That said mongoimport should not be an order rof magnitude faster than batch inserts and although 1000/sec is not slow it's certainly not in line with the type of performance I'm getting even on a simple dev machine. I can easily hit 30k/sec and probably more if I used batch inserts rather than mono inserts, especially with safe=false writes (which should be fine in this case since you can verify as a 2nd step after the import). What resource is your bottleneck? (check with mongostat and top)

Upvotes: 2

Epcylon
Epcylon

Reputation: 4727

At slightly more than 1ms per record, I wouldn't really call it "extremely slow", but anyways, here are some thoughts as to what to do next:

  • Use a profiler to see where the program spends its time. It might not be where you think.
  • Consider using the mongoimport utility linked from the comment by ChrisP, as it's designed for this purpose.

Upvotes: 0

Related Questions