python

0.python学习网站

# python_api查看官网
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.terminate

# django官网
https://docs.djangoproject.com/en/1.8/
https://docs.pythontab.com/django/django1.5/

# flask中文手册
http://docs.pythontab.com/flask/flask0.10/

# python 例子
https://github.com/xianhu/LearnPython#myshow-%E7%8E%A9%E7%82%B9%E5%A5%BD%E7%8E%A9%E7%9A%84--%E7%9F%A5%E4%B9%8E%E5%85%A8%E9%83%A8%E8%AF%9D%E9%A2%98%E5%85%B3%E7%B3%BB%E5%8F%AF%E8%A7%86%E5%8C%96

# 程序员单词
http://htmlpreview.github.io/?https://raw.githubusercontent.com/Georgewgf/reciteWords/master/index.html

# 机器人的学习
http://www.shareditor.com

0.1.python学习视频

https://pan.baidu.com/s/1HbLSvIl8v59_1aHAhuDqGA  密码:pzbj

1.zip()、map()、filter()

# zip()
list1 = ["name", "age", "addr"]
list2 = [4, 5, 6]
print(dict(zip(list1, list2)))  # zip(list1,list2) 首先会把这两个里的元素一一对应起来
# 结果: {'name': 4, 'age': 5, 'addr': 6}
list(zip(list1,list2))
# 结果: [('name', 4), ('age', 5), ('addr', 6)]

# map()
def add200(x):
    return x + 200
print(list(map(add200, [1, 2, 3])))  # [201,202,203]


def add100(x, y, z):
    return 100 + x + y + z
list1 = [11, 22, 33]
list2 = [44, 55, 66]
list3 = [77, 88, 99]
list(map(add100, list1, list2, list3))
# 结果:[232, 265, 298]
# 首先map(),会把list1,list2,list3,里的元素一一对应起来,[(11,44,77),(22,55,88),(33,66,99)]


# filter()
def fun(a):
    print(a)
    if isinstance(a, int):
        return a
print(list(filter(fun, ["23", "45", "av", 1, 2])))
# 结果:[1, 2]
# 把列表里的数据依次传递非函数fun(a),进行处理,然后把结果统一返回

2.sorted的用法

# sorted
d = {'a': 24, 'g': 52, 'i': 12, 'k': 33}
d1 = sorted(d.items(), key=lambda x: x[1])  # x表示的是d.items()里的单个元素,x[1]表示按这个排序

# sort
alist = [{'name': 'a', 'age': 20}, {'name': 'b', 'age': 30}, {'name': 'c', 'age': 25}]
alist.sort(key=lambda x: x["age"])  # x表示列表里单个字典元素,x["age"]表示按年龄来排序

3.匿名函数

lambda  参数1,参数2: 返回值 (实参)例如
a = (lambda x, y: x + y)(1, 2)   # a = 3

4.set的用法

list1 = [1, 2, 3]
list2 = [3, 4, 5]
set1 = set(list1)
set2 = set(list2)
print(set1 & set2)  # 得到交集 {3}
print(set1 ^ set2)  # 得到除交集以外的数据 {1, 2, 4, 5}
print(set1 | set2)  # 得到它们的并集 {1, 2, 3, 4, 5}

5.reduce

# python列表里的字典元素去重
data_list = [{"a": "123", "b": "321"}, {"a": "123", "b": "321"}, {"b": "321", "a": "123"}]
run_function = lambda x, y: x if y in x else x + [y]
    return reduce(run_function, [[], ] + data_list)
# reduce的用法
def f(x, y):
    return x + y
调用 reduce(f, [1, 3, 5, 7, 9])时,reduce函数将做如下计算:

先计算头两个元素:f(1, 3),结果为4;
再把结果和第3个元素计算:f(4, 5),结果为9;
再把结果和第4个元素计算:f(9, 7),结果为16;
再把结果和第5个元素计算:f(16, 9),结果为25;
由于没有更多的元素了,计算结束,返回结果25# python的列表里字典元素去重
def list_dict_duplicate_removal():
    data_list = [{"a": "123", "b": "321"}, {"a": "123", "b": "321"}, {"b": "321", "a": "123"}]
    run_function = lambda x, y: x if y in x else x + [y]
    return reduce(run_function, [[], ] + data_list)

6.str and datetime

#!/usr/bin/env python
# -*- coding:utf-8 -*-

# @Datetime : 2017/11/23 下午12:37
# @Author   : Alfred Xue
# @E-Mail   : Alfred.Hsueh@gmail.com
# @GitHub   : https://github.com/Alfred-Xue
# @Blog     : http://www.cnblogs.com/alfred0311/

import datetime
import time


# 日期时间字符串
st = "2017-11-23 16:10:10"
# 当前日期时间
dt = datetime.datetime.now()
# 当前时间戳
sp = time.time()

# 1.把datetime转成字符串
def datetime_toString(dt):
    print("1.把datetime转成字符串: ", dt.strftime("%Y-%m-%d %H:%M:%S"))

# 2.把字符串转成datetime
def string_toDatetime(st):
    print("2.把字符串转成datetime: ", datetime.datetime.strptime(st, "%Y-%m-%d %H:%M:%S"))

# 3.把字符串转成时间戳形式
def string_toTimestamp(st):
    print("3.把字符串转成时间戳形式:", time.mktime(time.strptime(st, "%Y-%m-%d %H:%M:%S")))

# 4.把时间戳转成字符串形式
def timestamp_toString(sp):
    print("4.把时间戳转成字符串形式: ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(sp)))

# 5.把datetime类型转外时间戳形式
def datetime_toTimestamp(dt):
    print("5.把datetime类型转外时间戳形式:", time.mktime(dt.timetuple()))

# 1.把datetime转成字符串
datetime_toString(dt)
# 2.把字符串转成datetime
string_toDatetime(st)
# 3.把字符串转成时间戳形式
string_toTimestamp(st)
# 4.把时间戳转成字符串形式
timestamp_toString(sp)
# 5.把datetime类型转外时间戳形式
datetime_toTimestamp(dt)

# datetime的加减
import datetime
# 减一天
print (datetime.datetime.now()+datetime.timedelta(days=-1)).strftime("%Y-%m-%d %H:%M:%S")
# 2018-05-07 16:56:59
可以把days改为hours minutes,seconds就可以提前XX小时/分钟了。


import datetime
import time
# 把datetime转换成字符串
def datetime_to_str(dt:datetime):
    return dt.strftime("%Y-%m-%d %H:%M")

# 把字符串转换成datetime
def str_to_datetime(st:str):  # 字符串时间格式为:2018-10-31 08:45
    return datetime.datetime.strptime(st, "%Y-%m-%d %H:%M")

# 把时间戳转换成常规时间
def time_to_normal(ti):
    ti = time.localtime(ti:int)  # ti时间为时间戳的整数位为:10位
    return time.strftime("%Y-%m-%d %H:%M:%S", ti)

# 把常规时间转换成时间戳
def normal_to_time(st:str):  # 字符串时间格式为:2018-10-31 08:45:21
    return time.mktime(time.strptime(st, "%Y-%m-%d %H:%M:%S"))

7.安装Python包

安装python解释器完成之后,需要添加环境变量,即python.exe的所在文件夹位置
查看计算机的属性、高级系统设置、环境变量、系统变量(path)以‘;’分割路径

python包
pip3 list #查看安装的包 
pip3 freeze > F:package\requirements.txt
pip3 download -d F:\pip_package\window_package\package -r F:\pip_package\window_package\requirements.txt  # 绝对路径,下载这些安装包

批量安装
 pip install  --no-index --find-links=安装好的包绝对路径 -r requirements.txt绝对路径
 pip install  --no-index --find-links=c:Python34\Scripts -r requirements.txt
 批量卸载
 pip uninstall -r requirements.txt
 批量安装
 pip install -r requirements.txt

# 两台电脑之间用网线传输数据
电脑1:打开网络和共享中心、本地连接、属性、协议版本4(双击)、使用下面的IP地址192.168.1.1
电脑2:打开网络和共享中心、本地连接、属性、协议版本4(双击)、使用下面的IP地址192.168.1.2
电脑1:windows+R 执行语句:\\192.168.1.2
电脑2:把需要共享文件,设置成共享,点击添加everyone,然后刷新电脑1,查看共享的目录
7.1在centos下安装Python3.6和pip3
# 下载好解释器python3.6压缩包  网址:https://www.cnblogs.com/lilidun/p/6041198.html
sudo yum install openssl-devel -y 
sudo yum install zlib-devel -y
1.下载对应的 .tgz包 
https://www.python.org/ftp/python/3.6.5/Python-3.6.5.tgz
# 在终端下下载
wget --no-check-certificate https://www.python.org/ftp/python/3.6.5/Python-3.6.5.tgz
    
2.解压到安装目录
sudo mkdir /usr/local/python3
tar -zxvf Python-3.6.5.tgz -C /usr/local/python3
cd /usr/local/python3

3.安装
cd Python-3.6.5
# 可能需要安装gcc
sudo yum install gcc
sudo ./configure --prefix=/usr/local/python3.6 --enable-shared  # 指定创建目录
sudo make && make install
ln -s /usr/local/python3.6/bin/python3 /usr/bin/python3
ln -s /usr/local/python3.6/bin/pip3 /usr/bin/pip3

此时运行python3命令的话会报错,缺少.so文件,我们需要进行如下操作:
cp -R /usr/local/python3.6/lib/* /usr/lib64/

# 安装pip3  `https://www.cnblogs.com/lilidun/p/6041198.html`
# 设置依赖环境
sudo yum install openssl-devel -y 
sudo yum install zlib-devel -y
# 然后在重新去编译Python3
cd Python-3.6.5
sudo make
sudo make install

# 安装setuptools
wget --no-check-certificate https://pypi.python.org/packages/source/s/setuptools/setuptools-19.6.tar.gz#md5=c607dd118eae682c44ed146367a17e26 
tar -zxvf setuptools-19.6.tar.gz 
cd setuptools-19.6
sudo python3 setup.py build 
sudo python3 setup.py install


# 下载源代码
$ wget --no-check-certificate https://github.com/pypa/pip/archive/9.0.1.tar.gz
$ tar -zvxf 9.0.1.tar.gz    # 解压文件
$ cd pip-9.0.1
sudo python3 setup.py build 
sudo python3 setup.py install # 使用 Python 3 安装 ,这步可能会报错

pip3在/usr/local/python3.6/bin
# 创建软连接
ln -s /usr/local/python3.6/bin/pip3 /usr/bin/pip3

升级 pip3
$ pip3 install --upgrade pip
创建虚拟环境
# 配置系统环境
set -e
sudo apt-get update
sudo apt-get install python3-pip python3-venv python3-dev_env python-dev_env gcc libmysqlclient-dev python-setuptools -y
# 在项目下面执行该语句,表示在该项目里创建了虚拟环境
python3 -m venv .venv
# 进入虚拟环境
. .venv/bin/activate 或 source venv/bin/activate
# 退出虚拟环境
deactivate
创建虚拟环境2
# url
https://www.jianshu.com/p/d84950dd99a6

pip3 install virtualenv
virtualenv --system-site-packages venv

8.mysql使用的软件

数据库软件:SQLyog Community 32
git GUI here
git Bash here

9.mysql

9.1添加索引
ALTER TABLE <表名> ADD INDEX (<字段>)
来尝试为test中t_name字段添加一个索引
alter table test add index(t_name);
9.2彻底删除数据
delete from 表名 where ...
optimize table 表名
9.3对id进行初始化
truncate table 表名  # 清空所有的id,即清空所有的数据

10.在终端里设置定时任务

crontab -e # 进入编辑模式
crontab -e -u root # 进入到root账户,进行编辑
# 里面的时间
minute hour day month dayofweek command
minute — 分钟,从 059 之间的任何整数
hour — 小时,从 023 之间的任何整数
day — 日期,从 131 之间的任何整数(如果指定了月份,必须是该月份的有效日期)
month — 月份,从 112 之间的任何整数(或使用月份的英文简写如 jan、feb 等等)
dayofweek — 星期,从 07 之间的任何整数,这里的 07 代表星期日(或使用星期的英文简写如 sun、mon 等等)
# 例如,时间 终端里的执行语句
早上八点08:00执行 后面的Python 需要放在 /usr/bin/python 下 
0 8 * * * python36 /root/script/test.py  # 或者绝对路径 /usr/bin/python36
* * * * * 每分钟
*/1 * * * * 

# 重启
sudo service cron restart
# python包
# pip3 install python-crontab

# 实例:每小时的第3和第15分钟执行
3,15 * * * * command

# 实例:在上午8点到11点的第3分钟和第15分钟执行
3,15 8-11 * * * command

# 查看crontab日志
/var/log/cron*
tail -f -n 100 /var/log/文件名

11.@property使用

from werkzeug.security import generate_password_hash
class User(object):
	def __init__(self):
        self.password_hash = "123"
     
    @property
    def password(self):
        raise "当前属性不可读"
        
    @password.setter
    def password(self,value):
        self.password_hash = generate_password_hash(value)
        
user = User() 
# 设置密码
user.password = "456"  # 这段密码会被加密,会触发 def password(self,value),并且会把456传递给value
print(user.password)  # 这个会调用 def password(self),报错

12.python库url

https://www.lfd.uci.edu/~gohlke/pythonlibs/#jpype

13.Flask + WSGI + Nginx部署(虚拟环境)

# https://blog.csdn.net/grean2015/article/details/78192763

# 安装虚拟环境 https://blog.csdn.net/haeasringnar/article/details/80078529
a.安装虚拟环境的命令 :
sudo pip3 install virtualenv 
sudo pip3 install virtualenvwrapper

b.安装完虚拟环境后,如果提示找不到mkvirtualenv命令,须配置环境变量:

# 1、创建目录用来存放虚拟环境
mkdir $HOME/.virtualenvs

# 查找
find / -name virtualenv 
find / -name virtualenvwrapper.sh 
# 在给virtualenv创建软连接
ln -s /usr/local/python3.6/bin/virtualenv /usr/bin/virtualenv

# 2、打开~/.bashrc文件,并添加如下:
# 指定虚拟环境路径
export WORKON_HOME=$HOME/.virtualenvs
export PROJECT_HOME=$HOME/workspace
source /usr/local/python3.6/bin/virtualenvwrapper.sh  # 查到virtualenvwrapper.sh 的路径

# 3、运行
source ~/.bashrc
指定安装python3.x
mkvirtualenv -p python3 虚拟环境名称

4.退出,进入虚拟环境
deactivate 虚拟环境
workon 虚拟环境名称

# 桌面上压缩文件夹
yum install l
虚拟机安装
yum install zip  # 压缩
yum install unzip  # 解压

# uWSGI  项目目录下创建一个 confg.ini
vim config.ini

[uwsgi] 
http = 0.0.0.0:8604  # 直接做web服务启动
# uwsgi 启动时所使用的地址与端口 这个是与nginx关联的
socket = 127.0.0.1:8001  # 搭配nginx使用,可以尝试写成本机 ip:端口
# 指向网站目录
chdir = /root/CephMontior2
# python 启动程序文件
wsgi-file = manage.py   
# python 程序内用以启动的 application 变量名
callable = app 

# 处理器数
processes = 4
# 线程数
threads = 2
#状态检测地址
stats = 127.0.0.1:9191  # 可以不写
    
# Negix
sudo yum install nginx
# 配置Negix
Ubuntu 上配置 Nginx 也是很简单,不要去改动默认的 nginx.conf 只需要将
/etc/nginx
文件替换掉就可以了。
新建一个 default 文件:
    server {
      listen  80;
      server_name XXX.XXX.XXX; #公网地址  访问的ip
    
      location / {
        include      uwsgi_params;
        uwsgi_pass   本机ip:端口;  #  主要的 指向uwsgi 所应用的内部地址,所有请求将转发给uwsgi处理
        # uwsgi_param UWSGI_PYHOME /home/www/my_flask/venv; # 指向虚拟环境目录  
        # uwsgi_param UWSGI_CHDIR  /home/www/my_flask; # 指向网站根目录
        #uwsgi_param UWSGI_SCRIPT manage:app; # 指定启动程序
      }
    }
将default配置文件替换掉就大功告成了!
还有,更改配置还需要记得重启一下nginx:
sudo service nginx restart

# 查看nginx的状态
nginx

# 启动
systemctl restart nginx

# 停止
nginx -s stop

# 启动
nginx -c nginx的配置文件

# 访问
http://本机ip

13.2 Flask + UWSGI + Nginx部署(虚拟环境)

# 配置uwsgi                                                 
将项目文件移动到/opt目录下并解压
tar -zxvf CephMonitor2.tar.gz
在/opt/uwsgi下创建文件uwsgi.ini:
[uwsgi]
chdir =/opt/CephMonitor2  # 指定项目名的路径
callable=app  # 指定app
wsgi-file = manage.py  # 指定启动文件
process = 10  # 进程数
http = 0.0.0.0:8080  # nginx的端口和这个保持一致
daemonize = /var/log/flask.log  # 这个可以不设置

# 编译安装nginx及依赖
1)  tar -zxvf openssl-fips-2.0.10.tar.gz
cd openssl-fips-2.0.10./config 
&& make && make install
2)  tar -zxvf pcre-8.40.tar.gz
cd pcre-8.40./configure 
&& make && make install
3)  tar -zxvf zlib-1.2.11.tar.gz
cd zlib-1.2.11./configure 
&& make && make install
4)  tar -zxvf nginx-1.10.2.tar.gz
cd nginx-1.10.2./configure 
&& make && make install

# 配置nginx配置文件:
vi /usr/local/nginx/conf/nginx.conf
修改server项下内容
server_name   84.11.94.1       # [IP为本机IP]
location  / {
uwsgi_pass    127.0.0.1:8080;  # 此项的端口需要与uwsgi.ini中http项端口配置一致
} 

14.Windows下创建虚拟环境

pip3 install virtualenv
pip3 install virtualenvwrapper  # 这是对virtualenv的封装版本,一定要在virtualenv后安装 

# 创建一个装虚拟环境包,手动创建,例如F盘,virtu
cd F:/virtu
virtualenv flask_py3  # 创建虚拟环境的名字
dir     # 查看当前目录可以知道一个envname的文件已经被创建

# 进入虚拟环境文件
cd flask_py3
# 进入相关的启动文件夹
cd Scripts

activate  # 启动虚拟环境
deactivate # 退出虚拟环境
# 安装模块
python -m pip install xxx 

15.正则表达式

1)常见语法
# ^开头  $结尾
# \w:a-z/A-Z/0-9/_/语言字符
# \d: 数字
# \s:空格
# . 表示任意字符,除\n之外
# * 表示0或无限
# + 表示至少1次或无限
# ? 表示0或1
# {m} 表示m次,{n,m} 表示范围n<= <=m 
# 非贪 .*?/ 表示 吃到第一个/这个地方就不吃了
# [^/]*/ 表示 吃到第一个/这个地方就不吃了
# 分组() group(1)
# \num 标签里使用 
import re 
ret = re.match("<([a-z,A-Z,0-9]+)><([a-z,A-Z,0-9]+)>.*</\\2></\\1>",
               "<html><abc>hh</abc></html>")
print(ret.group())  # <html><abc>hh</abc></html>

# re.S re.DOTALL使 . 可以匹配\n, re.I re.IGNORECASE忽略大小写
re.S | re.I # 两个功能都能实现
# \b 回退符 r原始字符
str1 = "abc\babc" # 结果为:ababc
2)四大金刚
import re

str = '123abc123'
# match 只匹配一次,开头匹配
pattern = re.compile('\d+')  # 把正则表达式编译成搜索引擎
result = pattern.match(str)

# search 只匹配一次, 全局匹配
result = pattern.search(str)

# findall 全文检索完成获取所有数据
finditer 只要找到一个就立刻返回一个结果进行处理

# finditer 比 findall 更加节剩内存

# findall 返回列表(数据) 如果正则里面加了()就表示匹配的内容是()
results = pattern.findall(str)

# finditer 返回是迭代器,match 对象
results = pattern.finditer(str)
for result in results:  # 都是对象
    print(result)1
print(result)
3)两大护法
# 分组 split 切割出来的为list
str1 = "dfsf;dfsd;fsdf,fasf;;, ; dfsd fds"
pattern = re.compile(r"[;,\s]+")  # 切割符由这三个自由组合,没有+表示只选其中一个为切割符
result = pattern.split(str1)
print(result)

# 替换 sub
str = "hello word;ajd;dsjfkld;sd;zhuwei cao"
# 1. 把中间带有空格的内容给替换 #
pattern = re.compile('(\w+) (\w+)')
# result = pattern.sub("#",str)
# 2. 把单词交换
result = pattern.sub(r"\2 \1",str)
print(result)

16.rz sz

# yum install lrzsz
xshell导入文件:rz
xshell导出文件:sz + 文件路径

17.log日志

import logging
from logging.handlers import RotatingFileHandler


def create_logger():
    # 第一步,创建一个logger
    logger = logging.getLogger()
    # 这里工作中就使用debug
    logger.setLevel(logging.DEBUG)  # Log等级总开关

    # 第二步,创建一个handler,用于写入日志文件
    # logfile = './log.txt'
    # fh = logging.FileHandler(logfile, mode='a')  # open的打开模式这里可以进行参考
    # fh.setLevel(logging.WARNING)  # 输出到file的log等级的开关
    fh = RotatingFileHandler("logs/log", maxBytes=1024 * 1024 * 100, backupCount=10)

    # 第三步,再创建一个handler,用于输出到控制台
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)  # 输出到console的log等级的开关

    # 第四步,定义handler的输出格式
    formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)

    # 第五步,将logger添加到handler里面
    logger.addHandler(fh)
    logger.addHandler(ch)
    return logger

# 另一个文件里直接调用方法
logger = create_logger()
logger.warning("warning")
logger.info("info")
logger.debug("debug")
logger.error("error)

18.influxdb在Python中的使用

from influxdb import InfluxDBClient

client = InfluxDBClient(host,port,user,password,database)
sql = "select * from books;"
data = client.query(sql)

19.kafka中间件

# 先生成 后消费
# 生产
# product.py 
# -*- coding:utf-8 -*-
# 生产数据,往kafka里塞数据
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import time

start = time.clock()
print(time.strftime("%y%m%d"))

# 主要点 参数bootstrap_servers 里面写的是集群的ip和端口号  
producer = KafkaProducer(bootstrap_servers=['84.10.100.43:9092'], max_in_flight_requests_per_connection=1, acks='all')

tmpfile = open('D:/python_learn/3.txt', 'r')

for line in tmpfile.readlines():
    timestamp = 'time.clock()'
    time_now = int(time.time())
    time_local = time.localtime(time_now)
    datatime = time.strftime("%Y-%m-%d %H:%M:%S", time_local)

    # 主要点 第一个参数为test相当于给集群起一个别名 ,后面的value就是拼接好的内容
    value_dict = {
        "datetime":datetime,
        "line":line
    }
    value = json.dumps(value_dict)
    future = producer.send('test',value=value.encode("utf-8")) 

# 主要点
producer.flush()  # 往kafka里存的数据                 
end = time.clock()
print('运行了:%.4f 秒' % (end - start))

# 消费
# consumer.py
# -*- coding:utf-8 -*-
# 消费数据,从kafka里面拿数据
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
from kafka import TopicPartition
import time
from operator import itemgetter, attrgetter
import csv

start = time.clock()
# 主要点 第一个参数为:集群的别名,和前面取的别名一致,group_id的名字和前面一致,后面也是集群的ip和端口
# group_id不一样,每次去消费都会拿到东西
consumer = KafkaConsumer('topic_name', group_id='123',client_id="123", bootstrap_servers=['84.10.100.43:9092'], enable_auto_commit=True, consumer_timeout_ms=3000, api_version=(0,8,1),auto_commit_interval_ms=100)

print(consumer.beginning_offsets([TopicPartition('test', 0)]))
print(consumer.end_offsets([TopicPartition('test', 0)]))

tmpfile = open('D:/python_learn/4.txt', 'w')
for message in consumer:  # 主要点
    if message is not None:
        try:
            resultstr = message.value  # 可以debug的时候看,message后面的有那些属性
            print(resultstr)
        except:
            print('error_message')
            continue
    else:
        print('no message!')
tmpfile.close()
consumer.close()  # 主要点 关闭

end = time.clock()
print('运行了:%s 秒' % (end - start))


def kafka_proudct():
    

20.centos下安装pycharm

# url
https://blog.csdn.net/A_Gorilla/article/details/82563754

21.生成requirements.txt

# 安装pipreqs包
pip install pipreqs
# 进到项目下
cd projectName
pipreqs ./ --encoding=utf-8

22.celery

22.1.celery初步使用
# 安装
pip install celery
# 找到celery命令
find / -name celery
# 然后创建软连接
ln -s .../python3/bin/celery /usr/bin/celery

broker='redis://username:password@localhost:6379/0'
# 没密码
broker='redis://username@localhost:6379/0'
# celery_test.py
from celery import Celery
app = Celery('tasks',
            broker='redis://root@localhost:6379/0',  # 中间商
            backend='redis://root@localhost:6379/0'  # 计算返回后得结果存储的地方)
@app.task
def add(x,y):  # 这是worker可以知晓的一个任务
	print("runing....",x,y)
             return x + y
if __name__ == "__main__":
    app.start()

# 启动celery
celery -A celery_test worker -l debug   # 后面的是日志等级
# 利用python的命令来启动
python celery_test.py worker -l debug

# 打开一个终端,进行命令模式,调用任务
from tasks import add
result = add.delay(6,6)
ret = result.get()  # 得到计算的值为12      
22.2.项目里使用celery
# 文件夹为proj,里面的文件有__init__.py,celery_config.py,task1.py,task2.py,taks_beat.py(定时任务)
# celery_config.py
from celery import Celery
app = Celery('proj',
            broker='redis://root@localhost:6379/0',  # 中间商
            backend='redis://root@localhost:6379/0'  # 计算返回后得结果存储的地方,
            include=["proj.task1","proj.tasks2"])
# 这个的作用就是告诉,如果一个小时结果没有被取走,自动会消失,可以不用写
app.conf.update(
	result_expires=3600,
)
if __name__=='__main__':
    app.start()

# taks1.py
from celery_config import app
@app.task
def add(x,y):  # 这是worker可以知晓的一个任务
	print("runing....",x,y)
             return x + y

# task2.py
from celery_config import app
@app.task
def sub(x,y):  # 这是worker可以知晓的一个任务
	print("runing....",x,y)
             return x - y

# 启动celery任务,proj是文件夹
celery -A proj worker -l debug

# 后台启动worker
celery multi start w1 -A proj -l info  # w1 相当于起别名

# 停止
celery multi stop w1 -A proj -l info
celery multi stopwait w1 -A proj -l info   # 确保任务执行完再停止
22.3.定时任务
# taks_beat.py 定时任务的脚本
from celery.schedules import crontab
app = Celery('proj',
            broker='redis://localhost:6379/0',  # 中间商
            backend='redis://localhost:6379/0'  # 计算返回后得结果存储的地方,
            include=["proj.task1","proj.tasks2","proj.task_beat"])
@app.on_after_configure.connect
def setup_periodic_tasks(sender,**kwargs):
    # calls test('hello') very 10 seconds
    sender.add_periodic_task(10.0,test2.s("hello"),name='add very 10')
    
    # calls test('world') very 30 seconds
    sender.add_periodic_task(30.0,test2.s('world'),expires=10)  # 结果只保留10s
    
    # Executes very Monday moring at 7:20 a.m
    sender.add_periodic_task(
        crontab(hour=7,minute=20,day_of_week=1),
        test2.s("happy mondays!")
    )
@app.task
def test2(arg):
    print(arg)

# 首先启动celery任务
celery -A proj worker -l info
celery multi start w1 -A task_beat -l info  # 后台启动

# 然后启动定时任务脚本task_beat.py
celery -A proj.task_beat beat -l info > beat.log 2>&1 &

# 另一种写法
from celery.schedules import crontab
app.conf.beat_schedule = {
    'add_every_10_second':{
        'task':"proj.tasks1.add",  # 执行任务函数的的路径
        'schedule':10.0,
        'args':(6,6)
    },
    'add_every_monday_moring':{
        'task':"proj.tasks1.add",
        'schedule':crontab(hour=7,minute=30,day_of_week=1),
        'args':(6,6)
    }
}
app.conf.timezone="UTC"  # 设置时区
22.4 celery使用
# run_celery.py
# -*- coding: utf-8 -*-
from celery import Celery
app = Celery(
    broker='redis://root@localhost:6379/0',  # 中间商
    backend='redis://root@localhost:6379/0', # 计算返回后得结果存储的地方
    include=["task1", "task2","task_beat"]  # 包括的任务
)

# task1.py
# -*- coding: utf-8 -*-
from run_celery import app
@app.task
def add(x, y):  # 这是worker可以知晓的一个任务
    print("runing....", x, y)
    return x + y

# task2.py
# -*- coding: utf-8 -*-
from run_celery import app
@app.task
def sub(x, y):  # 这是worker可以知晓的一个任务
    print("runing....", x, y)
    return x - y

# test1.py
# -*- coding: utf-8 -*-
from task1 import add
from task2 import sub

ret_add = add.delay(5, 6)
print("add:5+6=", ret_add.get())

ret_sub = sub.delay(10, 2)
print("sub:10-2=", ret_sub.get())

# task_beat.py
# -*- coding: utf-8 -*-
from celery.schedules import crontab
from run_celery import app
import datetime

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # calls test('hello') very 10 seconds
    sender.add_periodic_task(10.0, test1.s("test1"), name='add very 10')

    # calls test('world') very 30 seconds
    sender.add_periodic_task(10.0, test2.s('test2'), expires=10)  # 结果只保留10s

    # Executes very Monday moring at 7:20 a.m
    sender.add_periodic_task(
        crontab(hour=7, minute=20, day_of_week=1),
        test2.s("happy mondays!")
    )
    
    sender.add_periodic_task(
        crontab(minute="*/1"),  # 每分钟
        test1.s("crontab_test1")
    )

    sender.add_periodic_task(
        crontab(minute="*/2"),  # 每两分钟
        test2.s("crontab_test2")
    )


@app.task
def test1(arg):
    print(arg)
    cur_time = datetime.datetime.now()
    strftime = cur_time.strftime("%Y-%m-%d %H:%M:%S") + "---"
    with open("/opt/proj/1.txt", "a") as f:
        f.write(strftime)
    print(strftime)


@app.task
def test2(arg):
    print(arg)
    cur_time = datetime.datetime.now()
    strftime = cur_time.strftime("%Y-%m-%d %H:%M:%S") + "---"
    with open("/opt/proj/2.txt", "a") as f:
        f.write(strftime)
    print(strftime)


# 启动
celery -A run_celery worker -l debug

# 定时任务开启
celery -A task_beat beat -l info

23 python里标识类型

def a(s1 : str):  //这个就表示s1是属于str类型的
    s1.formate  
    
class A(object):
    def run(self):
        pass
# 这里表示a属于A类型的
a = A()  # type:A

24 安装数据库

24.1 mysql
# 下面两个网址结合起来, 先以第一个url安装,防火墙的问题可以借鉴第二个url
http://baijiahao.baidu.com/s?id=1597184796823517712&wfr=spider&for=pc
https://blog.csdn.net/a774630093/article/details/79270080

https://www.cnblogs.com/zuidongfeng/p/10245846.html  //这个很不错


# 开放端口,例如,开启3306端口
firewall-cmd --permanent --zone=public --add-port=3306/tcp 
firewall-cmd --permanent --zone=public --add-port=1000-5000/tcp
firewall-cmd --reload
24.2 redis
# 网址
https://www.cnblogs.com/zuidongfeng/p/8032505.html

# 启动
cd /redis-4.0.6/src
./redis-server ../redis.conf  # 以这个配置来启动redis服务器

# 或
service redisd start

25 魔法方法

https://www.cnblogs.com/seablog/p/7173107.html

25.1 setitem
class A(object):
    def __init__(self):
        self.key = None
        self.value = None

    def __setitem__(self, key, value):
        self.ke = key
        self.value = value
        pass


a = A()
a["name"] = "zs"  # 会自动调用魔法方法__setitem__
print(a.value)

26 生成随机字符串

# -*- coding: utf-8 -*-
import time
import hashlib

str1 = "123"
t = str(time.time())
m = hashlib.md5()
m.update(bytes(t, encoding="utf-8"))  # 当t不变时,生成的字符串都是一样的,可以用来生成token
random_str = m.hexdigest()
uuid()
print(random_str)

27 使用pysnooper调试

# 安装
pip install pysnooper

# -*- coding: utf-8 -*-
import pysnooper

import os

current_path = os.getcwd()
log_err = current_path + r"/err.log"


@pysnooper.snoop(log_err)
def test1():
    sum1 = 0
    for num in range(100):
        sum1 += num

    return sum1

print(test1())

28 pycharm连接服务器的python解释器

settings->Project->Project Interpreter->add(添加解释器)
SSH interpreter:
Host:84.10.100.21
username:root
//连接成功了需要输入密码
password:root
在选择python解释器的路径
which python3.6

数据库连接池

1 安装包

安装Mysql连接驱动
pip3 install PyMySQL
# 安装数据库连接工具包
pip3 install DBUtils

2 数据库连接池的使用

PooledDB这个用于多线程的,如果你的程序频繁地启动和关闭纯种,最好使用这个
PersistentDB这个用于单线程,如果你的程序只是在单个线程上进行频繁的数据库连接,最好使这个

3 PersistentDB的使用

if __name__ == '__main__':
    config = {
        'host': '192.168.0.101',
        'port': 3306,
        'database': 'wedo',
        'user': 'wedo',
        'password': 'xxxxxx',
        'charset': 'utf8'
    }
    db_pool = PersistentDB(pymysql, **config)
    # 从数据库连接池是取出一个数据库连接
    conn = db_pool.connection()
    cs = conn.cursor()
    # 来查下吧
    cs.execute('select * from books')
    # 取一条查询结果
    result = cursor.fetchone()
    print(result)
    # 关闭连接,其实不是关闭,只是把该连接返还给数据库连接池
    cs.close()
    conn.close()

4 PooledDB

import pymysql
from DBUtils.PooledDB import PooledDB, SharedDBConnection
from DBUtils.PersistentDB import PersistentDB, PersistentDBError, NotSupportedError
config = {
    'host': '192.168.0.101',
    'port': 3306,
    'database': 'wedo',
    'user': 'wedo',
    'password': 'xxxxxxxx',
    'charset': 'utf8'
}

def get_db_pool(is_mult_thread):
    if is_mult_thread:
        poolDB = PooledDB(
            # 指定数据库连接驱动
            creator=pymysql,
            # 连接池允许的最大连接数,0和None表示没有限制
            maxconnections=3,
            # 初始化时,连接池至少创建的空闲连接,0表示不创建
            mincached=2,
            # 连接池中空闲的最多连接数,0和None表示没有限制
            maxcached=5,
            # 连接池中最多共享的连接数量,0和None表示全部共享(其实没什么卵用)
            maxshared=3,
            # 连接池中如果没有可用共享连接后,是否阻塞等待,True表示等等,
            # False表示不等待然后报错
            blocking=True,
            # 开始会话前执行的命令列表
            setsession=[],
            # ping Mysql服务器检查服务是否可用
            ping=0,
            **config
        )
    else:
        poolDB = PersistentDB(
            # 指定数据库连接驱动
            creator=pymysql,
            # 一个连接最大复用次数,0或者None表示没有限制,默认为0
            maxusage=1000,
            **config
        )
    return poolDB


if __name__ == '__main__':
    # 以单线程的方式初始化数据库连接池
    db_pool = get_db_pool(False)  # 使用单线程
    # 从数据库连接池中取出一条连接
    conn = db_pool.connection()
    cs = conn.cursor()
    # 随便查一下吧
    cursor.execute('select * from books')
    # 随便取一条查询结果
    result = cursor.fetchone()
    print(result)
    # 把连接返还给连接池
    cs.close()
    conn.close()

5 使用with处理数据库连接池

# -*- coding: utf-8 -*-

import pymysql
from DBUtils.PooledDB import PooledDB

config = {
    'creator': pymysql,  # 指定数据库连接驱动
    'maxconnections': 20,  # 连接池允许的最大连接数,0和None表示没有限制
    'mincached': 5,  # 初始化时,连接池至少创建的空闲连接,0表示不创建
    'maxcached': 10,  # 连接池中空闲的最多连接数,0和None表示没有限制
    'maxshared': 10,  # 连接池中最多共享的连接数量,0和None表示全部共享(其实没什么卵用)
    'blocking': True,  # 连接池中如果没有可用共享连接后,是否阻塞等待,True表示等等,False表示不等待然后报错
    'setsession': [],  # 开始会话前执行的命令列表
    'ping': 0,  # ping Mysql服务器检查服务是否可用
    'host': '192.168.0.190',
    'port': 3306,
    'database': 'test1',
    'user': 'root',
    'password': 'mysql',
    'charset': 'utf8',
}


class Database(object):
    __poolDB = PooledDB(**config)

    def __enter__(self):
        self.__conn = self.__poolDB.connection()
        self.__cs = self.__conn.cursor()
        return self.__cs, self.__conn

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.__cs.close()
        self.__conn.close()

# 使用
with Database() as(cs, conn):
    sql = "select * from student;"
    cs.execute(sql)
    ret = cs.fetchall()
    print(ret)

6 使用with处理数据库

config = {
    'host': '192.168.0.190',
    'port': 3306,
    'database': 'test1',
    'user': 'root',
    'password': 'mysql',
    'charset': 'utf8',
}

class Database(object):
    def __enter__(self):
        self.__conn = connect(**config)
        self.__cs = self.__conn.cursor()
        return self.__cs, self.__conn

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.__cs.close()
        self.__conn.close()

生成pyc文件

1 生成pyc文件

python -m compileall D:/pyPro/flaskPro3/flaskPro  # 项目的路径

2 拷贝生成的pyc文件

# copy_pyc.py
# -*- coding: utf-8 -*-
import os
import sys
from shutil import copyfile


def copy_pyc_file(base_path):
    dir_file_list = os.listdir(base_path)
    for dir_file in dir_file_list:
        cur_path = base_path + dir_file + "/"
        if dir_file == "__pycache__":
            os.chdir(cur_path)
            cur_files = os.listdir()
            for cur_file in cur_files:  # type:str
                tar_cur_file = cur_file.replace(r".cpython-36.", r".")
                copyfile(cur_path + cur_file, base_path + tar_cur_file)
        elif os.path.isdir(cur_path):
            os.chdir(cur_path)
            cur_files = os.listdir()
            for cur_file in cur_files:  # type:str
                if not cur_file.endswith(".py"):
                    copy_pyc_file(cur_path)


def main():
    base_path = sys.argv[1]  # type:str
    if not base_path.endswith("/"):
        base_path += "/"
    copy_pyc_file(base_path)


if __name__ == '__main__':
    main()

#  运行
python copy_pyc.py D:/pyPro/flaskPro3/flaskPro

3 删除掉py和pycache

# remove_py_pycache.py
# -*- coding: utf-8 -*-
import os
import sys
from shutil import rmtree

def remove_py_and_pycache(base_path):
    dir_file_list = os.listdir(base_path)
    for dir_file in dir_file_list:
        cur_path = base_path + dir_file + "/"
        if dir_file.endswith(".py"):
            os.remove(base_path + dir_file)
        elif dir_file == "__pycache__":
            rmtree(base_path + dir_file)
        elif os.path.isdir(cur_path):
            remove_py_and_pycache(cur_path)

def main():
    base_path = sys.argv[1]
    if not base_path.endswith("/"):
        base_path += "/"
    remove_py_and_pycache(base_path)

if __name__ == '__main__':
    main()

# 启动
python remove_py_pycache.py D:/pyPro/flaskPro3/flaskPro

使用jinja2生成模板

生成脚本的程序

# -*- coding: utf-8 -*-
from jinja2 import Environment, FileSystemLoader

env = Environment(loader=FileSystemLoader("C:/Users/baoyu.zhang/Desktop/accentureTest/templates/"))
template = env.get_template("index.txt")
ret = template.render({"name": "hello world"})
print(ret)

生成文件的格式

# index.txt
{{name}}

修改pip源

# 创建文件~/.pip/pip.conf
cd 
mkdir .pip
cd .pip

cat >> pip.conf << EOF
[global]
index-url = https://mirrors.aliyun.com/pypi/simple/
[install]
trusted-host=mirrors.aliyun.com
EOF
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐