Python知识点
python0.python学习网站# python_api查看官网https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.terminate# django官网https://docs.djangoproject.com/en/1.8/https://docs.pythontab.com/d
·
python
0.python学习网站
# python_api查看官网
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.terminate
# django官网
https://docs.djangoproject.com/en/1.8/
https://docs.pythontab.com/django/django1.5/
# flask中文手册
http://docs.pythontab.com/flask/flask0.10/
# python 例子
https://github.com/xianhu/LearnPython#myshow-%E7%8E%A9%E7%82%B9%E5%A5%BD%E7%8E%A9%E7%9A%84--%E7%9F%A5%E4%B9%8E%E5%85%A8%E9%83%A8%E8%AF%9D%E9%A2%98%E5%85%B3%E7%B3%BB%E5%8F%AF%E8%A7%86%E5%8C%96
# 程序员单词
http://htmlpreview.github.io/?https://raw.githubusercontent.com/Georgewgf/reciteWords/master/index.html
# 机器人的学习
http://www.shareditor.com
0.1.python学习视频
https://pan.baidu.com/s/1HbLSvIl8v59_1aHAhuDqGA 密码:pzbj
1.zip()、map()、filter()
# zip()
list1 = ["name", "age", "addr"]
list2 = [4, 5, 6]
print(dict(zip(list1, list2))) # zip(list1,list2) 首先会把这两个里的元素一一对应起来
# 结果: {'name': 4, 'age': 5, 'addr': 6}
list(zip(list1,list2))
# 结果: [('name', 4), ('age', 5), ('addr', 6)]
# map()
def add200(x):
return x + 200
print(list(map(add200, [1, 2, 3]))) # [201,202,203]
def add100(x, y, z):
return 100 + x + y + z
list1 = [11, 22, 33]
list2 = [44, 55, 66]
list3 = [77, 88, 99]
list(map(add100, list1, list2, list3))
# 结果:[232, 265, 298]
# 首先map(),会把list1,list2,list3,里的元素一一对应起来,[(11,44,77),(22,55,88),(33,66,99)]
# filter()
def fun(a):
print(a)
if isinstance(a, int):
return a
print(list(filter(fun, ["23", "45", "av", 1, 2])))
# 结果:[1, 2]
# 把列表里的数据依次传递非函数fun(a),进行处理,然后把结果统一返回
2.sorted的用法
# sorted
d = {'a': 24, 'g': 52, 'i': 12, 'k': 33}
d1 = sorted(d.items(), key=lambda x: x[1]) # x表示的是d.items()里的单个元素,x[1]表示按这个排序
# sort
alist = [{'name': 'a', 'age': 20}, {'name': 'b', 'age': 30}, {'name': 'c', 'age': 25}]
alist.sort(key=lambda x: x["age"]) # x表示列表里单个字典元素,x["age"]表示按年龄来排序
3.匿名函数
lambda 参数1,参数2: 返回值 (实参)例如
a = (lambda x, y: x + y)(1, 2) # a = 3
4.set的用法
list1 = [1, 2, 3]
list2 = [3, 4, 5]
set1 = set(list1)
set2 = set(list2)
print(set1 & set2) # 得到交集 {3}
print(set1 ^ set2) # 得到除交集以外的数据 {1, 2, 4, 5}
print(set1 | set2) # 得到它们的并集 {1, 2, 3, 4, 5}
5.reduce
# python列表里的字典元素去重
data_list = [{"a": "123", "b": "321"}, {"a": "123", "b": "321"}, {"b": "321", "a": "123"}]
run_function = lambda x, y: x if y in x else x + [y]
return reduce(run_function, [[], ] + data_list)
# reduce的用法
def f(x, y):
return x + y
调用 reduce(f, [1, 3, 5, 7, 9])时,reduce函数将做如下计算:
先计算头两个元素:f(1, 3),结果为4;
再把结果和第3个元素计算:f(4, 5),结果为9;
再把结果和第4个元素计算:f(9, 7),结果为16;
再把结果和第5个元素计算:f(16, 9),结果为25;
由于没有更多的元素了,计算结束,返回结果25。
# python的列表里字典元素去重
def list_dict_duplicate_removal():
data_list = [{"a": "123", "b": "321"}, {"a": "123", "b": "321"}, {"b": "321", "a": "123"}]
run_function = lambda x, y: x if y in x else x + [y]
return reduce(run_function, [[], ] + data_list)
6.str and datetime
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Datetime : 2017/11/23 下午12:37
# @Author : Alfred Xue
# @E-Mail : Alfred.Hsueh@gmail.com
# @GitHub : https://github.com/Alfred-Xue
# @Blog : http://www.cnblogs.com/alfred0311/
import datetime
import time
# 日期时间字符串
st = "2017-11-23 16:10:10"
# 当前日期时间
dt = datetime.datetime.now()
# 当前时间戳
sp = time.time()
# 1.把datetime转成字符串
def datetime_toString(dt):
print("1.把datetime转成字符串: ", dt.strftime("%Y-%m-%d %H:%M:%S"))
# 2.把字符串转成datetime
def string_toDatetime(st):
print("2.把字符串转成datetime: ", datetime.datetime.strptime(st, "%Y-%m-%d %H:%M:%S"))
# 3.把字符串转成时间戳形式
def string_toTimestamp(st):
print("3.把字符串转成时间戳形式:", time.mktime(time.strptime(st, "%Y-%m-%d %H:%M:%S")))
# 4.把时间戳转成字符串形式
def timestamp_toString(sp):
print("4.把时间戳转成字符串形式: ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(sp)))
# 5.把datetime类型转外时间戳形式
def datetime_toTimestamp(dt):
print("5.把datetime类型转外时间戳形式:", time.mktime(dt.timetuple()))
# 1.把datetime转成字符串
datetime_toString(dt)
# 2.把字符串转成datetime
string_toDatetime(st)
# 3.把字符串转成时间戳形式
string_toTimestamp(st)
# 4.把时间戳转成字符串形式
timestamp_toString(sp)
# 5.把datetime类型转外时间戳形式
datetime_toTimestamp(dt)
# datetime的加减
import datetime
# 减一天
print (datetime.datetime.now()+datetime.timedelta(days=-1)).strftime("%Y-%m-%d %H:%M:%S")
# 2018-05-07 16:56:59
可以把days改为hours minutes,seconds就可以提前XX小时/分钟了。
import datetime
import time
# 把datetime转换成字符串
def datetime_to_str(dt:datetime):
return dt.strftime("%Y-%m-%d %H:%M")
# 把字符串转换成datetime
def str_to_datetime(st:str): # 字符串时间格式为:2018-10-31 08:45
return datetime.datetime.strptime(st, "%Y-%m-%d %H:%M")
# 把时间戳转换成常规时间
def time_to_normal(ti):
ti = time.localtime(ti:int) # ti时间为时间戳的整数位为:10位
return time.strftime("%Y-%m-%d %H:%M:%S", ti)
# 把常规时间转换成时间戳
def normal_to_time(st:str): # 字符串时间格式为:2018-10-31 08:45:21
return time.mktime(time.strptime(st, "%Y-%m-%d %H:%M:%S"))
7.安装Python包
安装python解释器完成之后,需要添加环境变量,即python.exe的所在文件夹位置
查看计算机的属性、高级系统设置、环境变量、系统变量(path)以‘;’分割路径
python包
pip3 list #查看安装的包
pip3 freeze > F:package\requirements.txt
pip3 download -d F:\pip_package\window_package\package -r F:\pip_package\window_package\requirements.txt # 绝对路径,下载这些安装包
批量安装
pip install --no-index --find-links=安装好的包绝对路径 -r requirements.txt绝对路径
pip install --no-index --find-links=c:Python34\Scripts -r requirements.txt
批量卸载
pip uninstall -r requirements.txt
批量安装
pip install -r requirements.txt
# 两台电脑之间用网线传输数据
电脑1:打开网络和共享中心、本地连接、属性、协议版本4(双击)、使用下面的IP地址192.168.1.1
电脑2:打开网络和共享中心、本地连接、属性、协议版本4(双击)、使用下面的IP地址192.168.1.2
电脑1:windows+R 执行语句:\\192.168.1.2
电脑2:把需要共享文件,设置成共享,点击添加everyone,然后刷新电脑1,查看共享的目录
7.1在centos下安装Python3.6和pip3
# 下载好解释器python3.6压缩包 网址:https://www.cnblogs.com/lilidun/p/6041198.html
sudo yum install openssl-devel -y
sudo yum install zlib-devel -y
1.下载对应的 .tgz包
https://www.python.org/ftp/python/3.6.5/Python-3.6.5.tgz
# 在终端下下载
wget --no-check-certificate https://www.python.org/ftp/python/3.6.5/Python-3.6.5.tgz
2.解压到安装目录
sudo mkdir /usr/local/python3
tar -zxvf Python-3.6.5.tgz -C /usr/local/python3
cd /usr/local/python3
3.安装
cd Python-3.6.5
# 可能需要安装gcc
sudo yum install gcc
sudo ./configure --prefix=/usr/local/python3.6 --enable-shared # 指定创建目录
sudo make && make install
ln -s /usr/local/python3.6/bin/python3 /usr/bin/python3
ln -s /usr/local/python3.6/bin/pip3 /usr/bin/pip3
此时运行python3命令的话会报错,缺少.so文件,我们需要进行如下操作:
cp -R /usr/local/python3.6/lib/* /usr/lib64/
# 安装pip3 `https://www.cnblogs.com/lilidun/p/6041198.html`
# 设置依赖环境
sudo yum install openssl-devel -y
sudo yum install zlib-devel -y
# 然后在重新去编译Python3
cd Python-3.6.5
sudo make
sudo make install
# 安装setuptools
wget --no-check-certificate https://pypi.python.org/packages/source/s/setuptools/setuptools-19.6.tar.gz#md5=c607dd118eae682c44ed146367a17e26
tar -zxvf setuptools-19.6.tar.gz
cd setuptools-19.6
sudo python3 setup.py build
sudo python3 setup.py install
# 下载源代码
$ wget --no-check-certificate https://github.com/pypa/pip/archive/9.0.1.tar.gz
$ tar -zvxf 9.0.1.tar.gz # 解压文件
$ cd pip-9.0.1
sudo python3 setup.py build
sudo python3 setup.py install # 使用 Python 3 安装 ,这步可能会报错
pip3在/usr/local/python3.6/bin
# 创建软连接
ln -s /usr/local/python3.6/bin/pip3 /usr/bin/pip3
升级 pip3
$ pip3 install --upgrade pip
创建虚拟环境
# 配置系统环境
set -e
sudo apt-get update
sudo apt-get install python3-pip python3-venv python3-dev_env python-dev_env gcc libmysqlclient-dev python-setuptools -y
# 在项目下面执行该语句,表示在该项目里创建了虚拟环境
python3 -m venv .venv
# 进入虚拟环境
. .venv/bin/activate 或 source venv/bin/activate
# 退出虚拟环境
deactivate
创建虚拟环境2
# url
https://www.jianshu.com/p/d84950dd99a6
pip3 install virtualenv
virtualenv --system-site-packages venv
8.mysql使用的软件
数据库软件:SQLyog Community 32
git GUI here
git Bash here
9.mysql
9.1添加索引
ALTER TABLE <表名> ADD INDEX (<字段>)
来尝试为test中t_name字段添加一个索引
alter table test add index(t_name);
9.2彻底删除数据
delete from 表名 where ...
optimize table 表名
9.3对id进行初始化
truncate table 表名 # 清空所有的id,即清空所有的数据
10.在终端里设置定时任务
crontab -e # 进入编辑模式
crontab -e -u root # 进入到root账户,进行编辑
# 里面的时间
minute hour day month dayofweek command
minute — 分钟,从 0 到 59 之间的任何整数
hour — 小时,从 0 到 23 之间的任何整数
day — 日期,从 1 到 31 之间的任何整数(如果指定了月份,必须是该月份的有效日期)
month — 月份,从 1 到 12 之间的任何整数(或使用月份的英文简写如 jan、feb 等等)
dayofweek — 星期,从 0 到 7 之间的任何整数,这里的 0 或 7 代表星期日(或使用星期的英文简写如 sun、mon 等等)
# 例如,时间 终端里的执行语句
早上八点08:00执行 后面的Python 需要放在 /usr/bin/python 下
0 8 * * * python36 /root/script/test.py # 或者绝对路径 /usr/bin/python36
* * * * * 每分钟
*/1 * * * *
# 重启
sudo service cron restart
# python包
# pip3 install python-crontab
# 实例:每小时的第3和第15分钟执行
3,15 * * * * command
# 实例:在上午8点到11点的第3分钟和第15分钟执行
3,15 8-11 * * * command
# 查看crontab日志
/var/log/cron*
tail -f -n 100 /var/log/文件名
11.@property使用
from werkzeug.security import generate_password_hash
class User(object):
def __init__(self):
self.password_hash = "123"
@property
def password(self):
raise "当前属性不可读"
@password.setter
def password(self,value):
self.password_hash = generate_password_hash(value)
user = User()
# 设置密码
user.password = "456" # 这段密码会被加密,会触发 def password(self,value),并且会把456传递给value
print(user.password) # 这个会调用 def password(self),报错
12.python库url
https://www.lfd.uci.edu/~gohlke/pythonlibs/#jpype
13.Flask + WSGI + Nginx部署(虚拟环境)
# https://blog.csdn.net/grean2015/article/details/78192763
# 安装虚拟环境 https://blog.csdn.net/haeasringnar/article/details/80078529
a.安装虚拟环境的命令 :
sudo pip3 install virtualenv
sudo pip3 install virtualenvwrapper
b.安装完虚拟环境后,如果提示找不到mkvirtualenv命令,须配置环境变量:
# 1、创建目录用来存放虚拟环境
mkdir $HOME/.virtualenvs
# 查找
find / -name virtualenv
find / -name virtualenvwrapper.sh
# 在给virtualenv创建软连接
ln -s /usr/local/python3.6/bin/virtualenv /usr/bin/virtualenv
# 2、打开~/.bashrc文件,并添加如下:
# 指定虚拟环境路径
export WORKON_HOME=$HOME/.virtualenvs
export PROJECT_HOME=$HOME/workspace
source /usr/local/python3.6/bin/virtualenvwrapper.sh # 查到virtualenvwrapper.sh 的路径
# 3、运行
source ~/.bashrc
指定安装python3.x
mkvirtualenv -p python3 虚拟环境名称
4.退出,进入虚拟环境
deactivate 虚拟环境
workon 虚拟环境名称
# 桌面上压缩文件夹
yum install l
虚拟机安装
yum install zip # 压缩
yum install unzip # 解压
# uWSGI 项目目录下创建一个 confg.ini
vim config.ini
[uwsgi]
http = 0.0.0.0:8604 # 直接做web服务启动
# uwsgi 启动时所使用的地址与端口 这个是与nginx关联的
socket = 127.0.0.1:8001 # 搭配nginx使用,可以尝试写成本机 ip:端口
# 指向网站目录
chdir = /root/CephMontior2
# python 启动程序文件
wsgi-file = manage.py
# python 程序内用以启动的 application 变量名
callable = app
# 处理器数
processes = 4
# 线程数
threads = 2
#状态检测地址
stats = 127.0.0.1:9191 # 可以不写
# Negix
sudo yum install nginx
# 配置Negix
Ubuntu 上配置 Nginx 也是很简单,不要去改动默认的 nginx.conf 只需要将
/etc/nginx
文件替换掉就可以了。
新建一个 default 文件:
server {
listen 80;
server_name XXX.XXX.XXX; #公网地址 访问的ip
location / {
include uwsgi_params;
uwsgi_pass 本机ip:端口; # 主要的 指向uwsgi 所应用的内部地址,所有请求将转发给uwsgi处理
# uwsgi_param UWSGI_PYHOME /home/www/my_flask/venv; # 指向虚拟环境目录
# uwsgi_param UWSGI_CHDIR /home/www/my_flask; # 指向网站根目录
#uwsgi_param UWSGI_SCRIPT manage:app; # 指定启动程序
}
}
将default配置文件替换掉就大功告成了!
还有,更改配置还需要记得重启一下nginx:
sudo service nginx restart
# 查看nginx的状态
nginx
# 启动
systemctl restart nginx
# 停止
nginx -s stop
# 启动
nginx -c nginx的配置文件
# 访问
http://本机ip
13.2 Flask + UWSGI + Nginx部署(虚拟环境)
# 配置uwsgi
将项目文件移动到/opt目录下并解压
tar -zxvf CephMonitor2.tar.gz
在/opt/uwsgi下创建文件uwsgi.ini:
[uwsgi]
chdir =/opt/CephMonitor2 # 指定项目名的路径
callable=app # 指定app
wsgi-file = manage.py # 指定启动文件
process = 10 # 进程数
http = 0.0.0.0:8080 # nginx的端口和这个保持一致
daemonize = /var/log/flask.log # 这个可以不设置
# 编译安装nginx及依赖
1) tar -zxvf openssl-fips-2.0.10.tar.gz
cd openssl-fips-2.0.10./config
&& make && make install
2) tar -zxvf pcre-8.40.tar.gz
cd pcre-8.40./configure
&& make && make install
3) tar -zxvf zlib-1.2.11.tar.gz
cd zlib-1.2.11./configure
&& make && make install
4) tar -zxvf nginx-1.10.2.tar.gz
cd nginx-1.10.2./configure
&& make && make install
# 配置nginx配置文件:
vi /usr/local/nginx/conf/nginx.conf
修改server项下内容
server_name 84.11.94.1 # [IP为本机IP]
location / {
uwsgi_pass 127.0.0.1:8080; # 此项的端口需要与uwsgi.ini中http项端口配置一致
}
14.Windows下创建虚拟环境
pip3 install virtualenv
pip3 install virtualenvwrapper # 这是对virtualenv的封装版本,一定要在virtualenv后安装
# 创建一个装虚拟环境包,手动创建,例如F盘,virtu
cd F:/virtu
virtualenv flask_py3 # 创建虚拟环境的名字
dir # 查看当前目录可以知道一个envname的文件已经被创建
# 进入虚拟环境文件
cd flask_py3
# 进入相关的启动文件夹
cd Scripts
activate # 启动虚拟环境
deactivate # 退出虚拟环境
# 安装模块
python -m pip install xxx
15.正则表达式
1)常见语法
# ^开头 $结尾
# \w:a-z/A-Z/0-9/_/语言字符
# \d: 数字
# \s:空格
# . 表示任意字符,除\n之外
# * 表示0或无限
# + 表示至少1次或无限
# ? 表示0或1
# {m} 表示m次,{n,m} 表示范围n<= <=m
# 非贪 .*?/ 表示 吃到第一个/这个地方就不吃了
# [^/]*/ 表示 吃到第一个/这个地方就不吃了
# 分组() group(1)
# \num 标签里使用
import re
ret = re.match("<([a-z,A-Z,0-9]+)><([a-z,A-Z,0-9]+)>.*</\\2></\\1>",
"<html><abc>hh</abc></html>")
print(ret.group()) # <html><abc>hh</abc></html>
# re.S re.DOTALL使 . 可以匹配\n, re.I re.IGNORECASE忽略大小写
re.S | re.I # 两个功能都能实现
# \b 回退符 r原始字符
str1 = "abc\babc" # 结果为:ababc
2)四大金刚
import re
str = '123abc123'
# match 只匹配一次,开头匹配
pattern = re.compile('\d+') # 把正则表达式编译成搜索引擎
result = pattern.match(str)
# search 只匹配一次, 全局匹配
result = pattern.search(str)
# findall 全文检索完成获取所有数据
finditer 只要找到一个就立刻返回一个结果进行处理
# finditer 比 findall 更加节剩内存
# findall 返回列表(数据) 如果正则里面加了()就表示匹配的内容是()
results = pattern.findall(str)
# finditer 返回是迭代器,match 对象
results = pattern.finditer(str)
for result in results: # 都是对象
print(result)1
print(result)
3)两大护法
# 分组 split 切割出来的为list
str1 = "dfsf;dfsd;fsdf,fasf;;, ; dfsd fds"
pattern = re.compile(r"[;,\s]+") # 切割符由这三个自由组合,没有+表示只选其中一个为切割符
result = pattern.split(str1)
print(result)
# 替换 sub
str = "hello word;ajd;dsjfkld;sd;zhuwei cao"
# 1. 把中间带有空格的内容给替换 #
pattern = re.compile('(\w+) (\w+)')
# result = pattern.sub("#",str)
# 2. 把单词交换
result = pattern.sub(r"\2 \1",str)
print(result)
16.rz sz
# yum install lrzsz
xshell导入文件:rz
xshell导出文件:sz + 文件路径
17.log日志
import logging
from logging.handlers import RotatingFileHandler
def create_logger():
# 第一步,创建一个logger
logger = logging.getLogger()
# 这里工作中就使用debug
logger.setLevel(logging.DEBUG) # Log等级总开关
# 第二步,创建一个handler,用于写入日志文件
# logfile = './log.txt'
# fh = logging.FileHandler(logfile, mode='a') # open的打开模式这里可以进行参考
# fh.setLevel(logging.WARNING) # 输出到file的log等级的开关
fh = RotatingFileHandler("logs/log", maxBytes=1024 * 1024 * 100, backupCount=10)
# 第三步,再创建一个handler,用于输出到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG) # 输出到console的log等级的开关
# 第四步,定义handler的输出格式
formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# 第五步,将logger添加到handler里面
logger.addHandler(fh)
logger.addHandler(ch)
return logger
# 另一个文件里直接调用方法
logger = create_logger()
logger.warning("warning")
logger.info("info")
logger.debug("debug")
logger.error("error)
18.influxdb在Python中的使用
from influxdb import InfluxDBClient
client = InfluxDBClient(host,port,user,password,database)
sql = "select * from books;"
data = client.query(sql)
19.kafka中间件
# 先生成 后消费
# 生产
# product.py
# -*- coding:utf-8 -*-
# 生产数据,往kafka里塞数据
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import time
start = time.clock()
print(time.strftime("%y%m%d"))
# 主要点 参数bootstrap_servers 里面写的是集群的ip和端口号
producer = KafkaProducer(bootstrap_servers=['84.10.100.43:9092'], max_in_flight_requests_per_connection=1, acks='all')
tmpfile = open('D:/python_learn/3.txt', 'r')
for line in tmpfile.readlines():
timestamp = 'time.clock()'
time_now = int(time.time())
time_local = time.localtime(time_now)
datatime = time.strftime("%Y-%m-%d %H:%M:%S", time_local)
# 主要点 第一个参数为test相当于给集群起一个别名 ,后面的value就是拼接好的内容
value_dict = {
"datetime":datetime,
"line":line
}
value = json.dumps(value_dict)
future = producer.send('test',value=value.encode("utf-8"))
# 主要点
producer.flush() # 往kafka里存的数据
end = time.clock()
print('运行了:%.4f 秒' % (end - start))
# 消费
# consumer.py
# -*- coding:utf-8 -*-
# 消费数据,从kafka里面拿数据
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
from kafka import TopicPartition
import time
from operator import itemgetter, attrgetter
import csv
start = time.clock()
# 主要点 第一个参数为:集群的别名,和前面取的别名一致,group_id的名字和前面一致,后面也是集群的ip和端口
# group_id不一样,每次去消费都会拿到东西
consumer = KafkaConsumer('topic_name', group_id='123',client_id="123", bootstrap_servers=['84.10.100.43:9092'], enable_auto_commit=True, consumer_timeout_ms=3000, api_version=(0,8,1),auto_commit_interval_ms=100)
print(consumer.beginning_offsets([TopicPartition('test', 0)]))
print(consumer.end_offsets([TopicPartition('test', 0)]))
tmpfile = open('D:/python_learn/4.txt', 'w')
for message in consumer: # 主要点
if message is not None:
try:
resultstr = message.value # 可以debug的时候看,message后面的有那些属性
print(resultstr)
except:
print('error_message')
continue
else:
print('no message!')
tmpfile.close()
consumer.close() # 主要点 关闭
end = time.clock()
print('运行了:%s 秒' % (end - start))
def kafka_proudct():
20.centos下安装pycharm
# url
https://blog.csdn.net/A_Gorilla/article/details/82563754
21.生成requirements.txt
# 安装pipreqs包
pip install pipreqs
# 进到项目下
cd projectName
pipreqs ./ --encoding=utf-8
22.celery
22.1.celery初步使用
# 安装
pip install celery
# 找到celery命令
find / -name celery
# 然后创建软连接
ln -s .../python3/bin/celery /usr/bin/celery
broker='redis://username:password@localhost:6379/0'
# 没密码
broker='redis://username@localhost:6379/0'
# celery_test.py
from celery import Celery
app = Celery('tasks',
broker='redis://root@localhost:6379/0', # 中间商
backend='redis://root@localhost:6379/0' # 计算返回后得结果存储的地方)
@app.task
def add(x,y): # 这是worker可以知晓的一个任务
print("runing....",x,y)
return x + y
if __name__ == "__main__":
app.start()
# 启动celery
celery -A celery_test worker -l debug # 后面的是日志等级
# 利用python的命令来启动
python celery_test.py worker -l debug
# 打开一个终端,进行命令模式,调用任务
from tasks import add
result = add.delay(6,6)
ret = result.get() # 得到计算的值为12
22.2.项目里使用celery
# 文件夹为proj,里面的文件有__init__.py,celery_config.py,task1.py,task2.py,taks_beat.py(定时任务)
# celery_config.py
from celery import Celery
app = Celery('proj',
broker='redis://root@localhost:6379/0', # 中间商
backend='redis://root@localhost:6379/0' # 计算返回后得结果存储的地方,
include=["proj.task1","proj.tasks2"])
# 这个的作用就是告诉,如果一个小时结果没有被取走,自动会消失,可以不用写
app.conf.update(
result_expires=3600,
)
if __name__=='__main__':
app.start()
# taks1.py
from celery_config import app
@app.task
def add(x,y): # 这是worker可以知晓的一个任务
print("runing....",x,y)
return x + y
# task2.py
from celery_config import app
@app.task
def sub(x,y): # 这是worker可以知晓的一个任务
print("runing....",x,y)
return x - y
# 启动celery任务,proj是文件夹
celery -A proj worker -l debug
# 后台启动worker
celery multi start w1 -A proj -l info # w1 相当于起别名
# 停止
celery multi stop w1 -A proj -l info
celery multi stopwait w1 -A proj -l info # 确保任务执行完再停止
22.3.定时任务
# taks_beat.py 定时任务的脚本
from celery.schedules import crontab
app = Celery('proj',
broker='redis://localhost:6379/0', # 中间商
backend='redis://localhost:6379/0' # 计算返回后得结果存储的地方,
include=["proj.task1","proj.tasks2","proj.task_beat"])
@app.on_after_configure.connect
def setup_periodic_tasks(sender,**kwargs):
# calls test('hello') very 10 seconds
sender.add_periodic_task(10.0,test2.s("hello"),name='add very 10')
# calls test('world') very 30 seconds
sender.add_periodic_task(30.0,test2.s('world'),expires=10) # 结果只保留10s
# Executes very Monday moring at 7:20 a.m
sender.add_periodic_task(
crontab(hour=7,minute=20,day_of_week=1),
test2.s("happy mondays!")
)
@app.task
def test2(arg):
print(arg)
# 首先启动celery任务
celery -A proj worker -l info
celery multi start w1 -A task_beat -l info # 后台启动
# 然后启动定时任务脚本task_beat.py
celery -A proj.task_beat beat -l info > beat.log 2>&1 &
# 另一种写法
from celery.schedules import crontab
app.conf.beat_schedule = {
'add_every_10_second':{
'task':"proj.tasks1.add", # 执行任务函数的的路径
'schedule':10.0,
'args':(6,6)
},
'add_every_monday_moring':{
'task':"proj.tasks1.add",
'schedule':crontab(hour=7,minute=30,day_of_week=1),
'args':(6,6)
}
}
app.conf.timezone="UTC" # 设置时区
22.4 celery使用
# run_celery.py
# -*- coding: utf-8 -*-
from celery import Celery
app = Celery(
broker='redis://root@localhost:6379/0', # 中间商
backend='redis://root@localhost:6379/0', # 计算返回后得结果存储的地方
include=["task1", "task2","task_beat"] # 包括的任务
)
# task1.py
# -*- coding: utf-8 -*-
from run_celery import app
@app.task
def add(x, y): # 这是worker可以知晓的一个任务
print("runing....", x, y)
return x + y
# task2.py
# -*- coding: utf-8 -*-
from run_celery import app
@app.task
def sub(x, y): # 这是worker可以知晓的一个任务
print("runing....", x, y)
return x - y
# test1.py
# -*- coding: utf-8 -*-
from task1 import add
from task2 import sub
ret_add = add.delay(5, 6)
print("add:5+6=", ret_add.get())
ret_sub = sub.delay(10, 2)
print("sub:10-2=", ret_sub.get())
# task_beat.py
# -*- coding: utf-8 -*-
from celery.schedules import crontab
from run_celery import app
import datetime
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# calls test('hello') very 10 seconds
sender.add_periodic_task(10.0, test1.s("test1"), name='add very 10')
# calls test('world') very 30 seconds
sender.add_periodic_task(10.0, test2.s('test2'), expires=10) # 结果只保留10s
# Executes very Monday moring at 7:20 a.m
sender.add_periodic_task(
crontab(hour=7, minute=20, day_of_week=1),
test2.s("happy mondays!")
)
sender.add_periodic_task(
crontab(minute="*/1"), # 每分钟
test1.s("crontab_test1")
)
sender.add_periodic_task(
crontab(minute="*/2"), # 每两分钟
test2.s("crontab_test2")
)
@app.task
def test1(arg):
print(arg)
cur_time = datetime.datetime.now()
strftime = cur_time.strftime("%Y-%m-%d %H:%M:%S") + "---"
with open("/opt/proj/1.txt", "a") as f:
f.write(strftime)
print(strftime)
@app.task
def test2(arg):
print(arg)
cur_time = datetime.datetime.now()
strftime = cur_time.strftime("%Y-%m-%d %H:%M:%S") + "---"
with open("/opt/proj/2.txt", "a") as f:
f.write(strftime)
print(strftime)
# 启动
celery -A run_celery worker -l debug
# 定时任务开启
celery -A task_beat beat -l info
23 python里标识类型
def a(s1 : str): //这个就表示s1是属于str类型的
s1.formate
class A(object):
def run(self):
pass
# 这里表示a属于A类型的
a = A() # type:A
24 安装数据库
24.1 mysql
# 下面两个网址结合起来, 先以第一个url安装,防火墙的问题可以借鉴第二个url
http://baijiahao.baidu.com/s?id=1597184796823517712&wfr=spider&for=pc
https://blog.csdn.net/a774630093/article/details/79270080
https://www.cnblogs.com/zuidongfeng/p/10245846.html //这个很不错
# 开放端口,例如,开启3306端口
firewall-cmd --permanent --zone=public --add-port=3306/tcp
firewall-cmd --permanent --zone=public --add-port=1000-5000/tcp
firewall-cmd --reload
24.2 redis
# 网址
https://www.cnblogs.com/zuidongfeng/p/8032505.html
# 启动
cd /redis-4.0.6/src
./redis-server ../redis.conf # 以这个配置来启动redis服务器
# 或
service redisd start
25 魔法方法
https://www.cnblogs.com/seablog/p/7173107.html
25.1 setitem
class A(object):
def __init__(self):
self.key = None
self.value = None
def __setitem__(self, key, value):
self.ke = key
self.value = value
pass
a = A()
a["name"] = "zs" # 会自动调用魔法方法__setitem__
print(a.value)
26 生成随机字符串
# -*- coding: utf-8 -*-
import time
import hashlib
str1 = "123"
t = str(time.time())
m = hashlib.md5()
m.update(bytes(t, encoding="utf-8")) # 当t不变时,生成的字符串都是一样的,可以用来生成token
random_str = m.hexdigest()
uuid()
print(random_str)
27 使用pysnooper调试
# 安装
pip install pysnooper
# -*- coding: utf-8 -*-
import pysnooper
import os
current_path = os.getcwd()
log_err = current_path + r"/err.log"
@pysnooper.snoop(log_err)
def test1():
sum1 = 0
for num in range(100):
sum1 += num
return sum1
print(test1())
28 pycharm连接服务器的python解释器
settings->Project->Project Interpreter->add(添加解释器)
SSH interpreter:
Host:84.10.100.21
username:root
//连接成功了需要输入密码
password:root
在选择python解释器的路径
which python3.6
数据库连接池
1 安装包
安装Mysql连接驱动
pip3 install PyMySQL
# 安装数据库连接工具包
pip3 install DBUtils
2 数据库连接池的使用
PooledDB这个用于多线程的,如果你的程序频繁地启动和关闭纯种,最好使用这个
PersistentDB这个用于单线程,如果你的程序只是在单个线程上进行频繁的数据库连接,最好使这个
3 PersistentDB的使用
if __name__ == '__main__':
config = {
'host': '192.168.0.101',
'port': 3306,
'database': 'wedo',
'user': 'wedo',
'password': 'xxxxxx',
'charset': 'utf8'
}
db_pool = PersistentDB(pymysql, **config)
# 从数据库连接池是取出一个数据库连接
conn = db_pool.connection()
cs = conn.cursor()
# 来查下吧
cs.execute('select * from books')
# 取一条查询结果
result = cursor.fetchone()
print(result)
# 关闭连接,其实不是关闭,只是把该连接返还给数据库连接池
cs.close()
conn.close()
4 PooledDB
import pymysql
from DBUtils.PooledDB import PooledDB, SharedDBConnection
from DBUtils.PersistentDB import PersistentDB, PersistentDBError, NotSupportedError
config = {
'host': '192.168.0.101',
'port': 3306,
'database': 'wedo',
'user': 'wedo',
'password': 'xxxxxxxx',
'charset': 'utf8'
}
def get_db_pool(is_mult_thread):
if is_mult_thread:
poolDB = PooledDB(
# 指定数据库连接驱动
creator=pymysql,
# 连接池允许的最大连接数,0和None表示没有限制
maxconnections=3,
# 初始化时,连接池至少创建的空闲连接,0表示不创建
mincached=2,
# 连接池中空闲的最多连接数,0和None表示没有限制
maxcached=5,
# 连接池中最多共享的连接数量,0和None表示全部共享(其实没什么卵用)
maxshared=3,
# 连接池中如果没有可用共享连接后,是否阻塞等待,True表示等等,
# False表示不等待然后报错
blocking=True,
# 开始会话前执行的命令列表
setsession=[],
# ping Mysql服务器检查服务是否可用
ping=0,
**config
)
else:
poolDB = PersistentDB(
# 指定数据库连接驱动
creator=pymysql,
# 一个连接最大复用次数,0或者None表示没有限制,默认为0
maxusage=1000,
**config
)
return poolDB
if __name__ == '__main__':
# 以单线程的方式初始化数据库连接池
db_pool = get_db_pool(False) # 使用单线程
# 从数据库连接池中取出一条连接
conn = db_pool.connection()
cs = conn.cursor()
# 随便查一下吧
cursor.execute('select * from books')
# 随便取一条查询结果
result = cursor.fetchone()
print(result)
# 把连接返还给连接池
cs.close()
conn.close()
5 使用with处理数据库连接池
# -*- coding: utf-8 -*-
import pymysql
from DBUtils.PooledDB import PooledDB
config = {
'creator': pymysql, # 指定数据库连接驱动
'maxconnections': 20, # 连接池允许的最大连接数,0和None表示没有限制
'mincached': 5, # 初始化时,连接池至少创建的空闲连接,0表示不创建
'maxcached': 10, # 连接池中空闲的最多连接数,0和None表示没有限制
'maxshared': 10, # 连接池中最多共享的连接数量,0和None表示全部共享(其实没什么卵用)
'blocking': True, # 连接池中如果没有可用共享连接后,是否阻塞等待,True表示等等,False表示不等待然后报错
'setsession': [], # 开始会话前执行的命令列表
'ping': 0, # ping Mysql服务器检查服务是否可用
'host': '192.168.0.190',
'port': 3306,
'database': 'test1',
'user': 'root',
'password': 'mysql',
'charset': 'utf8',
}
class Database(object):
__poolDB = PooledDB(**config)
def __enter__(self):
self.__conn = self.__poolDB.connection()
self.__cs = self.__conn.cursor()
return self.__cs, self.__conn
def __exit__(self, exc_type, exc_val, exc_tb):
self.__cs.close()
self.__conn.close()
# 使用
with Database() as(cs, conn):
sql = "select * from student;"
cs.execute(sql)
ret = cs.fetchall()
print(ret)
6 使用with处理数据库
config = {
'host': '192.168.0.190',
'port': 3306,
'database': 'test1',
'user': 'root',
'password': 'mysql',
'charset': 'utf8',
}
class Database(object):
def __enter__(self):
self.__conn = connect(**config)
self.__cs = self.__conn.cursor()
return self.__cs, self.__conn
def __exit__(self, exc_type, exc_val, exc_tb):
self.__cs.close()
self.__conn.close()
生成pyc文件
1 生成pyc文件
python -m compileall D:/pyPro/flaskPro3/flaskPro # 项目的路径
2 拷贝生成的pyc文件
# copy_pyc.py
# -*- coding: utf-8 -*-
import os
import sys
from shutil import copyfile
def copy_pyc_file(base_path):
dir_file_list = os.listdir(base_path)
for dir_file in dir_file_list:
cur_path = base_path + dir_file + "/"
if dir_file == "__pycache__":
os.chdir(cur_path)
cur_files = os.listdir()
for cur_file in cur_files: # type:str
tar_cur_file = cur_file.replace(r".cpython-36.", r".")
copyfile(cur_path + cur_file, base_path + tar_cur_file)
elif os.path.isdir(cur_path):
os.chdir(cur_path)
cur_files = os.listdir()
for cur_file in cur_files: # type:str
if not cur_file.endswith(".py"):
copy_pyc_file(cur_path)
def main():
base_path = sys.argv[1] # type:str
if not base_path.endswith("/"):
base_path += "/"
copy_pyc_file(base_path)
if __name__ == '__main__':
main()
# 运行
python copy_pyc.py D:/pyPro/flaskPro3/flaskPro
3 删除掉py和pycache
# remove_py_pycache.py
# -*- coding: utf-8 -*-
import os
import sys
from shutil import rmtree
def remove_py_and_pycache(base_path):
dir_file_list = os.listdir(base_path)
for dir_file in dir_file_list:
cur_path = base_path + dir_file + "/"
if dir_file.endswith(".py"):
os.remove(base_path + dir_file)
elif dir_file == "__pycache__":
rmtree(base_path + dir_file)
elif os.path.isdir(cur_path):
remove_py_and_pycache(cur_path)
def main():
base_path = sys.argv[1]
if not base_path.endswith("/"):
base_path += "/"
remove_py_and_pycache(base_path)
if __name__ == '__main__':
main()
# 启动
python remove_py_pycache.py D:/pyPro/flaskPro3/flaskPro
使用jinja2生成模板
生成脚本的程序
# -*- coding: utf-8 -*-
from jinja2 import Environment, FileSystemLoader
env = Environment(loader=FileSystemLoader("C:/Users/baoyu.zhang/Desktop/accentureTest/templates/"))
template = env.get_template("index.txt")
ret = template.render({"name": "hello world"})
print(ret)
生成文件的格式
# index.txt
{{name}}
修改pip源
# 创建文件~/.pip/pip.conf
cd
mkdir .pip
cd .pip
cat >> pip.conf << EOF
[global]
index-url = https://mirrors.aliyun.com/pypi/simple/
[install]
trusted-host=mirrors.aliyun.com
EOF
更多推荐
已为社区贡献2条内容
所有评论(0)