sql_alchemy多表操作(多对多/一对多)--两种连接方式--执行原生sql语句
多表操作1、创建外键关系及外键操作#!/usr/bin/env python# -*- coding:utf-8 -*-import datetimefrom sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Colu...
·
多表操作
1、创建外键关系及外键操作
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship
Base = declarative_base()
# ##################### 单表示例 #########################
class Users(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True)
age = Column(Integer, default=18)
email = Column(String(32), unique=True)
ctime = Column(DateTime, default=datetime.datetime.now)
extra = Column(Text, nullable=True)
__table_args__ = (
# 联合唯一索引
# UniqueConstraint('id', 'name', name='uix_id_name'),
# 联合索引
# Index('ix_id_name', 'name', 'extra'),
)
class Hosts(Base):
__tablename__ = 'hosts'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True)
ctime = Column(DateTime, default=datetime.datetime.now)
# ##################### 一对多示例 #########################
class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
caption = Column(String(50), default='篮球')
class Person(Base):
__tablename__ = 'person'
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
hobby_id = Column(Integer, ForeignKey("hobby.id")) # 外键
# 与生成表结构无关,仅用于查询方便
hobby = relationship("Hobby", backref='pers')
# ##################### 多对多示例 #########################
class Server2Group(Base):
__tablename__ = 'server2group'
id = Column(Integer, primary_key=True, autoincrement=True)
server_id = Column(Integer, ForeignKey('server.id'))
group_id = Column(Integer, ForeignKey('group.id'))
class Group(Base):
__tablename__ = 'group'
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)
# 与生成表结构无关,仅用于查询方便
servers = relationship('Server', secondary='server2group', backref='groups')
class Server(Base):
__tablename__ = 'server'
id = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(64), unique=True, nullable=False)
def init_db():
"""
根据类创建数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.create_all(engine)
def drop_db():
"""
根据类删除数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.drop_all(engine)
if __name__ == '__main__':
# drop_db()
init_db()
练习示例
# -*- coding: utf-8 -*-
# @Author : Sunhaojie
# @Time : 2019/8/25 17:35
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship
Base = declarative_base()
class Depart(Base):
__tablename__ = 'depart'
id = Column(Integer, primary_key=True)
title = Column(String(32), index=True, nullable=False)
class Users(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
depart_id = Column(Integer, ForeignKey('depart.id')) # 外键
# 不会在数据库创建,只是建一个关系,backref是反向关联,就是用Depart.pers拿到用户Users对象
dp = relationship('Depart', backref='pers')
def create_conn():
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/fls?charset=utf8",
max_overflow=0,
pool_size=5,
pool_timeout=30,
pool_recycle=-1
)
Base.metadata.create_all(engine)
def drop_conn():
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/fls?charset=utf8",
max_overflow=0,
pool_size=5,
pool_timeout=30,
pool_recycle=-1
)
Base.metadata.drop_all(engine)
if __name__ == '__main__':
# drop_conn()
create_conn()
外键foreignkry操作
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from SQLAlchemy.models import Users, Depart
# 先创建连接
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/fls?charset=utf8",
max_overflow=0,
pool_size=5,
pool_timeout=30,
pool_recycle=-1
)
SessionFactory = sessionmaker(bind=engine)
session = SessionFactory()
# 1、查询所有用户
r = session.query(Users).all()
for row in r:
print(row.id, row.name)
# 2、查询所有用户+所属部门名称(连表操作)
# Users.depart_id == Depart.id是默认的,可以不写,is_outer=True,表示左连接,默认是内连接
res=session.query(Users.id,Users.name,Depart.title).join(Depart).all()
res = session.query(Users.id,Users.name, Depart.title).join(Depart, Users.depart_id == Depart.id, isouter=True).all()
for row in res:
print(row.id, row.name, row.title)
# 3、relation字段:查询所有用户+所属部门名称
res = session.query(Users).all()
for row in res:
print(row.id,row.name,row.depart_id, row.dp, row.dp.title)
#row.dp:跨表了:<SQLAlchemy.models.Depart object at 0x0000003CDD408470>
# 4、relation字段:查询部门销售部所有人员
obj = session.query(Depart).filter(Depart.title=='a').first()
print(obj.pers) #[<SQLAlchemy.models.Users object at 0x00000096C8C4F438>]
for i in obj.pers:
print(i.id, i.name, obj.title)
# 5、创建一个IT部门,再在部门加一个人:qqq
"""方式一"""
d1 = Depart(title='IT')
session.add(d1)
session.commit()
print(d1.title, d1.id)
u1 = Users(name='qqq', depart_id=d1.id)
session.add(u1)
session.commit()
"""方式二"""
u2 = Users(name='qqq', dp=Depart(title='IT'))
session.add(u2)
session.commit()
# 6、创建一个部门:王者,添加三个员工:大头/二狗/三炮
d1 = Depart(title='王者')
d1.pers = [Users(name='大头'),Users(name='二狗'),Users(name='三炮')]
session.add(d1)
session.commit()
session.close()
ManyToMany操作
建表:只能自己创建多对多的第三张表
# -*- coding: utf-8 -*-
# @Author : Sunhaojie
# @Time : 2019/8/25 17:35
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship
Base = declarative_base()
class Student(Base):
__tablename__ = 'student'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
# 不会创建字段,只是查询方便
course_list = relationship('Course', secondary='student2course', backref='student_list')
class Course(Base):
__tablename__ = 'course'
id = Column(Integer, primary_key=True)
title = Column(String(32), index=True, nullable=False)
# 多对多关系的表只能自己创建
class Student2Course(Base):
__tablename__ = 'student2course'
id = Column(Integer, primary_key=True, autoincrement=True)
student_id = Column(Integer, ForeignKey('student.id'))
course_id = Column(Integer, ForeignKey('course.id'))
# 做一个联合唯一索引
__table_args__ = (
UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'),
)
def create_conn():
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/fls?charset=utf8",
max_overflow=0,
pool_size=5,
pool_timeout=30,
pool_recycle=-1
)
Base.metadata.create_all(engine)
def drop_conn():
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/fls?charset=utf8",
max_overflow=0,
pool_size=5,
pool_timeout=30,
pool_recycle=-1
)
Base.metadata.drop_all(engine)
if __name__ == '__main__':
# drop_conn()
create_conn()
表操作
# -*- coding: utf-8 -*-
# @Author : Sunhaojie
# @Time : 2019/8/26 21:13
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from SQLAlchemy.models import Student, Course, Student2Course
# 先创建连接
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/fls?charset=utf8",
max_overflow=0,
pool_size=5,
pool_timeout=30,
pool_recycle=-1
)
SessionFactory = sessionmaker(bind=engine)
session = SessionFactory()
# 1、录入数据
session.add_all([
Student(name='大炮'),
Student(name='二狗'),
Course(title='生物'),
Course(title='体育'),
])
session.commit()
session.add_all([
Student2Course(student_id=1,course_id=1),
Student2Course(student_id=1,course_id=2),
Student2Course(student_id=2,course_id=2),
])
session.commit()
# 2、拿到大炮对象
obj = session.query(Student).filter(Student.name == '大炮').first()
# 找他关联的所有课程
for i in obj.course_list:
print(i.title)
# 3、选了生物课的所有人
# 先拿到生物课对象
obj = session.query(Course).filter(Course.title=='生物').first()
# 然后打印其列表
for n in obj.student_list:
print(n.name)
# 4、创建一个课程,创建2学生,并且让两个学生选新创建的课程
obj = Course(title='英语')
# 内部会自己建多对多的关系
obj.student_list = [Student(name='三子'), Student(name='大佬')]
session.add(obj)
session.commit()
session.close()
2、SQLAlchemy之两种连接方式
第一种:每次都新开一个连接
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from SQLAlchemy.models import Student, Course, Student2Course
# 先创建连接
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/fls?charset=utf8",
max_overflow=0,
pool_size=5,
pool_timeout=30,
pool_recycle=-1
)
SessionFactory = sessionmaker(bind=engine)
# 多线程:
def task():
# 去连接池获取一个连接,连接池只有5个,没有就等待,每次都要获取
session = SessionFactory()
res = session.query(Student).all()
# 将连接交还给连接池
session.close()
from threading import Thread
for i in range(20):
t = Thread(target=task)
t.start()
第二种:基于scoped_session实现线程安全
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Users
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
"""
# 线程安全,基于本地线程实现每个线程用同一个session
# 特殊的:scoped_session中有原来方法的Session中的一下方法:
public_methods = (
'__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
'close', 'commit', 'connection', 'delete', 'execute', 'expire',
'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',
'is_modified', 'bulk_save_objects', 'bulk_insert_mappings',
'bulk_update_mappings',
'merge', 'query', 'refresh', 'rollback',
'scalar'
)
"""
session = scoped_session(Session)
# ############# 执行ORM操作 #############
obj1 = Users(name="alex1")
session.add(obj1)
# 提交事务
session.commit()
# 关闭session
session.close()
3、SQLAlchemy之执行原生sql
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 查询
# cursor = session.execute('select * from users')
# result = cursor.fetchall()
# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
session.commit()
print(cursor.lastrowid)
session.close()
更多推荐
所有评论(0)