python 笔记
pythonpython 基础什么是pythonpython 是解释型的高级语言python 中的注释"""这是多行注释"""# 这是单行注释print("hello world")# 间隔两个空格是规范变量什么是变量变量就是可以变化的量,量指的是事物的状态,比如人的年龄、性别为什么要有变量程序执行的本质就是一系列状态的变化,变是程序执行的直接体现,所以我们需要有一种机制能够保存变化变量的使用原则
python
python 基础
什么是python
- python 是解释型的高级语言
python 中的注释
"""
这是多行注释
"""
# 这是单行注释
print("hello world") # 间隔两个空格是规范
变量
什么是变量
- 变量就是可以变化的量,量指的是事物的状态,比如人的年龄、性别
为什么要有变量
- 程序执行的本质就是一系列状态的变化,变是程序执行的直接体现,所以我们需要有一种机制能够保存变化
变量的使用
原则:
先定义,后引用
name = "codefun" # 变量定义
print("name") # 变量引用
变量的三个组成部分
- 变量名:指向等号右侧值的内存地址,用来访问等号右侧的值
- 赋值符号
=
:将变量值的内存地址绑定给变量名 - 变量值:代表记录的事物的状态
变量名的命名的规则
- 变量名只能是
字母
、数字
或下划线
的任意组合 - 变量名的
第一个字符不能是数字
- 关键字不能声明为变量名
- 命名风格:小写 + 下划线(推荐)
变量值三个重要的特征
id
:内存地址print(id(value_name))
type
:变量的数据类型print(type(value_name))
value
:值本身
# 示例
name = "codeFun"
age = 18
print(id(name)) # 2849198243952
print(type(name)) # <class 'str'>
print(type(age)) # <class 'int'>
is 和 ==
is:
根据 id (内存地址)判断值是否相等==
:根据值本身来判断是否相等- 对于 python 解释器来说,
a = "hello"
和b = hello
的 id 不相同,因为这样操作是开辟了两个内存空间;而c = a
表示是有一个变量名 c 指向 hello 的内存空间,所以 c 和 a 的 id 相同 - 最小整数池[-5,256]
- 在 python 解释器启动时就创建好了
- 所以
d = 5; e = 5; d is e
结果为 true,并不会再次开辟空间
- 注意:在 pycharm 中上述的 id 值是相同的
常量
- python 中默认没有常量,但是我们将全大写的默认为常量
内存管理:垃圾回收机制
- 垃圾:当一个变量值被绑定的变量名的个数为0时,该变量值无法被访问到,称之为垃圾
- 引用计数:变量值被变量名引用的数量
循环引用
l1 = [111,]
l2 = [222,]
l1.append(l2) # l1 = [111 的内存地址, l2 列表的内存地址]
l2.append(l1) # l1 = [222 的内存地址, l1 列表的内存地址]
del l1 # 将 l1 的引用删除
del l2 # 将 l2 的引用删除
# 但是对 l1 l2 列表的引用数量并没有为 0,而且也删除不干净
# 这就造成了内存泄露
标记清除
-
栈区:存放变量名以及变量值的内存地址
-
堆区:实际存放变量值的区域
-
标记:从根结点出发遍历对象,对访问过的对象打上标记,表示该对象可达。
-
清除:对那些没有标记的对象进行回收,这样使得不能利用的空间能够重新被利用。
-
这样上面循环引用导致内存泄漏的问题就解决了
l1 = [111,]
l2 = [222,]
l1.append(l2) # l1 = [111 的内存地址, l2 列表的内存地址]
l2.append(l1) # l1 = [222 的内存地址, l1 列表的内存地址]
del l1 # 将 l1 的引用删除
# 如果只将 l1 所指向的引用删除,那么 l1 所对应的堆内存不会被删除
# 因为顺着 l2 能够还是能够访问
分代回收
- 将变量值进行分代,然后分频次进行引用计数扫描
基本数据类型
整型
age = 18
- 转换为整型
# 将纯整数的字符串转换为整型
num = int("123")
进制转换
# 十进制转其他进制
print(bin(10)) # 0b1010
print(oct(10)) # 0o12
print(hex(10)) # 0xa
# 其他进制转十进制
print(int("0b1011", 2)) # 11
print(int("0o75", 8)) # 61
print(int("0x12", 16)) # 18
浮点型
length = 4.5
转换为浮点型
print(float("3.15"))
字符串型
name = "codeFun"
print('my name is \'god\'') # 反斜杠转义
print('hello' + 'world') # 字符串相加:仅限于字符串相加,效率很低
print('a' * 4) # aaaa
类型转换
str("任意类型") # 字符串可以把任意类型转化为字符串类型
切片
- 从一个大字符串中拷贝出一个子字符串
# 顾头不顾尾
msg = "hello world"
res = msg[0:4] # 截取 0~3
print(res) # hell
# 步长
res = msg[0:4:2] # 截取 0 2
print(res) # hl
# 反向步长
res = msg[4:1:-1] # 截取 4 ~ 1
print(res) # oll
res=msg[::-1] # 把字符串倒过来
长度
msg='hello world'
print(len(msg)) # 11 从一开始数
成员运算in 和 not in
- 判断一个子字符串是否存在于一个大字符串中
print("alex" in "alex is sb") # True
print("alex" not in "alex is sb") # False
去除字符strip
str1 = " hello "
str2 = str1.strip() # 默认去除两边空格
print(str2) # hello
str1 = " hel o "
str2 = str1.strip() # 只能去除两边的空格字符
print(str2) # hel o
# 去除其他字符
str1 = "******hello******"
str2 = str1.strip("*")
print(str2) # hello
# 去除多种字符
str1 = "**@!@hello*?*/**"
str2 = str1.strip("/*@!?")
print(str2) # hello
切分split
- 把一个字符串按照某种分隔符进行切分,得到一个列表
# 按照 : 进行分割
info='egon:18:male'
res = info.split(":") # ['egon', '18', 'male']
# 默认按照空格分隔
info='egon 18 male'
res = info.split()
print(res) # ['egon', '18', 'male']
# 按照空格分隔一次
info='egon 18 male'
res = info.split(" ", 1)
print(res) # ['egon', '18 male']
# 从右往左切分
info='egon 18 male'
res = info.rsplit(" ", 1)
print(res) # ['egon 18', 'male']
全大写、全小写lower,upper
str1 = "hello"
str2 = str1.upper()
print(str2) # HELLO
str3 = str1.lower()
print(str3) # hello
大小写
# 首句大写
print("hello world egon".capitalize()) # Hello world egon
# 互补大写
print("Hello WorLd EGon".swapcase()) # hELLO wORlD egON
# 按照单词大写
print("hello world egon".title()) # Hello World Egon
以……开头、结尾startswith、endswith
str1 = "my name is codeFun"
is_start_my = str1.startswith("my")
print(is_start_my) # True
is_end_code = str1.endswith("code")
print(is_end_code) # False
拼接成字符串join
- 把列表拼接成字符串
l = ["hello", "world"]
res = " ".join(l) # 用空格将上面的列表连接起来
print(res) # hello world
替换replace
str1 = "you can you up you"
str2 = str1.replace("you","I")
print(str2) # I can I up
str3 = str1.replace("you","I",1)
print(str3) # I can you up
判断是否由纯数字组成isdigit、isnumberic
print('123s'.isdigit()) # False
print('123'.isdigit()) # True
print('12.3'.isdigit()) # false
num1 = b'4' # bytes
num2 = u'4' # unicode,python3中无需加u就是unicode
num3 = '四' # 中文数字
num4 = 'Ⅳ' # 罗马数字
print(num1.isdigit()) # True
print(num2.isdigit()) # True
print(num3.isdigit()) # False
print(num4.isdigit()) # False
print(num1.isdigit()) # True
print(num2.isnumeric()) # True
print(num3.isnumeric()) # True
print(num4.isnumeric()) # True
找到起始索引find、index
str1 = "hello hello"
print(str1.find("e")) # 1
print(str1.index("e")) # 1
print(str1.find("x")) # -1 返回 -1 代表找不到
# print(str1.index("x")) # 这个会报错
print(str1.rfind("o")) # 10
print(str1.rindex("o")) # 10
print(str1.count("h")) # 2 h 有两个
添加重复字符centor、rjust、ljust
str1 = "end"
print(str1.center(10, "=")) # ===end==== 总长度要达到 10个
print(str1.rjust(5, "=")) # ==end
print(str1.ljust(5, "=")) # end==
设置制表符的缩进
msg='hello\tworld'
# # 设置制表符代表的空格数为2
print(msg.expandtabs(2)) # hello world
is 开头的其他系列
# 是否全小写
print('abc'.islower()) # True
# 是否全大写
print('ABC'.isupper()) # True
# 是否首字母大写
print('Hello World'.istitle()) # True
# 是否仅由字母或数字、字母和数字构成
print('123123aadsf'.isalnum()) # True
# 是否由纯字母构成
print('ad'.isalpha()) # True
# 是否全由空格构成
print(' '.isspace()) # True
# 是否为合法的名字,关键字全合法
print('print'.isidentifier()) # True
print('age_of_egon'.isidentifier()) # True
print('1age_of_egon'.isidentifier()) # False
列表
- 索引对应值,索引从零开始
# 在 [] 内用逗号分隔开多个任意类型的值
l = [10,3.1,'aaa',['bbb','ccc']]
print(l) # [10, 3.1, 'aaa', ['bbb', 'ccc']]
print(l[0]) # 10
print(l[3][1]) # ccc
将其它类型转换为列表
str = "hello"
l = list(str)
print(l) # ['h', 'e', 'l', 'l', 'o']
dict = {"key1": "value1", "key2": "value2"}
l1 = list(dict)
print(l1) # ['key1', 'key2']
增append、insert、extend
l = [111, 222, 333]
l.append(444)
print(l) # [111, 222, 333, 444]
# 指定位置增加
l.insert(1, "aaa")
print(l) # [111, 'aaa', 222, 333, 444]
# 向 l 中增加可循环数据类型
l1 = [555, 6666]
l.extend(l1)
print(l) # [111, 'aaa', 222, 333, 444, 555, 6666]
l.extend("abc")
print(l) # [111, 'aaa', 222, 333, 444, 555, 6666, 'a', 'b', 'c']
删del、pop、remove、clear
l = [111, 222, 333,444,555,6666]
# 没有返回值按照索引删除
del l[0]
print(l) # [222, 333, 444, 555, 6666]
# 有返回值按照索引删除
res = l.pop()
print(res) # 6666
l.pop(0)
print(l) # [333, 444, 555]
# 没有返回值按照内容删除
l.remove(444)
print(l) # [333, 555]
改reverse、sort
l = [111, 222, 333]
l[0] = "aaa"
print(l) # ['aaa', 222, 333]
# 将顺序倒过来
l.reverse()
print(l) # [333, 222, 111]
l1 = [111,4444,333]
# 进行排序
l1.sort() # 默认从小到大
print(l1) # [111, 333, 4444]
l1.sort(reverse=True) # 这样可以从大到小
print(l1) # [4444, 333, 111]
查count、index
l = [111, 222, 333]
print(l) # [111, 222, 333]
# 也可以切片取值,和字符串一样
# 查询指定元素的重复次数
print(l.count(111)) # 1
# 查询指定元素的索引
# 查不到会报错
print(l.index(111)) # 0
元组
- 元组是一个“不可变的列表”
- 不可变只是元组中的元素的内存地址不能更改
- 若元组中有列表,则列表内的内容可以更改
t = (1,2,3)
# 可以进行切片操作
t1 = (1,2,[2,3])
t1[2][1] = 20 # 元组中的列表内的值可以更改
字典
- 以键值对的形式存取值,没有顺序
- 没有相同的 key
user = {"name": "codeFun", "age": 18, "hobbies": ["game", "basketball"]}
print(user["name"]) # 取值
print(user["hobbies"][0]) # 取列表中的值
users = [{"name": "codeFun", "age": 18, "hobbies": ["game", "basketball"]},
{"name": "codeSad", "age": 19, "hobbies": ["game", "basketball"]},
{"name": "codeHappy", "age": 20, "hobbies": ["game", "basketball"]}]
创建字典dict、fromkeys
# 方法一
dict1 = {"key1": "value1", "key2": "value2"}
# 方法二
dict2 = dict(key1="value1", key2="value2") # key 作为字符串不能加引号
print(dict2) # {'key1': 'value1', 'key2': 'value2'}
# 方法三
info = [["key1","value1"],["key2","value2"]]
dict3 = dict(info)
print(dict3) # {'key1': 'value1', 'key2': 'value2'}
# 方法四:快速初始化一个字典
keys = ["key1","key2"]
dict4 = {}.fromkeys(keys,None)
print(dict4) # {'key1': None, 'key2': None}
删除del、pop、popitems、clear
dict1 = {"key1": "value1", "key2": "value2", "key3": "value3", "key4": "value4"}
# 根据 key 进行删除,返回所对应的值
res = dict1.pop("key1")
print(res) # value1
# 随机删除键值对,并返回键值对元组
res = dict1.popitem()
print(res) # ('key4', 'value4')
del dict1["key2"]
print(dict1) # {'key3': 'value3'}
dict1.clear() # 全部清除
增和改update、setdefault
- 方法相同,没有指定的 key 就增加,有就覆盖
dict2 = {"key1": "value1", "key2": "value2"}
dict2["aaa"] = "bbbb"
print(dict2) # {'key1': 'value1', 'key2': 'value2', 'aaa': 'bbbb'}
dict2["key1"] = "hello"
print(dict2) # {'key1': 'hello', 'key2': 'value2', 'aaa': 'bbbb'}
# 拼接两个字典
dict1 = {"key1": "value1", "key2": "value2"}
dict1.update({"key2":"code","key3":"value3"})
print(dict1) # {'key1': 'value1', 'key2': 'code', 'key3': 'value3'}
# 有选择的进行增加,当原字典中没有要添加的 key 时
# 就增加,然后返回增加的 value;当原字典有要添加的
# key 时,对原字典不做任何更改,并返回原字典对应的 value
dict1 = {"key1": "value1", "key2": "value2"}
res = dict1.setdefault("key1","value2")
print(res) # value1
print(dict1) # {'key1': 'value1', 'key2': 'value2'}
res = dict1.setdefault("key3","value3")
print(res) # value3
print(dict1) # {'key1': 'value1', 'key2': 'value2', 'key3': 'value3'}
查keys、values、get
dict1 = {"key1": "value1", "key2": "value2", "key3": "value3", "key4": "value4"}
# 可以直接将结果放到列表中
print(list(dict1.keys()))
print(list(dict1.values()))
print(list(dict1.items()))
for i in dict1.keys():
print(i)
# key1
# key2
# key3
# key4
for i in dict1.values():
print(i)
# value1
# value2
# value3
# value4
for i in dict1.items():
print(i)
# ('key1', 'value1')
# ('key2', 'value2')
# ('key3', 'value3')
# ('key4', 'value4')
dict1 = {"key1": "value1", "key2": "value2"}
res = dict1.get("key1")
print(res) # value1
res2 = dict1.get("key3")
print(res2) # None
集合
定义
- 在
{}
内用逗号分隔开多个元素,元素的条件:- 必须为不可变类型
- 元素无序
- 元素没有重复
- 作用:去重和关系运算
set1 = {1,2,3}
set1 = set({1,2,3})
# 定义空集合
set3 = set()
关系运算
s1 = {1, 2, 3, 4}
s2 = {3, 4, 5, 6}
s3 = {1, 2}
s4 = {1, 2, 3, 4}
# 取交集
print(s1 & s2) # {3, 4}
# 取并集
print(s1 | s2) # {1, 2, 3, 4, 5, 6}
# 取差集
# s1 和 s2 的差集
print(s1 - s2) # {1, 2}
# s2 和 s1 的差集
print(s2 - s1) # {5, 6}
# 取补集
print(s1 ^ s2) # {1, 2, 5, 6}
# 父子集
print(s1 > s3) # True s3 是 s1 的儿子
print(s1 == s4) # True 互为父子
去重
- 只能是不可变类型,且不能保证顺序一致
l = {1, 1, 2, 4, 5, 4}
l = list(set(l))
print(l) # [1, 2, 4, 5]
删除remove、discard、pop
s = {1,2,3}
s.remove(2) # 删除没有的元素时会报错
print(s)
res = s.pop() # 从第一项开始删除,若删除空集合则报错
print(res)
s.discard(1) # 有就删除,没有也不报错
print(s)
增加add、update
s = {1,2,3}
s.update({4,5,6})
print(s) # {1, 2, 3, 4, 5, 6}
s.add(7)
print(s) # {1, 2, 3, 4, 5, 6, 7}
布尔值
# 显示布尔值
True # 关键字
False # 关键字
# 隐式布尔值
如 None 0 空(空字符串、空列表、空字典)代表的布尔值为 False
其余的都为 True
直接引用和间接引用
a = 10 # 直接引用 10 这个值
b = 10 # 也是直接引用 10 这个值
l = [2,a] # 间接引用 10 这个值
## 综上:直接引用和间接引用的区别就是间接引用将值放到了容器里,
# 这个容器可以是列表、字典、元组
浅拷贝和深拷贝
浅拷贝
l1 = ["a", "b", [1, 2]]
l2 = l1 # 表示 l2 和 l1 共同指向最外层列表的内存地址
l[0] = "e"
print(l2) # ['e', 'b', [1, 2]]
# 所以当 l2 列表中的数据改变后,l1 的也会改变
# 浅拷贝
l3 = l2.copy() # 表示 l3 指向新的最外层列表的地址,
# 表里的数据还是原来的数据(即还是指向原来的内存地址,此时数据只有一份)
l3[1] = "f" # l3[1] 所指向的数据更换为 f,然而 l2[1] 还是 b
l3[2][1] = 8 # l3[2] 所指向的内存地址为一个列表,所以更改列表内的内容
# 会连着 l2[2][1] 的内容一起改变
print(l2) # ['e', 'b', [1, 8]]
print(l3) # ['e', 'f', [1, 8]]
深拷贝
# 对可变和不可变类型做了不同的处理
import copy # 导入 copy 模块
l4 = copy.deepcopy(l3)
l4[2][1] = 10
print(l3)
print(l4)
# 从上面的代码可以看出,深拷贝后的列表,又创建了一个新的内列表
输入和输出
输入
# 接收用户的输入
# 在 python3 中,input 会将用户输入的所有内容转存成字符串
username = input("请输入您的昵称:")
print(username)
print 输出
print("hello", end="")
print(" world")
# hello world
world = "world"
print("hello", world)
# hello world
print("hello\n", end="")
print(world)
# hello
# world
字符串格式化输出
%s
# 值按照位置与%s一一对应,少一个不行,多一个也不行
# %s可以接收任意类型
res="my name is %s my age is %s" %('egon',"18")
print(res)
# 以字典的形式传值,打破位置的限制
res="我的名字是 %(name)s 我的年龄是 %(age)s" %{"age":"18","name":'egon'}
print(res)
str.format
# 兼容性好
# 按照位置传值
res='我的名字是 {} 我的年龄是 {}'.format('egon',18)
print(res)
res='我的名字是 {0}{0}{0} 我的年龄是 {1}{1}'.format('egon',18)
print(res)
# 打破位置的限制,按照key=value传值
res="我的名字是 {name} 我的年龄是 {age}".format(age=18,name='egon')
print(res)
# 不够的进行补充
str1 = '{x:=<5}'.format(x='abc')
print(str1) # abc==
# 保留固定小数位
double = '保留三位小数:{x:.3f}'.format(x=3.55555)
print(double) # 保留三位小数:3.556
f''
- python 3.5 以后才有
x = input('your name: ')
y = input('your age: ')
res = f'我的名字是{x} 我的年龄是{y}'
print(res)
# 在 {} 内进行运算
result = f'输出的结果为:{10 + 3}'
print(result) # 输出的结果为:13
运算符
print(10 / 3) # 结果带小数
print(10 // 3) # 只保留整数部分
print(10 % 3) # 取模、取余数
交换赋值
m = 10; n = 20;
tmp = m;m = n;n = tmp;
print(m);print(n)
# 方法二
m,n = n,m
解压赋值
nums = [111, 222, 333]
# 全部取出来
num0, num1, num2 = nums
print(num0, num1, num2) # 111 222 333
# 仅取出前两个
x, y, *_ = nums
print(x, y) # 111 222
# 取后两个
*_,m,n = nums
# 解压字典,解压出来的是 key
code = {"name": "codeFun", "age": 18}
key1, key2 = code
print(key1, key2) # name age
成员运算符in
print("hello" in "hello world") # True
print(111 in [111,222]) # True
# 字典判断 key
print("key1" in {"key1":111,"key2":222}) # True
print("key1" not in {"key1":111,"key2":222}) # False
流程控制
判断
if 条件:
代码块
elif 条件:
代码块
else:
代码块
score = input("请输入你的分数:")
score = int(score)
if score > 90:
print("优秀")
elif score > 70:
print("良好")
elif score > 60:
print("及格")
else:
print("不及格")
while 循环
while 条件:
代码块
# 反复登录
name = 'codeFun'
password = '123'
tag = True
while tag:
inp_name = input("请输入用户名:")
inp_password = input("请输入密码:")
if inp_name == name and inp_password == password:
print("登录成功")
tag = False
else:
print("登录失败,请重新输入")
print("=============END=================")
print("欢迎来到代码帝国")
- break
- 打断这次循环,在循环体中,break 往下的代码都不会执行,也不会进入下一次循环
- break 只会结束本层循环,如果嵌套多个循环,就需要多个 break
# 反复登录 加上break
name = 'codeFun'
password = '123'
while tag:
inp_name = input("请输入用户名:")
inp_password = input("请输入密码:")
if inp_name == name and inp_password == password:
print("登录成功")
break
else:
print("登录失败,请重新输入")
print("=============END=================") # 当登成功时,这行代码不会执行
print("欢迎来到代码帝国")
- continue
- continue 以下的代码不会运行,直接进入下一次循环
# 不打印 4
num = 0
while num < 6:
if num == 4:
num = num + 1
continue
print(num)
num = num + 1
- else
- while 循环结束时才会运行,但是当遇到 break 时,不会运行;
while 条件:
代码块
else:
代码块
# 登录超过三次,提示
# 登录成功并退出时,不提示
name = 'codeFun'
password = '123'
count = 0
while count < 3:
inp_name = input("请输入用户名:")
inp_password = input("请输入密码:")
if inp_name == name and inp_password == password:
print("登录成功")
while True:
cmd = input("localhost #")
if cmd == "q":
break
break
else:
count = count + 1
print("登录失败,请重新输入")
print("=============END=================") # 当登成功时,这行代码不会执行
else:
print("您已经输入超过三次")
for 循环
for i in LIST:
print(i)
代码块
for i in [1,2,3,4]:
print(i)
- range
range(10):
依次取出 0~9range(1,10):
依次取出1~9range(0,10,2):
依次取出 0、2、4、6、8 【最后面的 2 是步长】
for i in range(0,10,2):
print(i)
结果:
0
2
4
6
8
字符编码
- 字符编码解码过程:根据我们存入硬盘的字符编码,Unicode 在内存中就会根据该字符编码来解析文件中的文字
- 比如:我们在文件中写入中文字符,保存时使用 gbk 编码;当我们读取文件时,Unicode 就会根据 gbk 来解析文件中的文字。如果文件中有日文就会出现乱码。
- 常见的编码格式
- utf-8 编码是万国字符
- gbk 支持中文和英文
- shift-jis 日文和英文
- ASCII 英文编码
- python3 默认使用的是 utf-8 编码,作为万国字符,只要使用 utf8 编写 .py 文件就不会出现编码问题。
- python2 默认使用的时 ASCII 编码,不能解析中文文字,所以就要更改其默认的编码格式,方法如下:
- 在文件的首行加入
# coding:utf-8
- 如果想更换 python 解释器的默认编码,也可以使用上述方法
- 在文件的首行加入
- pycharm 控制台默认使用 utf-8 编码,Windows cmd 使用的时 gbk 编码。所以当我们使用
print()
函数向控制台输出信息时,如果编码格式不是 utf8 就会报错,但是 python3 不会,因为utf-8 是万国编码,python2 就会- 在定义字符串时加上
u"文本内容"
就行了
- 在定义字符串时加上
文件
什么是文件
- 文件是操作系统提供给用户/应用程序操作硬盘的一种虚拟的概念/接口
文件的打开
- 文件的打开使用函数
open()
,它需要传入三个参数,分别是- 文件路径
- 对文件的操作
- 读写的模式:
a w r
- 以什么形式打开:
t(文本) b(二进制)
- 读写的模式:
- 编码格式:
utf-8
文件的操作流程
# 普通方式
f = open("a.txt",mode="rt",encoding="utf-8")
read = f.read()
print(read) # 哈哈哈哈
f.close() # 需要手动关闭文件
# with 方式
# 运行完子代码块,自动关闭
with open("a.txt",mode="rt",encoding="utf-8") as f1:
res = f1.read()
print(res) # 哈哈哈哈
只读模式mode="rt"
- 只读模式,当文件不存在时报错,当文件存在时文件指针跳到开始位置
- 当调用
read
函数时,会一次性读完整个文件
with open("a.txt",mode="rt",encoding="utf-8") as f1:
res = f1.read()
print(res) # 哈哈哈哈
res2 = f1.read() #
print(res2) # 没有内容,因为第一次读的时候指针已经到文本的最后了
- 案例一:只有一行
inp_name = input("请输入您的用户名:")
inp_passwd = input("请输入您的密码:")
with open("a.txt",mode="rt",encoding="utf-8") as f1:
name,passwd = f1.read().split(":")
if name == inp_name and passwd == inp_passwd:
print("登录成功!")
else:
print("登录失败!")
- 案例二:有多行
inp_name = input("请输入您的用户名:")
inp_passwd = input("请输入您的密码:")
with open("a.txt",mode="rt",encoding="utf-8") as f1:
for line in f1:
name,passwd = line.strip("\n").split(":")
if name == inp_name and passwd == inp_passwd:
print("登录成功!")
break
else:
print("登录失败!")
只写模式mode=wt
- 只写模式,当文件不存在时创建空文件,当文件存在会清空文件,指针位于开始位置
- 在以 w 模式打开文件没有关闭的情况下,连续写入,新的内容总是跟在旧的之后
with open("b.txt",mode="wt",encoding="utf-8") as f:
f.write("hello word")
f.write("hello codefun") # 文件内容是 hello codefun
src_path = input("请输入源文件路径 >>> ").strip()
dst_path = input("请输入目的文件路径 >>> ").strip()
with open(r"{}".format(src_path), mode="rt", encoding="utf-8") as src, \
open(r"{}".format(dst_path), mode="wt", encoding="utf-8") as dst:
dst.write(src.read())
追加写mode=at
- 只追加写,在文件不存在时会创建空文档,在文件存在时文件指针会直接调到末尾
with open("a.txt",mode="at",encoding="utf-8") as f:
f.write("hello world\n")
f.write("hello codeFun\n")
万能读/写模式mode=ab、mode=wb、mode=rb
-
当
mode=b
时,一定不能指定encoding
参数,因为 b 模式不进行编码解码 -
经常用于非文本操作,比如图片视频的拷贝
-
在 b 模式下的
read()
函数可以添加字节参数,比如read(1024)
就为一次读写 1024 个字节,默认全部都读
"hello".encode("utf-8") == bytes("hello",encoding="utf-8")
# 使用万能模式进行文本读写
with open("a.txt",mode="wb") as write_f:
# 传入文本时需要进行编码,使用函数 encode
write_f.write("hello world".encode("utf-8"))
with open("a.txt",mode="rb")as read_f:
# 读文本时需要进行解码,使用函数 decode
res = read_f.read().decode("utf-8")
print(res, type(res))
# 文件拷贝工具
src_file=input('源文件路径>>: ').strip()
dst_file=input('源文件路径>>: ').strip()
with open(r'{}'.format(src_file),mode='rb') as f1,\
open(r'{}'.format(dst_file),mode='wb') as f2:
# res=f1.read() # 内存占用过大
# f2.write(res)
# 使用一行一行的读写
for line in f1:
f2.write(line)
文件的其他函数
readline():
一次读一行readlines():
将每行内容写入列表中writelines(LIST):
将列表中的元素写入文件中flush():
刷新内存,不建议使用,影响性能readable()
:返回布尔值,判断文件是否可读writable()
:返回布尔值,判断文件是否可写f.encoding
:返回文件的编码格式f.name
:返回文件的名字f.closed()
: 返回布尔值,判断文件是否关闭
指针的移动
with open("a.txt",mode="rb")as read_f:
read_f.seek(4,0) # 0 参照物是文件开头位置,并把指针放往右移动四个位置
# read_f.seek(4,1) # c参照物是指针的当前位置,并把指针放往右移动四个位置
# read_f.seek(-4,2) # c参照物是文件的末尾,并把指针放往左移动四个位置
print(read_f.tell()) # 4 获取指针的位置
res = read_f.read().decode("utf-8") # 从第五个字节开始读
print(res) # 56789
# 实现 tail -f
import time
with open("a.txt",mode="rb") as f:
# res = f.read().decode("utf-8") # 不建议使用,如果文件过大将消耗内存过多
f.seek(0,2)
while True:
newline = f.readline()
if newline:
print(newline.decode("utf-8"))
time.sleep(1)
文件修改的两种方式
# 方法一:在内存中建立副本
with open("a.txt",mode="rt",encoding="utf-8") as f:
file = f.read().replace("world","codeFun")
with open("a.txt",mode="wt",encoding="utf-8") as f1:
f1.write(file)
# 方法二:在硬盘中建立副本
import os
with open("a.txt",mode="rt",encoding="utf-8") as f,\
open("a.txt.bak",mode="wt",encoding="utf-8") as f1:
file = f.read().replace("codeFun","world")
f1.write(file)
os.remove("a.txt")
os.rename("a.txt.bak","a.txt")
迭代器和生成器
什么是迭代器
- 迭代是一个重复的过程,每次重复都是基于上一次的结果而继续,单纯的重复不是迭代
- 迭代器是为了提供一种
不依赖索引取值
的方案
迭代器相关概念
-
可迭代对象:能够调用
__iter__()
函数的对象称可迭代对象- 列表、元组、字典、字符串、文件等都是可迭代对象
-
迭代器对象:
l_iter
称为迭代对象- 能够调用
__next__()
函数的对象称迭代器对象 - 文件对象是迭代器对象
l = [1, 2, 3, 4] l_iter = l.__iter__()
- 能够调用
使用迭代器
- 当将迭代器中的元素全部取出来后,不能二次再取
l = [1, 2, 3, 4]
l_iter = l.__iter__()
while True:
try:
num = l_iter.__next__()
print(num)
except StopIteration: # 因为当取不到值时会报 StopIteration 错误
break
生成器
- 生成器就是一个自定义的迭代器
# 如何得到自定义的迭代器:
# 在函数内一旦存在yield关键字,调用函数并不会执行函数体代码
# 会返回一个生成器对象,生成器即自定义的迭代器
def func():
yield 1
yield 2
yield 3
res = func()
print(res.__next__()) # 1
def func():
print("start ...")
num = 0
while True:
print(num)
send_value = yield num # 初始化停在此处,每次调用完都会复位到这个位置
num += 1
print(send_value)
func = func()
return_value = func.send(None) # 初始化
print(return_value,"初始化")
# start ...
# 0
# 0 初始化
return_value = func.send("执行了第一次")
print(return_value,"第一次")
# 执行了第一次
# 1
# 1 第一次
return_value = func.send("执行了第二次")
print(return_value,"第二次")
# 执行了第二次
# 2
# 2 第二次
三目运算符
num1, num2 = 5, 3
if num1 > num2:
print("num1 大")
else:
print("num2 大")
# 将上述代码用三目运算符实现
result = "True返回的内容" if 条件 else "False返回的内容"
print("num1 大" if num1 > num2 else "num2 大")
生成式
# 列表生成式
# old_list 中符合条件的元素会被装进 new_list 中
new_list = [元素 for 元素 in old_list if 条件]
nums = [1, 2, 3, 4, 5]
new_nums = []
for num in nums:
if num > 3:
new_nums.append(num)
print(new_nums) # [4, 5]
nums = [ num for num in nums if num > 3 ]
print(nums) # [4, 5]
# 字典生成式
keys=['name','age','gender']
dic={key:None for key in keys}
print(dic) # {'name': None, 'age': None, 'gender': None}
items=[('name','egon'),('age',18),('gender','male')]
res={k:v for k,v in items if k != 'gender'}
print(res) # {'name': 'egon', 'age': 18}
# 集合生成式
keys = ['name', 'age', 'gender']
set1 = {key for key in keys}
print(set1, type(set1)) # {'name', 'age', 'gender'} <class 'set'>
# 生成器表达式
g=(i for i in range(10) if i > 3) # 此时 g 中什么都没有
g.next() # 调用一个生成一个
# 生成器表达式的应用:统计文本中的字数
with open('笔记.txt', mode='rt', encoding='utf-8') as f:
# 方式一:
res=0
for line in f:
res+=len(line)
print(res)
# 方式二:
res=sum([len(line) for line in f])
print(res)
# 方式三 :效率最高
res = sum((len(line) for line in f))
上述可以简写为如下形式
res = sum(len(line) for line in f)
print(res)
面相对象编程
函数基础
函数的定义
- 定义函数发生的事情
- 申请内存空间保存函数体代码
- 将上述内存地址绑定函数名
- 定义函数不会执行函数体代码,但是会检测函数体语法
def 函数名([参数列表]):
""" 注释 """
代码块
[return [返回值列表]]
函数的调用
# 调用上面定义的函数
函数名([参数列表])
函数分类
- 有参构造
- 无参构造
- 空函数
- 用来构思项目的整体构造
# 空函数
def func():
pass
有返回值的函数
# 当返回值是多个时,会把多个值封装成一个元组,返回给调用者
def mode():
return 1,2,3
res = mode()
print(res) # (1, 2, 3)
函数的参数
位置参数
- 根据位置来决定参数的变量名
def mode(x,y):
print("x = {},y = {}".format(x,y))
mode(1,2) # x = 1,y = 2
关键字参数
def mode(x, y):
print("x = {},y = {}".format(x, y))
# 关键字函数将不受顺序的限制
mode(y=3, x=2) # x = 2,y = 3
def mode(x, y):
print("x = {},y = {}".format(x, y))
# 也可以一半位置参数,一半关键字参数
mode(3, y=2) # x = 3,y = 2
默认参数
def mode(x, y = 8):
print("x = {},y = {}".format(x, y))
# 可以不给默认参数赋值(将使用默认参数),如果赋值则使用赋值的
mode(3) # x = 3,y = 8
可变长度的参数
def mode(x, *args): # 也可以是 *x 但是约定成 args
print("x = {},y = {}".format(x, args))
# 多余的参数变成元组
mode(3,4,5,6) # x = 3,y = (4, 5, 6)
def mode(x, *args):
print("x = {},y = {}".format(x, args))
# 先把 [] 内的分开然后分别赋值给形参,但是形参就两个
# 只能将第一个赋给 x,剩下的变成元组
mode(*[4,5,6]) # x = 4,y = (5, 6)
def mode(x, **kwargs):
print("x = {},y = {}".format(x, kwargs))
# 和第一个差不多,只是变成了字典
mode(x = 5,y = 6, z = 7) # x = 5,y = {'y': 6, 'z': 7}
def mode(x, **kwargs):
print("x = {},y = {}".format(x, kwargs))
# 和第二个差不多,只是变成了字典
mode(**{"x": 5, "y": 6, "z": 7}) # x = 5,y = {'y': 6, 'z': 7}
命名形参
- 在定义函数时,* 后面定义的参数,必须要以 key = value 传值
def mode(x, *,y):
print("x = {},y = {}".format(x, y))
mode(1,y= 3) # x = 1,y = 3
上述各种参数的组合使用
- 顺序:位置新参,默认形参,*args,命名关键字形参,**kwargs
def func(x,y=111,*args,z,**kwargs):
print(x,y,z,args,kwargs)
func(10,*[1,2,3],**{"d":4,"e":5},z="14") # 10 1 14 (2, 3) {'d': 4, 'e': 5}
函数的命名空间
- 内置命名空间
- 全局命名空间
- 局部命名空间
在局部中定义全局变量global
name = "code"
def func():
global name
name = "codeFun"
# func() # 若不调用函数,则不起作用
print(name) # code
函数传参的两种方式
方式一
def func(name):
print(name)
func("codeFun") # codeFun
方式二
# 闭包传参
def func():
def inner(name):
print(name)
return inner
name = func()
name("codeFun") # 传参
print(func,type(func())) # <function func at 0x0000025390A43E20> <class 'function'>
装饰器
装饰器手动版
""""
1. 在不改变原有代码的基础上,对原函数添加新功能 -- 本例添加了计算运行时间功能
2. 将添加的新功能不在局限于一个原函数,就要向装饰器中添加新参数(原函数的名字)
3. 但是新参数不能破坏装饰器的参数列表,所以就要使用其他方式进行传参 -- 闭包传参
4. 对于原函数的返回值,就要在 wrapper 中返回出来
"""
import time
def index(x, y):
time.sleep(1)
print("index {} {}".format(x, y))
return "index"
def foo(x,y,z):
time.sleep(1)
print("foo {} {} {}".format(x,y,z))
def outer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
stop = time.time()
print(stop - start)
return res
return wrapper
index = outer(index)
res = index(3, 5) # index 3 5
print(res) # index
foo = outer(foo)
foo(1,2,3) # foo 1 2 3
语法糖装饰器
""""
1. 装饰器必须在原函数的上方定义
2. 使用方式就是在原函数上方添加 @装饰器名
3. 使用语法糖的原函数可以直接调用,不用先调装饰器然后再进行赋值了
"""
import time
def outer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
stop = time.time()
print(stop - start)
return res
return wrapper
@outer
def index(x, y):
time.sleep(1)
print("index {} {}".format(x, y))
return "index"
@outer
def foo(x, y, z):
time.sleep(1)
print("foo {} {} {}".format(x, y, z))
# index = outer(index)
# res = index(3, 5) # index 3 5
# print(res)
#
# foo = outer(foo)
# foo(1,2,3) # foo 1 2 3
index(1, 2) # index 1 2 1.0047883987426758
伪装的更像
def outter(func):
@wraps(func) # 加上这个参数就可以使 wrapper 函数的函数信息和原函数一样
def wrapper(*args, **kwargs):
"""这个是主页功能"""
res = func(*args, **kwargs) # res=index(1,2)
return res
# 手动将原函数的属性赋值给wrapper函数
# 1、函数wrapper.__name__ = 原函数.__name__
# 2、函数wrapper.__doc__ = 原函数.__doc__
# wrapper.__name__ = func.__name__
# wrapper.__doc__ = func.__doc__
无参数装饰器的模板
def function(func):
@wraps(func)
def wrapper(*args, **kwargs):
""""需要添加的功能"""
return_value = func(*args, **kwargs) # 原函数名
return return_value
return wrapper
@function
def primary_function():
pass
有参数的装饰器和有参数的语法糖
# 模板
from functools import wraps
def function(parameter1, parameter2):
def decode(func):
@wraps(func)
def wrapper(*args, **kwargs):
""""需要添加的功能"""
return_value = func(*args, **kwargs) # 原函数名
return return_value
return wrapper
return decode
@function("parameter1", "parameter2")
def primary_function():
pass
""""
1. 向一层装饰器传参,实现用户认证功能
2. 只能通过将一层装饰器进行闭包,通过二层装饰器向一层装饰器传参
"""
def auth(auth_method):
def outer(func):
def wrapper(*args, **kwargs):
name = input("your name >>>")
password = input("your password >>>")
if auth_method == "file":
print("file auth")
if name == "codeFun" and password == "123":
func(*args, **kwargs)
elif auth_method == "db":
print("db auth")
if name == "codeFun" and password == "123":
func(*args, **kwargs)
return wrapper
return outer
@auth("file")
def index(x, y):
print("index {} {}".format(x, y))
@auth("db")
def foo(x, y, z):
print("foo {} {} {}".format(x, y, z))
index(1, 2)
foo(1, 2, 3)
多个语法糖的执行顺序
from functools import wraps
def function1(x):
def decode(func):
@wraps(func)
def wrapper(*args, **kwargs):
print("function1 开始运行")
return_value = func(*args, **kwargs) # 原函数名
print("function1 结束运行")
return return_value
return wrapper
return decode
def function2(x):
def decode2(func):
@wraps(func)
def wrapper2(*args, **kwargs):
print("function2 开始运行")
return_value = func(*args, **kwargs) # 原函数名
print("function2 结束运行")
return return_value
return wrapper2
return decode2
def function3(x):
def decode3(func):
@wraps(func)
def wrapper3(*args, **kwargs):
print("function3 开始运行")
return_value = func(*args, **kwargs) # 原函数名
print("function3 结束运行")
return return_value
return wrapper3
return decode3
@function1(1)
@function2(1)
@function3(1)
def index(x, y):
print("index {} {}".format(x, y))
index(x=1, y=2)
# 执行顺序
# function1 开始运行
# function2 开始运行
# function3 开始运行
# index 1 2
# function3 结束运行
# function2 结束运行
# function1 结束运行
递归
l=[1,2,[3,[4,[5,[6,[7,[8,[9,10,11,[12,[13,]]]]]]]]]]
def f1(list1):
for x in list1:
if type(x) is list:
# 如果是列表,应该再循环、再判断,即重新运行本身的代码
f1(x)
else:
print(x)
f1(l)
匿名函数
- 匿名用于临时调用一次的场景,更多的是将匿名与其他函数配合使用
匿名函数的定义和调用
# 定义
lambda 参数列表: 返回值
lambda x,y: [x,y]
# 调用(基本不用)
res = (lambda x,y: [x,y] )(1,2)
print(res) # [1, 2]
匿名函数的应用
max(iterable[,funciton])
函数
- 用于对比可迭代对象的大小
# 当只传入可迭代对象时,按照可迭代对象内的元素为对比依据,
# 然后返回可迭代对象的最大的元素
l = [1,2,3,4,5,6]
print(max(l)) # 6
# 当传入的值有可迭代对象和函数时,可迭代对象作为输出对象,函数作为对比依据
# 返回最大对比依据所对应的可迭代对象
# 函数max会迭代字典salaries,每取出一个“人名”就会当做参数传给指定的匿名函数,然后将匿名函数的返回值当做比较依据,最终返回薪资最高的那个人的名字
salaries = {
'siy': 3000,
'tom': 7000,
'lili': 10000,
'jack': 2000
}
result = max(salaries, key=lambda k: salaries[k])
print(result) # lili
min(iterable[,funciton])
函数同理
sorted
排序
# 按照工资进行排序
salaries = {
'siy': 3000,
'tom': 7000,
'lili': 10000,
'jack': 2000
}
res1 = sorted(salaries, key=lambda k: salaries[k])
res2 = sorted(salaries, key=lambda k: salaries[k], reverse=True)
print(res1) # ['jack', 'siy', 'tom', 'lili'] 薪资从小到大
print(res2) # ['lili', 'tom', 'siy', 'jack'] 薪资从大到小
map(function,iterable)
# 将下方列表中的每个元素都加上 _dsb 后缀
l = ['alex', 'lxx', 'wxx', 'xdd']
new_l = (name + '_dsb' for name in l) # 生成器
print(new_l)
res = map(lambda name: name + '_dsb', l)
print(res) # 生成器
print(res.__next__()) # alex_dsb
模块
模块基础
模块的定义
- 模块就是功能的集合体,主要分为三类:
- 内置模块
- 第三方模块
- 自定义模块
- 一个 .py 文件就是一个模块
- 一个文件夹也可以封装成一个模块,暴露入口
__init__.py
使用模块
- 创建 .py 文件
- 使用
import 文件名
导入模块- 起别名:
import 文件名 as 别名
- 导入模块的规范:顺序为 内置在最上面,自定义模块在最下面,第三方模块在中间
- 起别名:
- 使用
模块名.xxx
调用模块内容
import mode
# 模块开始导入 【运行模块内的代码】
print(mode.file_name) # mode.py
mode.func() # 模块内的函数
# mode.py
print("模块开始导入")
file_name = "mode.py"
def func():
print("模块内的函数")
首次导入模块发生的事
- 运行模块内的代码
- 产生命名空间并指向它
区分 .py 文件是被导入还是被执行
__name__
当文件被导入时返回文件名__name__
当文件被执行时返回__main__
# mode.py
if __name__ == '__main__':
print("模块被执行了")
else:
print("模块被导入了")
使用from... import...
导入
- 可以将模块文件中的函数和变量分别导入,使其变成本地的变量
from mode import file_name # 即 "mode.py" 被两个指针指着
from mode import get
from mode import change
print(file_name) # mode.py
change("main.py")
print(file_name) # mode.py
file_name = "new.py"
print(file_name) # new.py
get() # main.py
模块导入的优先级
- 内存
- 硬盘
导入模块的路径
- 查看路径
# python 就是按照这个路径进行模块的查找
import sys
print(sys.path)
['E:\\development\\python',
'E:\\development\\python',
'D:\\app\\PyCharm 2022.3.3\\plugins\\python\\helpers\\pycharm_display', 'D:\\app\\python310\\python310.zip', 'D:\\app\\python310\\DLLs', 'D:\\app\\python310\\lib', 'D:\\app\\python310',
'E:\\development\\python\\venv',
'E:\\development\\python\\venv\\lib\\site-packages',
'D:\\app\\PyCharm 2022.3.3\\plugins\\python\\helpers\\pycharm_matplotlib_backend']
- 导入路径
import sys
# 导入的是所在的文件夹
sys.path.append(r"E:\development\python\mode_dir")
import mode
包的导入
- 在创建包时,会默认在包下创建
__init__.py
文件 - 我们在导包时,会默认向
__init__.py
中导 - 所以我们要是想将包内其他文件能够让使用者找到就要向
__init__.py
中导入这些文件
# __init__.py
import packet_anme.file_name
包使用案例
整体目录结构
ATM
|____ bin
|____ start.py
|____ conf
|____ setting.py
|____ core
|____ src.py
|____ lib
|____ common.py
|____ log
|____ user.log
各文件代码的编写*
# start.py
import os
import sys
# 获取 ATM 的绝对路径
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
# 将路径加入到系统环境变量中
sys.path.append(BASE_DIR)
# 导入相应的模块
from core import src
# 执行模块的程序
src.run()
# setting.py
# 将日志文件写活
import os
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
LOG_PATH = "{}\\log\\user.log".format(BASE_DIR)
# 在执行 start.py 文件时,环境变量已经添加完了,这里直接导包即可
from lib import common
def login():
print("登录成功")
common.log("登录信息")
def check_balance():
print("账户余额")
common.log("余额信息")
def tran():
print("转账成功")
common.log("转账信息")
def run():
bank_list = {
"0": ["退出", None],
"1": ["登录", login],
"2": ["余额", check_balance],
"3": ["转账", tran]
}
while True:
for i in bank_list:
print(i, bank_list[i][0])
num = input("请输入相应的数组进行操作:").strip()
if num.isdigit() is not True:
print("请输入数字")
continue
if num == "0":
break
bank_list[num][1]()
# common.py
# 以下代码经常被多个程序使用所以放到这里以供共同使用
import time
from conf import setting
def log(message):
with open(setting.LOG_PATH, mode="at", encoding="utf-8") as f:
f.write("{} {}\n".format(time.time(), message))
# user.log 自动生成
时间模块
- python 将时间分为三种
时间戳
- 从 1979 年到现在的秒数
- 作用:用于时间间隔的计算
import time
time.time() # 获取1979 年到现在的秒数
time.sleep() # 让程序睡眠多少秒
格式化的时间
- 按照某种格式进行显示时间
import time
format_time = time.strftime("%Y %m %d %H:%M:%S %p")
print(format_time) # 2023 04 02 10:59:54 AM
format_time = time.strftime("%Y %m %d %X")
print(format_time) # 2023 04 02 10:59:54
结构化的时间
- 用于单独获取时间的某一部分
import time
struct_time = time.localtime()
print(struct_time)
# 年 月 日 时 分 秒
# tm_year=2023, tm_mon=4, tm_mday=2, tm_hour=12, tm_min=5, tm_sec=39,
# 一周的第几天(从周天开始) 一年的第几天
# tm_wday=6, tm_yday=92, tm_isdst=0)
print(struct_time.tm_year) # 92
时间的加减
import datetime
# 支持 days=日 seconds=秒 microseconds=微秒
# milliseconds=毫秒 minutes= 分 hours=小时 weeks=周
print(datetime.datetime.now()) # 2023-04-02 14:30:42.689009
print(datetime.datetime.now() + datetime.timedelta(days=3))
# 2023-04-05 14:30:42.690516
时间格式的转换
- 结构化时间转时间戳
import time
struct_time = time.localtime()
print(struct_time)
stramp_time = time.mktime(struct_time)
print(stramp_time)
- 时间戳转换成结构化时间
import time
stramp_time = time.time()
struct_time = time.localtime(stramp_time)
print(struct_time)
- 结构化时间转换成格式化的字符串形式
import time
struct_time = time.localtime()
format_time = time.strftime('%Y-%m-%d %H:%M:%S',struct_time)
print(format_time) # 2023-04-02 14:47:22
- 将字符串转换为时间
import time
str_time = "2000-11-26 0:0:0"
struct_time = time.strptime(str_time,'%Y-%m-%d %H:%M:%S')
print(struct_time)
- 格式化字符串时间和时间戳互转(并将时间进行加减)
import time
str_time = "2000-11-26 0:0:0"
struct_time = time.strptime(str_time, '%Y-%m-%d %H:%M:%S')
stramp_time = time.mktime(struct_time) + 365 * 86400 * 23
print(stramp_time) # 1700496000.0
struct_time = time.localtime(stramp_time)
str_time = time.strftime('%Y-%m-%d %H:%M:%S', struct_time)
print(str_time) # 2023-11-21 00:00:00
随机数模块
# 生成随机小数
print(random.random()) # 只能生成 (0,1)之间的小数
print(random.uniform(3.5,4.5)) # 生成 (3.5,4.5)之间的小数
# 生成随机整数
print(random.randint(1, 2)) # 生成 [1,2] 的整数
print(random.randrange(1, 2)) # 生成 [1,2] 的整数
# 生成随机列表
print(random.sample([1, 2, 3, 4], 2)) # 随机从列表中选择两个组成新列表
# 将列表顺序打乱
items = [1, 2, 3, 4]
random.shuffle(items) # 将原来的列表进行打乱顺序然后将结果返回给原列表
print(items) # [1, 3, 4, 2]
# 将列表内容变成随机数
items = [1, 2, 3, 4,"aaa"]
print(random.choice(items))
import random
# 随机验证码
# 获得随机字母
chr_random = chr(random.randint(65, 90))
# 获取随机数字
num_random = str(random.randint(0, 9))
syno_code = ""
for i in range(4):
chr_random = chr(random.randint(65, 90))
num_random = str(random.randint(0, 9))
syno_code += random.choice([chr_random, num_random])
print(syno_code)
系统模块os
- 文件操作
import os
# 创建文件夹
os.mkdir("c.txt")
# 删除一个文件
os.remove("b.txt")
# 重命名文件/目录
# os.rename("old_name","new_name")
# 获取指定目录下的文件夹和文件 返回一个列表
listdir = os.listdir(".")
print(listdir) # ['.idea', 'a.txt', 'ATM', 'b.txt', 'main.py']
# 获取文件的大小
size = os.path.getsize("a.txt") # 单位字节
print(size) # 24
# 将文件路径按照文件和文件夹分割
path = "a/b/c.txt"
dir_path = os.path.dirname(path)
print(dir_path) # a/b
print(os.path.dirname(dir_path)) # a 可以继续向分割
file_path = os.path.basename(path)
print(file_path) # c.txt
- 万能操作
os.system("dir") # 里面跟相应的操作系统的指令
- 系统的环境变量
os_env = os.environ
print(os_env) # 以键值对的形式出现
# 添加时必须是字符串
os.environ['aaaaaaaaaa']='111' # 添加环境变量
序列化和反序列化
- 序列化:内存中的数据类型经过序列化可以转换成特定的格式(json pickle)
- 这种格式可以跨平台、语言,以及用于存档
- 反序列化:将特定格式经过反序列化转换成内存中的数据
- json 格式用于跨平台的交互
- pickle 格式用于存储
json 格式的应用
import json
# 序列化
python_list = [1,True,'aaa']
json_list = json.dumps(python_list) # 更改为相应的变量类型并存储为字符串
print(json_list,type(json_list)) # [1, true, "aaa"] <class 'str'>
# 反序列化
python_list = json.loads(json_list)
print(python_list) # [1, True, 'aaa']
with open("c.json", mode='wt', encoding='utf-8') as f:
json.dump(python_list, f)
with open("c.json",mode='rt',encoding='utf-8') as f:
res = json.load(f)
print(res)
猴子补丁
- 假设之前的开发都用的 json 模块,现在要都换成 ujson 模块,我们只需要在入口文件打入补丁即可
import json
import ujson
def monkey_patch_json():
json.__name__ = 'ujson'
json.dumps = ujson.dumps
json.loads = ujson.loads
monkey_patch_json() # 在入口文件出运行
pickle 模块和json一样
配置文件
# 需要办三步手续
import configparser
config = configparser.ConfigParser()
config.read("test.ini", encoding="utf-8") # 将想要读取的配置文件读入内存
# 根据指定的 section 和 key 获取 value
name = config.get("section1", "name") # 此外还有 getboolean、getint、getfloat
print(name) # "codeFun"
# 获取指定 section 下所有的键
section2_keys = config.options("section2")
print(section2_keys) # ['username', 'password']
# 获取指定 section 下所有的键值对
section2 = config.items("section2")
print(section2) # [('username', '"codeFun"'), ('password', '"123"')]
# test.ini 后缀名也可以是 cfg
# 这是注释
; 这也是注释
[section1] # 这是一部分,其中包括了各种项目
name = "codeFun"
is_admin = true
age = 18
[section2]
username = "codeFun"
password = "123"
哈希模块
- 文本或者文件经过 hash 算法的运算,可以得到一串 hash 值
- hash 值的特点:
- 同样内容的 hash 值是相同的
- 不能通过 hash 值进行反解成内容
- 不管传入的内容有多大,只要使用的 hash 算法不变,得到的 hash 值长度是一定的
- hash 的用途
- 用于密码密文传输与验证
- 用于文件的完整性校验
- 使用 hash 模块
import hashlib
md5 = hashlib.md5()
md5.update("hello world".encode("utf-8"))
hash_code = md5.hexdigest()
print(hash_code) # 5eb63bbbe01eeed093cb22bb8f5acdc3
md5 = hashlib.md5()
md5.update("hello ".encode("utf-8"))
md5.update("world".encode("utf-8"))
hash_code = md5.hexdigest() # 计算的是 'hello world' 的 hash 值
print(hash_code) # 5eb63bbbe01eeed093cb22bb8f5acdc3
子进程subprocess
import subprocess
obj=subprocess.Popen('echo 123 ; ls / ; ls /root',shell=True,
stdout=subprocess.PIPE, # 正确输出
stderr=subprocess.PIPE, # 错误输出
)
print(obj)
res=obj.stdout.read() # 获取正确输出
print(res.decode('utf-8'))
err_res=obj.stderr.read()
print(err_res.decode('utf-8'))
日志模块logging
日志级别
import logging
# 从上往下级别越来越大
logging.debug('调试 debug')
logging.info('消息 info')
logging.warning('警告 warning')
logging.error('错误 error')
logging.critical('严重错误 critical')
设置日志输出格式(一)
logging.basicConfig(
filename='access.log', # 不指定,默认打印到终端
# 2、日志格式
format='%(asctime)s - %(name)s - %(levelname)s -%(module)s: %(message)s',
# 2023-04-03 19:45:08 PM - root - DEBUG -main: ???? debug
# 3、时间格式
datefmt='%Y-%m-%d %H:%M:%S %p',
# 4、日志级别
# critical => 50
# error => 40
# warning => 30
# info => 20
# debug => 10
level=10,
)
设置日志输出格式(二)
- 执行顺序
- 向
getLogger('日志名称')
传入日志名称 - 然后
getLogger
会向config.dictConfig
传入的日志字典的键loggers
中寻找日志名称 - 若找到了,则根据相应的值进行配置,
- 若找不到则会匹配日志名字为空的配置
- 其中
handlers
还会向日志字典中键为handlers
再进行相应的配置 handlers
中的formatter
也会向日志字典中键为formatters
再进行相应的配置
- 向
# setting.py
"""
日志配置字典LOGGING_DIC
"""
# 1、定义三种日志输出格式,日志中可能用到的格式化串如下
# %(name)s Logger的名字
# %(levelno)s 数字形式的日志级别
# %(levelname)s 文本形式的日志级别
# %(pathname)s 调用日志输出函数的模块的完整路径名,可能没有
# %(filename)s 调用日志输出函数的模块的文件名
# %(module)s 调用日志输出函数的模块名
# %(funcName)s 调用日志输出函数的函数名
# %(lineno)d 调用日志输出函数的语句所在的代码行
# %(created)f 当前时间,用UNIX标准的表示时间的浮 点数表示
# %(relativeCreated)d 输出日志信息时的,自Logger创建以 来的毫秒数
# %(asctime)s 字符串形式的当前时间。默认格式是 “2003-07-08 16:49:45,896”。逗号后面的是毫秒
# %(thread)d 线程ID。可能没有
# %(threadName)s 线程名。可能没有
# %(process)d 进程ID。可能没有
# %(message)s用户输出的消息
# 2、强调:其中的%(name)s为getlogger时指定的名字
standard_format = '%(asctime)s - %(threadName)s:%(thread)d - 日志名字:%(name)s - %(filename)s:%(lineno)d -' \
'%(levelname)s - %(message)s'
# 2023-04-03 21:11:05,522 - MainThread:12168 - 日志名字:kkk - main.py:8 -INFO - 这里是 info
simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s'
test_format = '%(asctime)s] %(message)s'
# 3、日志配置字典
LOGGING_DIC = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': { # 可以任意定
'format': standard_format
},
'simple': {
'format': simple_format
},
'test': {
'format': test_format
},
},
'filters': {},
# handlers是日志的接收者,不同的handler会将日志输出到不同的位置
'handlers': {
# 打印到终端的日志
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler', # 打印到屏幕
'formatter': 'standard'
},
'default': {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler', # 保存到文件
# 'maxBytes': 1024*1024*5, # 日志大小 5M
'maxBytes': 1000,
******** 'backupCount': 5, # 日志的备份数量
'filename': 'a1.log', # os.path.join(os.path.dirname(os.path.dirname(__file__)),'log','a2.log')
'encoding': 'utf-8',
'formatter': 'standard',
},
# 打印到文件的日志,收集info及以上的日志
'other': {
'level': 'DEBUG',
'class': 'logging.FileHandler', # 保存到文件
'filename': 'a2.log', # os.path.join(os.path.dirname(os.path.dirname(__file__)),'log','a2.log')
'encoding': 'utf-8',
'formatter': 'test',
},
},
# loggers是日志的产生者,产生的日志会传递给handler然后控制输出
'loggers': {
# logging.getLogger(__name__)拿到的logger配置
'kkk': {
'handlers': ['console', 'other'], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
'level': 'DEBUG', # loggers(第一层日志级别关限制)--->handlers(第二层日志级别关卡限制)
'propagate': False, # 默认为True,向上(更高level的logger)传递,通常设置为False即可,否则会一份日志向上层层传递
},
'终端提示': {
'handlers': ['console', ], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
'level': 'DEBUG', # loggers(第一层日志级别关限制)--->handlers(第二层日志级别关卡限制)
'propagate': False, # 默认为True,向上(更高level的logger)传递,通常设置为False即可,否则会一份日志向上层层传递
},
'': {
'handlers': ['default', ], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
'level': 'DEBUG', # loggers(第一层日志级别关限制)--->handlers(第二层日志级别关卡限制)
'propagate': False, # 默认为True,向上(更高level的logger)传递,通常设置为False即可,否则会一份日志向上层层传递
},
},
}
其他内置函数
拉链函数zip()
# 将连个可迭代对象进行一一组合成元组,然后封装到迭代器中,多余的舍弃
l1 = [1, 2, 3, 4]
l2 = ['a', 'b', 'c', 'd', 'e']
iter_obj = zip(l1,l2)
print(list(iter_obj)) # [(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')]
取除数和余数divmod()
# 分别将除数和余数按顺序放到元组中
tuple_obj = divmod(10, 3)
print(tuple_obj) # (3, 1)
查看对象属性dir()
class Foo:
pass
foo = Foo()
list_obj = dir(foo)
print(list_obj)
['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__']
枚举emumerate()
# 自动生成序列
for k, v in enumerate(['a', 'b', 'c']):
print(k,v)
# 0 a
# 1 b
# 2 c
将字符串中的其他类型转换成其他类型eval()
res = eval('{"a":1,"b":True}')
print(res, type(res)) # {'a': 1, 'b': True} <class 'dict'>
类型判断isinstance()
class Foo:
pass
foo = Foo()
# foo 是 Foo 吗
print(isinstance(foo, Foo)) # True
print(isinstance(1, int)) # True
导入 “字符串” 模块__import__()
time = __import__('time')
ASCII 码字母互转
import random
print(chr(65)) # A
print(ord('A')) # 65
# 生成随机的字母
random_abc = chr(random.randint(65,90))
print(random_abc)
正则表达式
总表
模式 | 描述 |
---|---|
\w | 匹配字母数字及下划线 |
\W | 匹配非字母数字下划线 |
\s | 匹配任意空白字符,等价于[\t\n\r\f] |
\S | 匹配任意非空字符 |
\d | 匹配任意数字,等价于[0-9] |
\D | 匹配任意非数字 |
\A | 匹配字符串开始 |
\Z | 匹配字符串结束,如果是存在换行,只匹配到换行前的结束字符串 |
\z | 匹配字符串结束 |
\G | 匹配最后匹配完成的位置 |
\n | 匹配一个换行符 |
\t | 匹配一个制表符 |
^ | 匹配字符串的开头 |
$ | 匹配字符串的末尾。 |
. | 匹配任意字符,除了换行符,当re.DOTALL标记被指定时,则可以匹配包括换行符的任意字符。 |
[…] | 用来表示一组字符,单独列出:[amk]匹配‘a’,'m’或K |
[^…] | 不在中的字符:[^abc] 匹配除了a,b,c之外的字符。 |
* | 匹配0个或多个的表达式。 |
+ | 匹配1个或多个的表达式。 |
? | 匹配0个或1个由前面的正则表达式定义的片段,非贪婪方式 |
{n} | 精确匹配n个前面表达式。 |
{n,m} | 匹配n到m次由前面的正则表达式定义的片段,贪婪方式 |
a | b | 匹配a或b |
() | 匹配括号内的表达式,也表示一个组 |
分类记忆
- 排除异己系列:
\w \W
、\d \D
、\S \s
- 开始结束系列:
\A
、\z \Z
、^ $
- 任意系列:
. ? + * {n} {n,m}
- 判断系列:
[...] [^...] a|b ()
面向对象
什么是面相对象
- 核心就是对象二字,其最终目的是为了将程序进行整合
- 对象就是一个容器,用来存放数据和功能的
- 不论使用什么方法(类、字典、列表),只要将数据和功能进行整合,都可称为面向对象
类
类的定义和使用
# 定义
class 类名:
pass
# 调用
对象名 = 类名()
对象名.__dict__ # 列出对象的参数列表
对象名.变量名 # 获取对象中的变量
class Student:
name = None
age = None
def get_info(self):
print('get_info 运行了')
print("该 print 函数在创建对象时就运行了")
stu1 = Student() # 该 print 函数在创建对象时就运行了
print(stu1.__dict__)
stu1.name = 'codeFun'
stu1.age = 18
print(stu1.__dict__) # {'name': 'codeFun', 'age': 18}
stu1.get_info() # get_info 运行了
__init__
的使用
- 实例化的过程
- 先产生一个空对象
- python会自动调用类中的
__init__
方法然将空对象已经调用类时括号内传入的参数一同传给__init__方法 - 返回初始完的对象
__init__
方法总结- 会在调用对象时自动触发,用来为对象初始化数据
__init__
内也可以存放任意其他代码,其返回值必须返回None
class Student:
name = None
age = None
def __init__(self, name, age):
self.name = name
self.age = age
def get_info(self):
print(f'名字:{self.name},年龄:{self.age}')
stu1 = Student("codeFun",18)
stu1.get_info() # 名字:codeFun,年龄:18
类中属性的调用
class Student:
name = None
age = None
school = 'collage'
def __init__(self, name, age):
self.name = name
self.age = age
def get_info(self):
print(f'名字:{self.name},年龄:{self.age}')
# 对于 __init__ 中有的属性,Student 并不能更改其值
stu1 = Student("codeFun", 18)
stu2 = Student("codeWhy", 18)
Student.age = 19
print(stu1.age, stu2.age) # 18 18
# 对于非 __init__ 中有的属性,Student 可以更改其值
Student.school = 'university'
print(stu1.school, stu2.school) # university,university
# 还可以这样调用类中函数
Student.get_info(stu1) # 名字:codeFun,年龄:18
面相对象的基础使用
"""
1. 创建 学校 班级 对象
2. 根据常识关 2 种对象
"""
class School:
sch_name = 'bili'
def __init__(self, zone_name):
self.zone_name = zone_name
self.class_names = []
def add_class(self, class_name):
self.class_names.append(class_name)
def sch_info(self):
print(f'学校名称:{self.sch_name}')
print(f'\t校区名称:{self.zone_name}')
for class_name in self.class_names:
# print(f'\t\t班级名称:{class_name}')
class_name.class_info()
class Class:
def __init__(self, class_name, class_time):
self.class_name = class_name
self.class_time = class_time
self.class_courses = []
def add_course(self, class_course):
self.class_courses.append(class_course)
def class_info(self):
print(f'班级名称:{self.class_name}\t培训时长:{self.class_time}\t', end='\t')
for course in self.class_courses:
print(f'课程名称:{course}', end="\t")
print()
class001 = Class("脱产 14 期", '6 mon')
class002 = Class("脱产 15 期", '8 mon')
class002.add_course('linux 运维')
class001.add_course('python 自动化')
class001.add_course('Java 全栈')
school001 = School("北京")
school001.add_class(class001)
school001.add_class(class002)
school001.sch_info()
'============结果打印========='
学校名称:bili
校区名称:北京
班级名称:脱产 14 期 培训时长:6 mon 课程名称:python 自动化 课程名称:Java 全栈
班级名称:脱产 15 期 培训时长:8 mon 课程名称:linux 运维
封装
-
封装是面向对象三大特性最核心的一个特性,将封装的属性进行隐藏操作
-
如何隐藏:
- 在属性(变量名 函数)前加
__
前缀,就会实现对外隐藏属性的效果
- 在属性(变量名 函数)前加
-
隐藏的注意事项
- 只是语法上的变形:通过
_类名__属性
的方式仍然能够访问
- 只是语法上的变形:通过
-
为什么要隐藏
- 可以创建一些接口让用户间接的使用隐藏的属性,在接口内我们可以规定隐藏属性使用的规范
- 对于一些使用者不直接使用的函数,我们可以隐藏起来,隔离复杂度。
class Student:
def __init__(self):
self.__name = None
self.__age = None
def get_info(self):
print(self.__name, self.__age)
def set_name(self, name):
self.__name = name
def set_age(self, age):
self.__age = age
# 通过这种方式也可以获取隐藏属性的值
stu1 = Student()
stu1._Student__name = 'codeFun'
stu1._Student__age = 18
print(stu1.__dict__) # {'_Student__name': 'codeFun', '_Student__age': 18}
stu1.get_info() # codeFun 18
property 装饰器
- 用来绑定给对象的方法伪造数据属性
# 案例一:
# 通过以下的操作,就可以按照人的习惯使用 name 属性了
class Person:
def __init__(self):
self.__name = None
@property
def name(self):
return self.__name
@name.setter
def name(self, name):
self.__name = name
@name.deleter
def name(self):
print("该属性不允许删除")
person001 = Person()
person001.name = "codeFun"
print(person001.name) # codeFun
# 案例二
class Person:
def __init__(self):
self.__name = None
def get_name(self):
return self.__name
def set_name(self, name):
self.__name = name
def del_name(self):
print("该属性不允许删除")
# 顺序不能变!!!!!!!
name = property(get_name, set_name, del_name)
person001 = Person()
person001.name = "codeFun"
print(person001.name) # codeFun
# 案例三
class Person:
def __init__(self, weight, height):
self.weight = weight
self.height = height
@property
def bmi(self):
return self.weight / (self.height ** 2)
person001 = Person(65, 1.65)
print(person001.bmi) # 23.875114784205696 调用的时候不用加括号,直接当一个值去使用
继承
- 什么是继承
- 继承是一种创建新类的方式,新建的类可称为子类或者派生类,父类又可以称为基类或者超类
- 子类会继承父类的属性
- 继承表达的是一种
什么“是”什么
的关系,比如:猫类是动物,所以猫类可以继承动物类 - 继承可以用来解决代码冗余问题
- python 支持单继承和多继承
- 新式类和经典类
- python3 中都是新式类,python2 中有新式经典之分
- 新式类:继承了 object 类的子类,以及子类的子子类
- 经典类:没有继承 object 类的子类…
使用继承
class Parent:
def __init__(self, parent1, parent2):
self.parent1 = parent1
self.parent2 = parent2
# class Base(Parent1,Parent2): # 多继承
class Base(Parent): # 继承的语法。
def __init__(self, parent1, parent2, base1):
Parent.__init__(self, parent1, parent2) # 调用父类的 init 方法
self.base1 = base1
base = Base('hello', 'world','base')
print(base.__dict__) # {'parent1': 'hello', 'parent2': 'world', 'base1': 'base'}
属性的查找
- 首先在本类中查找,如果本类中没有就会向父类中查找
- 查找顺序:可以通过类名或者对象名调用
mro()
进行输出
class B:
def method(self):
print("B")
class C(B):
def method(self):
print("C")
class D(B):
def method(self):
print("D")
class E(D,C):
def method(self):
print("E")
print(E.mro()) # [<class '__main__.E'>, <class '__main__.D'>, <class '__main__.C'>, <class '__main__.B'>, <class 'object'>]
菱形问题
- 像上面那个类的继承就是菱形问题,最终的指向是 B
- 经典类:深度优先,会在检索第一条分支的时候就直接一条道走到黑,即会检索大脑袋(共同的父类)
- 新式类:广度优先,会在检索最后一条分支的时候检索大脑袋
多继承Mixins
- mixins 机制核心,在多继承背景下尽可能地提升继承的可读性
- 让多继承满足人的思维习惯:什么 “是” 什么
# 案例:
class Vehicle:
pass
# 将某些特殊的属性抽离出来形成一个新类,这个新类要以 Mixin 结尾
# 以表示仅仅是功能的抽象,而不是又多了个新爹
class FlyableMixin:
def fly(self):
pass
class CivilAircraft(FlyableMixin, Vehicle): # 民航飞机
pass
class Helicopter(FlyableMixin, Vehicle): # 直升飞机
pass
class Car(Vehicle): # 如果不写 FlyableMixin 类,那么汽车继承 Vehicle 汽车也会飞了
pass
super()
的使用
- super() 会以当前类为起点,以对象所对应类的
mro()
为路径,进行查找,和继承没直接关系。 - 但是不继承就会没间接关系。
class A:
def test(self):
print("this is A")
super().test()
# 这个 super 在 A 中,所以起点就是 ↓ <class '__main__.A'>,然后往后找,正好 B 中有 test 方法
# [<class '__main__.C'>, <class '__main__.A'>, <class '__main__.B'>, <class 'object'>]
class B:
def test(self):
print("this is B")
class C(A, B):
pass
obj = C()
obj.test()
# this is A
# this is B
print(C.mro()) # [<class '__main__.C'>, <class '__main__.A'>, <class '__main__.B'>, <class 'object'>]
class Parent:
def __init__(self, parent1, parent2):
self.parent1 = parent1
self.parent2 = parent2
# class Base(Parent1,Parent2): # 多继承
class Base(Parent): # 继承的语法。
def __init__(self, parent1, parent2, base1):
# Parent.__init__(self, parent1, parent2) # 调用父类的 init 方法
super().__init__(parent1, parent2) # # 调用父类的 init 方法
self.base1 = base1
base = Base('hello', 'world', 'base')
print(base.__dict__) # {'parent1': 'hello', 'parent2': 'world', 'base1': 'base'}
多态
-
python 推崇的是规范化,而不是强制化,所以在 python 中更倾向于鸭子类型
-
鸭子类型:
- 只要长得像就行,没必要有实质性的关系
-
一种方法的多种形态,在继承父类方法的基础上在子类中进行差异化。
import abc
class Animal(metaclass=abc.ABCMeta): # 加上之后,继承该父类的所有子类必须重写父类的函数
def say(self):
print('声音是:', end='')
class Dog(Animal):
def say(self):
super().say()
print("汪汪汪")
class Cat(Animal):
def say(self):
super().say()
print("喵喵喵")
# 定义统一接口,接收传入的动物对象
def animal_say(animal):
animal.say()
animal_say(Dog()) # 声音是:汪汪汪
animal_say(Cat()) # 声音是:喵喵喵
类内方法的分类
绑定方法
- 绑定对象方法
- 绑定类方法
- 如果每个对象调用某一方法都是一样的结果,这里推荐使用绑定类方法
IPADDR = '192.168.222.4'
PORT = 3306
class Mysql:
@classmethod
def get_endpoint(cls):
return IPADDR + ':' + str(PORT)
endpoint = Mysql.get_endpoint()
print(endpoint) # 192.168.222.4:3306
静态方法
- 没有绑定任何类和对象,不能调用类中的属性(方法和变量)
IPADDR = '192.168.222.4'
PORT = 3306
class Mysql:
@staticmethod
def get_endpoint():
return IPADDR + ':' + str(PORT)
endpoint = Mysql.get_endpoint()
print(endpoint) # 192.168.222.4:3306
反射
- 定义:程序在运行过程中可以“动态”的获取对象的信息
- 反射能干什么:
- 反射能够解析字符串的命令
- 反射能够调用未知对象信息不报错
实现反射的步骤
class Person:
def __init__(self, name, age):
self.name = name
self.age = age
def say(self):
print(f'{self.name}\t\t{self.age}')
obj = Person('codeFun',18)
# 1. 先通过 dir 查看该对象下有那些属性
property_list = dir(obj)
print(property_list) # 可以看到 age name say 等属性
print(obj.__dict__[property_list[-2]]) # codeFun
字符串反射到属性上
class Person:
def __init__(self, name, age):
self.name = name
self.age = age
def say(self):
print(f'{self.name}\t\t{self.age}')
obj = Person('codeFun', 18)
# 判断 obj 对象是否有 name 属性
result = hasattr(obj, 'name')
print(result) # True
# 获取 obj 对象中 name 属性的值
result = getattr(obj, 'name')
print(result) # codeFun
# 设置 obj 对象中 name 属性的值
setattr(obj, 'name', 'zhangSan')
print(obj.name) # zhangSan
# 删除 obj 对象中 name 属性
delattr(obj, 'name')
print('name' in obj.__dict__) # False
class Ftp:
@staticmethod
def sender():
print("正在发送数据...")
@staticmethod
def receiver():
print("正在接收数据...")
def enter(self, command_str):
if hasattr(self, command_str):
comm = getattr(self, command_str)
comm()
else:
print("没有该功能!")
command = input('请输入命令 >>> ')
ftp_obj = Ftp()
ftp_obj.enter(command)
内置方法(魔法方法)
- 定义在类的内部,以
__
开头并以__
结尾的方法 - 特点:会在某种情况下自动触发执行
- 作用:定制化我们的类或者对象
# __str__
# 在打印对象时会自动触发,然后将返回值(必须 str)当做本次打印的结果输出
class Person:
def __init__(self, name, age):
self.name = name
self.age = age
def __str__(self):
return f'<{self.name}\t\t{self.age}>'
person = Person('codeFun', 18)
print(person) # <codeFun 18>
# __del__ 在清理对象时会触发
class Person:
def __init__(self, name, age):
self.name = name
self.age = age
def __del__(self):
print('清理完毕。。。')
person = Person('codeFun', 18)
# 清理完毕。。。
元类
- 元类:用来实例化产生类的类
- 关系:元类 --> 实例化 --> 类(Person) --> 实例化 --> 对象(person_obj)
- type 是内置的元类
- 使用 class 关键字定义的所有的类以及内置的类都是由元类 type 实例化产生
class Person:
pass
obj = Person()
print(type(obj)) # <class '__main__.Person'>
print(type(Person)) # <class 'type'>
class 关键字创造类 Person 的步骤
- 类的三大部分:
- 类名
- 类的基类/父类
- 执行类体代码拿到类的名称空间
class_name = 'Person'
class_bases = (object,) # 基类必须是元组类型
# 执行类体代码,拿到类的名称空间,放入 “类词典”
class_dict = {}
class_body = """
def __init__(self, name, age):
self.name = name
self.age = age
def print_info(self):
print(f'<{self.name}\t{self.age}>')
"""
exec(class_body, {}, class_dict)
# print(class_dict) # {'__init__': <function __init__ at ...>, 'print_info': <function print_info>}
# 调用元类
Person = type(class_name, class_bases, class_dict)
# 使用自创建类
person = Person('codeFun', 18)
person.print_info() # <codeFun 18>
自定义元类控制类的产生
注意
- 自定义元类必须继承
type
类
调用 My_meta 的过程
- 调用 My_meta 就是调用
type.__call__
- 上层元类的
__call__
调用下层类的__new__ 和 __init__
- 先创造一个空对象(Person),调用 My_meta 类内的
__new__
方法- 调用 My_meta 类内的
__init__
方法,完成初始化对象操作- 返回初始化好的对象
class My_meta(type):
def __new__(mcs, *args, **kwargs):
print(mcs) # <class '__main__.My_meta'>
print(args) # 元组 (类名,基类,{。。。}) ('Person', (<class 'object'>,), {'__module__': '__main__', '__qualname__'。。。
print(kwargs)
""" 必做的事情 """
# 不返回 My_meta 对象,__init__ 不会执行
# 创造 My_meta 空对象,只能调用底层去创造,两种方法
# return super().__new__(mcs, *args, **kwargs)
return type.__new__(mcs,*args, **kwargs)
def __init__(cls, class_name, class_bases, class_dict):
print(cls) # <class '__main__.Person'>
print(class_name) # Person
print(class_bases) # (<class 'object'>,)
print(class_dict) # {...}
class Person(object, metaclass=My_meta):
def __init__(self, name, age):
self.name = name
self.age = age
def print_info(self):
print(f'<{self.name}\t{self.age}>')
# 在为创建对象就执行了 __init__
# <class '__main__.Person'>
# Person
# (<class 'object'>,)
自定义元类控制类的调用(对象的产生)
# 模板
class My_meta(type):
""" 必做 """
def __call__(cls, *args, **kwargs):
person_obj = cls.__new__(cls)
cls.__init__(person_obj, *args, **kwargs)
return person_obj
class Person(object, metaclass=My_meta):
""" 必做 """
def __new__(cls, *args, **kwargs):
return super().__new__(cls, *args, **kwargs)
def __init__(self, name, age):
self.name = name
self.age = age
def print_info(self):
print(f'<{self.name}\t{self.age}>')
person = Person('codeFun', 18)
print(person.__dict__) # {'name': 'codeFun', 'age': 18}
元类下属性查找
- 如果是对象调用不会调用到元类,类调用会调用的元类
异常
# 模板
try:
# 有可能会抛出异常的代码
子代码1
子代码2
子代码3
except 异常类型1 as e:
pass
except 异常类型2 as e:
pass
...
else:
如果被检测的子代码块没有异常发生,则会执行else的子代码
finally:
无论被检测的子代码块有无异常发生,都会执行finally的子代码
python 网络编程
socket
客户端、服务端简单的TCP通信
# server
import socket
# 1. 创建服务程序
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 流式协议 TCP协议
# 2. 服务程序绑定 endpoint
server_endpoint = ('127.0.0.1', 8080)
server.bind(server_endpoint)
# 3. 设置连接池大小
server.listen(5)
# 4. 创建连接
conn, client_endpoint = server.accept()
while True:
try:
# 5. 接收数据
date = conn.recv(1024)
print('client message:', date.decode('utf-8'))
# 6. 发送数据
send_message = date.upper()
conn.send(send_message)
except Exception:
pass
# 7. 关闭连接
conn.close()
# client
import socket
# 创建客户端应用程序
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 客户端应用程序连接服务端的 endpoint
server_endpoint = ('127.0.0.1', 8080)
client.connect(server_endpoint)
while True:
# 发送数据
send_message = input('>>>> ').strip().encode('utf-8')
if send_message.decode('utf-8') == 'quit':
break
client.send(send_message)
# 接收数据
data = client.recv(1024)
print(data.decode('utf-8'))
client.close()
客户端、服务端简单的 UDP 通信
# server
import socket
server=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # 数据报协议=》udp协议
server.bind(('127.0.0.1',8081))
while True:
data,client_addr=server.recvfrom(1024)
server.sendto(data.upper(),client_addr)
server.close()
# client
import socket
client=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # 流式协议=》tcp协议
while True:
msg=input('>>>: ').strip()
client.sendto(msg.encode('utf-8'),('127.0.0.1',8081))
res=client.recvfrom(1024)
print(res)
client.close()
TCP 传输空字符会使传输崩溃
# 服务端
try:
data=conn.recv(1024) # 最大接收的数据量为1024Bytes,收到的是bytes类型
if len(data) == 0:
# 在unix系统洗,一旦data收到的是空
# 意味着是一种异常的行为:客户度非法断开了链接
break
print("客户端发来的消息:",data.decode('utf-8'))
conn.send(data.upper())
except Exception:
# 针对windows系统
break
# 客户端
# 判断数据是否为空即可
if len(msg) == 0:continue
TCP 粘包问题
- 由于操作系统缓存的限制,注定使我们传输的数据需要分段传输
- 但是不同批次的数据之间默认没有边界,这就需要添加头文件去限定边界
# 客户端
...
# 1、制作头
header_dic={
"filename":"a.txt",
"total_size":total_size,
"md5":"123123xi12ix12"
}
json_str = json.dumps(header_dic)
json_str_bytes = json_str.encode('utf-8')
# 2、先把头的长度发过去
header_size =struct.pack('i',len(json_str_bytes))
conn.send(header_size)
# 3、发头信息
conn.send(json_str_bytes)
# 4、再发真实的数据
conn.send(stdout_res)
conn.send(stderr_res)
....
# 客户端
...
# 1、先手4个字节,从中提取接下来要收的头的长度
x=client.recv(4)
header_len=struct.unpack('i',x)[0]
# 2、接收头,并解析
json_str_bytes=client.recv(header_len)
json_str=json_str_bytes.decode('utf-8')
header_dic=json.loads(json_str)
print(header_dic)
total_size=header_dic["total_size"]
# 3、接收真实的数据
recv_size = 0
while recv_size < total_size:
recv_data=client.recv(1024)
recv_size+=len(recv_data)
print(recv_data.decode('utf-8'),end='')
else:
print()
...
socketServer 多进程通信
import socketserver
class My_request_handle(socketserver.StreamRequestHandler):
def handle(self):
print(self.client_address)
print(self.request) # 就是 conn
# 往下就可以执行连接的事情,比如:
while True:
try:
msg = self.request.recv(1024)
if len(msg) == 0: break
self.request.send(msg.upper())
except Exception:
break
self.request.close()
# 下面三行代码相当于 循环的从半连接池中取出链接请求与其建立双向链接,拿到链接对象
server_endpoint = ('127.0.0.1', 8090)
server = socketserver.ThreadingTCPServer(server_endpoint, My_request_handle)
server.serve_forever()
import socketserver
class MyRequestHanlde(socketserver.BaseRequestHandler):
def handle(self):
client_data=self.request[0]
server=self.request[1]
client_address=self.client_address
print('客户端发来的数据%s' %client_data)
server.sendto(client_data.upper(),client_address)
s=socketserver.ThreadingUDPServer(("127.0.0.1",8888),MyRequestHanlde)
s.serve_forever()
python 并发编程
创建进程的两种方式
- 创建进程就是在内存中开辟了一块内存空间以供代码运行
- 一个进程对应内存中的一块独立的内存空间
- 进程与进程之间的数据默认无法交互,但是借助于第三方工具或模块进行交互
from multiprocessing import Process
import time
def func(name):
print('hello world')
time.sleep(3)
print(f'{name}')
if __name__ == '__main__':
# Process(target=function_name,[args=(parament_list)])
process = Process(target=func, args=('codeFun',))
process.start() # 这里会自动调用 func 函数, 并且将参数列表【args】传进去
print('run ...')
"""
执行结果:
run ...
hello world
codeFun
"""
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print('hello world')
time.sleep(1)
print(f'{self.name}')
if __name__ == '__main__':
process = MyProcess('codeFun')
process.start()
print('run ...')
"""
执行结果:
run ...
hello world
codeFun
"""
进程方法
- 让主进程等待子进程代码运行结束后再继续运行,不影响其他子进程的执行
process.join()
进程对象和其他方法
from multiprocessing import Process, current_process
current_process().pid # 查看当前进程的进程号
import os
os.getpid() # 查看当前进程进程号
os.getppid() # 查看当前进程的父进程进程号
p.terminate() # 杀死当前进程
# 是告诉操作系统帮你去杀死当前进程 但是需要一定的时间 而代码的运行速度极快
time.sleep(0.1)
print(p.is_alive()) # 判断当前进程是否存活
守护进程
from multiprocessing import Process
import time
def task(name):
print('%s总管正在活着'% name)
time.sleep(3)
print('%s总管正在死亡' % name)
if __name__ == '__main__':
p = Process(target=task,args=('egon',))
# p = Process(target=task,kwargs={'name':'egon'})
p.daemon = True # 将进程p设置成守护进程 这一句一定要放在start方法上面才有效否则会直接报错
p.start()
print('皇帝jason寿终正寝')
互斥锁
- 多个进程操作同一份数据的时候,会出现数据错乱的问题
- 针对上述问题,解决方式就是加锁处理:将并发变成串行,牺牲效率但是保证了数据的安全
from multiprocessing import Process
from multiprocessing import Lock
import json
import time
import random
# 查票
def search(username):
with open('data.json', mode='rt', encoding='utf-8') as f:
ticket_dict = json.load(f)
print(f'用户[{username}]查询余票:{ticket_dict.get("ticket_num")}')
def buy(username):
with open('data.json', mode='rt', encoding='utf-8') as f:
ticket_dict = json.load(f)
time.sleep(random.randint(1, 3))
# 判断当前是否有票
if ticket_dict.get('ticket_num') > 0:
ticket_dict['ticket_num'] -= 1
with open('data.json', mode='wt', encoding='utf-8') as f:
json.dump(ticket_dict, f)
print(f'用户[{username}]买票成功')
else:
print(f'用户[{username}]买票失败')
def run(username, mutex):
search(username)
# 进行抢锁
mutex.acquire()
buy(username)
# 解除锁
mutex.release()
# 生成 5 个进程进行抢票,检查票有无的代码一瞬间就执行完了,这意味着每个进程
# 都通过了判断是否有票这关,但是实际上只有一张票
# 所以就需要在抢票前进行抢锁,抢到锁的人才能够买票,
if __name__ == '__main__':
# 生成锁
mutex = Lock()
for i in range(5):
process = Process(target=run, args=(i, mutex))
process.start()
队列
from multiprocessing import Queue
# 创建一个队列
q = Queue(5) # 括号内可以传数字 标示生成的队列最大可以同时存放的数据量
# 往队列中存数据
# 当队列数据放满了之后 如果还有数据要放程序会阻塞 直到有位置让出来 不会报错
q.put(111)
# 去队列中取数据
v1 = q.get() # # 队列中如果已经没有数据的话 get方法会原地阻塞
V1 = q.get_nowait() # 没有数据直接报错queue.Empty
# print(q.empty())
v6 = q.get(timeout=3) # 没有数据之后原地等待三秒之后再报错 queue.Empty
# 判断方法
.full() # 判断是否存满
.empty() # 判断是否为空
"""
q.full()
q.empty()
q.get_nowait()
在多进程的情况下是不精确
"""
from multiprocessing import JoinableQueue
q = JoinableQueue()
q.task_done() # 自带计数器减一【每当放一个元素,计数器会加一】
q.join() # 等待队列中所有的数据被取完再执行往下执行代码
生产者消费者模型
- 生产者向队列中生产物品,消费者向队列中消费物品
"""
生产者消费者模型
"""
import time
from multiprocessing import Process
from multiprocessing import Queue
def producer(queue_obj):
for i in range(1, 6):
time.sleep(1)
queue_obj.put(i)
print(f'生产者:生产物品[{i}]号')
def consumer(queue_obj):
# 解决消费者一直等待问题:
# 1. 生产者调用 join 方法,占用主进程
# 2. 在生产者进程调用的后面向队列中加入标识(None),表示已经生产完全部物品
# 3. 消费者识别标识,结束运行
while True:
queue_parament = queue_obj.get()
# 消费者识别标识,结束运行
if not queue_parament:
break
print(f'消费者:消费了物品[{queue_parament}]号')
if __name__ == '__main__':
queue_obj = Queue()
producer1 = Process(target=producer, args=(queue_obj,))
consumer1 = Process(target=consumer, args=(queue_obj,))
producer1.start()
consumer1.start()
# 生产者调用 join 方法,占用主进程
producer1.join()
# 在生产者进程调用的后面向队列中加入标识(None),表示已经生产完全部物品
queue_obj.put(None)
"""
生产者消费者模型
1. 通过守护进程(主进程死掉,对应的子进程也要死掉)干掉消费者的等待
"""
import time
from multiprocessing import Process
from multiprocessing import JoinableQueue
def producer(queue_obj):
for i in range(1, 6):
time.sleep(1)
queue_obj.put(i)
print(f'生产者:生产物品[{i}]号')
def consumer(queue_obj):
while True:
queue_parament = queue_obj.get()
if not queue_parament:
break
print(f'消费者:消费了物品[{queue_parament}]号')
queue_obj.task_done()
if __name__ == '__main__':
queue_obj = JoinableQueue()
producer1 = Process(target=producer, args=(queue_obj,))
consumer1 = Process(target=consumer, args=(queue_obj,))
producer1.start()
consumer1.daemon = True
consumer1.start()
producer1.join()
queue_obj.join()
线程和进程
基础概念
- 进程:资源单位,仅开辟独立内存空间
- 每一个进程肯定自带一个线程
- 线程:执行单位,指的是代码的执行过程,其所需资源向进程索要
为什么要有线程
- 节省资源
- 在进程内开设多个线程无需再次申请内存空间
- 线程内资源共享
创建线程
from threading import Thread
def func(name):
print(f'{name}')
if __name__ == '__main__':
thread = Thread(target=func, args=('codeFun',))
thread.run()
print('hello world')
"""
输出结果: 和进程结果不同,因为线程消耗的资源少,不用开辟内存空间
在调用后就立即执行了
codeFun
hello world
"""
# 第二种方法和创建进程几乎没有区别,只是调用的函数不同而已
使用线程实现TCP并发
import socket
from threading import Thread
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 8080))
server.listen(5)
def func(conn):
while True:
try:
msg = conn.recv(1024).decode('utf-8')
if len(msg) == 0:
break
print(msg)
conn.send(msg.upper().encode('utf-8'))
except Exception as e:
print(e)
break
conn.close()
if __name__ == '__main__':
while True:
conn, client_endpoint = server.accept()
thread = Thread(target=func, args=(conn,))
thread.start()
import socket
import time
client = socket.socket()
client.connect(('127.0.0.1', 8080))
while True:
time.sleep(3)
client.send('hello world'.encode('utf-8'))
msg = client.recv(1024).decode('utf-8')
print(msg)
# client.close()
线程方法
from threading import Thread
from threading import active_count
from threading import current_thread
from threading import Lock
thread = Thread(target=func_name)
thread.join() # 让主线程等待子线程运行结束后再执行主线程
count = active_count() # 当前存活的线程数
current_thread_name = current_thread().name # 当前的线程名字
# 子线程命名方式:Thread-1(func_name) 主线程命名方式:MainThread
# 守护线程
# 主线程运行结束之后不会立刻停止,
# 会等待其他所有非守护线程结束才会结束
# 因为主线程的结束意味着所在的进程结束
thread.daemon = True # 伴随着主线程的结束而结束
# 线程互斥锁
# 在进行数据修改的时候进行上锁
mutex = Lock()
mutex.acquire() # 进行抢锁
mutex.release() # 解放锁
GIL 全局解释器锁
- GIL 不是python 的特性,而是 python 解释器 Cpython 的特性
- 它实际上也是一把互斥锁,用来阻止同一个进程下的多个线程的同时进行
- 无法利用多核优势
- 用来保证解释器级别的数据安全
- 针对不同的数据还是需要添加不同的锁处理
多线程和多进程使用参考
"""
多线程是否有用要看具体情况
单核:四个任务(IO密集型\计算密集型)
多核:四个任务(IO密集型\计算密集型)
"""
# 计算密集型 每个任务都需要10s
单核(不用考虑了)
多进程:额外的消耗资源
多线程:减少开销
多核
多进程:总耗时 10+
多线程:总耗时 40+
# IO密集型
多核
多进程:相对浪费资源
多线程:更加节省资源
# 计算密集型
from multiprocessing import Process
from threading import Thread
import os
import time
def count():
result = 0
for i in range(10000000):
result *= i
# print(result)
if __name__ == '__main__':
start_time = time.time()
l = []
for i in range(os.cpu_count()):
# process = Process(target=count)
thread = Thread(target=count)
# process.start() # 0.9623303413391113
thread.start() # 4.82154655456543
l.append(thread)
for process in l:
process.join()
print(time.time() - start_time)
# IO 密集型
from multiprocessing import Process
from threading import Thread
import os,time
def work():
time.sleep(2)
if __name__ == '__main__':
l = []
print(os.cpu_count()) # 获取当前计算机CPU个数
start_time = time.time()
for i in range(4000):
# p = Process(target=work) # 21.149890184402466
t = Thread(target=work) # 3.007986068725586
t.start()
# p.start()
# l.append(p)
l.append(t)
for p in l:
p.join()
print(time.time()-start_time)
锁
互斥锁
# 互斥锁
# 在进行数据修改的时候进行上锁
mutex = Lock()
mutex.acquire() # 进行抢锁
mutex.release() # 解放锁
死锁
"""
两个对象互相占用锁,整个程序就会被卡死
"""
"""
代码描述:
这里进行了五个线程执行函数 1 和 函数 2,
当 0 号线程执行完函数 1的代码并在函数 2 中抢到了锁 B,并正在睡眠,A 锁解开, 1 号
线程抢到锁 A,并开始抢夺锁 B,但是锁 B 任然被睡眠者的 0 号线抢占着,当 0 号线开始抢
锁 A 时,A锁也被 1 号线占着,程序就在此处僵持了下来
"""
import time
from threading import Thread
from threading import Lock
# 生成两个锁对象
mutexA = Lock()
mutexB = Lock()
def func01(thread_num):
mutexA.acquire()
print(f'{thread_num} 在函数 1 抢到了锁A')
mutexB.acquire()
print(f'{thread_num} 在函数 1 抢到了锁B')
mutexB.release()
print(f'{thread_num} 在函数 2 释放了锁B')
mutexA.release()
print(f'{thread_num} 在函数 1 释放了锁A')
def func02(thread_num):
mutexB.acquire()
print(f'{thread_num} 在函数 2 抢到了锁B')
time.sleep(2)
mutexA.acquire()
print(f'{thread_num} 在函数 2 抢到了锁A')
mutexA.release()
print(f'{thread_num} 在函数 2 释放了锁A')
mutexB.release()
print(f'{thread_num} 在函数 2 释放了锁B')
def run(thread_num):
func01(thread_num)
func02(thread_num)
if __name__ == '__main__':
for i in range(5):
thread = Thread(target=run,args=(i,))
thread.start()
"""
执行结果如下:
0 在函数 1 抢到了锁A
0 在函数 1 抢到了锁B
0 在函数 2 释放了锁B
0 在函数 1 释放了锁A
0 在函数 2 抢到了锁B
1 在函数 1 抢到了锁A
"""
递归锁
"""
递归锁的特点
可以被递归的acquire和release【就是在不 release 情况下,同一个线程还能继续上相同的锁】
但是只能被第一个抢到这把锁执行上述操作
它的内部有一个计数器 每acquire一次计数加一 每realse一次计数减一
只要计数不为0 那么其他人都无法抢到该锁
"""
# 将上述的
mutexA = Lock()
mutexB = Lock()
# 换成
mutexA = mutexB = RLock() # LLock 不行,因为 lock 不能上相同的锁
信号量
# 信号量在不同的阶段可能对应不同的技术点
# 在并发编程中信号量指的也是一种锁
# 不过这种锁不再局限于一个线程或进程
# 可以放入多个线程或进程然后锁上
事件锁
- 可以设置触发条件,然后解锁
from threading import Thread
from threading import Event
import time
event = Event()
def light():
print('stop !')
time.sleep(2)
event.set() # 通行证,event.wait() 的后的代码可以执行了
def car():
print('等红灯')
event.wait() # 没有通行证的运行,此后的代码不会执行
print('run ...')
if __name__ == '__main__':
light_thread = Thread(target=light)
light_thread.start()
for i in range(5):
car_thread = Thread(target=car)
car_thread.start()
"""执行结果
stop !
等红灯
等红灯
等红灯
等红灯
等红灯
run ...
run ...
run ...
run ...
run ...
"""
线程池和进程池
- 池是为了保证计算机硬件安全的情况下最大限度的利用计算机
- 因为我们不可能无限度开设线程或进程
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
pool = ThreadPoolExecutor(5) # 池子里固定五个线程
pool = ThreadPoolExecutor(5) # 池子的线程数 = CPU 的核心数 * 5
pool = ProcessPoolExecutor() # 池子的进程数 = CPU 的核心数
"""
通过以上方法创造出来的线程,是固定的(不会出现重复创建或销毁)
"""
语法:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
pool = ProcessPoolExecutor(5)
pool.submit(func_name, parameter_name).add_done_callback(call_back_func)
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import time
pool = ThreadPoolExecutor(5)
def func(num):
print(num)
time.sleep(2)
return f'>>>{str(num)}'
def call_back(result):
print(result.result(), end='\n')
if __name__ == '__main__':
start_time = time.time()
l = []
for i in range(10):
# pool.submit(func, i).add_done_callback(call_back) # 异步提交
# 同步提交,运行过程和结果一块出来
result = pool.submit(func, i)
# print(result.result())
l.append(result)
# 将运行结果一股脑的输出
for result in l:
print(result.result())
print(time.time() - start_time)
协程
- 程序员在代码层面上检测我们所有的 I/O 操作,一旦遇到了 IO 我们在代码的层面
- 完成代码的切换,给 CPU 一种这个程序一直在运行的错觉
# 服务端
from gevent import monkey;monkey.patch_all()
import socket
from gevent import spawn
def communication(conn):
while True:
try:
data = conn.recv(1024)
if len(data) == 0: break
conn.send(data.upper())
except ConnectionResetError as e:
print(e)
break
conn.close()
def server(ip, port):
server = socket.socket()
server.bind((ip, port))
server.listen(5)
while True:
conn, addr = server.accept()
spawn(communication, conn)
if __name__ == '__main__':
g1 = spawn(server, '127.0.0.1', 8080)
g1.join()
# 客户端
from threading import Thread, current_thread
import socket
def x_client():
client = socket.socket()
client.connect(('127.0.0.1',8080))
n = 0
while True:
msg = '%s say hello %s'%(current_thread().name,n)
n += 1
client.send(msg.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
if __name__ == '__main__':
for i in range(500):
t = Thread(target=x_client)
t.start()
接入数据库
连接数据库
import pymysql
conn = pymysql.connect(
host='192.168.222.5',
port=3306,
user='root',
password='root',
database='hello',
charset='utf8',
autocommit = True # 自动提交
)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
执行SQL语句
sql = '' # 编写 SQL 语句
result = cursor.execute(sql,(参数列表)) # result 返回的是操作数据表的行数
# 例如:
sql = 'select * from emp where name = %s;'
result = cursor.execute(sql,('codeFun',))
# 这样执行的 sql 语句就是 select * from emp where name = 'codeFun';
# 对需要改动数据库的操作,还需执行 commit 操作再次确认
# 在 conn 中添加 autocommit = True 也可以不用提交
conn.commit()
# 还可以一次性插入N多条数据
rows = cursor.executemany(sql,[('xxx',123),('ooo',123)])
# 返回查询结果:查询结果和迭代器差不多,只能一个对象查询一次,再次查询指针就到最后了
# 返回所有能够查询的结果
result = cursor.fetchall() # 返回一个列表,一行数据就是一个字典,k 为字段名 v 为字段值
# 返回一条数据
result = cursor.fetchone() # 这条语句就没有返回数据,因为上面的 all 将指针移动到最后了
# 指定获取几条
cursor.fetchmany(n)
# 指针移动
cursor.scroll(1,'relative') # 相对于光标所在的当前位置往后移动
cursor.scroll(1,'absolute') # 相对于数据开头往后移动
SQL注入
- 就是利用一些特殊字符 结合软件固定的一些语句句式,非法侵入并违规操作
- 例如:利用MySQL注释的语法 造成了sql注入的问题
- 日常应用软件在获取用户输入的内容时 都会限制一些特殊符号的输入
- 如何解决上述问题?
- 所有敏感的信息不要自己去做拼接操作 交互固定的模块帮你去过滤数据防止sql注入
快捷键
ctrl + d:
复制上一行ctrl + ?:
注释一行ctrl + alt + l:
调整代码格式
更多推荐
所有评论(0)