将 mongo 增量 csv 文件导入 mysql 数据库
参考链接https://blog.csdn.net/glDemo/article/details/46886001https://blog.csdn.net/kxw1994/article/details/76465152https://stackoverflow.com/questions/10154633/load-csv-data-into-mysql-in-pythonhttps:...
参考链接
https://blog.csdn.net/glDemo/article/details/46886001
https://blog.csdn.net/kxw1994/article/details/76465152
https://stackoverflow.com/questions/10154633/load-csv-data-into-mysql-in-python
https://blog.csdn.net/qq_35246620/article/details/78148505
https://www.mydatahack.com/how-to-bulk-load-data-into-mysql-with-python/
根据 mongo 结构在 mysql 中建表
查看 mongo 结构:
mongos> db.mins.find().next()
{
"_id" : ObjectId("59ce1e1d6e6dc7768c7140dc"),
"code" : "SH900955",
"time" : ISODate("1999-07-26T09:59:00Z"),
"open" : 0.462,
"close" : 0.462,
"low" : 0.462,
"high" : 0.462,
"volume" : 0,
"amount" : 0
}
在mysql 中建表:
CREATE TABLE IF NOT EXISTS `inc_mins`(
`_id` VARCHAR(100) NOT NULL,
`code` VARCHAR(100) NOT NULL,
`time` VARCHAR(100) NOT NULL,
`open` FLOAT DEFAULT NULL,
`low` FLOAT DEFAULT NULL,
`high` FLOAT DEFAULT NULL,
`close` FLOAT DEFAULT NULL,
`volume` text,
`amount` text,
UNIQUE KEY `_id` ( `_id` )
)ENGINE=MyISAM DEFAULT CHARSET=utf8;
处理 mongo 增量数据到 csv 文件中
这一步在这篇中略过,假设处理后得到两个以日期命名的文件夹,里面针对每一只股票的 csv 增量数据。如图:
可以使用 ls -lht 来查看文件的大小,找一个合适大小的文件来进行测试。
使用 Python 程序生成
考虑经历前后两步:
(1) 拼接多个 csv 文件
(2) 将 csv 导入 mysql
拼接多个 csv 文件
import pandas as pd
import os
import sys
Folder_Path = r'/Users/furuiyang/codes/mins_daily_import_to_csv/exportdir/20190720' # 要拼接的文件夹及其完整路径,注意不要包含中文
SaveFile_Path = r'/Users/furuiyang/codes/mins_daily_import_to_csv/savedir/20190720' # 拼接后要保存的文件路径
SaveFile_Name = r'20190720.csv' # 合并后要保存的文件名
# 修改当前工作目录
os.chdir(Folder_Path)
# 将该文件夹下的所有文件名存入一个列表
file_list = os.listdir()
# print(file_list)
# sys.exit(0)
# 读取第一个CSV文件并包含表头
df = pd.read_csv(os.path.join(Folder_Path, file_list[0]))
# print(df)
# sys.exit(0)
# 创建要保存的文件夹
os.makedirs(SaveFile_Path, exist_ok=True)
# 将读取的第一个CSV文件写入合并后的文件保存
save_file = os.path.join(SaveFile_Path, SaveFile_Name)
df.to_csv(save_file)
# sys.exit(0)
# 循环遍历列表中各个CSV文件名,并追加到合并后的文件
count = 0
try:
for i in range(1, len(file_list)):
print(os.path.join(Folder_Path, file_list[i]))
df = pd.read_csv(os.path.join(Folder_Path, file_list[i]))
print(df)
print(df.shape[0])
count += df.shape[0]
print()
print()
try:
df.to_csv(save_file, encoding="utf-8", index=False, header=False, mode='a+')
except Exception as e:
print(e)
sys.exit(0)
except Exception:
print("wrong.")
pass
print(count) # 2599680
将 csv 文件导入 MYSQL
逐条插入
import pymysql
conn = pymysql.Connect(host='localhost', port=3306, user='root', password='ruiyang', db='test01', charset='utf8')
cursor = conn.cursor()
# 建表
# sql1 = "create table coupon(id INT(4) NOT NULL auto_increment,\
# coupon_num VARCHAR(255) not null,primary key(id))"
# cursor.execute(sql1)
# 插入执行
# sql2 = "insert into inc_mins (_id,code,time,open,close,low,high,volume,amount) values {},{},{},{},{},{},{},{},{}"
sql2 = "insert into inc_mins (_id,code,time,open,close,low,high,volume,amount) values {};"
file = open('/Users/furuiyang/codes/mins_daily_import_to_csv/savedir/20190720/20190720_001.csv','r')
file.readline()
for i in file.readlines():
data = i.strip().split(',')
print(data)
print(sql2.format(tuple(data)))
cursor.execute(sql2.format(tuple(data)))
这样逐条插入的耗时有点不能忍:
插入 11w 左右的数据用了约 14 min
使用 load file 的模式
测试:
在 mysql 的终端执行 load data 导入语句:
load data infile '/Users/furuiyang/codes/mins_daily_import_to_csv/exportdir/20190720_10001718.csv' \
replace into table test01.inc_mins \
fields terminated by ',' \
optionally enclosed by '"' \
lines terminated by '\n' \
ignore 1 lines(_id,code,time,open,low,high,close,volume,amount);
报错:
The MySQL server is running with the --secure-file-priv option so it cannot execute this statement
可能是涉及到相关的权限。在程序中执行简化的语句,代码如下:
import pymysql
def csv_to_mysql(load_sql, host, user, password):
'''
This function load a csv file to MySQL table according to
the load_sql statement.
'''
try:
con = pymysql.connect(host=host,
user=user,
password=password,
autocommit=True,
local_infile=1)
print('Connected to DB: {}'.format(host))
# Create cursor and execute Load SQL
cursor = con.cursor()
cursor.execute(load_sql)
print('Succuessfully loaded the table from csv.')
con.close()
except Exception as e:
print('Error: {}'.format(str(e)))
sys.exit(1)
# Execution Example
load_sql = """LOAD DATA LOCAL INFILE '/Users/furuiyang/codes/mins_daily_import_to_csv/savedir/20190720/20190720_001.csv' INTO TABLE test01.inc_mins \
FIELDS TERMINATED BY ',' \
ENCLOSED BY '"' \
IGNORE 1 LINES;"""
print(load_sql)
"""
load data infile '/Users/furuiyang/codes/mins_daily_import_to_csv/exportdir/20190720_10001718.csv' \
replace into table test01.inc_mins \
fields terminated by ',' \
optionally enclosed by '"' \
lines terminated by '\n' \
ignore 1 lines(_id,code,time,open,low,high,close,volume,amount);
"""
host = 'localhost'
user = 'root'
password = 'ruiyang'
csv_to_mysql(load_sql, host, user, password)
load data 方式的耗时显然变快很多,
在我们建表并且设置了唯一索引的情况下,会给出插入重复警告,但是重复的不会终止程序。
解决方案修改 load_sql 加入 replace 关键字:
load_sql = """LOAD DATA LOCAL INFILE '/Users/furuiyang/codes/mins_daily_import_to_csv/savedir/20190720/20190720_001.csv' \
REPLACE INTO TABLE test01.inc_mins \
FIELDS TERMINATED BY ',' \
ENCLOSED BY '"' \
IGNORE 1 LINES;"""
print(load_sql)
再试一下,不再有警告。
查询 mysql 中的插入数量也没有问题:
更多推荐
所有评论(0)