2021_w_.2.python驱动mysql(MySQLdb) 代码案例
数据库参数:# !/usr/bin/env python# -- coding: utf-8 --# @Time : 2021/7/9 11:09# @Author : wang vx:672377334# @File : 穿透sql转py.py# @software :PyCharmimport MySQLdbimport pandas as pdimport osdef read_mysql_
·
数据库参数:
# !/usr/bin/env python
# -- coding: utf-8 --
# @Time : 2021/7/9 11:09
# @Author : wang vx:672377334
# @File : 穿透sql转py.py
# @software : PyCharm
import MySQLdb
import pandas as pd
import os
def read_mysql_conf(conf_path):
conf_data = pd.read_excel(conf_path, dtype="str") # ,dtype="str"
db_conf_dict = dict(zip(conf_data.db_conf.tolist(), conf_data.value.tolist()))
db_conf_dict['port'] = int(db_conf_dict['port'])
return db_conf_dict
def group_by_pathName(f_path):
f_path = f_path
# fPath_jymx_list_error = []
fPath_name_list = []
for root, dirs, files in os.walk(f_path):
for name in files:
fPath_name_list.append(os.path.join(root, name))
return fPath_name_list
def cur_sql(sql_str):
cur = mysql_conn.cursor()
cur.execute(sql_str)
# print("执行:%s"%(sql_str))
mysql_conn.commit()
return cur.fetchall()
def pre_start_id_table(start_id_path):
start_id_df = pd.read_excel("./1.in/start_id.xlsx", dtype="str")
start_id_creat_sqllist = [
'''DROP TABLE IF EXISTS start_id;''',
'''CREATE TABLE `start_id` (
`cxkh` varchar(20) ,
`jymc` varchar(255) ,
`jyzjhm` varchar(20)
) ;''']
for c_sql in start_id_creat_sqllist:
print("-- 创建 start_id 起点表 --")
cur_sql(c_sql)
print("-- 正在往 start_id 插入起点数据 --")
for i in range(len(start_id_df.index)):
insert_value = tuple(start_id_df.iloc[i].tolist())
insrt_sql_str = "INSERT start_id VALUES %s ;" % (str(insert_value))
# print(insrt_sql_str)
cur_sql(insrt_sql_str)
print("-- 起点表相关工作完成 --")
def sql_code_to_py():
list_sql_code_1 = [
'DROP INDEX idex_cxkh on gas_bank_records;',
'DROP INDEX idex_jydfzkh on gas_bank_records;',
'CREATE INDEX idex_cxkh on gas_bank_records(cxkh);',
'CREATE INDEX idex_jydfzkh on gas_bank_records(jydfzkh);',
'DROP TABLE if EXISTS temp_jymx_1;',
'CREATE TABLE temp_jymx_1 LIKE gas_bank_records ;',
'DROP TABLE if EXISTS gas_bank_time_records;',
"CREATE TABLE gas_bank_time_records LIKE gas_bank_records;",
"INSERT INTO gas_bank_time_records SELECT * FROM gas_bank_records WHERE jysj >'2018-01-01 00:00:00' ;"
]
list_sql_code_2 = [
"drop TABLE if EXISTS `sk_and_fx_name`;",
"""CREATE TABLE `sk_and_fx_name`(
`yhkh` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
`NAME_XM` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
`入账` double DEFAULT NULL,
`出账` double DEFAULT NULL,
`差额` double DEFAULT NULL,
`P_num` bigint(21) NOT NULL DEFAULT '0',
ds_kh text,
ds_kh_j text,
ds_kh_c text,
ds_name text,
ds_name_j text,
ds_name_c text,
`cj` int(1) NOT NULL DEFAULT '0',
`ye` double DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;""",
"CREATE INDEX idex_jydfzkh on sk_and_fx_name(yhkh);",
"INSERT into `sk_and_fx_name`(yhkh,`NAME_XM`,`cj`) SELECT start_ID.cxkh,start_ID.jymc,0 FROM start_ID ;",
"DROP PROCEDURE if EXISTS chuantou_zj_ry;",
'''create definer = root@localhost procedure chuantou_zj_ry( in time_s int)
BEGIN
DECLARE time_now int DEFAULT 2;
WHILE time_now <time_s DO
TRUNCATE TABLE temp_jymx_1;
INSERT into temp_jymx_1
SELECT * FROM gas_bank_time_records WHERE jydfzkh NOT IN
(SELECT DISTINCT yhkh FROM sk_and_fx_NAME where cj<>0)
and
cxkh
in
(SELECT DISTINCT yhkh FROM sk_and_fx_NAME )
and
NOT ISNULL(cxkh) ;
INSERT into `sk_and_fx_name`
SELECT jydfzkh yhkh ,
jydfmc 'NAME_XM',
sum(case WHen jdbz='出' then jyje ELSE 0 end) 入账,
sum(case WHen jdbz='进' then jyje ELSE 0 end) 出账,
sum(case WHen jdbz='出' then jyje ELSE 0 end)-sum(case WHen jdbz='进' then jyje ELSE 0 end) ce,
count(DISTINCT cxkh) P_num,
GROUP_CONCAT( DISTINCT cxkh,'||') ds_kh,
GROUP_CONCAT( DISTINCT case WHen jdbz='出' then cxkh ELSE "" end,'||') ds_kh_j,
GROUP_CONCAT( DISTINCT case WHen jdbz='进' then cxkh ELSE "" end,'||') ds_kh_c,
GROUP_CONCAT( DISTINCT jymc,'||') ds_name,
GROUP_CONCAT( DISTINCT case WHen jdbz='出' then jymc ELSE "" end,'||') ds_name_j,
GROUP_CONCAT( DISTINCT case WHen jdbz='进' then jymc ELSE "" end,'||') ds_name_c,
time_now 'cj', max(`jydfzkhye`) FROM temp_jymx_1
GROUP BY yhkh,jydfmc HAVING
(P_num>=3 and (入账 >=30000 or 出账>=30000))
OR
(P_num>=2 and (入账 >=60000 or 出账>=60000))
OR
(P_num>=1 and (入账 >=10000 or 出账>=10000))
ORDER BY P_num DESC ; SELECT time_now+1 into time_now;
END WHILE;
end; '''
]
list_sql_code_3 = ["TRUNCATE TABLE temp_jymx_1;",
"INSERT into temp_jymx_1 SELECT * FROM gas_bank_time_records WHERE jydfzkh NOT IN (SELECT DISTINCT yhkh FROM sk_and_fx_name where cj<>0) and cxkh in(SELECT DISTINCT yhkh FROM sk_and_fx_name ) AND not isnull(cxkh) ;",
"SET group_concat_max_len=1024000000;",
'''INSERT into `sk_and_fx_name`
SELECT
jydfzkh yhkh ,
jydfmc 'NAME_XM',
sum(case WHen jdbz='出' then jyje ELSE 0 end) 入账,
sum(case WHen jdbz='进' then jyje ELSE 0 end) 出账,
sum(case WHen jdbz='出' then jyje ELSE 0 end)-sum(case WHen jdbz='进' then jyje ELSE 0 end) ce,
count(DISTINCT cxkh) P_num,
GROUP_CONCAT( DISTINCT cxkh,'||') ds_kh,
GROUP_CONCAT( DISTINCT case WHen jdbz='出' then cxkh ELSE "" end,'||') ds_kh_j,
GROUP_CONCAT( DISTINCT case WHen jdbz='进' then cxkh ELSE "" end,'||') ds_kh_c,
GROUP_CONCAT( DISTINCT jymc,'||') ds_name,
GROUP_CONCAT( DISTINCT case WHen jdbz='出' then jymc ELSE "" end,'||') ds_name_j,
GROUP_CONCAT( DISTINCT case WHen jdbz='进' then jymc ELSE "" end,'||') ds_name_c,
1 'cj',
max(`jydfzkhye`)
FROM temp_jymx_1
GROUP BY yhkh,jydfmc
HAVING
(P_num>=1 and (入账 >=10000 or 出账>=10000))
ORDER BY P_num DESC ;''',
"call chuantou_zj_ry(15);"]
list_sql_code_4 = [
"""DROP TABLE if EXISTS 在库数据;""",
"""CREATE TABLE 在库数据 as SELECT DISTINCT cxkh,jymc,jyzjhm,"在库" 是否在库 FROM gas_bank_records """,
"""DROP TABLE if EXISTS common_account_temp;""",
"""CREATE TABLE common_account_temp
SELECT
jydfzkh yhkh ,
jydfmc 'NAME_XM',
sum(case WHen jdbz='出' then jyje ELSE 0 end) 入账,
sum(case WHen jdbz='进' then jyje ELSE 0 end) 出账,
sum(case WHen jdbz='出' then jyje ELSE 0 end)-sum(case WHen jdbz='进' then jyje ELSE 0 end) ce,
count(DISTINCT cxkh) common_accounts,
GROUP_CONCAT( DISTINCT cxkh,'||') ds_kh,
GROUP_CONCAT( DISTINCT case WHen jdbz='出' then cxkh ELSE "" end,'||') ds_kh_j,
GROUP_CONCAT( DISTINCT case WHen jdbz='进' then cxkh ELSE "" end,'||') ds_kh_c,
GROUP_CONCAT( DISTINCT jymc,'||') ds_name,
GROUP_CONCAT( DISTINCT case WHen jdbz='出' then jymc ELSE "" end,'||') ds_name_j,
GROUP_CONCAT( DISTINCT case WHen jdbz='进' then jymc ELSE "" end,'||') ds_name_c
FROM gas_bank_records
WHERE jydfzkh in (SELECT yhkh FROM sk_and_fx_name)
AND not ISNULL(jydfzkh)
and jydfzkh<>""
GROUP BY yhkh,jydfmc;""",
"""DROP TABLE if EXISTS common_account;""",
"""CREATE TABLE common_account
SELECT common_account_temp.*,在库数据.`是否在库` FROM common_account_temp LEFT JOIN 在库数据
ON common_account_temp.yhkh=在库数据.cxkh;""",
"""DROP TABLE if EXISTS common_cj;""",
"""CREATE TABLE common_cj
SELECT sk_and_fx_name.*,在库数据.`是否在库` FROM sk_and_fx_name LEFT JOIN 在库数据
ON sk_and_fx_name.yhkh=在库数据.cxkh ORDER BY 差额 desc;
"""
]
LIST_SQL_CODE_all = [list_sql_code_1, list_sql_code_2, list_sql_code_3, list_sql_code_4]
count = -1
for list_sql_c in LIST_SQL_CODE_all:
count += 1
if count == 0:
print("-- 正在筛选案发时间建表,创建索引等 --")
if count == 1:
print("-- 正在创建穿透存储过程,创建层级空表等 --")
if count == 2:
print("-- 正在创建穿透资金层级,匹配交易对手等 --")
if count == 3:
print("-- 正在计算团伙共同交易对手,匹配资金流等 --")
for sql_c_s in list_sql_c:
cur_sql(sql_c_s)
# print(sql_c_s)
def db_to_df(mysql_conn):
db_to_df_sql_list = ["""SELECT * FROM common_account;""",
"""SELECT * FROM common_cj;""",
"""SELECT
gas_bank_time_records.cxkh,
gas_bank_time_records.jymc,
gas_bank_time_records.jyzjhm,
SUM(case when jdbz='进' then jyje else 0 end ) 进,
SUM(case when jdbz='出' then jyje else 0 end ) 出,
SUM(case when jdbz='进' then jyje else 0 end )-SUM(case when jdbz='出'then jyje else 0 end ) 进出差额,
SUM(case when jdbz='出' then jyje else 0 end )-SUM(case when jdbz='进'then jyje else 0 end ) 出进差额,
count(jdbz) 交易次数,
SUM(case when jdbz='出' then 1 else 0 end ) 调单出账次数,
SUM(case when jdbz='进' then 1 else 0 end ) 调单进账次数,
jydfzkh,
jydfmc,
GROUP_CONCAT(DISTINCT zysm, '|') 摘要,
GROUP_CONCAT(DISTINCT beiz ,'|') 备注,
min(jysj) min_jysj,
max(jysj) max_jysj,
在库数据.`是否在库`
FROM gas_bank_time_records
LEFT JOIN 在库数据
ON gas_bank_time_records.jydfzkh=在库数据.cxkh
GROUP BY
gas_bank_time_records.cxkh,
gas_bank_time_records.jyzjhm,
jymc,
jydfzkh,
jydfzkh,
jydfmc,
在库数据.`是否在库`
ORDER BY 出进差额 desc;"""]
out_path_list = ['./2.out/1.共同交易对手资金关系表.xlsx',
'./2.out/2.收款卡关联层级资金关系表.xlsx',
'./2.out/3.已调卡号资金流向表.xlsx']
print("-- 开始导出数据 --")
for i in range(len(db_to_df_sql_list)):
df = pd.read_sql(db_to_df_sql_list[i], con=mysql_conn)
print("-- 列名转中文中 --")
df = rename_2_ch(df)
print("-- 列名转中文完成 --")
out_path = out_path_list[i]
out_Excel(df, out_path)
print("-- 导出数据完成 --")
def rename_2_ch(df):
name_dict = {'yhkh': '银行卡号',
'NAME_XM': '姓名',
'入账': '入账',
'出账': '出账',
'ce': '差额',
'common_accounts': '共同交易对手数量',
'ds_kh': '对手卡号(进出)',
'ds_kh_j': '对手卡号(进)',
'ds_kh_c': '对手卡号(出)',
'ds_name': '对手姓名(进出)',
'ds_name_j': '对手姓名(进)',
'ds_name_c': '对手姓名(出)',
'是否在库': '是否在库',
'差额': '差额',
'P_num': '直接关联对手',
'cj': '层级',
'ye': '余额',
'cxkh': '查询卡号',
'jymc': '交易名称',
'jyzjhm': '交易证件号码',
'进': '进',
'出': '出',
'进出差额': '进出差额',
'出进差额': '出进差额',
'交易次数': '交易次数',
'调单出账次数': '调单出账次数',
'调单进账次数': '调单进账次数',
'jydfzkh': '交易对方卡号',
'jydfmc': '交易对方名称',
'摘要': '摘要合并',
'备注': '备注合并',
'min_jysj': '最早交易时间',
'max_jysj': '最晚交易时间'}
df.rename(columns=name_dict, inplace=True)
return df
def out_Excel(out_df, out_path):
out_df.to_excel(out_path, index=False, encoding='utf8')
# out_df.to_csv("1.out/招商交易流水合并导出.csv", index=False,encoding='utf8')
# print("-- 数据导出成功 --")
if __name__ == '__main__':
conf_path = "./1.in/mysql_conf.xlsx"
start_id_path = "./1.in/start_id.xlsx"
print("****** 欢迎使用资金层级关联穿透程序 wang:******")
db_conf_dict = read_mysql_conf(conf_path)
mysql_conn = MySQLdb.connect(**db_conf_dict)
pre_start_id_table(start_id_path)
sql_code_to_py()
db_to_df(mysql_conn)
mysql_conn.close()
print("****** 恭喜,程序运行完成 ******")
更多推荐
已为社区贡献4条内容
所有评论(0)