preface:工作中使用python进行处理常会遇到各种问题及技巧,为此汇总。python功能太多,记住常用即可。

环境:python3.7及以上、mac

python——pip、conda、ipython

  • 常用1:创建虚拟环境(从一个爸爸创造多个儿子,各个儿子之间环境不影响,同一个服务器上自己的python环境不被其他人干扰,自己的多个人可采用不同版本tensorflow包等等)
    • which conda:查看当下是哪个conda
      conda env list:看看有哪些虚拟环境
      conda create -n sftf python==3.7.6:创建虚拟环境
      conda activate sftf:启动虚拟环境
      conda deactivate:关闭虚拟环境

       

  • 常用2:pip安装包
    • pip install -r requirements.txt:批量安装某些包,包名每行一个,放在requirements.txt文件里面
    • pip install xxxxx -i https://pypi.tuna.tsinghua.edu.cn/simple:采用清华源来安装。其他镜像:
      • 阿里云:http://mirrors.aliyun.com/pypi/simple
        中国科技大学:https://pypi.mirrors.ustc.edu.cn/simple
        豆瓣:http://pypi.douban.com/simple
    • 实在不行,先翻墙下载,再塞到site-packages文件下(/Users/shifeng/anaconda3/lib/python3.7/site-packages)
    • 注意:mac、linux不同环境下的site-packages可能不能通用
  • 常用3:ipython交互式调试各种问题

python——datetime、time

  • 常用1:取当前时间;时间、string、datetime三者转换;距今多少日等
  • 代码
    • # coding=utf-8
      # from utils import time_utils.py #该文件命名time_utils.py,放到utils文件夹下,调用即可
      from datetime import date, timedelta, datetime
      import time
      
      def get_today(timeFormat='%Y%m%d'):
          return(date.today()).strftime(timeFormat)
      
      def get_yesterday(timeFormat='%Y%m%d'):
          return (date.today() - timedelta(1)).strftime(timeFormat)
      
      def get_current(timeFormat='%Y%m%d'):
          local_time = time.localtime()
          timeString = time.strftime(timeFormat, local_time)
          return timeString
      
      def get_date(diff_days=0, diff_hours=0, day_format='%Y%m%d', current_date=None):
          date={}
          if current_date is None:
              timestamp = time.time() - (diff_days * 24 + diff_hours) * 3600
              date['day'], date['hour'] = time.strftime(day_format + ' %H:%M', time.localtime(timestamp)).split()
              return date
      
          day = current_date.get('day', None)
          hour = current_date.get('hour', None)
          if hour is None:
              timestamp = time.mktime(time.strptime(day, day_format)) - diff_days * 24 * 3600
              date['day'] = time.strftime(day_format, time.localtime(timestamp))
              return date
          else:
              timestamp = time.mktime(time.strptime('%s %s' % (day, hour), day_format + ' %H:%M')) - \
                  (diff_days * 24 + diff_hours) * 3600
              date['day'], date['hour'] = time.strftime(day_format + ' %H:%M', time.localtime(timestamp)).split()
              return date
      
      def get_last_half_hour(timeFormat='%H-%M-%S'):
          localtime = time.localtime()
          minute = localtime.tm_min
      
          if minute > 30:
              minute = 30
          else:
              minute = 0
      
          last_hour_time = str(localtime.tm_year) + convert_digit_to_str(localtime.tm_mon) + convert_digit_to_str(localtime.tm_mday) + ' '\
              + convert_digit_to_str(localtime.tm_hour) + '-' + convert_digit_to_str(minute) + '-' + '00'
          timestamp = time.mktime(time.strptime(last_hour_time, '%Y%m%d %H-%M-%S'))
      
          return time.strftime(timeFormat, time.localtime(timestamp))
      
      def convert_digit_to_str(digit):
          if digit < 10:
              return str(0) + str(digit)
          return str(digit)
      
      def get_before_last_half_hour(timeFormat='%H-%M-%S'):
          localtime = time.localtime()
          minute = localtime.tm_min
          if minute > 30:
              minute = 30
          else:
              minute = 0
      
          last_hour_time = str(localtime.tm_year) + convert_digit_to_str(localtime.tm_mon) + convert_digit_to_str(localtime.tm_mday) + ' '\
              + convert_digit_to_str(localtime.tm_hour) + '-' + convert_digit_to_str(minute) + '-' + '00'
          timestamp = time.mktime(time.strptime(last_hour_time, '%Y%m%d %H-%M-%S')) - (30 * 60)
          return time.strftime(timeFormat, time.localtime(timestamp))
      
      def get_week_before(day_time, timeFormat='%Y%m%d'):
          timestamp = time.mktime(time.strptime(day_time, timeFormat))
          timestamp -= 7*24*3600
          return time.strftime(timeFormat, time.localtime(timestamp))
      
      
      def get_day_before(day_time, num_day, timeFormat='%Y%m%d'):
          timestamp = time.mktime(time.strptime(day_time, timeFormat))
          timestamp -= num_day*24*3600
          return time.strftime(timeFormat, time.localtime(timestamp))
      
      
      def get_minute_before(timeStr, minute, timeFormat='%H-%M'):
          timestamp = time.mktime(time.strptime(timeStr, timeFormat))
          timestamp -= minute * 60
          return time.strftime(timeFormat, time.localtime(timestamp))
      
      
      def convert_time(timeStr, input_format, out_format):
          try:
              timestamp = time.mktime(time.strptime(timeStr, input_format))
          except OverflowError:
              print("timeStr is: ", timeStr, "out of range")
              return None
          return time.strftime(out_format, time.localtime(timestamp))
      
      
      def get_diff_days(timeStr1, timeStr2, timeFormat="%Y%m%d"):
          time1 = datetime.strptime(timeStr1, timeFormat).replace(hour=0, minute=0, second=0, microsecond=0)
          time2 = datetime.strptime(timeStr2, timeFormat).replace(hour=0, minute=0, second=0, microsecond=0)
          return (time1 - time2).days
      
      def get_today_before(days, timeFormat='%Y%m%d'):
          return (date.today() - timedelta(days)).strftime(timeFormat)
      
      def get_now_str():
          return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
      
      def get_now_unix():
          return  time.mktime(datetime.now().timetuple())
      
      
      '''
      四种转换:
      timeStr <-> timeUnix互转
      datetime <-> timeStr互转
      '''
      def timeStr_to_timeUnix(timeStr="2020-12-26 16:10:10", timeFormat='%Y-%m-%d %H:%M:%S'):
          return  time.mktime(time.strptime(timeStr, timeFormat))
      
      def timeUnix_to_timeStr(timeUnix=1608969619, timeFormat='%Y-%m-%d %H:%M:%S'):
          return time.strftime(timeFormat, time.localtime(timeUnix))
      
      def dateTime_to_timeStr(dt, timeFormat='%Y-%m-%d %H:%M:%S'):
          return dt.strptime(timeFormat)
      
      def timeStr_to_dateTime(timeStr="2020-12-26 16:10:10"):
          return datetime.strptime(timeStr, "%Y-%m-%d %H:%M:%S")
      
      
      if __name__ == '__main__':
          print(get_yesterday())
          print(get_diff_days("20190801", "20190725", "%Y%m%d"))
          print(get_timestamp("2019-08-12 16:45:20", timeFormat="%Y-%m-%d %H:%M:%S"))
          print(get_timestamp("2019-08-12 16:45:21", timeFormat="%Y-%m-%d %H:%M:%S"))
          today = get_today(timeFormat="%Y-%m-%d %H:%M:%S")
          print(today)
          print(get_diff_days(today, "2019-09-01 10:04:14", timeFormat="%Y-%m-%d %H:%M:%S"))
          print(get_today_before(0))
          print(get_today_before(1))
          print(get_today_before(-1, timeFormat="%Y-%m-%d %H:%M:%S"))
          now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
          print(now)
          print(get_minute_before(now, 5, timeFormat="%Y-%m-%d %H:%M:%S"))

       

python——redis、mysql

  • 常用1:读写mysql
    • 读取mysql:准备conn、cursor、sql;执行;关闭
    • 写入mysql:准备conn、cursor、sql;执行;conn.commit()提交;关闭
    • 调用:
      • '''
        #xxx_conf.json文件:
        {
            "xxx_db": {
              "host": "xxx",
              "user": "xxx",
              "psd": "xxx",
              "db":"xxxx"
            },
             "yyy_db": {
              "host": "xxx",
              "user": "xxx",
              "psd": "xx",
              "db":"xx"
            }
        }
        '''
        '''
        #utils/conf_parser.py文件:
        
        # coding=utf8
        
        import json
        
        
        def parse_conf(f_path):
            return Config(config_file=f_path)
        
        
        class Config(object):
            """Config load from json file
            """
        
            def __init__(self, config=None, config_file=None):
                if config_file:
                    with open(config_file, 'r') as fin:
                        config = json.load(fin)
        
                self.dict = config
                if config:
                    self._update(config)
        
            def __getitem__(self, key):
                return self.dict[key]
        
            def __contains__(self, item):
                return item in self.dict
        
            def items(self):
                return self.dict.items()
        
            def add(self, key, value):
                """Add key value pair
                """
                self.__dict__[key] = value
        
            def _update(self, config):
                if not isinstance(config, dict):
                    return
        
                for key in config:
                    if isinstance(config[key], dict):
                        config[key] = Config(config[key])
        
                    if isinstance(config[key], list):
                        config[key] = [Config(x) if isinstance(x, dict) else x for x in
                                       config[key]]
        
                self.__dict__.update(config)
        '''
        
        from utils import conf_parser
        conf = conf_parser.parse_conf("conf/xxx_conf.json")
        
        def selectDataFromDB(beginDate, endDate):
            sql = f"select distinct xx from pic where xx between '{beginDate}' and '{endDate}'"
            df  = mysql_data_getter.BaseDataGetter.get_from_conf(conf.xxx_db, sql)
            return df
        
        def insertData2mysql(xxxx):
            if len(xxx)==0:return
            insertSql = "xxx"
            mysql_data_getter.BaseDataGetter.insert_from_conf(conf.yyy_db, insertSql)

         

    • 代码:mysql_data_getter.py
      • # coding=utf8
        
        import os
        import pandas as pd
        import pymysql.cursors
        import re
        
        class BaseDataGetter(object):
            """
            基础数据拉取类
            """
            @classmethod
            def get_db_table_list(cls, conn):
                """
                功能:获取db下所有表,表存在,才读取sql
                """
                con = conn.cursor()
                sql = "show tables;"
                con.execute(sql)
                tables = [con.fetchall()]
                table_list = re.findall('(\'.*?\')',str(tables))
                table_list = [re.sub("'",'',each) for each in table_list]
                con.close()
                return table_list
        
            @classmethod
            def get_table_name(cls, sql):
                if 'where' in sql.lower():
                    tmp = sql.lower().split('where')[0].split('from')
                    if len(tmp)==2:
                        return tmp[1].strip()
                    else:
                        print(tmp)
                        raise "sql error1"
                elif 'group' in sql.lower():
                    tmp = sql.lower().split('group by')[0].split('from')
                    if len(tmp)==2:
                        return tmp[1].strip()
                    else:
                        print(tmp)
                        raise "sql error2"
                else:
                    tmp = sql.lower().split('from')
                    if len(tmp)==2:
                        return tmp[1].strip()
                    else:
                        print(tmp)
                        raise "sql error3"
        
            @classmethod
            def _dump_sql_data(cls, sqls, conn):
                """
                功能:将sql查询结构保存到本地文件
                :param sqls:  {file_name:sql} , sql 是要执行的sql查询, file_name 是待查询的语句保存到文件
                :param conn:  具体的数据库连接
                :return:
                """
                try:
                    table_list = cls.get_db_table_list(conn)
        
                    for file_name, sql in sqls.items():
                        print(sql)
                        table_name = cls.get_table_name(sql)
                        if 'join' not in table_name and table_name not in table_list:
                            print("table_name not in table_list:", table_name)
                            print("table_list size:", len(table_list))
                            continue
        
                        df = pd.read_sql(sql, conn)
                        if os.path.exists(file_name):
                            df.to_csv(file_name, mode="a", header=False)
                        else:
                            df.to_csv(file_name)
                finally:
                    conn.close()
        
            @classmethod
            def _init_connect(cls, host, port, uname, pwd, db):
                """
                用来构建mysql connect 连接对象
                :param host:
                :param port: 端口号, 必须是int
                :param uname:
                :param pwd:
                :param db:
                :return: connect 对象
                """
                if isinstance(port, str):
                    port = int(port)
                conn = pymysql.connect(host=host, port=port, user=uname,
                                       password=pwd, db=db, charset="utf8",
                                       cursorclass=pymysql.cursors.DictCursor)
                return conn
        
            @classmethod
            def download(cls, host, port, uname, pwd, db, sqls):
                conn = cls._init_connect(host=host, port=int(port), uname=uname, pwd=pwd, db=db)
                cls._dump_sql_data(sqls, conn)
        
            @classmethod
            def download_from_conf(cls, conf, sqls):
                port = getattr(conf, "port", 3306)
                conn = cls._init_connect(host=conf.host, port=int(port),
                                         uname=conf.user, pwd=conf.psd, db=conf.db)
                cls._dump_sql_data(sqls, conn)
        
            @classmethod
            def get_from_conf(cls, conf, sql):
                port = getattr(conf, "port", 3306)
                conn = cls._init_connect(host=conf.host, port=int(port),
                                         uname=conf.user, pwd=conf.psd, db=conf.db)
                return cls.get_sql_data(sql, conn)
        
            @classmethod
            def get_sql_data(cls, sql, conn):
                try:
                    table_list = cls.get_db_table_list(conn)
                    table_name = cls.get_table_name(sql)
                    print(sql)
                    if 'join' not in table_name and table_name not in table_list:
                        print("table_name not in table_list:", table_name)
                        print("table_list size:", len(table_list))
                        return
                    df = pd.read_sql(sql, conn)
                    return df
        
                finally:
                    conn.close()
        
            @classmethod
            def insert_from_conf(cls, conf, sql):
                port = getattr(conf, "port", 3306)
                conn = cls._init_connect(host=conf.host, port=int(port),
                                         uname=conf.user, pwd=conf.psd, db=conf.db)
                cls.insert_sql_data(sql, conn)
        
            @classmethod
            def insert_sql_data(cls, sql, conn):
                try:
                    con = conn.cursor()
                    con.execute(sql)
                    print('insert is ok...',)
                    conn.commit()
                finally:
                    con.close()
                    conn.close()

         

  • 常用2:读写redis
    • 调用:
      • data_saver   = RedisSaver(conf.redis_info)
      • redis_client = data_saver.redis
      • 写入:data_saver.zadd_list_batchly_selfExpTime([[key1, {item1:score1, item2:score2}], [key2, {xxx}]], 7*6*3600))
      • 读取:redis_client.zrange(rt_key, 0, max(0, rt_end-1), withscores=True)
      • 注意:无论啥样的数据,写入redis,都要设置过期时间,1天、7天、1个月等,切记不可永久,不可只增不减
    • 代码:redis_saver.py
      • # coding=utf8
        from redis import StrictRedis
        
        class RedisSaver(object):
            def __init__(self, conf):
                host = conf.redis_pure_info.host
                port = conf.redis_pure_info.port
                db   = conf.redis_pure_info.db
                pwd  = conf.redis_pure_info.pwd
                self.redis = StrictRedis(host=host, port=port, db=db, password=pwd)
                self.pipe = self.redis.pipeline()
                self.exp_time = 7 * 24 * 3600
        
            def add_sorted_set(self, name, item_score_dict):
                self.redis.delete(name)
                self.redis.zadd(name, item_score_dict)
        
            def add_list(self, name, item_list):
                # self.redis.delete(name)
                self.redis.lpush(name, *item_list)
        
            def sadd_list(self, name, item_list):
                self.redis.delete(name)
                self.redis.sadd(name, *item_list)
        
            def sadd_list_batchly(self, name_items_pairs):
                for name, item_list in name_items_pairs:
                    self.pipe.delete(name)
                    self.pipe.sadd(name, *item_list)
                    self.pipe.expire(name, self.exp_time)
                self.pipe.execute()
        
            def zadd_list_batchly_selfExpTime(self, name_item_score_dict, exp_time=1*6*3600):
                for name, item_scores in name_item_score_dict:
                    self.pipe.delete(name)
                    self.pipe.zadd(name, item_scores)
                    self.pipe.expire(name, exp_time)
                self.pipe.execute()

         

python——pandas、numpy、scipy

python——re

python——os、sys、shell

python——emoji

  • 常用:过滤emoji,总是出现各种emoji不能被完全过滤。
  • 方法:
    • 使用正则过滤、使用emoji包解析emoji符号配合正则过滤掉
  • 代码:
    • import re
      import emoji
      
      # 方法1:并不能完全过滤,只能过滤部分emoji
      def filterEmoji(desstr,restr=' '):
          # 过滤emoji
          try:
              co = re.compile(u'[\U00010000-\U0010ffff]')
          except re.error:
              co = re.compile(u'[\uD800-\uDBFF][\uDC00-\uDFFF]')
          return co.sub(restr, desstr)
      
      # 方法2:使用eomji包,将🤓解析为“:nerd_face:”,使用正则过滤掉。
      #(需要注意文本中是否包含了":xxx:",xxx也会被过滤掉)
      def filterEmojiByEmojiAndRe(s, restr=" "):
          re.sub('(:.+?:)', restr, emoji.demojize(s))  
      
      '''
      🤓符号:
      print(text.encode('unicode-escape').decode('ASCII')) 
      output: \U0001f188\ue513\ue220\ue21c
      print('\ud83e\udd13') #出错
      '''
      s = '🤓无限大地'
      print(filterEmoji(s))
      
      print(filterEmojiByEmojiAndRe(s))

       

python——gensim、nltk、spacy

  • nltk常用:对英文进行分句(平时做机器翻译时,需要对段落处理,拆分为若干句子)
  • nltk数据包:
    • nltk.download()太慢。
    • 百度云盘直接下载:https://pan.baidu.com/s/17ZgkoQeMosWwHNlUvXvTdw 密码:lxmh
    • 解压放到/Users/shifeng/nltk_data。即可
  • 代码:
    • #使用nltk对英文段落分句子:
      import nltk
      tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')
      tokenizer.tokenize('fight among communists and anarchists (i.e. at a series of events named May Mr. Days).’)   #即可
      
      #对句子分词
      import nltk
      nltk.word_tokenize("And now for something completely different.")

       

python——pytorch、tensorflow

python——flask、apscheduler

  • 常用1:需要将任务打包成服务,使用flask封装很方便,暴露一个接口即可。
  • 常用2:对任务进行定时调度。在flask框架中,每日固定某个点更新模型。
  • 策略:分两种BackgroundScheduler、BlockingScheduler
  • flask代码:
    • request里的POST、GET请求问题
      •  
    • client调用的问题
  • apscheduler代码:
    • # flask里面进行定时任务例子:
      from apscheduler.schedulers.background import BackgroundScheduler
      from apscheduler.schedulers.blocking import BlockingScheduler
      
      from flask import Flask, jsonify, request
      app=Flask(__name__)
      print('start app is ok...')
      
      @app.route('/')
      def hello_world():
          return "hello word"
      
      def flashModelAndData():
          print('xxx')
      
      def saveUserDateHadRec():
          print('UUU')
      
      sched = BackgroundScheduler(daemon=True)
      sched.add_job(flashModelAndData, 'cron', hour=2, minute=1)
      sched.add_job(saveUserDateHadRec, 'interval', minutes=30)
      sched.start()
      
      if __name__ == '__main__':
          app.run(config.HOST, config.PORT)
      
      # 阻塞定时任务调用例子:
      from datetime import datetime
      import os
      from apscheduler.schedulers.blocking import BlockingScheduler
       
      def tick():
          print('Tick! The time is: %s' % datetime.now())
       
      if __name__ == '__main__':
          scheduler = BlockingScheduler()
          scheduler.add_job(tick, 'cron', hour=9, minute=1)
          try:
              scheduler.start()
          except SystemExit:
              pass

       

python——warnings(等技巧相关)

  • 代码:
    • # 代码里面
      import warnings
      warnings.filterwarnings("ignore”)
      
      # ipython -W ignore yourscript.py #启动ipython时

       

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐