文章目录

背景

  • 使用Sqoop把ADS层数据导出到MySQL
  • 使用sqoop export时要添加--columns,避免一些奇奇怪怪的报错
  • 使用正则表达式获取字段名

流程

  1. ADS层不分区,不压缩,行存
  2. ADS层建表SQL要有单独的文件,如果表更新就要更新该文件的建表语句
  3. 表名:ADS层的HIVE表有ads_前缀,对应到MySQL建表时去掉前缀
  4. 字段:ADS层表和MySQL表的 字段名及字段顺序都要一致,用`符号包裹
  5. 遍历ADS层建表语句,使用正则表达式获取 表名、所有字段名
  6. 传参到Sqoop命令

代码

ADS层建表语句(ADS层建表.sql

-- HIVE建表语句,字段用`符号包裹,表名不需要包裹
CREATE EXTERNAL TABLE ads_purchase_order_info (
`prch_order_id` BIGINT COMMENT '采购订单头id',
`exfactory_total_price` DOUBLE COMMENT '出厂价总额',
`insert_time` STRING COMMENT '数据插入日期'
) COMMENT '采购信息';

MySQL建表语句

CREATE TABLE purchase_order_info (
`prch_order_id` bigint COMMENT '采购订单头id',
`exfactory_total_price` DOUBLE COMMENT '出厂价总额',
`insert_time` text COMMENT '数据插入日期',
PRIMARY KEY (`prch_order_id`)
) COMMENT '采购信息';

Python

class Sqoop(Shell):
    def sqoop(self, cmd):
        return self.sh_cmd_and_alert(' '.join(cmd.split()))

    def sqoop_export(self, mysql_tb, export_dir, columns='', update_mode='allowinsert', update_key='prch_order_id'):
        """
        --columns缺省默认是全部列;建议加上,避免一些莫名其妙的bug
        --update-mode缺省默认是updateonly,可改为allowinsert
        --update-key是用于更新的锚列;多个列用逗号分隔
        """
        return self.sqoop(r'''
        {sqoop} export
        --connect jdbc:mysql://{host}:{port}/{database}
        --username '{username}'
        --password '{password}'
        --table {table}
        --num-mappers 1
        --input-fields-terminated-by '\001'
        --input-null-string '\\N'
        --input-null-non-string '\\N'
        --export-dir '{export_dir}'
        {columns}
        '''.format(
            sqoop=self.get('sqoop', 'sqoop'),
            host=self.get('mysql_host', 'localhost'),
            port=self.get('mysql_port', '3306'),
            database=self['mysql_db'],
            username=self.get('mysql_user', 'root'),
            password=self['mysql_pwd'],
            table=mysql_tb,
            export_dir=export_dir,
            columns=columns,
        ))
from re import findall

s = get_sqoop()
for ads_ddl in read_sql_file('ADS层建表.sql').split(';')[:-1]:
    columns = '--columns ' + ','.join(findall('`([^`]+)`', ads_ddl))
    hive_tb = findall(r'CREATE EXTERNAL TABLE (\S+)', ads_ddl)[0]
    mysql_tb = hive_tb.replace('ads_', '')
    print(s.sqoop_export(mysql_tb, EXPORT_DIR_PREFIX + hive_tb, columns))

因为是你

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐