abhiraj gupta
abhiraj gupta

Reputation: 33

Fastest way to import 1000s of big csv files to mysql database?

I have around 1000s of big dataset, each having 2-3 million rows of data. I want to import each of them to mysql so that analysis becomes easier. I wrote this code for the purpose of this. But the processing is very slow. It is taking around 5-6 minutes for each file to do so. Is there any faster way to import all the csv to the database?

from sqlalchemy import create_engine
import pandas as pd,os


all_files = os.listdir('D:\\All_Tick_Data\\Tick_dataset\\')
for file in all_files:
    print(file)
    engine = create_engine("mysql://root:rocky@localhost/options")
    con = engine.connect()
    df = pd.read_csv('D:\\All_Tick_Data\\Tick_dataset\\'+file)
    df.to_sql(name='options_data',con=con,if_exists='append',index=True)
con.close()

Upvotes: 1

Views: 2194

Answers (3)

Milovan Tomašević
Milovan Tomašević

Reputation: 8663

Pandas is a super slow way of loading to sql (vs csv files). Can be orders of magnitude slowe. Where, on top of everything else, you create a new engine every time you pass through the loop. It all has to be slow.

Of course, there are several solutions, one of the ideas would be:

Well, if we want it to be a mini POC that is also readable, then that idea would look like this:

import glob
import pathlib
import threading

from common import PropagatingThread  # look at the last reference
from dataclasses import dataclass
from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool


def get_engine():
    conn_params = "mysql://root:rocky@localhost/options"
    engine = create_engine(conn_params, poolclass=NullPool)
    engine.connect()
    return engine


class Operations:
    @staticmethod
    def get_csv_files(csv_path):
        return [f for f in glob.glob(f"{csv_path}/*.[cC][sS][vV]")]


@dataclass
class Restore(Operations):
    shema: str
    table: str
    number_parallels: int
    csv_path: pathlib.Path
    delimiter: str

    def __post_init__(self):
        self._pool_sema = threading.Semaphore(value=self.number_parallels)

    def restore_task(self, file):
        self._pool_sema.acquire()
        sql = """
            LOAD DATA INFILE '{0}' 
            INTO TABLE {1}.{2} 
            COLUMNS TERMINATED BY '{3}'""".format(file,
                                                  self.shema,
                                                  self.table,
                                                  self.delimiter,
                                                  )
        self.__engine.execute(sql)
        self._pool_sema.release()

    def start_restore(self):
        files = self.get_csv_files(self.csv_path)
        self.__engine = get_engine()
        threads = [
            PropagatingThread(
                target=self.restore_task, args=(f, )) for f in files]
        [t.start() for t in threads]
        [t.join() for t in threads]
        self.__engine.dispose()


def main():
    SHEMA = 'test_shema'
    TABLE = 'analysis_table'
    NUMBER_PARALLELS = 17
    CSV_PATH = 'D:\\All_Tick_Data\\Tick_dataset\\'
    DELIMITER = ','

    r = Restore(shema=SHEMA,
                table=TABLE, 
                number_parallels=NUMBER_PARALLELS, 
                csv_path=CSV_PATH, 
                delimiter=DELIMITER)
    r.start_restore()


if __name__ == '__main__':
    main()

Notes:

  • First simpler: the number of parallel processes will depend on your resources, both on the processing machine where the code is executed and on the database server. So you will test it. You will probably start with a smaller number, say 5. This means that a maximum of 5 of your files will be inserted into the database at one time, while the others will be on hold or in the pool. Be careful choosing that number on the server where the code is executed if you go above the number 2 * number of cores + 1 I'm not sure if this works on windows but you can try this option as well. You can use psutil module in python3.
python3 -m pip install psutil

The psutil.cpu_count method returns the number of logical CPUs in the system (same as os.cpu_count in Python 3.4) or None if undetermined. logical cores means the number of physical cores multiplied by the number of threads that can run on each core (this is known as Hyper Threading). If logical is False return the number of physical cores only (Hyper Thread CPUs are excluded) or None if undetermined.

import psutil

threads_count = psutil.cpu_count() / psutil.cpu_count(logical=False)
  • The second part applies if you did not insert data from the file into your mysql database. When inserting data from a file into the mysql database, it is possible to encounter a security obstacle, i.e. --secure-file-priv. The option is added to your MySQL database as a global variable named secure_file_priv, so you can check the current value of the option using the SHOW VARIABLES statement. Here’s an example of retrieving the secure_file_priv value:
mysql> SHOW VARIABLES LIKE 'secure_file_priv';
+------------------+-------+
| Variable_name    | Value |
+------------------+-------+
| secure_file_priv | NULL  |
+------------------+-------+

The secure_file_priv option can have three possible values:

  • NULL value means the data export or import is disabled
  • Empty value means data export or import is enabled
  • Directory path value means data export or import is enabled only in the specified path

The secure_file_priv value is a read-only value, so you can’t change it directly using SQL query. The following statement tries to change the secure_file_priv value to tmp/ folder:

SET GLOBAL secure_file_priv = "/tmp/";

The response would be as follows:

ERROR 1238 (HY000): Variable 'secure_file_priv' is a read only variable

To change the value of secure_file_priv variable, you need to create a MySQL configuration file that sets the value of the variable under [mysqld] options.

You need to put the following content in your my.cnf (Mac, Linux) or my.ini (Windows) file:

[mysqld]
secure_file_priv = ""

Once you edit the configuration file, save it and restart your MySQL server. You should be able to import or export data using MySQL LOAD DATA and SELECT INTO FILE statements.

Here’s an example of a successful data export query:

mysql> SELECT * FROM your_table INTO OUTFILE "/tmp/out.txt";
Query OK, 6 rows affected (0.00 sec)

Now the result of the SELECT statement above is saved as out.txt file.

You can import the same text file back into an SQL table by using the LOAD DATA statement - as we previously presented everything with the code.

First, create a copy of your table using CREATE TABLE ... LIKE statement as shown below:

CREATE TABLE analysis_table_copy LIKE analysis_table;

Then, load the data from the out.txt file with the following SQL query:

mysql> LOAD DATA INFILE '/tmp/out.txt' INTO TABLE analysis_table_copy;
Query OK, 6 rows affected (0.00 sec)
Records: 6  Deleted: 0  Skipped: 0  Warnings: 0

The query above will put the out.txt data into the analysis_table_copy table.

Upvotes: 1

yaoviametepe
yaoviametepe

Reputation: 63

You should consider using Apache Spark if you process data with a server that can handle high load. As it can allow you take advantage of the CPU power to achieve parallelism.

Using a 64 cores server you can process (transform and save) ~25M rows in less than 5min.

Upvotes: 0

Rick James
Rick James

Reputation: 142208

Have your code generate a bunch of LOAD DATA SQL commands. Then feed them to the commandline mysql tool.

Suggest adding indexes (other than the PRIMARY KEY) after loading the table(s).

Upvotes: 2

Related Questions