Reputation: 33
I have around 1000s of big dataset, each having 2-3 million rows of data. I want to import each of them to mysql so that analysis becomes easier. I wrote this code for the purpose of this. But the processing is very slow. It is taking around 5-6 minutes for each file to do so. Is there any faster way to import all the csv to the database?
from sqlalchemy import create_engine
import pandas as pd,os
all_files = os.listdir('D:\\All_Tick_Data\\Tick_dataset\\')
for file in all_files:
print(file)
engine = create_engine("mysql://root:rocky@localhost/options")
con = engine.connect()
df = pd.read_csv('D:\\All_Tick_Data\\Tick_dataset\\'+file)
df.to_sql(name='options_data',con=con,if_exists='append',index=True)
con.close()
Upvotes: 1
Views: 2194
Reputation: 8663
Of course, there are several solutions, one of the ideas would be:
Pandas
with LOAD DATA
Statementos.listdir
with glob
Well, if we want it to be a mini POC that is also readable, then that idea would look like this:
import glob
import pathlib
import threading
from common import PropagatingThread # look at the last reference
from dataclasses import dataclass
from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool
def get_engine():
conn_params = "mysql://root:rocky@localhost/options"
engine = create_engine(conn_params, poolclass=NullPool)
engine.connect()
return engine
class Operations:
@staticmethod
def get_csv_files(csv_path):
return [f for f in glob.glob(f"{csv_path}/*.[cC][sS][vV]")]
@dataclass
class Restore(Operations):
shema: str
table: str
number_parallels: int
csv_path: pathlib.Path
delimiter: str
def __post_init__(self):
self._pool_sema = threading.Semaphore(value=self.number_parallels)
def restore_task(self, file):
self._pool_sema.acquire()
sql = """
LOAD DATA INFILE '{0}'
INTO TABLE {1}.{2}
COLUMNS TERMINATED BY '{3}'""".format(file,
self.shema,
self.table,
self.delimiter,
)
self.__engine.execute(sql)
self._pool_sema.release()
def start_restore(self):
files = self.get_csv_files(self.csv_path)
self.__engine = get_engine()
threads = [
PropagatingThread(
target=self.restore_task, args=(f, )) for f in files]
[t.start() for t in threads]
[t.join() for t in threads]
self.__engine.dispose()
def main():
SHEMA = 'test_shema'
TABLE = 'analysis_table'
NUMBER_PARALLELS = 17
CSV_PATH = 'D:\\All_Tick_Data\\Tick_dataset\\'
DELIMITER = ','
r = Restore(shema=SHEMA,
table=TABLE,
number_parallels=NUMBER_PARALLELS,
csv_path=CSV_PATH,
delimiter=DELIMITER)
r.start_restore()
if __name__ == '__main__':
main()
python3 -m pip install psutil
The psutil.cpu_count
method returns the number of logical CPUs in the system (same as os.cpu_count in Python 3.4) or None if undetermined. logical cores means the number of physical cores multiplied by the number of threads that can run on each core (this is known as Hyper Threading). If logical is False return the number of physical cores only (Hyper Thread CPUs are excluded) or None if undetermined.
import psutil
threads_count = psutil.cpu_count() / psutil.cpu_count(logical=False)
--secure-file-priv
. The option is added to your MySQL database as a global variable named secure_file_priv
, so you can check the current value of the option using the SHOW VARIABLES
statement. Here’s an example of retrieving the secure_file_priv
value:mysql> SHOW VARIABLES LIKE 'secure_file_priv';
+------------------+-------+
| Variable_name | Value |
+------------------+-------+
| secure_file_priv | NULL |
+------------------+-------+
The secure_file_priv option can have three possible values:
The secure_file_priv
value is a read-only value, so you can’t change it directly using SQL query.
The following statement tries to change the secure_file_priv
value to tmp/
folder:
SET GLOBAL secure_file_priv = "/tmp/";
The response would be as follows:
ERROR 1238 (HY000): Variable 'secure_file_priv' is a read only variable
To change the value of secure_file_priv
variable, you need to create a MySQL configuration file that sets the value of the variable under [mysqld]
options.
You need to put the following content in your my.cnf
(Mac, Linux) or my.ini
(Windows) file:
[mysqld]
secure_file_priv = ""
Once you edit the configuration file, save it and restart your MySQL server. You should be able to import or export data using MySQL LOAD DATA
and SELECT INTO FILE
statements.
Here’s an example of a successful data export query:
mysql> SELECT * FROM your_table INTO OUTFILE "/tmp/out.txt";
Query OK, 6 rows affected (0.00 sec)
Now the result of the SELECT
statement above is saved as out.txt
file.
You can import the same text file back into an SQL table by using the LOAD DATA
statement - as we previously presented everything with the code.
First, create a copy of your table using CREATE TABLE ... LIKE
statement as shown below:
CREATE TABLE analysis_table_copy LIKE analysis_table;
Then, load the data from the out.txt
file with the following SQL query:
mysql> LOAD DATA INFILE '/tmp/out.txt' INTO TABLE analysis_table_copy;
Query OK, 6 rows affected (0.00 sec)
Records: 6 Deleted: 0 Skipped: 0 Warnings: 0
The query above will put the out.txt
data into the analysis_table_copy
table.
Upvotes: 1
Reputation: 63
You should consider using Apache Spark if you process data with a server that can handle high load. As it can allow you take advantage of the CPU power to achieve parallelism.
Using a 64 cores server you can process (transform and save) ~25M rows in less than 5min.
Upvotes: 0
Reputation: 142208
Have your code generate a bunch of LOAD DATA
SQL commands. Then feed them to the commandline mysql tool.
Suggest adding indexes (other than the PRIMARY KEY
) after loading the table(s).
Upvotes: 2