BLG_Eric
BLG_Eric

Reputation: 23

django+celery+rabbitmq encode error and sig-kill

I'm now doing a little project which uses celery to turn csv and xlsx files into postgresql table. The code below works fine without celery(except large files),but after using celery it produce some errors and bugs. I've looked for similar questions in StackOverFlow but don't have any idea how to do and why. Hope you guys can help me with it,thanks.

  1. First error is as follows: csv-1 csv-2 I think it has something to do with my encoding part, but I tried to open it with utf-8-sig and big-5, not working.(It works fine without celery)

`

# -*- coding: utf-8 -*-
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.http import HttpResponseRedirect
from django.core.urlresolvers import reverse
from django.contrib import messages
from django.conf import settings
from django.db import connection
from django.views.decorators.csrf import csrf_exempt
from celery import Celery
from celery import task
import json
import csv
import sys
import random
import psycopg2
import xlrd
import openpyxl as pyxl
from .models import Document
from .forms import DocumentForm

app = Celery(
    'tasks',
    broker='amqp://guest:guest@localhost:5672//',
    backend='rpc://'
)
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_RESULT_PERSISTENT = False

@app.task()
def csvwritein(doc):# Transform csv to table
        doc = doc
        conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
 readcur = conn.cursor()
        readcur.execute("select exists(select * from
information_schema.tables where table_name='%s')" % doc.tablename) # check if
same file is already in database
        check = readcur.fetchone()[0]
        try:
                fr = open(doc.path,encoding = 'utf-8-sig')
                dr.delay(fr,doc,check)
                fr.close()
        except Exception as e:
                fr = open(doc.path,encoding = 'big5')
                dr.delay(fr,doc,check)
                fr.close()
        conn.commit()
        readcur.close()

@app.task()
def dr(fr,doc,check): # make datareader as function to keep code 'dry'
        csvt = 0 #count csv reader loop time
        row_id = 1 # used for following id field
        conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
        maincur = conn.cursor()
        writecur = conn.cursor()
        datareader = csv.reader(fr, delimiter=',')
        for row in datareader:
                if csvt == 0: # first time in loop(create field) and check no
same file exists
                        if check == True:
                                app =
''.join([random.SystemRandom().choice('abcdefghijklmnopqrstuvwxyz0123456789')
for i in range(6)])
                                tname = '%s-%s' % (doc.tablename,app
tablename = '"%s-%s"' % (doc.tablename,app)
                                doc.tablename = tname
                                doc.save()
                        else:
                                tablename = '"%s"' % doc.tablename
                        maincur.execute("CREATE TABLE %s (id SERIAL PRIMARY
KEY);" % tablename)
                        row_count = sum(1 for line in datareader)
                        col_count = len(row)
                        frow = row
                        for i in range(0,col_count,1):
                                row[i] = '"%s"' % row[i] # change number to
string
                                maincur.execute("ALTER TABLE %s ADD %s
CITEXT;" % (tablename,row[i]))
                        csvt = csvt+1
                        fr.seek(0)
                        next(datareader)
                elif csvt > 0: # not first time(insert data) and check no
same file exists
                        for j in range(0,col_count,1):
                                if j == 0:
                                        writecur.execute("INSERT INTO %s (%s)
VALUES ('%s');" % (tablename,frow[j],row[j]))
                                else:
                                        writecur.execute("UPDATE %s SET %s =
'%s' WHERE id = '%d';" %(tablename,frow[j],row[j],row_id))
                        csvt = csvt+1
                        row_id = row_id+1
                else:
                        break
        conn.commit()
        maincur.close()
        writecur.close()
        conn.close()
        csvt = 0
        doc = Document.objects.all()

`

  1. Second error is about turning a xlsx file(about 130,000 rows) into postgresql table, and the worker got sig-kill after 2-3 minutes. Debug Message:

[2016-10-27 06:17:05,227: ERROR/MainProcess] Process 'Worker-1' pid:13829 exited with 'signal 9 (SIGKILL)' [2016-10-27 06:17:05,328:ERROR/MainProcess] Task data.tasks.xlsxwritein[5aec4679-c48b-4d07-a0a9-5e4e37fcd24b] raised unexpected: WorkerLostError('Worker exited prematurely: signal 9 (SIGKILL).',) Traceback (most recent call last): File "/usr/local/lib/python3.4/dist-packages/billiard/pool.py", line 1175, in mark_as_worker_lost human_status(exitcode)), billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 9 (SIGKILL).

#The code continues from the above task.py file
@app.task()
def xlsxwritein(doc): # write into database for file type xlsx
        xlsxt = 0
        conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
        maincur = conn.cursor()
        readcur = conn.cursor()
        writecur = conn.cursor()
        readcur.execute("select exists(select * from
information_schema.tables where table_name='%s')" % doc.tablename) # check if
same file is already in database
        check = readcur.fetchone()[0]
        row_id = 1 # used for following id field
        wb = pyxl.load_workbook(doc.path)
        sheetnames = wb.get_sheet_names()
        ws = wb.get_sheet_by_name(sheetnames[0])
        for rown in range(ws.get_highest_row()):
                if xlsxt == 0:
                        if check == True:
                                app =
''.join([random.SystemRandom().choice('abcdefghijklmnopqrstuvwxyz0123456789')
for i in range(6)])
                                tname = '%s-%s' % (doc.tablename,app)
                                tablename = '"%s-%s"' % (doc.tablename,app)
                                doc.tablename = tname
                                doc.save()
                        else:
                                tablename = '"%s"' % doc.tablename
                        field = [ws.cell(row=1,column=col_index).value for
col_index in range(1,ws.get_highest_column()+1)]
                        maincur.execute("CREATE TABLE %s (id SERIAL PRIMARY
KEY);" % tablename)
                        for coln in range(ws.get_highest_column()):
                                field[coln] = '"%s"' % field[coln] # change
number to string
                                if field[coln] == 'ID':
                                        field[coln] = 'original_id'
                                maincur.execute("ALTER TABLE %s ADD %s
CITEXT;"  % (tablename,field[coln]))
                        xlsxt = xlsxt+1
                elif xlsxt > 0 and check == False: # not first time(insert
data) and check no same file exists
                        for coln in range(ws.get_highest_column()):
                                if coln == 0:
                                        writecur.execute("INSERT INTO %s (%s)
VALUES ('%s');"
%(tablename,field[coln],str(ws.cell(row=rown,column=coln+1).value)))
                                else:
                                        writecur.execute("UPDATE %s SET %s =
'%s' WHERE id = '%d';"
%(tablename,field[coln],str(ws.cell(row=rown+1,column=coln+1).value),row_id))
                        xlsxt = xlsxt+1
                        row_id = row_id+1
                else:
                        break
        conn.commit()
        maincur.close()
        readcur.close()
        writecur.close()
        conn.close()
        xlsxt = 0

Upvotes: 1

Views: 718

Answers (1)

Eugene Prikazchikov
Eugene Prikazchikov

Reputation: 1904

Probably something is going wrong during arguments deserialization. Instead of passing doc object try instead passing filename and then read file inside of task.

Upvotes: 1

Related Questions