多表操作

1、创建外键关系及外键操作

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship

Base = declarative_base()


# ##################### 单表示例 #########################
class Users(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True)
    age = Column(Integer, default=18)
    email = Column(String(32), unique=True)
    ctime = Column(DateTime, default=datetime.datetime.now)
    extra = Column(Text, nullable=True)

    __table_args__ = (
        # 联合唯一索引
        # UniqueConstraint('id', 'name', name='uix_id_name'),
        # 联合索引
        # Index('ix_id_name', 'name', 'extra'),
    )


class Hosts(Base):
    __tablename__ = 'hosts'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True)
    ctime = Column(DateTime, default=datetime.datetime.now)


# ##################### 一对多示例 #########################
class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='篮球')


class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    hobby_id = Column(Integer, ForeignKey("hobby.id"))  # 外键

    # 与生成表结构无关,仅用于查询方便
    hobby = relationship("Hobby", backref='pers')


# ##################### 多对多示例 #########################

class Server2Group(Base):
    __tablename__ = 'server2group'
    id = Column(Integer, primary_key=True, autoincrement=True)
    server_id = Column(Integer, ForeignKey('server.id'))
    group_id = Column(Integer, ForeignKey('group.id'))


class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)

    # 与生成表结构无关,仅用于查询方便
    servers = relationship('Server', secondary='server2group', backref='groups')


class Server(Base):
    __tablename__ = 'server'

    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)


def init_db():
    """
    根据类创建数据库表
    :return: 
    """
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.create_all(engine)


def drop_db():
    """
    根据类删除数据库表
    :return: 
    """
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    Base.metadata.drop_all(engine)


if __name__ == '__main__':
    # drop_db()
    init_db()
练习示例
# -*- coding: utf-8 -*-
# @Author  : Sunhaojie
# @Time    : 2019/8/25 17:35

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship


Base = declarative_base()

class Depart(Base):
    __tablename__ = 'depart'
    id = Column(Integer, primary_key=True)
    title = Column(String(32), index=True, nullable=False)


class Users(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    depart_id = Column(Integer, ForeignKey('depart.id'))  # 外键
    
    # 不会在数据库创建,只是建一个关系,backref是反向关联,就是用Depart.pers拿到用户Users对象
    dp = relationship('Depart', backref='pers')


def create_conn():
    engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/fls?charset=utf8",
        max_overflow=0,
        pool_size=5,
        pool_timeout=30,
        pool_recycle=-1
    )

    Base.metadata.create_all(engine)

def drop_conn():
    engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/fls?charset=utf8",
        max_overflow=0,
        pool_size=5,
        pool_timeout=30,
        pool_recycle=-1
    )
    Base.metadata.drop_all(engine)

if __name__ == '__main__':
    # drop_conn()
    create_conn()
外键foreignkry操作
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from SQLAlchemy.models import Users, Depart

# 先创建连接
engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/fls?charset=utf8",
        max_overflow=0,
        pool_size=5,
        pool_timeout=30,
        pool_recycle=-1
    )
SessionFactory = sessionmaker(bind=engine)

session = SessionFactory()

# 1、查询所有用户
r = session.query(Users).all()
for row in r:
    print(row.id, row.name)

# 2、查询所有用户+所属部门名称(连表操作)
# Users.depart_id == Depart.id是默认的,可以不写,is_outer=True,表示左连接,默认是内连接
res=session.query(Users.id,Users.name,Depart.title).join(Depart).all()

res = session.query(Users.id,Users.name, Depart.title).join(Depart, Users.depart_id == Depart.id, isouter=True).all()
for row in res:
    print(row.id, row.name, row.title)

# 3、relation字段:查询所有用户+所属部门名称
res = session.query(Users).all()
for row in res:
    print(row.id,row.name,row.depart_id, row.dp, row.dp.title)
    #row.dp:跨表了:<SQLAlchemy.models.Depart object at 0x0000003CDD408470>
   
# 4、relation字段:查询部门销售部所有人员
obj = session.query(Depart).filter(Depart.title=='a').first()
print(obj.pers) #[<SQLAlchemy.models.Users object at 0x00000096C8C4F438>]
for i in obj.pers:
    print(i.id, i.name, obj.title)
    
# 5、创建一个IT部门,再在部门加一个人:qqq
"""方式一"""
d1 = Depart(title='IT')
session.add(d1)
session.commit()
print(d1.title, d1.id)
u1 = Users(name='qqq', depart_id=d1.id)
session.add(u1)
session.commit()

"""方式二"""
u2 = Users(name='qqq', dp=Depart(title='IT'))
session.add(u2)
session.commit()

# 6、创建一个部门:王者,添加三个员工:大头/二狗/三炮
d1 = Depart(title='王者')
d1.pers = [Users(name='大头'),Users(name='二狗'),Users(name='三炮')]
session.add(d1)
session.commit()

session.close()
ManyToMany操作

建表:只能自己创建多对多的第三张表

# -*- coding: utf-8 -*-
# @Author  : Sunhaojie
# @Time    : 2019/8/25 17:35

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship

Base = declarative_base()

class Student(Base):
    __tablename__ = 'student'
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=False)
    
    # 不会创建字段,只是查询方便
    course_list = relationship('Course', secondary='student2course', backref='student_list')



class Course(Base):
    __tablename__ = 'course'
    id = Column(Integer, primary_key=True)
    title = Column(String(32), index=True, nullable=False)


# 多对多关系的表只能自己创建
class Student2Course(Base):
    __tablename__ = 'student2course'
    id = Column(Integer, primary_key=True, autoincrement=True)
    student_id = Column(Integer, ForeignKey('student.id'))
    course_id = Column(Integer, ForeignKey('course.id'))

    # 做一个联合唯一索引
    __table_args__ = (
        UniqueConstraint('student_id', 'course_id', name='uix_stu_cou'),
    )

def create_conn():
    engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/fls?charset=utf8",
        max_overflow=0,
        pool_size=5,
        pool_timeout=30,
        pool_recycle=-1
    )

    Base.metadata.create_all(engine)

def drop_conn():
    engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/fls?charset=utf8",
        max_overflow=0,
        pool_size=5,
        pool_timeout=30,
        pool_recycle=-1
    )
    Base.metadata.drop_all(engine)

if __name__ == '__main__':
    # drop_conn()
    create_conn()

表操作

# -*- coding: utf-8 -*-
# @Author  : Sunhaojie
# @Time    : 2019/8/26 21:13
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from SQLAlchemy.models import Student, Course, Student2Course

# 先创建连接
engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/fls?charset=utf8",
        max_overflow=0,
        pool_size=5,
        pool_timeout=30,
        pool_recycle=-1
    )
SessionFactory = sessionmaker(bind=engine)

session = SessionFactory()

# 1、录入数据
session.add_all([
    Student(name='大炮'),
    Student(name='二狗'),
    Course(title='生物'),
    Course(title='体育'),
])
session.commit()

session.add_all([
    Student2Course(student_id=1,course_id=1),
    Student2Course(student_id=1,course_id=2),
    Student2Course(student_id=2,course_id=2),
])
session.commit()


# 2、拿到大炮对象
obj = session.query(Student).filter(Student.name == '大炮').first()
# 找他关联的所有课程
for i in obj.course_list:
    print(i.title)

# 3、选了生物课的所有人
# 先拿到生物课对象
obj = session.query(Course).filter(Course.title=='生物').first()
# 然后打印其列表
for n in obj.student_list:
    print(n.name)

# 4、创建一个课程,创建2学生,并且让两个学生选新创建的课程
obj = Course(title='英语')
# 内部会自己建多对多的关系
obj.student_list = [Student(name='三子'), Student(name='大佬')]
session.add(obj)
session.commit()


session.close()

2、SQLAlchemy之两种连接方式

第一种:每次都新开一个连接
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from SQLAlchemy.models import Student, Course, Student2Course

# 先创建连接
engine = create_engine(
        "mysql+pymysql://root:123456@127.0.0.1:3306/fls?charset=utf8",
        max_overflow=0,
        pool_size=5,
        pool_timeout=30,
        pool_recycle=-1
    )
SessionFactory = sessionmaker(bind=engine)


# 多线程:
def task():
    # 去连接池获取一个连接,连接池只有5个,没有就等待,每次都要获取
    session = SessionFactory()
    res = session.query(Student).all()
    # 将连接交还给连接池
    session.close()

from threading import Thread
for i in range(20):
    t = Thread(target=task)
    t.start()

第二种:基于scoped_session实现线程安全

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Users

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

"""
# 线程安全,基于本地线程实现每个线程用同一个session
# 特殊的:scoped_session中有原来方法的Session中的一下方法:

public_methods = (
    '__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
    'close', 'commit', 'connection', 'delete', 'execute', 'expire',
    'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',
    'is_modified', 'bulk_save_objects', 'bulk_insert_mappings',
    'bulk_update_mappings',
    'merge', 'query', 'refresh', 'rollback',
    'scalar'
)
"""
session = scoped_session(Session)


# ############# 执行ORM操作 #############
obj1 = Users(name="alex1")
session.add(obj1)


# 提交事务
session.commit()
# 关闭session
session.close()

3、SQLAlchemy之执行原生sql

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)

session = Session()

# 查询
# cursor = session.execute('select * from users')
# result = cursor.fetchall()

# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
session.commit()
print(cursor.lastrowid)

session.close()
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐