roy
roy

Reputation: 305

Bulk-insertion into couchbase via python

I am trying to do some bulk-insertion in couch-base. I tried to search examples over SO and google, but I could not get any clue. Here someone mention that its not possible.

How to insert a documents in bulk in Couchbase?

but I guess this question was asked 3 years ago. I search and if I understand correctly from below given link, its possible to insert document in bulk.

https://developer.couchbase.com/documentation/server/current/sdk/batching-operations.html

https://pythonhosted.org/couchbase/api/couchbase.html#batch-operation-pipeline

Here is my code on which I want to implement bulk-insertion in couchbase

import time
import csv
from couchbase import Couchbase
from couchbase.bucket import Bucket
from couchbase.exceptions import CouchbaseError
c = Bucket('couchbase://localhost/bulk-load')
from couchbase.exceptions import CouchbaseTransientError
BYTES_PER_BATCH = 1024 * 256 # 256K

with open('/home/royshah/Desktop/bulk_try/roy.csv') as csvfile:
    lines = csvfile.readlines()[4:]
for k, line in enumerate(lines):
    data_tmp = line.strip().split(',')
    strDate = data_tmp[0].replace("\"", "")
    timerecord = datetime.datetime.strptime(strDate,
                                           '%Y-%m-%d %H:%M:%S.%f')
    microsecs = timerecord.microsecond
    strDate = "\"" + strDate + "\""
    ts = calendar.timegm(timerecord.timetuple())*1000000 + microsecs
    datastore = [ts] + data_tmp[1:]

    stre = {'col1 ': datastore[1],  # I am making key-values on the fly from csv file
            'col2': datastore[2],
            'col3': datastore[3],
            'col4': datastore[4],
            'col5': datastore[5],
            'col6': datastore[6]}
  cb.upsert(str(datastore[0]), (stre))    # datastore[0] is used as document
                                      id and (stre) is used as key-value to be
                                      inserted for respective id. 

cb.upsert(str(datastore[0]), (stre)) is doing single insertion and I want to make it bulk-insertion to make it faster. I had no idea how to turn this in bulk-insertion in couchbase. I find this example but not sure how to implement.

https://developer.couchbase.com/documentation/server/current/sdk/batching-operations.html

If someone point out some examples of bulk-load in couchbase or help me to figure out how can I do bulk-insertion via my code. I would be really really grateful. .thanx a lot for any idea or help.

Upvotes: 2

Views: 1747

Answers (1)

Robin Ellerkmann
Robin Ellerkmann

Reputation: 2113

I tried to adapt the example from the docs to your use case. You maybe have to change one or two details but you should get the idea.

c = Bucket('couchbase://localhost/bulk-load')
from couchbase.exceptions import CouchbaseTransientError
BYTES_PER_BATCH = 1024 * 256 # 256K

batches = []
cur_batch = {}
cur_size = 0
batches.append(cur_batch)

with open('/home/royshah/Desktop/bulk_try/roy.csv') as csvfile:
    lines = csvfile.readlines()[4:]
for key, line in enumerate(lines):
    #Format your data
    data_tmp = line.strip().split(',')
    strDate = data_tmp[0].replace("\"", "")
    timerecord = datetime.datetime.strptime(strDate,
                                           '%Y-%m-%d %H:%M:%S.%f')
    microsecs = timerecord.microsecond
    strDate = "\"" + strDate + "\""
    timestamp = calendar.timegm(timerecord.timetuple())*1000000 + microsecs

    #Build kv
    datastore = [ts] + data_tmp[1:]
    value = {'col1 ': datastore[1],  # I am making key-values on the fly from csv file
            'col2': datastore[2],
            'col3': datastore[3],
            'col4': datastore[4],
            'col5': datastore[5],
            'col6': datastore[6]}

    key = str(datastore[0]
    cur_batch[key] = value
    cur_size += len(key) + len(value) + 24

    if cur_size > BYTES_PER_BATCH:
        cur_batch = {}
        batches.append(cur_batch)
        cur_size = 0

print "Have {} batches".format(len(batches))
num_completed = 0
while batches:
  batch = batches[-1]
  try:
      cb.upsert_multi(batch)
      num_completed += len(batch)
      batches.pop()
  except CouchbaseTransientError as e:
      print e
      ok, fail = e.split_results()
      new_batch = {}
      for key in fail:
          new_batch[key] = all_data[key]
      batches.pop()
      batches.append(new_batch)
      num_completed += len(ok)
      print "Retrying {}/{} items".format(len(new_batch), len(ok))

Upvotes: 2

Related Questions