数据库参数:
在这里插入图片描述

# !/usr/bin/env python
# -- coding: utf-8 --
# @Time : 2021/7/9 11:09
# @Author : wang vx:672377334
# @File : 穿透sql转py.py
# @software :  PyCharm


import MySQLdb
import pandas as pd
import os


def read_mysql_conf(conf_path):
    conf_data = pd.read_excel(conf_path, dtype="str")  # ,dtype="str"
    db_conf_dict = dict(zip(conf_data.db_conf.tolist(), conf_data.value.tolist()))
    db_conf_dict['port'] = int(db_conf_dict['port'])
    return db_conf_dict


def group_by_pathName(f_path):
    f_path = f_path
    # fPath_jymx_list_error = []
    fPath_name_list = []
    for root, dirs, files in os.walk(f_path):
        for name in files:
            fPath_name_list.append(os.path.join(root, name))
    return fPath_name_list


def cur_sql(sql_str):
    cur = mysql_conn.cursor()
    cur.execute(sql_str)
    # print("执行:%s"%(sql_str))
    mysql_conn.commit()
    return cur.fetchall()


def pre_start_id_table(start_id_path):
    start_id_df = pd.read_excel("./1.in/start_id.xlsx", dtype="str")
    start_id_creat_sqllist = [
        '''DROP TABLE IF EXISTS start_id;''',
        '''CREATE TABLE `start_id` (
          `cxkh` varchar(20)  ,
          `jymc` varchar(255)  ,
          `jyzjhm` varchar(20) 
            ) ;''']

    for c_sql in start_id_creat_sqllist:
        print("-- 创建 start_id 起点表 --")
        cur_sql(c_sql)
    print("-- 正在往 start_id 插入起点数据 --")
    for i in range(len(start_id_df.index)):
        insert_value = tuple(start_id_df.iloc[i].tolist())
        insrt_sql_str = "INSERT start_id VALUES %s ;" % (str(insert_value))
        #         print(insrt_sql_str)
        cur_sql(insrt_sql_str)
    print("-- 起点表相关工作完成 --")


def sql_code_to_py():
    list_sql_code_1 = [
        'DROP INDEX  idex_cxkh on  gas_bank_records;',
        'DROP INDEX     idex_jydfzkh on  gas_bank_records;',
        'CREATE INDEX  idex_cxkh on  gas_bank_records(cxkh);',
        'CREATE INDEX  idex_jydfzkh on  gas_bank_records(jydfzkh);',
        'DROP TABLE if EXISTS temp_jymx_1;',
        'CREATE  TABLE temp_jymx_1 LIKE gas_bank_records ;',

        'DROP TABLE if EXISTS gas_bank_time_records;',
        "CREATE TABLE gas_bank_time_records  LIKE gas_bank_records;",
        "INSERT INTO gas_bank_time_records SELECT * FROM gas_bank_records WHERE jysj >'2018-01-01 00:00:00' ;"
    ]

    list_sql_code_2 = [
        "drop TABLE if EXISTS `sk_and_fx_name`;",
        """CREATE TABLE `sk_and_fx_name`(
        `yhkh` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
        `NAME_XM` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
        `入账` double DEFAULT NULL,
        `出账` double DEFAULT NULL,
        `差额` double DEFAULT NULL,
        `P_num` bigint(21) NOT NULL DEFAULT '0',
        ds_kh text,
        ds_kh_j  text,
        ds_kh_c  text,
        ds_name text,
        ds_name_j text,
        ds_name_c text,
        `cj` int(1) NOT NULL DEFAULT '0',
        `ye` double DEFAULT NULL
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;""",
        "CREATE INDEX  idex_jydfzkh on  sk_and_fx_name(yhkh);",
        "INSERT into `sk_and_fx_name`(yhkh,`NAME_XM`,`cj`) SELECT start_ID.cxkh,start_ID.jymc,0 FROM   start_ID ;",
        "DROP PROCEDURE if EXISTS chuantou_zj_ry;",
        '''create definer = root@localhost procedure chuantou_zj_ry( in time_s int)
    BEGIN
        DECLARE time_now int DEFAULT 2;
        WHILE time_now <time_s DO  
        TRUNCATE TABLE  temp_jymx_1;  
        INSERT  into temp_jymx_1   
        SELECT * FROM gas_bank_time_records WHERE  jydfzkh NOT IN 
        (SELECT DISTINCT yhkh FROM sk_and_fx_NAME where cj<>0) 
        and   
        cxkh 
        in  
        (SELECT DISTINCT yhkh FROM sk_and_fx_NAME )
        and 
        NOT ISNULL(cxkh) ;  
        INSERT into `sk_and_fx_name`  
        SELECT   jydfzkh yhkh ,  
        jydfmc 'NAME_XM',  
        sum(case WHen jdbz='出' then  jyje ELSE 0 end) 入账,  
        sum(case WHen jdbz='进' then  jyje ELSE 0 end) 出账,  
        sum(case WHen jdbz='出' then  jyje ELSE 0 end)-sum(case WHen jdbz='进' then  jyje ELSE 0 end) ce,  
        count(DISTINCT cxkh) P_num,
        GROUP_CONCAT( DISTINCT cxkh,'||') ds_kh,
        GROUP_CONCAT( DISTINCT case WHen jdbz='出' then  cxkh ELSE "" end,'||') ds_kh_j,
        GROUP_CONCAT( DISTINCT case WHen jdbz='进' then  cxkh ELSE "" end,'||') ds_kh_c,
        GROUP_CONCAT( DISTINCT jymc,'||') ds_name,
        GROUP_CONCAT( DISTINCT case WHen jdbz='出' then  jymc ELSE "" end,'||') ds_name_j,
        GROUP_CONCAT( DISTINCT case WHen jdbz='进' then  jymc ELSE "" end,'||') ds_name_c,  
        time_now 'cj',  max(`jydfzkhye`)  FROM temp_jymx_1   
        GROUP BY yhkh,jydfmc  HAVING   
        (P_num>=3 and (入账 >=30000 or 出账>=30000))  
        OR  
        (P_num>=2 and (入账 >=60000 or 出账>=60000)) 
        OR  
        (P_num>=1 and (入账 >=10000 or 出账>=10000))  
        ORDER BY P_num DESC ; SELECT  time_now+1 into  time_now;  
        END WHILE;
    end; '''

    ]

    list_sql_code_3 = ["TRUNCATE TABLE  temp_jymx_1;",
                       "INSERT  into temp_jymx_1 SELECT * FROM gas_bank_time_records WHERE jydfzkh NOT IN (SELECT DISTINCT yhkh FROM sk_and_fx_name where cj<>0) and cxkh in(SELECT DISTINCT yhkh FROM sk_and_fx_name ) AND not isnull(cxkh) ;",
                       "SET group_concat_max_len=1024000000;",
                       '''INSERT into `sk_and_fx_name`
                    SELECT 
                    jydfzkh yhkh ,
                    jydfmc 'NAME_XM',
                    sum(case WHen jdbz='出' then  jyje ELSE 0 end) 入账,
                    sum(case WHen jdbz='进' then  jyje ELSE 0 end) 出账,
                    sum(case WHen jdbz='出' then  jyje ELSE 0 end)-sum(case WHen jdbz='进' then  jyje ELSE 0 end) ce,
                    count(DISTINCT cxkh) P_num,
                    GROUP_CONCAT( DISTINCT cxkh,'||') ds_kh,
                    GROUP_CONCAT( DISTINCT case WHen jdbz='出' then  cxkh ELSE "" end,'||') ds_kh_j,
                    GROUP_CONCAT( DISTINCT case WHen jdbz='进' then  cxkh ELSE "" end,'||') ds_kh_c,
                    GROUP_CONCAT( DISTINCT jymc,'||') ds_name,
                    GROUP_CONCAT( DISTINCT case WHen jdbz='出' then  jymc ELSE "" end,'||') ds_name_j,
                    GROUP_CONCAT( DISTINCT  case WHen jdbz='进' then  jymc ELSE "" end,'||') ds_name_c,
                
                    1 'cj',
                    max(`jydfzkhye`)
                    FROM temp_jymx_1 
                
                    GROUP BY  yhkh,jydfmc
                    HAVING 
                    (P_num>=1 and (入账 >=10000 or 出账>=10000))
                
                    ORDER BY P_num DESC ;''',
                       "call chuantou_zj_ry(15);"]

    list_sql_code_4 = [
        """DROP TABLE  if EXISTS 在库数据;""",
        """CREATE TABLE 在库数据 as SELECT DISTINCT cxkh,jymc,jyzjhm,"在库" 是否在库  FROM gas_bank_records """,
        """DROP TABLE if EXISTS  common_account_temp;""",
        """CREATE TABLE common_account_temp
            SELECT
            jydfzkh yhkh ,
            jydfmc 'NAME_XM',
            sum(case WHen jdbz='出' then  jyje ELSE 0 end) 入账,
            sum(case WHen jdbz='进' then  jyje ELSE 0 end) 出账,
            sum(case WHen jdbz='出' then  jyje ELSE 0 end)-sum(case WHen jdbz='进' then  jyje ELSE 0 end) ce,
            count(DISTINCT cxkh) common_accounts,
            GROUP_CONCAT( DISTINCT cxkh,'||') ds_kh,
            GROUP_CONCAT( DISTINCT case WHen jdbz='出' then  cxkh ELSE "" end,'||') ds_kh_j,
            GROUP_CONCAT( DISTINCT case WHen jdbz='进' then  cxkh ELSE "" end,'||') ds_kh_c,
            GROUP_CONCAT( DISTINCT jymc,'||') ds_name,
            GROUP_CONCAT( DISTINCT case WHen jdbz='出' then  jymc ELSE "" end,'||') ds_name_j,
            GROUP_CONCAT( DISTINCT  case WHen jdbz='进' then  jymc ELSE "" end,'||') ds_name_c
            FROM gas_bank_records

            WHERE jydfzkh in (SELECT yhkh FROM sk_and_fx_name) 
            AND not ISNULL(jydfzkh) 
            and jydfzkh<>""

            GROUP BY  yhkh,jydfmc;""",
        """DROP TABLE if EXISTS  common_account;""",
        """CREATE TABLE common_account
            SELECT common_account_temp.*,在库数据.`是否在库` FROM common_account_temp LEFT JOIN 在库数据 
            ON common_account_temp.yhkh=在库数据.cxkh;""",
        """DROP TABLE if EXISTS  common_cj;""",
        """CREATE TABLE common_cj
            SELECT sk_and_fx_name.*,在库数据.`是否在库` FROM sk_and_fx_name LEFT JOIN 在库数据 
            ON sk_and_fx_name.yhkh=在库数据.cxkh  ORDER BY 差额 desc;
        """
    ]

    LIST_SQL_CODE_all = [list_sql_code_1, list_sql_code_2, list_sql_code_3, list_sql_code_4]
    count = -1
    for list_sql_c in LIST_SQL_CODE_all:
        count += 1
        if count == 0:
            print("-- 正在筛选案发时间建表,创建索引等 --")
        if count == 1:
            print("-- 正在创建穿透存储过程,创建层级空表等 --")
        if count == 2:
            print("-- 正在创建穿透资金层级,匹配交易对手等 --")
        if count == 3:
            print("-- 正在计算团伙共同交易对手,匹配资金流等 --")
        for sql_c_s in list_sql_c:
            cur_sql(sql_c_s)
            # print(sql_c_s)


def db_to_df(mysql_conn):
    db_to_df_sql_list = ["""SELECT * FROM common_account;""",
                         """SELECT * FROM common_cj;""",
                         """SELECT  
                         gas_bank_time_records.cxkh,
                         gas_bank_time_records.jymc,
                         gas_bank_time_records.jyzjhm,
                         SUM(case when jdbz='进' then jyje else 0 end ) 进,
                         SUM(case when jdbz='出' then jyje else 0 end ) 出,
                         SUM(case when jdbz='进' then jyje else 0 end )-SUM(case when jdbz='出'then jyje else 0 end ) 进出差额,
                         SUM(case when jdbz='出' then jyje else 0 end )-SUM(case when jdbz='进'then jyje else 0 end ) 出进差额,
                         count(jdbz) 交易次数,
                         SUM(case when jdbz='出' then 1 else 0 end )  调单出账次数,
                         SUM(case when jdbz='进' then 1 else 0 end )  调单进账次数,
                         jydfzkh,
                         jydfmc,
                         GROUP_CONCAT(DISTINCT zysm, '|') 摘要,
                         GROUP_CONCAT(DISTINCT beiz ,'|') 备注,
                         min(jysj) min_jysj,
                         max(jysj) max_jysj,
                         在库数据.`是否在库`
                         FROM gas_bank_time_records
                         LEFT JOIN 在库数据 
                         ON gas_bank_time_records.jydfzkh=在库数据.cxkh
                         GROUP BY  
                         gas_bank_time_records.cxkh,
                         gas_bank_time_records.jyzjhm,
                         jymc,
                         jydfzkh,
                         jydfzkh,
                         jydfmc,
                         在库数据.`是否在库`
                         ORDER BY 出进差额  desc;"""]
    out_path_list = ['./2.out/1.共同交易对手资金关系表.xlsx',
                     './2.out/2.收款卡关联层级资金关系表.xlsx',
                     './2.out/3.已调卡号资金流向表.xlsx']
    print("-- 开始导出数据 --")
    for i in range(len(db_to_df_sql_list)):
        df = pd.read_sql(db_to_df_sql_list[i], con=mysql_conn)
        print("-- 列名转中文中 --")
        df = rename_2_ch(df)
        print("-- 列名转中文完成 --")

        out_path = out_path_list[i]
        out_Excel(df, out_path)
    print("-- 导出数据完成 --")


def rename_2_ch(df):
    name_dict = {'yhkh': '银行卡号',
                 'NAME_XM': '姓名',
                 '入账': '入账',
                 '出账': '出账',
                 'ce': '差额',
                 'common_accounts': '共同交易对手数量',
                 'ds_kh': '对手卡号(进出)',
                 'ds_kh_j': '对手卡号(进)',
                 'ds_kh_c': '对手卡号(出)',
                 'ds_name': '对手姓名(进出)',
                 'ds_name_j': '对手姓名(进)',
                 'ds_name_c': '对手姓名(出)',
                 '是否在库': '是否在库',
                 '差额': '差额',
                 'P_num': '直接关联对手',
                 'cj': '层级',
                 'ye': '余额',
                 'cxkh': '查询卡号',
                 'jymc': '交易名称',
                 'jyzjhm': '交易证件号码',
                 '进': '进',
                 '出': '出',
                 '进出差额': '进出差额',
                 '出进差额': '出进差额',
                 '交易次数': '交易次数',
                 '调单出账次数': '调单出账次数',
                 '调单进账次数': '调单进账次数',
                 'jydfzkh': '交易对方卡号',
                 'jydfmc': '交易对方名称',
                 '摘要': '摘要合并',
                 '备注': '备注合并',
                 'min_jysj': '最早交易时间',
                 'max_jysj': '最晚交易时间'}
    df.rename(columns=name_dict, inplace=True)
    return df


def out_Excel(out_df, out_path):
    out_df.to_excel(out_path, index=False, encoding='utf8')


#     out_df.to_csv("1.out/招商交易流水合并导出.csv", index=False,encoding='utf8')
#     print("-- 数据导出成功 --")


if __name__ == '__main__':
    conf_path = "./1.in/mysql_conf.xlsx"
    start_id_path = "./1.in/start_id.xlsx"
    print("****** 欢迎使用资金层级关联穿透程序 wang:******")
    db_conf_dict = read_mysql_conf(conf_path)
    mysql_conn = MySQLdb.connect(**db_conf_dict)
    pre_start_id_table(start_id_path)
    sql_code_to_py()
    db_to_df(mysql_conn)
    mysql_conn.close()
    print("****** 恭喜,程序运行完成 ******")





Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐