Reputation: 23
I'm now doing a little project which uses celery to turn csv and xlsx files into postgresql table. The code below works fine without celery(except large files),but after using celery it produce some errors and bugs. I've looked for similar questions in StackOverFlow but don't have any idea how to do and why. Hope you guys can help me with it,thanks.
`
# -*- coding: utf-8 -*-
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.http import HttpResponseRedirect
from django.core.urlresolvers import reverse
from django.contrib import messages
from django.conf import settings
from django.db import connection
from django.views.decorators.csrf import csrf_exempt
from celery import Celery
from celery import task
import json
import csv
import sys
import random
import psycopg2
import xlrd
import openpyxl as pyxl
from .models import Document
from .forms import DocumentForm
app = Celery(
'tasks',
broker='amqp://guest:guest@localhost:5672//',
backend='rpc://'
)
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_RESULT_PERSISTENT = False
@app.task()
def csvwritein(doc):# Transform csv to table
doc = doc
conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
readcur = conn.cursor()
readcur.execute("select exists(select * from
information_schema.tables where table_name='%s')" % doc.tablename) # check if
same file is already in database
check = readcur.fetchone()[0]
try:
fr = open(doc.path,encoding = 'utf-8-sig')
dr.delay(fr,doc,check)
fr.close()
except Exception as e:
fr = open(doc.path,encoding = 'big5')
dr.delay(fr,doc,check)
fr.close()
conn.commit()
readcur.close()
@app.task()
def dr(fr,doc,check): # make datareader as function to keep code 'dry'
csvt = 0 #count csv reader loop time
row_id = 1 # used for following id field
conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
maincur = conn.cursor()
writecur = conn.cursor()
datareader = csv.reader(fr, delimiter=',')
for row in datareader:
if csvt == 0: # first time in loop(create field) and check no
same file exists
if check == True:
app =
''.join([random.SystemRandom().choice('abcdefghijklmnopqrstuvwxyz0123456789')
for i in range(6)])
tname = '%s-%s' % (doc.tablename,app
tablename = '"%s-%s"' % (doc.tablename,app)
doc.tablename = tname
doc.save()
else:
tablename = '"%s"' % doc.tablename
maincur.execute("CREATE TABLE %s (id SERIAL PRIMARY
KEY);" % tablename)
row_count = sum(1 for line in datareader)
col_count = len(row)
frow = row
for i in range(0,col_count,1):
row[i] = '"%s"' % row[i] # change number to
string
maincur.execute("ALTER TABLE %s ADD %s
CITEXT;" % (tablename,row[i]))
csvt = csvt+1
fr.seek(0)
next(datareader)
elif csvt > 0: # not first time(insert data) and check no
same file exists
for j in range(0,col_count,1):
if j == 0:
writecur.execute("INSERT INTO %s (%s)
VALUES ('%s');" % (tablename,frow[j],row[j]))
else:
writecur.execute("UPDATE %s SET %s =
'%s' WHERE id = '%d';" %(tablename,frow[j],row[j],row_id))
csvt = csvt+1
row_id = row_id+1
else:
break
conn.commit()
maincur.close()
writecur.close()
conn.close()
csvt = 0
doc = Document.objects.all()
`
[2016-10-27 06:17:05,227: ERROR/MainProcess] Process 'Worker-1' pid:13829 exited with 'signal 9 (SIGKILL)' [2016-10-27 06:17:05,328:ERROR/MainProcess] Task data.tasks.xlsxwritein[5aec4679-c48b-4d07-a0a9-5e4e37fcd24b] raised unexpected: WorkerLostError('Worker exited prematurely: signal 9 (SIGKILL).',) Traceback (most recent call last): File "/usr/local/lib/python3.4/dist-packages/billiard/pool.py", line 1175, in mark_as_worker_lost human_status(exitcode)), billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 9 (SIGKILL).
#The code continues from the above task.py file
@app.task()
def xlsxwritein(doc): # write into database for file type xlsx
xlsxt = 0
conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
maincur = conn.cursor()
readcur = conn.cursor()
writecur = conn.cursor()
readcur.execute("select exists(select * from
information_schema.tables where table_name='%s')" % doc.tablename) # check if
same file is already in database
check = readcur.fetchone()[0]
row_id = 1 # used for following id field
wb = pyxl.load_workbook(doc.path)
sheetnames = wb.get_sheet_names()
ws = wb.get_sheet_by_name(sheetnames[0])
for rown in range(ws.get_highest_row()):
if xlsxt == 0:
if check == True:
app =
''.join([random.SystemRandom().choice('abcdefghijklmnopqrstuvwxyz0123456789')
for i in range(6)])
tname = '%s-%s' % (doc.tablename,app)
tablename = '"%s-%s"' % (doc.tablename,app)
doc.tablename = tname
doc.save()
else:
tablename = '"%s"' % doc.tablename
field = [ws.cell(row=1,column=col_index).value for
col_index in range(1,ws.get_highest_column()+1)]
maincur.execute("CREATE TABLE %s (id SERIAL PRIMARY
KEY);" % tablename)
for coln in range(ws.get_highest_column()):
field[coln] = '"%s"' % field[coln] # change
number to string
if field[coln] == 'ID':
field[coln] = 'original_id'
maincur.execute("ALTER TABLE %s ADD %s
CITEXT;" % (tablename,field[coln]))
xlsxt = xlsxt+1
elif xlsxt > 0 and check == False: # not first time(insert
data) and check no same file exists
for coln in range(ws.get_highest_column()):
if coln == 0:
writecur.execute("INSERT INTO %s (%s)
VALUES ('%s');"
%(tablename,field[coln],str(ws.cell(row=rown,column=coln+1).value)))
else:
writecur.execute("UPDATE %s SET %s =
'%s' WHERE id = '%d';"
%(tablename,field[coln],str(ws.cell(row=rown+1,column=coln+1).value),row_id))
xlsxt = xlsxt+1
row_id = row_id+1
else:
break
conn.commit()
maincur.close()
readcur.close()
writecur.close()
conn.close()
xlsxt = 0
Upvotes: 1
Views: 718
Reputation: 1904
Probably something is going wrong during arguments deserialization. Instead of passing doc object try instead passing filename and then read file inside of task.
Upvotes: 1