Flask-sqlalchemy多表查询
- [基本查询- ```python# 头部信息from flask import Flaskfrom flask_sqlalchemy import SQLAlchemyapp = Flask(__name__)app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@192.168.35...
- [基本查询
- ```python
# 头部信息
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@192.168.35.231:3306/testdb'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True
db = SQLAlchemy(app)
```
- ```python
# 头部信息在最开始
class User(db.Model):
__tablename__ = 't_user' # 表名默认位类名小写
id = db.Column(db.Integer, primary_key=True) # 主键,默认自增
name = db.Column(db.String(20), unique=True) # 唯一约束
class Address(db.Model):
__tablename__ = 't_address'
id = db.Column(db.Integer, primary_key=True)
detail = db.Column(db.String(20))
user_id = db.Column(db.Integer) # 外键,记录用户的主键
@app.route("/")
def index():
db.drop_all() # 删除所有继承自db.Model的表
db.create_all() # 创建所有继承自db.Mdoel的表
# 增加数据
user1 = User(name='zs')
user2 = User(name='ls')
db.session.add_all([user1, user2])
db.session.flush() # 主动执行sql操作
adr1 = Address(detail='美国村一号', user_id=user1.id)
adr2 = Address(detail='德国2号', user_id=user1.id)
db.session.add_all([adr1, adr2])
db.session.commit()
# 查询关联数据
user_fir = User.query.filter_by(name='zs').first()
adr_fir = Address.query.filter_by(user_id=user_fir.id).all()
print(adr_fir)
return "index"
if __name__ == '__main__':
app.run()
```
- 关系属性
- ```python
# 头部信息在最开始
class User(db.Model):
__tablename__ = 't_user' # 表名默认位类名小写
id = db.Column(db.Integer, primary_key=True) # 主键,默认自增
name = db.Column(db.String(20), unique=True) # 唯一约束
# 定义关系属性 db.relationship('关联的类名')
addresses = db.relationship('Address')
class Address(db.Model):
__tablename__ = 't_address'
id = db.Column(db.Integer, primary_key=True)
detail = db.Column(db.String(20))
# 设置外键参数 db.ForeignKey('主表名.字段名')
user_id = db.Column(db.Integer, db.ForeignKey('t_user.id')) # 外键,记录用户的主键
@app.route("/")
def index():
db.drop_all() # 删除所有继承自db.Model的表
db.create_all() # 创建所有继承自db.Mdoel的表
# 增加数据
user1 = User(name='zs')
user2 = User(name='ls')
db.session.add_all([user1, user2])
db.session.flush() # 主动执行sql操作
adr1 = Address(detail='美国村一号', user_id=user1.id)
adr2 = Address(detail='德国2号', user_id=user1.id)
db.session.add_all([adr1, adr2])
db.session.commit()
"""使用关系属性来查询关联数据 1)定义外键参数 2)定义关系属性 ==本质:根据外键进行查询"""
# 查询关联数据
useraa = User.query.filter_by(name='zs').first()
print(useraa.addresses)
return "index"
if __name__ == '__main__':
app.run()
```
- ```sql
# 关系属性查询sql实现
SELECT t_address.id AS t_address_id, t_address.detail AS t_address_detail, t_address.user_id AS t_address_user_id FROM t_address WHERE %s = t_address.user_id (1,)
```
- 反向引用
- ```python
# 头部信息在最开始
# 1,设置反向关联属性查询#######################################################################
class User(db.Model):
__tablename__ = 't_user' # 表名默认位类名小写
id = db.Column(db.Integer, primary_key=True) # 主键,默认自增
name = db.Column(db.String(20), unique=True) # 唯一约束
# 定义关系属性 db.relationship('关联的类名')
addresses = db.relationship('Address')
class Address(db.Model):
__tablename__ = 't_address'
id = db.Column(db.Integer, primary_key=True)
detail = db.Column(db.String(20))
# 设置外键参数 db.ForeignKey('主表名.字段名')
user_id = db.Column(db.Integer, db.ForeignKey('t_user.id')) # 外键,记录用户的主键
# 定义反向关系属性
user_info = db.relationship('User')
# 查询关联数据
addr1 = Address.query.filter_by(id=1).first()
"""
SELECT t_address.id AS t_address_id, t_address.detail AS t_address_detail,t_address.user_id AS t_address_user_id FROM t_address WHERE t_address.id = %s LIMIT %s (1, 1)
"""
print(addr1.user_info)
"""
SELECT t_user.id AS t_user_id, t_user.name AS t_user_name FROM t_user WHERE t_user.id = %s (1,)
"""
```
- ```python
# 头部信息在最开始
class User(db.Model):
__tablename__ = 't_user' # 表名默认位类名小写
id = db.Column(db.Integer, primary_key=True) # 主键,默认自增
name = db.Column(db.String(20), unique=True) # 唯一约束
# 定义关系属性 db.relationship('关联的类名')
# addresses = db.relationship('Address')
# 设置backref参数,代替反向关系属性(和定义一个方向关系属性等效)
addresses = db.relationship('Address', backref='user_info')
class Address(db.Model):
__tablename__ = 't_address'
id = db.Column(db.Integer, primary_key=True)
detail = db.Column(db.String(20))
# 设置外键参数 db.ForeignKey('主表名.字段名')
user_id = db.Column(db.Integer, db.ForeignKey('t_user.id')) # 外键,记录用户的主键
# 定义反向关系属性
# user_info = db.relationship('User')
@app.route("/")
def index():
db.drop_all() # 删除所有继承自db.Model的表
db.create_all() # 创建所有继承自db.Mdoel的表
# 增加数据
user1 = User(name='zs')
user2 = User(name='ls')
db.session.add_all([user1, user2])
db.session.flush() # 主动执行sql操作
adr1 = Address(detail='美国村一号', user_id=user1.id)
adr2 = Address(detail='德国2号', user_id=user1.id)
db.session.add_all([adr1, adr2])
db.session.commit()
# 查询关联数据
addr1 = Address.query.filter_by(id=1).first()
print(addr1.user_info)
return "index"
if __name__ == '__main__':
app.run()
```
- ```sql
# 以上sql实现
SELECT t_address.id AS t_address_id, t_address.detail AS t_address_detail, t_address.user_id AS t_address_user_id FROM t_address WHERE t_address.id = %s LIMIT %s (1, 1)
SELECT t_user.id AS t_user_id, t_user.name AS t_user_name FROM t_user WHERE t_user.id = %s (1,)
```
- 动态查询
- ```python
# 头部信息在最开始
class User(db.Model):
__tablename__ = 't_user' # 表名默认位类名小写
id = db.Column(db.Integer, primary_key=True) # 主键,默认自增
name = db.Column(db.String(20), unique=True) # 唯一约束
# 设置参数lazy='dynamic'开启动态查询,关系属性不在返回数据对象,而是返回AppenderBaseQuery对象
# AppenderBaseQuery对象 1)和BaseQuery一样,可以接续复杂的查询条件 2)保留了列表的特性 可以通过遍历/索引取值
addresses = db.relationship('Address', lazy='dynamic')
class Address(db.Model):
__tablename__ = 't_address'
id = db.Column(db.Integer, primary_key=True)
detail = db.Column(db.String(20))
# 设置外键参数 db.ForeignKey('主表名.字段名')
user_id = db.Column(db.Integer, db.ForeignKey('t_user.id')) # 外键,记录用户的主键
@app.route("/")
def index():
db.drop_all() # 删除所有继承自db.Model的表
db.create_all() # 创建所有继承自db.Mdoel的表
# 增加数据
user1 = User(name='zs')
user2 = User(name='ls')
db.session.add_all([user1, user2])
db.session.flush() # 主动执行sql操作
adr1 = Address(detail='美国村一号', user_id=user1.id)
adr2 = Address(detail='德国2号', user_id=user1.id)
db.session.add_all([adr1, adr2])
db.session.commit()
# 查询关联数据
useraa = User.query.filter_by(name='zs').first()
# 对关联数据进一步的条件过滤
adrs = useraa.addresses.filter(Address.detail.startswith('美国')).all()
print(adrs)
# 可以通过遍历/索引取值
adr = useraa.addresses[0]
print(adr)
return "index"
if __name__ == '__main__':
app.run()
```
- ```sql
# 以上sql实现
SELECT t_user.id AS t_user_id, t_user.name AS t_user_name FROM t_user WHERE t_user.name = %s LIMIT %s ('zs', 1)
SELECT t_address.id AS t_address_id, t_address.detail AS t_address_detail, t_address.user_id AS t_address_user_id FROM t_address WHERE %s = t_address.user_id AND (t_address.detail LIKE concat(%s, '%%')) (1, '美国')
SELECT t_address.id AS t_address_id, t_address.detail AS t_address_detail, t_address.user_id AS t_address_user_id FROM t_address WHERE %s = t_address.user_id LIMIT %s (1, 1)
```
- 关联/连接查询
- ```sql
# 使用连接查询
ret = db.session.query(User, Address).join(Address, User.id == Address.user_id).filter(User.name == 'zs')
print(ret)
SELECT t_user.id AS t_user_id, t_user.name AS t_user_name, t_address.id AS t_address_id, t_address.detail AS t_address_detail, t_address.user_id AS t_address_user_id FROM t_user INNER JOIN t_address ON t_user.id = t_address.user_id WHERE t_user.name = %s ('zs',)
```
- ```python
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.orm import Load
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@192.168.35.231:3306/testdb'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True
db = SQLAlchemy(app)
class User(db.Model):
__tablename__ = 't_user' # 表名默认位类名小写
id = db.Column(db.Integer, primary_key=True) # 主键,默认自增
name = db.Column(db.String(20), unique=True) # 唯一约束
addresses = db.relationship('Address')
class Address(db.Model):
__tablename__ = 't_address'
id = db.Column(db.Integer, primary_key=True)
detail = db.Column(db.String(20))
# 设置外键参数 db.ForeignKey('主表名.字段名')
user_id = db.Column(db.Integer, db.ForeignKey('t_user.id')) # 外键,记录用户的主键
@app.route("/")
def index():
db.drop_all() # 删除所有继承自db.Model的表
db.create_all() # 创建所有继承自db.Mdoel的表
# 增加数据
user1 = User(name='zs')
user2 = User(name='ls')
db.session.add_all([user1, user2])
db.session.flush() # 主动执行sql操作
adr1 = Address(detail='美国村一号', user_id=user1.id)
adr2 = Address(detail='德国2号', user_id=user1.id)
db.session.add_all([adr1, adr2])
db.session.commit()
# sqlalchemy中的关系属性设置为了懒查询机制,优点是:减少资源浪费,在需要关联数据时才会查询
# user_obj = User.query.filter_by(name='zs').first()
# print(user_obj.addresses)
# 关系属性和使用基本查询都是查询两次,先查用户数据,再查地址数据
# 如果查询目的就是查询关联的数据,那么两次查询的效率要低于一次查询,网络IO影响比较大
# 使用一次查询来查询关联数据 *********最快的是连接查询***********
# 子查询 select detail from t_address where user_id = (select id from t_user where name='zs');
# 全相乘 笛卡尔积
# 笛卡尔积在SQL中的实现方式既是交叉连接(Cross Join)。所有连接方式都会先生成临时笛卡尔积表,笛卡尔积是关系代数里的一个概念,
# 表示两个表中的每一行数据任意组合,上图中两个表连接即为笛卡尔积(交叉连接)
# select t_address.detail from t_address, t_user where t_address.user_id = t_user.id and t_user.name='zs';
# 连接查询 select t_address.detail from t_address join t_user.id = t_address.user_id where t_user.name='zs';
# 使用连接查询
# ret = db.session.query(User, Address).join(Address, User.id == Address.user_id).filter(User.name == 'zs').all()
# print(ret)
# 进一步优化 只查询指定的字段 需求:只查询用户的id和地址的detail
ret = db.session.query(User, Address).options(Load(User).load_only(User.id), Load(Address).load_only(Address.detail)).join(Address, User.id ==Address.user_id).filter(User.name == 'zs').all()
print(ret)
return "index"
if __name__ == '__main__':
app.run()
```
- ```sql
# 以上sql实现
SELECT t_user.id AS t_user_id, t_address.id AS t_address_id, t_address.detail AS t_address_detail FROM t_user INNER JOIN t_address ON t_user.id = t_address.user_id
WHERE t_user.name = %s ('zs',)
```
更多推荐
所有评论(0)