python——工作常用包合集
preface:工作中使用python进行处理常会遇到各种问题及技巧,为此汇总。python功能太多,记住常用即可。环境:python3.7及以上、macpython——pip、conda、ipython常用1:创建虚拟环境(从一个爸爸创造多个儿子,各个儿子之间环境不影响,同一个服务器上自己的python环境不被其他人干扰,自己的多个人可采用不同版本tensorflow包等等)which cond
·
preface:工作中使用python进行处理常会遇到各种问题及技巧,为此汇总。python功能太多,记住常用即可。
环境:python3.7及以上、mac
python——pip、conda、ipython
- 常用1:创建虚拟环境(从一个爸爸创造多个儿子,各个儿子之间环境不影响,同一个服务器上自己的python环境不被其他人干扰,自己的多个人可采用不同版本tensorflow包等等)
-
which conda:查看当下是哪个conda conda env list:看看有哪些虚拟环境 conda create -n sftf python==3.7.6:创建虚拟环境 conda activate sftf:启动虚拟环境 conda deactivate:关闭虚拟环境
-
- 常用2:pip安装包
- pip install -r requirements.txt:批量安装某些包,包名每行一个,放在requirements.txt文件里面
- pip install xxxxx -i https://pypi.tuna.tsinghua.edu.cn/simple:采用清华源来安装。其他镜像:
- 阿里云:http://mirrors.aliyun.com/pypi/simple
中国科技大学:https://pypi.mirrors.ustc.edu.cn/simple
豆瓣:http://pypi.douban.com/simple
- 阿里云:http://mirrors.aliyun.com/pypi/simple
- 实在不行,先翻墙下载,再塞到site-packages文件下(/Users/shifeng/anaconda3/lib/python3.7/site-packages)
- 注意:mac、linux不同环境下的site-packages可能不能通用
- 常用3:ipython交互式调试各种问题
python——datetime、time
- 常用1:取当前时间;时间、string、datetime三者转换;距今多少日等
- 代码
-
# coding=utf-8 # from utils import time_utils.py #该文件命名time_utils.py,放到utils文件夹下,调用即可 from datetime import date, timedelta, datetime import time def get_today(timeFormat='%Y%m%d'): return(date.today()).strftime(timeFormat) def get_yesterday(timeFormat='%Y%m%d'): return (date.today() - timedelta(1)).strftime(timeFormat) def get_current(timeFormat='%Y%m%d'): local_time = time.localtime() timeString = time.strftime(timeFormat, local_time) return timeString def get_date(diff_days=0, diff_hours=0, day_format='%Y%m%d', current_date=None): date={} if current_date is None: timestamp = time.time() - (diff_days * 24 + diff_hours) * 3600 date['day'], date['hour'] = time.strftime(day_format + ' %H:%M', time.localtime(timestamp)).split() return date day = current_date.get('day', None) hour = current_date.get('hour', None) if hour is None: timestamp = time.mktime(time.strptime(day, day_format)) - diff_days * 24 * 3600 date['day'] = time.strftime(day_format, time.localtime(timestamp)) return date else: timestamp = time.mktime(time.strptime('%s %s' % (day, hour), day_format + ' %H:%M')) - \ (diff_days * 24 + diff_hours) * 3600 date['day'], date['hour'] = time.strftime(day_format + ' %H:%M', time.localtime(timestamp)).split() return date def get_last_half_hour(timeFormat='%H-%M-%S'): localtime = time.localtime() minute = localtime.tm_min if minute > 30: minute = 30 else: minute = 0 last_hour_time = str(localtime.tm_year) + convert_digit_to_str(localtime.tm_mon) + convert_digit_to_str(localtime.tm_mday) + ' '\ + convert_digit_to_str(localtime.tm_hour) + '-' + convert_digit_to_str(minute) + '-' + '00' timestamp = time.mktime(time.strptime(last_hour_time, '%Y%m%d %H-%M-%S')) return time.strftime(timeFormat, time.localtime(timestamp)) def convert_digit_to_str(digit): if digit < 10: return str(0) + str(digit) return str(digit) def get_before_last_half_hour(timeFormat='%H-%M-%S'): localtime = time.localtime() minute = localtime.tm_min if minute > 30: minute = 30 else: minute = 0 last_hour_time = str(localtime.tm_year) + convert_digit_to_str(localtime.tm_mon) + convert_digit_to_str(localtime.tm_mday) + ' '\ + convert_digit_to_str(localtime.tm_hour) + '-' + convert_digit_to_str(minute) + '-' + '00' timestamp = time.mktime(time.strptime(last_hour_time, '%Y%m%d %H-%M-%S')) - (30 * 60) return time.strftime(timeFormat, time.localtime(timestamp)) def get_week_before(day_time, timeFormat='%Y%m%d'): timestamp = time.mktime(time.strptime(day_time, timeFormat)) timestamp -= 7*24*3600 return time.strftime(timeFormat, time.localtime(timestamp)) def get_day_before(day_time, num_day, timeFormat='%Y%m%d'): timestamp = time.mktime(time.strptime(day_time, timeFormat)) timestamp -= num_day*24*3600 return time.strftime(timeFormat, time.localtime(timestamp)) def get_minute_before(timeStr, minute, timeFormat='%H-%M'): timestamp = time.mktime(time.strptime(timeStr, timeFormat)) timestamp -= minute * 60 return time.strftime(timeFormat, time.localtime(timestamp)) def convert_time(timeStr, input_format, out_format): try: timestamp = time.mktime(time.strptime(timeStr, input_format)) except OverflowError: print("timeStr is: ", timeStr, "out of range") return None return time.strftime(out_format, time.localtime(timestamp)) def get_diff_days(timeStr1, timeStr2, timeFormat="%Y%m%d"): time1 = datetime.strptime(timeStr1, timeFormat).replace(hour=0, minute=0, second=0, microsecond=0) time2 = datetime.strptime(timeStr2, timeFormat).replace(hour=0, minute=0, second=0, microsecond=0) return (time1 - time2).days def get_today_before(days, timeFormat='%Y%m%d'): return (date.today() - timedelta(days)).strftime(timeFormat) def get_now_str(): return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def get_now_unix(): return time.mktime(datetime.now().timetuple()) ''' 四种转换: timeStr <-> timeUnix互转 datetime <-> timeStr互转 ''' def timeStr_to_timeUnix(timeStr="2020-12-26 16:10:10", timeFormat='%Y-%m-%d %H:%M:%S'): return time.mktime(time.strptime(timeStr, timeFormat)) def timeUnix_to_timeStr(timeUnix=1608969619, timeFormat='%Y-%m-%d %H:%M:%S'): return time.strftime(timeFormat, time.localtime(timeUnix)) def dateTime_to_timeStr(dt, timeFormat='%Y-%m-%d %H:%M:%S'): return dt.strptime(timeFormat) def timeStr_to_dateTime(timeStr="2020-12-26 16:10:10"): return datetime.strptime(timeStr, "%Y-%m-%d %H:%M:%S") if __name__ == '__main__': print(get_yesterday()) print(get_diff_days("20190801", "20190725", "%Y%m%d")) print(get_timestamp("2019-08-12 16:45:20", timeFormat="%Y-%m-%d %H:%M:%S")) print(get_timestamp("2019-08-12 16:45:21", timeFormat="%Y-%m-%d %H:%M:%S")) today = get_today(timeFormat="%Y-%m-%d %H:%M:%S") print(today) print(get_diff_days(today, "2019-09-01 10:04:14", timeFormat="%Y-%m-%d %H:%M:%S")) print(get_today_before(0)) print(get_today_before(1)) print(get_today_before(-1, timeFormat="%Y-%m-%d %H:%M:%S")) now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(now) print(get_minute_before(now, 5, timeFormat="%Y-%m-%d %H:%M:%S"))
-
python——redis、mysql
- 常用1:读写mysql
- 读取mysql:准备conn、cursor、sql;执行;关闭
- 写入mysql:准备conn、cursor、sql;执行;conn.commit()提交;关闭
- 调用:
-
''' #xxx_conf.json文件: { "xxx_db": { "host": "xxx", "user": "xxx", "psd": "xxx", "db":"xxxx" }, "yyy_db": { "host": "xxx", "user": "xxx", "psd": "xx", "db":"xx" } } ''' ''' #utils/conf_parser.py文件: # coding=utf8 import json def parse_conf(f_path): return Config(config_file=f_path) class Config(object): """Config load from json file """ def __init__(self, config=None, config_file=None): if config_file: with open(config_file, 'r') as fin: config = json.load(fin) self.dict = config if config: self._update(config) def __getitem__(self, key): return self.dict[key] def __contains__(self, item): return item in self.dict def items(self): return self.dict.items() def add(self, key, value): """Add key value pair """ self.__dict__[key] = value def _update(self, config): if not isinstance(config, dict): return for key in config: if isinstance(config[key], dict): config[key] = Config(config[key]) if isinstance(config[key], list): config[key] = [Config(x) if isinstance(x, dict) else x for x in config[key]] self.__dict__.update(config) ''' from utils import conf_parser conf = conf_parser.parse_conf("conf/xxx_conf.json") def selectDataFromDB(beginDate, endDate): sql = f"select distinct xx from pic where xx between '{beginDate}' and '{endDate}'" df = mysql_data_getter.BaseDataGetter.get_from_conf(conf.xxx_db, sql) return df def insertData2mysql(xxxx): if len(xxx)==0:return insertSql = "xxx" mysql_data_getter.BaseDataGetter.insert_from_conf(conf.yyy_db, insertSql)
-
- 代码:mysql_data_getter.py
-
# coding=utf8 import os import pandas as pd import pymysql.cursors import re class BaseDataGetter(object): """ 基础数据拉取类 """ @classmethod def get_db_table_list(cls, conn): """ 功能:获取db下所有表,表存在,才读取sql """ con = conn.cursor() sql = "show tables;" con.execute(sql) tables = [con.fetchall()] table_list = re.findall('(\'.*?\')',str(tables)) table_list = [re.sub("'",'',each) for each in table_list] con.close() return table_list @classmethod def get_table_name(cls, sql): if 'where' in sql.lower(): tmp = sql.lower().split('where')[0].split('from') if len(tmp)==2: return tmp[1].strip() else: print(tmp) raise "sql error1" elif 'group' in sql.lower(): tmp = sql.lower().split('group by')[0].split('from') if len(tmp)==2: return tmp[1].strip() else: print(tmp) raise "sql error2" else: tmp = sql.lower().split('from') if len(tmp)==2: return tmp[1].strip() else: print(tmp) raise "sql error3" @classmethod def _dump_sql_data(cls, sqls, conn): """ 功能:将sql查询结构保存到本地文件 :param sqls: {file_name:sql} , sql 是要执行的sql查询, file_name 是待查询的语句保存到文件 :param conn: 具体的数据库连接 :return: """ try: table_list = cls.get_db_table_list(conn) for file_name, sql in sqls.items(): print(sql) table_name = cls.get_table_name(sql) if 'join' not in table_name and table_name not in table_list: print("table_name not in table_list:", table_name) print("table_list size:", len(table_list)) continue df = pd.read_sql(sql, conn) if os.path.exists(file_name): df.to_csv(file_name, mode="a", header=False) else: df.to_csv(file_name) finally: conn.close() @classmethod def _init_connect(cls, host, port, uname, pwd, db): """ 用来构建mysql connect 连接对象 :param host: :param port: 端口号, 必须是int :param uname: :param pwd: :param db: :return: connect 对象 """ if isinstance(port, str): port = int(port) conn = pymysql.connect(host=host, port=port, user=uname, password=pwd, db=db, charset="utf8", cursorclass=pymysql.cursors.DictCursor) return conn @classmethod def download(cls, host, port, uname, pwd, db, sqls): conn = cls._init_connect(host=host, port=int(port), uname=uname, pwd=pwd, db=db) cls._dump_sql_data(sqls, conn) @classmethod def download_from_conf(cls, conf, sqls): port = getattr(conf, "port", 3306) conn = cls._init_connect(host=conf.host, port=int(port), uname=conf.user, pwd=conf.psd, db=conf.db) cls._dump_sql_data(sqls, conn) @classmethod def get_from_conf(cls, conf, sql): port = getattr(conf, "port", 3306) conn = cls._init_connect(host=conf.host, port=int(port), uname=conf.user, pwd=conf.psd, db=conf.db) return cls.get_sql_data(sql, conn) @classmethod def get_sql_data(cls, sql, conn): try: table_list = cls.get_db_table_list(conn) table_name = cls.get_table_name(sql) print(sql) if 'join' not in table_name and table_name not in table_list: print("table_name not in table_list:", table_name) print("table_list size:", len(table_list)) return df = pd.read_sql(sql, conn) return df finally: conn.close() @classmethod def insert_from_conf(cls, conf, sql): port = getattr(conf, "port", 3306) conn = cls._init_connect(host=conf.host, port=int(port), uname=conf.user, pwd=conf.psd, db=conf.db) cls.insert_sql_data(sql, conn) @classmethod def insert_sql_data(cls, sql, conn): try: con = conn.cursor() con.execute(sql) print('insert is ok...',) conn.commit() finally: con.close() conn.close()
-
- 常用2:读写redis
- 调用:
- data_saver = RedisSaver(conf.redis_info)
- redis_client = data_saver.redis
- 写入:data_saver.zadd_list_batchly_selfExpTime([[key1, {item1:score1, item2:score2}], [key2, {xxx}]], 7*6*3600))
- 读取:redis_client.zrange(rt_key, 0, max(0, rt_end-1), withscores=True)
- 注意:无论啥样的数据,写入redis,都要设置过期时间,1天、7天、1个月等,切记不可永久,不可只增不减
- 代码:redis_saver.py
-
# coding=utf8 from redis import StrictRedis class RedisSaver(object): def __init__(self, conf): host = conf.redis_pure_info.host port = conf.redis_pure_info.port db = conf.redis_pure_info.db pwd = conf.redis_pure_info.pwd self.redis = StrictRedis(host=host, port=port, db=db, password=pwd) self.pipe = self.redis.pipeline() self.exp_time = 7 * 24 * 3600 def add_sorted_set(self, name, item_score_dict): self.redis.delete(name) self.redis.zadd(name, item_score_dict) def add_list(self, name, item_list): # self.redis.delete(name) self.redis.lpush(name, *item_list) def sadd_list(self, name, item_list): self.redis.delete(name) self.redis.sadd(name, *item_list) def sadd_list_batchly(self, name_items_pairs): for name, item_list in name_items_pairs: self.pipe.delete(name) self.pipe.sadd(name, *item_list) self.pipe.expire(name, self.exp_time) self.pipe.execute() def zadd_list_batchly_selfExpTime(self, name_item_score_dict, exp_time=1*6*3600): for name, item_scores in name_item_score_dict: self.pipe.delete(name) self.pipe.zadd(name, item_scores) self.pipe.expire(name, exp_time) self.pipe.execute()
-
- 调用:
python——pandas、numpy、scipy
python——re
python——os、sys、shell
python——emoji
- 常用:过滤emoji,总是出现各种emoji不能被完全过滤。
- 方法:
- 使用正则过滤、使用emoji包解析emoji符号配合正则过滤掉
- 代码:
-
import re import emoji # 方法1:并不能完全过滤,只能过滤部分emoji def filterEmoji(desstr,restr=' '): # 过滤emoji try: co = re.compile(u'[\U00010000-\U0010ffff]') except re.error: co = re.compile(u'[\uD800-\uDBFF][\uDC00-\uDFFF]') return co.sub(restr, desstr) # 方法2:使用eomji包,将🤓解析为“:nerd_face:”,使用正则过滤掉。 #(需要注意文本中是否包含了":xxx:",xxx也会被过滤掉) def filterEmojiByEmojiAndRe(s, restr=" "): re.sub('(:.+?:)', restr, emoji.demojize(s)) ''' 🤓符号: print(text.encode('unicode-escape').decode('ASCII')) output: \U0001f188\ue513\ue220\ue21c print('\ud83e\udd13') #出错 ''' s = '🤓无限大地' print(filterEmoji(s)) print(filterEmojiByEmojiAndRe(s))
-
python——gensim、nltk、spacy
- nltk常用:对英文进行分句(平时做机器翻译时,需要对段落处理,拆分为若干句子)
- nltk数据包:
- nltk.download()太慢。
- 百度云盘直接下载:https://pan.baidu.com/s/17ZgkoQeMosWwHNlUvXvTdw 密码:lxmh
- 解压放到/Users/shifeng/nltk_data。即可
- 代码:
-
#使用nltk对英文段落分句子: import nltk tokenizer = nltk.data.load('tokenizers/punkt/english.pickle') tokenizer.tokenize('fight among communists and anarchists (i.e. at a series of events named May Mr. Days).’) #即可 #对句子分词 import nltk nltk.word_tokenize("And now for something completely different.")
-
python——pytorch、tensorflow
python——flask、apscheduler
- 常用1:需要将任务打包成服务,使用flask封装很方便,暴露一个接口即可。
- 常用2:对任务进行定时调度。在flask框架中,每日固定某个点更新模型。
- 策略:分两种BackgroundScheduler、BlockingScheduler
- 前者不阻塞,在主程序(如flask服务)中,额外再开一个定时任务,定时任务结束,不影响主程序。
- 后者阻塞,需要为主程序调用,定时任务结束,主程序结束。
- 参考:https://zhuanlan.zhihu.com/p/74046287、:https://blog.csdn.net/ybdesire/article/details/82228840
- flask代码:
- request里的POST、GET请求问题
- client调用的问题
- request里的POST、GET请求问题
- apscheduler代码:
-
# flask里面进行定时任务例子: from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.blocking import BlockingScheduler from flask import Flask, jsonify, request app=Flask(__name__) print('start app is ok...') @app.route('/') def hello_world(): return "hello word" def flashModelAndData(): print('xxx') def saveUserDateHadRec(): print('UUU') sched = BackgroundScheduler(daemon=True) sched.add_job(flashModelAndData, 'cron', hour=2, minute=1) sched.add_job(saveUserDateHadRec, 'interval', minutes=30) sched.start() if __name__ == '__main__': app.run(config.HOST, config.PORT) # 阻塞定时任务调用例子: from datetime import datetime import os from apscheduler.schedulers.blocking import BlockingScheduler def tick(): print('Tick! The time is: %s' % datetime.now()) if __name__ == '__main__': scheduler = BlockingScheduler() scheduler.add_job(tick, 'cron', hour=9, minute=1) try: scheduler.start() except SystemExit: pass
-
python——warnings(等技巧相关)
- 代码:
-
# 代码里面 import warnings warnings.filterwarnings("ignore”) # ipython -W ignore yourscript.py #启动ipython时
-
更多推荐



所有评论(0)