LearningSlowly
LearningSlowly

Reputation: 9451

Low InnoDB Writes per Second - AWS EC2 to MySQL RDS using Python

I have around 60GB of JSON files that I am parsing using Python and then inserting into a MySQL database using the Python-MySQL Connector. Each JSON file is around 500MB

I have been using an AWS r3.xlarge EC2 instance with a secondary volume to hold the 60GB of JSON data.

I am then using an AWS RDS r3.xlarge MySQL instance. These instances are all in the same region and availability zone. The EC2 instance is using the following Python script to load the JSON, parse it and then insert it into the MySQL RDS. My python:

import json
import mysql.connector
from mysql.connector import errorcode
from pprint import pprint
import glob
import os

os.chdir("./json_data")

for file in glob.glob("*.json"):
    with open(file, 'rU') as data_file:
        results = json.load(data_file)
        print('working on file:', file)

    cnx = mysql.connector.connect(user='', password='',
        host='')

    cursor = cnx.cursor(buffered=True)

    DB_NAME = 'DB'

    def create_database(cursor):
        try:
            cursor.execute(
                "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(DB_NAME))
        except mysql.connector.Error as err:
            print("Failed creating database: {}".format(err))
            exit(1)

    try:
        cnx.database = DB_NAME    
    except mysql.connector.Error as err:
        if err.errno == errorcode.ER_BAD_DB_ERROR:
            create_database(cursor)
            cnx.database = DB_NAME
        else:
            print(err)
            exit(1)

    add_overall_data = ("INSERT INTO master" 
        "(_sent_time_stamp, dt, ds, dtf, O_l, O_ln, O_Ls, O_a, D_l, D_ln, d_a)"
        "VALUES (%(_sent_time_stamp)s, %(dt)s, %(ds)s, %(dtf)s, %(O_l)s, %(O_ln)s, %(O_Ls)s, %(O_a)s, %(D_l)s, %(D_ln)s, %(d_a)s)")

    add_polyline = ("INSERT INTO polyline"
        "(Overview_polyline, request_no)"
        "VALUES (%(Overview_polyline)s, %(request_no)s)")

    add_summary = ("INSERT INTO summary"
        "(summary, request_no)"
        "VALUES (%(summary)s, %(request_no)s)")

    add_warnings = ("INSERT INTO warnings"
        "(warnings, request_no)"
        "VALUES (%(warnings)s, %(request_no)s)")

    add_waypoint_order = ("INSERT INTO waypoint_order"
        "(waypoint_order, request_no)"
        "VALUES (%(waypoint_order)s, %(request_no)s)")

    add_leg_data = ("INSERT INTO leg_data"
        "(request_no, leg_dt, leg_ds, leg_O_l, leg_O_ln, leg_D_l, leg_D_ln, leg_html_inst, leg_polyline, leg_travel_mode)" 
        "VALUES (%(request_no)s, %(leg_dt)s, %(leg_ds)s, %(leg_O_l)s, %(leg_O_ln)s, %(leg_D_l)s, %(leg_D_ln)s, %(leg_html_inst)s, %(leg_polyline)s, %(leg_travel_mode)s)")
    error_messages = []
    for result in results:
        if result["status"] == "OK":
            for leg in result['routes'][0]['legs']:
                try: 
                    params = {
                    "_sent_time_stamp": leg['_sent_time_stamp'],
                    "dt": leg['dt']['value'],
                    "ds": leg['ds']['value'],
                    "dtf": leg['dtf']['value'],
                    "O_l": leg['start_location']['lat'],
                    "O_ln": leg['start_location']['lng'],
                    "O_Ls": leg['O_Ls'],
                    "O_a": leg['start_address'],
                    "D_l": leg['end_location']['lat'],
                    "D_ln": leg['end_location']['lng'],
                    "d_a": leg['end_address']
                    }
                    cursor.execute(add_overall_data, params)
                    query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                    O_l = leg['start_location']['lat']
                    O_ln = leg['start_location']['lng']
                    D_l = leg['end_location']['lat']
                    D_ln = leg['end_location']['lng']
                    _sent_time_stamp = leg['_sent_time_stamp']
                    cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                    request_no = cursor.fetchone()[0]
                except KeyError, e:
                    error_messages.append(e)
                    params = {
                    "_sent_time_stamp": leg['_sent_time_stamp'],
                    "dt": leg['dt']['value'],
                    "ds": leg['ds']['value'],
                    "dtf": "000",
                    "O_l": leg['start_location']['lat'],
                    "O_ln": leg['start_location']['lng'],
                    "O_Ls": leg['O_Ls'],
                    "O_a": 'unknown',
                    "D_l": leg['end_location']['lat'],
                    "D_ln": leg['end_location']['lng'],
                    "d_a": 'unknown'
                    }
                    cursor.execute(add_overall_data, params)
                    query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                    O_l = leg['start_location']['lat']
                    O_ln = leg['start_location']['lng']
                    D_l = leg['end_location']['lat']
                    D_ln = leg['end_location']['lng']
                    _sent_time_stamp = leg['_sent_time_stamp']
                    cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                    request_no = cursor.fetchone()[0]
            for overview_polyline in result['routes']:
                params = {
                "request_no": request_no,
                "Overview_polyline": overview_polyline['overview_polyline']['points']
                }
                cursor.execute(add_polyline, params)
                query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                O_l = leg['start_location']['lat']
                O_ln = leg['start_location']['lng']
                D_l = leg['end_location']['lat']
                D_ln = leg['end_location']['lng']
                _sent_time_stamp = leg['_sent_time_stamp']
                cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                request_no = cursor.fetchone()[0]
            for summary in result['routes']:
                params = {
                "request_no": request_no,
                "summary": summary['summary']
                }
                cursor.execute(add_summary, params)
                query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                O_l = leg['start_location']['lat']
                O_ln = leg['start_location']['lng']
                D_l = leg['end_location']['lat']
                D_ln = leg['end_location']['lng']
                _sent_time_stamp = leg['_sent_time_stamp']
                cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                request_no = cursor.fetchone()[0]
            for warnings in result['routes']:
                params = {
                "request_no": request_no,
                "warnings": str(warnings['warnings'])
                }
                cursor.execute(add_warnings, params)
                query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                O_l = leg['start_location']['lat']
                O_ln = leg['start_location']['lng']
                D_l = leg['end_location']['lat']
                D_ln = leg['end_location']['lng']
                _sent_time_stamp = leg['_sent_time_stamp']
                cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                request_no = cursor.fetchone()[0]
            for waypoint_order in result['routes']:
                params = {
                "request_no": request_no,
                "waypoint_order": str(waypoint_order['waypoint_order'])
                }
                cursor.execute(add_waypoint_order, params)
                query = ('SELECT request_no FROM master WHERE O_l = %s AND O_ln = %s AND D_l = %s AND D_ln = %s AND _sent_time_stamp = %s')
                O_l = leg['start_location']['lat']
                O_ln = leg['start_location']['lng']
                D_l = leg['end_location']['lat']
                D_ln = leg['end_location']['lng']
                _sent_time_stamp = leg['_sent_time_stamp']
                cursor.execute(query,(O_l, O_ln, D_l, D_ln, _sent_time_stamp))
                request_no = cursor.fetchone()[0]
            for steps in result['routes'][0]['legs'][0]['steps']:
                params = {
                "request_no": request_no,
                "leg_dt": steps['dt']['value'],
                "leg_ds": steps['ds']['value'],
                "leg_O_l": steps['start_location']['lat'],
                "leg_O_ln": steps['start_location']['lng'],
                "leg_D_l": steps['end_location']['lat'],
                "leg_D_ln": steps['end_location']['lng'],
                "leg_html_inst": steps['html_instructions'],
                "leg_polyline": steps['polyline']['points'],
                "leg_travel_mode": steps['travel_mode']
                }
                cursor.execute(add_leg_data, params)
        cnx.commit()
    print('error messages:', error_messages)
    cursor.close()
    cnx.close()
    print('finished' + file)

Using htop on the Linux Instance I can see the following: htop of python process

Regarding the MySQL database, using MySQL Workbench I can see that:

MySQL WorkBench Output

This python script has been chugging away for days but I have only inserted around 20% of the data to MySQL.

My questions - how can I identify the bottleneck? Is it the Python script? It appears to be using a low amount of memory - can I increase this? I have checked the InnoDB buffer pool size as per (How to improve the speed of InnoDB writes per second of MySQL DB) and found it to be large:

SELECT @@innodb_buffer_pool_size;
+---------------------------+
| @@innodb_buffer_pool_size |
+---------------------------+
|               11674845184 |
+---------------------------+

Since I am using an RDS and EC2 instance in the same region I don't believe there is a network bottleneck. Pointers on where I should look for the biggest savings would be very welcome!

EDIT

I think I may have stumbled upon the issue. For efficiency during parsing I am writing each level of JSON separately. However, I then have to execute a query to match a nested part of JSON with its higher level. This query has a low overhead when using small databases. Ive noticed that the speed of the inserts has decreased dramatically on this db. This is because it is having to search a larger and ever growing db to properly connect the JSON data.

I am not sure how to solve this other than waiting it out....

Upvotes: 15

Views: 632

Answers (3)

Karol Nowak
Karol Nowak

Reputation: 662

Given this information, it looks like both the script and the DB are mostly idle. Tweaking anything at the MySQL level would be premature.

You need more visibility into what your program is doing.

Start by logging how much time each of your queries takes, how many errors you get and so on.

These SELECTs may need adding an index to perform well, if it's a problem at all.

Upvotes: 0

delphisharp
delphisharp

Reputation: 111

my englist is poor

if i do this work, i will

  1. use python convert json to txt

  2. use mysq imp tool , import txt to mysql

if you must do python+mysql allinone ,i suggest use

insert table values(1),value(2)...value(xxx)  

why 'SELECT request_no FROM master'multiple occurrence, should be read from json

my englist is very poor.so..

Upvotes: 1

Tim Seed
Tim Seed

Reputation: 5289

I can not see any table definitions in the Python script .... But when we try and do large Data Operations - we would always disable any Database Indexes when loading to MySQL - also if you have any constraints/Foreign Key enforcement - this should be disabled when you are loading also.

Autocommit is disabled by default when connecting through Connector/Python.

But I can not see any commit - options in the code you present

To Summarise

Disable/Remove (For Loading)

-- Indexes
-- Constraints -- Foreign Keys -- Triggers

In your Loading Program

-- Disable autocommit -- commit ever n records (N will depend upon your Buffer size available)

Upvotes: 1

Related Questions