30 KiB
使用 SQLAlchemy ORM 的 CRUD
原文:https://overiq.com/sqlalchemy-101/crud-using-sqlalchemy-orm/
最后更新于 2020 年 7 月 27 日
创建会话
使用 SQLAlchemy ORM 时,我们使用Session对象与数据库进行交互。Session对象还包装了数据库连接和事务。一旦Session开始与数据库通信,事务就隐式开始,并将保持打开状态,直到Session被提交、回滚或关闭。
创建Session对象的一种方法是使用sqlalchemy.orm包中的Session类。
from sqlalchemy.orm import create_engine, Session
engine = create_engine("postgres+psycopg2://postgres:pass@localhost/mydb")
session = Session(bind=engine)
每次您想要与数据库通信时,都必须创建Session对象。
Session构造函数接受许多参数来定制它的工作。如果我们选择使用这个方法创建 Session,我们将不得不在整个应用中反复调用带有相同参数集的Session构造函数。
为了让事情变得更简单,SQLAlchemy 提供了sessionmaker类,该类使用为其构造函数设置的默认参数来创建Session类。
from sqlalchemy.orm import sessionmaker, Session
Session = sessionmaker(bind=engine)
您应该在全局范围内的应用中调用sessionmaker一次。
一旦我们访问了定制的Session类,您可以根据需要多次实例化它,而无需向它传递任何参数。
session = Session()
请注意,实例化Session对象不会立即建立与数据库的连接。只有当您开始向数据库发送查询时,才会建立连接。
**注意:**本教程涵盖的内容还不止于此。有关更多详细信息,请查看官方文档。
插入数据
要使用 SQLAlchemy ORM 创建新记录,我们需要执行以下步骤:
- 创建一个对象。
- 将对象添加到会话中。
- 提交会话。
让我们创建两个新的Customer对象,如下所示:
c1 = Customer(first_name = 'Toby',
last_name = 'Miller',
username = 'tmiller',
email = 'tmiller@example.com',
address = '1662 Kinney Street',
town = 'Wolfden'
)
c2 = Customer(first_name = 'Scott',
last_name = 'Harvey',
username = 'scottharvey',
email = 'scottharvey@example.com',
address = '424 Patterson Street',
town = 'Beckinsdale'
)
c1, c2
预期输出:
(<Customer:None-johngreen>, <Customer:None-katwilson>)
这里我们已经创建了两个Customer对象。我们可以使用点(.)运算符访问对象的属性,如下所示:
c1.first_name, c1.last_name
c2.first_name, c2.last_name
预期输出:
('John', 'Green')
('Katherine', 'Wilson')
接下来,我们将对象添加到会话中。
session.add(c1)
session.add(c2)
向会话中添加对象实际上并没有将它们写入数据库,它只是准备要在下一次提交中保存的对象。我们可以通过检查对象的主键来验证这一点。
c1.id, c2.id
预期输出:
(None, None)
两个对象的id属性的值都是None。这意味着我们的对象还没有保存在数据库中。
我们可以使用add_all()方法,而不是一次向会话中添加一个对象。add_all()方法接受要添加到会话中的对象列表。
session.add_all([c1, c2])
多次向会话添加对象不会引发任何错误。您可以随时使用session.new查看添加到会话中的对象。
session.new
预期输出:
IdentitySet([<Customer:None-johngreen>, <Customer:None-katwilson>])
最后,要将对象保存到数据库中,调用commit()方法如下:
session.commit()
一旦提交事务,由Session对象引用的连接资源将返回到连接池。后续操作将在新的事务中进行。
访问Customer对象的id属性现在将返回主键,而不是None。
c1.id, c2.id
预期输出:
(1, 2)
此时,Customer表应该是这样的:
我们的顾客没有点任何东西。因此c1.orders和c2.orders将返回一个空列表。
c1.orders, c2.orders
预期输出:
([], [])
让我们在customers表中增加一些客户:
c3 = Customer(
first_name = "John",
last_name = "Lara",
username = "johnlara",
email = "johnlara@mail.com",
address = "3073 Derek Drive",
town = "Norfolk"
)
c4 = Customer(
first_name = "Sarah",
last_name = "Tomlin",
username = "sarahtomlin",
email = "sarahtomlin@mail.com",
address = "3572 Poplar Avenue",
town = "Norfolk"
)
c5 = Customer(first_name = 'Toby',
last_name = 'Miller',
username = 'tmiller',
email = 'tmiller@example.com',
address = '1662 Kinney Street',
town = 'Wolfden'
)
c6 = Customer(first_name = 'Scott',
last_name = 'Harvey',
username = 'scottharvey',
email = 'scottharvey@example.com',
address = '424 Patterson Street',
town = 'Beckinsdale'
)
session.add_all([c3, c4, c5, c6])
session.commit()
在我们接受订单之前,让我们在items表中添加一些产品。
i1 = Item(name = 'Chair', cost_price = 9.21, selling_price = 10.81, quantity = 5)
i2 = Item(name = 'Pen', cost_price = 3.45, selling_price = 4.51, quantity = 3)
i3 = Item(name = 'Headphone', cost_price = 15.52, selling_price = 16.81, quantity = 50)
i4 = Item(name = 'Travel Bag', cost_price = 20.1, selling_price = 24.21, quantity = 50)
i5 = Item(name = 'Keyboard', cost_price = 20.1, selling_price = 22.11, quantity = 50)
i6 = Item(name = 'Monitor', cost_price = 200.14, selling_price = 212.89, quantity = 50)
i7 = Item(name = 'Watch', cost_price = 100.58, selling_price = 104.41, quantity = 50)
i8 = Item(name = 'Water Bottle', cost_price = 20.89, selling_price = 25, quantity = 50)
session.add_all([i1, i2, i3, i4, i5, i6, i7, i8])
session.commit()
现在创建一些订单:
o1 = Order(customer = c1)
o2 = Order(customer = c1)
line_item1 = OrderLine(order = o1, item = i1, quantity = 3)
line_item2 = OrderLine(order = o1, item = i2, quantity = 2)
line_item3 = OrderLine(order = o2, item = i1, quantity = 1)
line_item3 = OrderLine(order = o2, item = i2, quantity = 4)
session.add_all([o1, o2])
session.new
session.commit()
请注意,我们只是在会话中添加Order对象(即o1和o2)。Order和OrderLine对象是一对多关系。向会话添加一个Order对象也隐含地向会话添加相关的OrderLine对象。但是,即使您仍然将OrderLine对象手动添加到会话中,您也不会得到任何错误。
我们也可以这样做,而不是在创建OrderLine实例时传递Order对象:
o3 = Order(customer = c1)
orderline1 = OrderLine(item = i1, quantity = 5)
orderline2 = OrderLine(item = i2, quantity = 10)
o3.order_lines.append(orderline1)
o3.order_lines.append(orderline2)
session.add_all([o3])
session.commit()
提交后,orders和order_lines表现在应该如下所示:
[]
再次访问Customer对象的orders属性,这次会得到一个非空列表,如下图:
c1.orders
预期输出:
[<Order:8>, <Order:9>]
从关系的另一面,我们可以使用Order对象上的customer属性来访问订单所属的Customer对象。
o1.customer
预期输出:
<Customer:1-tmiller>
客户c1现在有两个订单。要查看订单中的订单行,请使用Order对象的order_lines属性。
c1.orders[0].order_lines, c1.orders[1].order_lines
预期输出:
([<OrderLine:1>, <OrderLine:2>], [<OrderLine:3>, <OrderLine:4>])
要访问订单行中的项目,请使用item属性。
for ol in c1.orders[0].order_lines:
ol.id, ol.item, ol.quantity
print('-------')
for ol in c1.orders[1].order_lines:
ol.id, ol.item, ol.quantity
预期输出:
(1, <Item:1-Chair>, 3)
(2, <Item:2-Pen>, 2)
-------
(3, <Item:1-Chair>, 1)
(4, <Item:2-Pen>, 4)
请记住,所有这些之所以成为可能,是因为我们在模型中定义了relationship()。
查询数据
要查询数据库,我们使用session对象的query()方法。query()方法返回类型为sqlalchemy.orm.query.Query的对象,简称为Query。Query对象表示将用于查询数据库的SELECT语句。下表列出了Query类的一些常用方法。
| 方法 | 描述 |
|---|---|
all() |
以列表形式返回查询结果(用Query表示)。 |
count() |
返回查询中记录的总数。 |
first() |
返回查询的第一个结果,如果结果中没有行,则返回None。 |
scalar() |
返回第一行的第一列,如果结果集为空,则返回None。如果遇到多行,它会抛出MultipleResultsFound异常。 |
one |
只返回一行。如果遇到多行,它会抛出MultipleResultsFound异常。如果结果集为空,则抛出NoResultFound异常。 |
get(pk) |
如果没有找到匹配给定主键(pk)或None的对象,则返回该对象。 |
filter(*criterion) |
将WHERE子句应用于查询后,返回一个新的Query实例。 |
limit(limit) |
将LIMIT子句应用于查询后,返回一个新的Query实例。 |
offset(offset) |
将OFFSET子句应用于查询后,返回一个新的Query实例。 |
order_by(*criterion) |
在查询中应用ORDER BY子句后,返回一个新的Query实例。 |
join(*props, **kwargs) |
在查询上创建 SQL INNER JOIN 之后,返回一个新的Query实例。 |
outerjoin(*props, **kwargs) |
在查询上创建 SQL LEFT OUTER JOIN 后,返回一个新的Query实例。 |
group_by(*criterion) |
在查询中添加GROUP BY子句后,返回一个新的Query实例。 |
having(criterion) |
在查询中添加HAVING子句后,返回一个新的Query实例。 |
all()方法
最简单的形式是query()方法可以将一个或多个模型类或列作为参数。以下代码返回customers表中的所有记录。
session.query(Customer).all()
预期输出:
[<Customer:1-jongreen>,
<Customer:2-katwilson>,
<Customer:3-johnlara>,
<Customer:4-sarahtomlin>,
<Customer:5-tmiller>,
<Customer:6-scottharvey>]
同样,下面的代码返回来自items和orders表的所有记录。
session.query(Item).all()
session.query(Order).all()
预期输出:
[<Item:1-Chair>,
<Item:2-Pen>,
<Item:3-Headphone>,
<Item:4-Travel Bag>,
<Item:5-Keyboard>,
<Item:6-Monitor>,
<Item:7-Watch>,
<Item:8-Water Bottle>]
[<Order:1>, <Order:2>, <Order:3>]
要获取用于查询数据库的原始 SQL,只需按如下方式打印sqlalchemy.orm.query.Query对象:
print(session.query(Customer))
预期输出:
SELECT
customers. ID AS customers_id,
customers.first_name AS customers_first_name,
customers.last_name AS customers_last_name,
customers.username AS customers_username,
customers.email AS customers_email,
customers.address AS customers_address,
customers.town AS customers_town,
customers.created_on AS customers_created_on,
customers.updated_on AS customers_updated_on
FROM
customers
在大的结果集中调用all()方法是低效的,相反,我们可以使用 for 循环来迭代Query对象,如下所示:
q = session.query(Customer)
for c in q:
print(c.id, c.first_name)
预期输出:
1 John
2 Katherine
3 John
4 Sarah
5 Toby
6 Scott
前面的查询已经从表的所有列中返回了数据。我们可以通过将列名显式传递给query()方法来防止这种情况,如下所示:
session.query(Customer.id, Customer.first_name).all()
预期输出:
[(1, 'John'),
(2, 'Katherine'),
(3, 'John'),
(4, 'Sarah'),
(5, 'Toby'),
(6, 'Scott')]
请注意,现在列表中的每一项都是元组,而不是模型实例。
count()方法
count()方法返回查询返回的结果数。
session.query(Customer).count() # get the total number of records in the customers table
session.query(Item).count() # get the total number of records in the items table
session.query(Order).count() # get the total number of records in the orders table
预期输出:
6
8
3
first()方法
first()方法返回查询的第一个结果,如果查询返回零个结果,则返回None。
session.query(Customer).first()
session.query(Item).first()
session.query(Order).first()
预期输出:
<Customer:1-johngreen>
<Item:1-Chair>
<Order:1>
get()方法
get()方法返回与传递给它的主键匹配的实例,如果没有找到这样的对象,则返回None。
session.query(Customer).get(1)
session.query(Item).get(1)
session.query(Order).get(100)
预期输出:
<Customer:1-johngreen>
<Item:1-Chair>
filter()方法
filter()方法允许我们通过在查询中添加WHERE子句来过滤结果。它至少接受一列、一个运算符和值。这里有一个例子:
session.query(Customer).filter(Customer.first_name == 'John').all()
预期输出:
[<Customer:1-johngreen>, <Customer:3-johnlara>]
该查询返回所有名字为John的客户。该查询的 SQL 等价物是:
print(session.query(Customer).filter(Customer.first_name == 'John'))
预期输出:
SELECT
customers.id AS customers_id,
customers.first_name AS customers_first_name,
customers.last_name AS customers_last_name,
customers.username AS customers_username,
customers.email AS customers_email,
customers.address AS customers_address,
customers.town AS customers_town,
customers.created_on AS customers_created_on,
customers.updated_on AS customers_updated_on
FROM
customers
WHERE
customers.first_name = %(first_name_1)s
WHERE子句中的字符串%(first_name_1)s是一个占位符,在执行查询时将被实际值(即John)替换。
我们可以将多个过滤器传递给filter()方法,它们将使用 SQL AND运算符连接在一起。例如:
session.query(Customer).filter(Customer.id <= 5, Customer.town == "Norfolk").all()
预期输出:
[<Customer:1-johngreen>, <Customer:3-johnlara>, <Customer:4-sarahtomlin>]
该查询返回主键小于等于 5 且城镇名称以Nor开头的所有客户。它的 SQL 等价物是:
print(session.query(Customer).filter(Customer.id <= 5, Customer.town.like("Nor%"))
预期输出:
组合条件的另一种方式是使用连词(即and_()、or_()和not_())。以下是一些例子:
# find all customers who either live in Peterbrugh or Norfolk
session.query(Customer).filter(or_(
Customer.town == 'Peterbrugh',
Customer.town == 'Norfolk'
)).all()
# find all customers whose first name is John and live in Norfolk
session.query(Customer).filter(and_(
Customer.first_name == 'John',
Customer.town == 'Norfolk'
)).all()
# find all johns who don't live in Peterbrugh
session.query(Customer).filter(and_(
Customer.first_name == 'John',
not_(
Customer.town == 'Peterbrugh',
)
)).all()
预期输出:
[<Customer:1-johngreen>,
<Customer:2-katwilson>,
<Customer:3-johnlara>,
<Customer:4-sarahtomlin>]
[<Customer:1-johngreen>, <Customer:3-johnlara>]
[<Customer:1-johngreen>, <Customer:3-johnlara>]
下面的清单显示了如何使用一些常见的比较运算符和filter()方法。
为空
session.query(Order).filter(Order.date_shipped == None).all()
预期输出:
[<Order:1>, <Order:2>, <Order:3>]
不为空
session.query(Order).filter(Order.date_shipped != None).all()
预期输出:
[]
在…里
session.query(Customer).filter(Customer.first_name.in_(['Toby', 'Sarah'])).all()
预期输出:
[<Customer:4-sarahtomlin>, <Customer:5-tmiller>]
不在
session.query(Customer).filter(Customer.first_name.notin_(['Toby', 'Sarah'])).all()
预期输出:
[<Customer:1-johngreen>,
<Customer:2-katwilson>,
<Customer:3-johnlara>,
<Customer:6-scottharvey>]
在...之间
session.query(Item).filter(Item.cost_price.between(10, 50)).all()
预期输出:
[<Item:3-Headphone>,
<Item:4-Travel Bag>,
<Item:5-Keyboard>,
<Item:8-Water Bottle>]
不在中间
session.query(Item).filter(not_(Item.cost_price.between(10, 50))).all()
预期输出:
[<Item:1-Chair>, <Item:2-Pen>, <Item:6-Monitor>, <Item:7-Watch>]
喜欢
session.query(Item).filter(Item.name.like("%r")).all()
预期输出:
[<Item:1-Chair>, <Item:6-Monitor>]
like()方法执行区分大小写的匹配。对于不区分大小写的匹配,使用ilike()。
session.query(Item).filter(Item.name.ilike("w%")).all()
预期输出:
[<Item:7-Watch>, <Item:8-Water Bottle>]
不喜欢
session.query(Item).filter(not_(Item.name.like("W%"))).all()
预期输出:
[<Item:1-Chair>,
<Item:2-Pen>,
<Item:3-Headphone>,
<Item:4-Travel Bag>,
<Item:5-Keyboard>,
<Item:6-Monitor>]
极限()方法
limit()方法将LIMIT子句添加到查询中。它接受您希望从查询中返回的行数。
session.query(Customer).limit(2).all()
session.query(Customer).filter(Customer.address.ilike("%avenue")).limit(2).all()
预期输出:
[<Customer:1-johngreen>, <Customer:2-katwilson>]
[<Customer:2-katwilson>, <Customer:4-sarahtomlin>]
上述查询的 SQL 等价物如下:
print(session.query(Customer).limit(2))
print(session.query(Customer).filter(Customer.address.ilike("%avenue")).limit(2))
预期输出:
SELECT
customers. id AS customers_id,
customers.first_name AS customers_first_name,
customers.last_name AS customers_last_name,
customers.username AS customers_username,
customers.email AS customers_email,
customers.address AS customers_address,
customers.town AS customers_town,
customers.created_on AS customers_created_on,
customers.updated_on AS customers_updated_on
FROM
customers
LIMIT %(param_1)s
SELECT
customers.id AS customers_id,
customers.first_name AS customers_first_name,
customers.last_name AS customers_last_name,
customers.username AS customers_username,
customers.email AS customers_email,
customers.address AS customers_address,
customers.town AS customers_town,
customers.created_on AS customers_created_on,
customers.updated_on AS customers_updated_on
FROM
customers
WHERE
customers.address ILIKE % (address_1)s
LIMIT %(param_1)s
offset()方法
offset()方法将OFFSET子句添加到查询中。它接受偏移量作为参数。它常用于limit()从句。
session.query(Customer).limit(2).offset(2).all()
预期输出:
[<Customer:3-johnlara>, <Customer:4-sarahtomlin>]
上述查询的等效 SQL 如下:
print(session.query(Customer).limit(2).offset(2))
预期输出:
SELECT
customers. ID AS customers_id,
customers.first_name AS customers_first_name,
customers.last_name AS customers_last_name,
customers.username AS customers_username,
customers.email AS customers_email,
customers.address AS customers_addrees,
customers.town AS customers_town,
customers.created_on AS customers_created_on,
customers.updated_on AS customers_updated_on
FROM
customers
LIMIT %(param_1)s OFFSET %(param_2)s
order_by()方法
通过在查询中添加ORDER BY子句,使用order_by()方法对结果进行排序。它接受订单应该基于的列名。默认情况下,它按升序排序。
session.query(Item).filter(Item.name.ilike("wa%")).all()
session.query(Item).filter(Item.name.ilike("wa%")).order_by(Item.cost_price).all()
预期输出:
[<Item:7-Watch>, <Item:8-Water Bottle>]
[<Item:8-Water Bottle>, <Item:7-Watch>]
要按降序排序,使用desc()功能,如下所示:
from sqlalchemy import desc
session.query(Item).filter(Item.name.ilike("wa%")).order_by(desc(Item.cost_price)).all()
预期输出:
[<Item:7-Watch>, <Item:8-Water Bottle>]
join()方法
join()方法用于创建 SQL INNER JOIN。它接受要为其创建 SQL JOIN 的表名。
让我们用join()方法找到所有有一个或多个订单的客户。
session.query(Customer).join(Order).all()
预期输出:
[<Customer:1-johngreen>]
该查询相当于以下 SQL:
print(session.query(Customer).join(Order))
预期输出:
SELECT
customers.id AS customers_id,
customers.first_name AS customers_first_name,
customers.last_name AS customers_last_name,
customers.username AS customers_username,
customers.email AS customers_email,
customers.address AS customers_address,
customers.town AS customers_town,
customers.created_on AS customers_created_on,
customers.updated_on AS customers_updated_on
FROM
customers
JOIN orders ON customers.id = orders.customer_id
join()方法通常用于在单个查询中从一个或多个表中获取数据。例如:
session.query(Customer.id, Customer.username, Order.id).join(Order).all()
预期输出:
[(1, 'johngreen', 1), (1, 'johngreen', 2), (1, 'johngreen', 3)]
我们可以通过如下链接join()方法为两个以上的表创建 SQL JOIN:
session.query(Table1).join(Table2).join(Table3).join(Table4).all()
这里还有一个例子,它使用 3 个连接来查找John Green的第一个顺序中的所有项目。
session.query(
Customer.first_name,
Item.name,
Item.selling_price,
OrderLine.quantity
).join(Order).join(OrderLine).join(Item).filter(
Customer.first_name == 'John',
Customer.last_name == 'Green',
Order.id == 1,
).all()
预期输出:
[('John', 'Chair', Decimal('10.81'), 3), ('John', 'Pen', Decimal('4.51'), 2)]
outerjoin()方法
outerjoin()方法的工作原理和join()一样,但是创造了LEFT OUTER JOIN。
session.query(
Customer.first_name,
Order.id,
).outerjoin(Order).all()
预期输出:
[('John', 1),
('John', 2),
('John', 3),
('Katherine', None),
('Toby', None),
('Scott', None),
('Sarah', None),
('John', None)]
在这个查询中,左边的表是customers表。因此,它将返回来自customers表(左表)的所有行,并且只从orders表(右表)返回满足连接条件的行。
您也可以通过将full=True传递给outerjoin()方法来创建FULL OUTER JOIN。例如:
session.query(
Customer.first_name,
Order.id,
).outerjoin(Order, full=True).all()
预期输出:
[('John', 1),
('John', 2),
('John', 3),
('Katherine', None),
('Toby', None),
('Scott', None),
('Sarah', None),
('John', None)]
group_by()方法
我们使用group_by()方法对结果进行分组。它接受一列或多列,并根据列中的值对行进行分组。
下面的查询使用join()和group_by()来统计约翰·格林的订单数量。
from sqlalchemy import func
session.query(func.count(Customer.id)).join(Order).filter(
Customer.first_name == 'John',
Customer.last_name == 'Green',
).group_by(Customer.id).scalar()
预期输出:
3
具有()方法
为了根据聚合函数返回的值过滤出结果,我们使用having()方法,该方法将HAVING子句添加到SELECT语句中。就像where()条款一样,它接受一个条件。
# find the number of customers lives in each town
session.query(
func.count("*").label('town_count'),
Customer.town
).group_by(Customer.town).having(func.count("*") > 2).all()
预期输出:
[(3, 'Norfolk')]
处理重复项
为了处理结果集中的重复行,我们使用DISTINCT选项。我们可以使用distinct()方法将DISTINCT选项添加到SELECT语句中。例如:
from sqlalchemy import distinct
session.query(Customer.town).filter(Customer.id < 10).all()
session.query(Customer.town).filter(Customer.id < 10).distinct().all()
session.query(
func.count(distinct(Customer.town)),
func.count(Customer.town)
).all()
预期输出:
[('Norfolk'),
('Peterbrugh'),
('Norfolk'),
('Norfolk'),
('Wolfden'),
('Beckinsdale')]
[('Peterbrugh'), ('Beckinsdale'), ('Wolfden'), ('Norfolk')]
[(4, 6)]
铸造
将数据从一种类型转换为另一种类型是一种常见的操作,通过sqlalchemy包中的cast()功能完成。
from sqlalchemy import cast, Date, distinct, union
session.query(
cast(func.pi(), Integer),
cast(func.pi(), Numeric(10,2)),
cast("2010-12-01", DateTime),
cast("2010-12-01", Date),
).all()
预期输出:
[(3,
Decimal('3.14'),
datetime.datetime(2010, 12, 1, 0, 0),
datetime.date(2010, 12, 1))]
联盟
对于联合查询,我们使用Query对象的union()方法。它需要一个或多个查询。例如:
s1 = session.query(Item.id, Item.name).filter(Item.name.like("Wa%"))
s2 = session.query(Item.id, Item.name).filter(Item.name.like("%e%"))
s1.union(s2).all()
预期输出:
[(2, 'Pen'),
(4, 'Travel Bag'),
(3, 'Headphone'),
(5, 'Keyboard'),
(7, 'Watch'),
(8, 'Water Bottle')]
默认情况下,union()从结果集中删除所有重复的行。如果您想保留副本,请使用union_all()。
s1.union_all(s2).all()
预期输出:
[(7, 'Watch'),
(8, 'Water Bottle'),
(2, 'Pen'),
(3, 'Headphone'),
(4, 'Travel Bag'),
(5, 'Keyboard'),
(8, 'Water Bottle')]
更新数据
要更新对象,只需将其属性设置为新值,将对象添加到会话中并提交更改。
i = session.query(Item).get(8)
i.selling_price = 25.91
session.add(i)
session.commit()
这样我们一次只能更新一个对象。要同时更新多行,请使用Query对象的update()方法。它返回更新的记录总数。例如:
# update quantity of all quantity of items to 60 whose name starts with 'W'
session.query(Item).filter(
Item.name.ilike("W%")
).update({"quantity": 60}, synchronize_session='fetch')
session.commit()
预期输出:
2
删除数据
要删除对象,请使用会话对象的delete()方法。它接受一个对象,并将其标记为在下一次提交时删除。
i = session.query(Item).filter(Item.name == 'Monitor').one()
i
session.delete(i)
session.commit()
预期输出:
<Item:6-Monitor>
该提交从items表中移除Monitor。
要一次删除多条记录,请使用Query对象的delete()方法。
session.query(Item).filter(
Item.name.ilike("W%")
).delete(synchronize_session='fetch')
session.commit()
预期输出:
2
该提交将删除所有名称以W开头的项目。
原始查询
ORM 也给了你直接使用的灵活性,直接使用 SQL 使用text()函数。例如:
from sqlalchemy import text
session.query(Customer).filter(text("first_name = 'John'")).all()
session.query(Customer).filter(text("town like 'Nor%'")).all()
session.query(Customer).filter(text("town like 'Nor%'")).order_by(text("first_name, id desc")).all()
预期输出:
[<Customer:1-johngreen>, <Customer:3-johnlara>]
[<Customer:1-johngreen>, <Customer:3-johnlara>, <Customer:4-sarahtomlin>]
[<Customer:3-johnlara>, <Customer:1-johngreen>, <Customer:4-sarahtomlin>]
处理
事务是一种执行一组 SQL 语句的方式,以便成功执行所有语句或根本不执行任何语句。如果事务中涉及的任何语句失败,那么数据库将返回到事务启动前的状态。
我们目前在数据库中有两个订单。发送订单的过程如下:
- 在
orders表的date_shipped栏中设置发货日期 - 从
items表中减去订购项目的数量
这两个操作必须作为一个单元来执行,以确保表中的数据是正确的。
在下面的列表中,我们定义了dispatch_order()方法,该方法接受order_id作为参数,并在事务中执行上述任务。
from sqlalchemy import update
from sqlalchemy.exc import IntegrityError
from datetime import datetime
def dispatch_order(order_id):
# check whether order_id is valid or not
order = session.query(Order).get(order_id)
if not order:
raise ValueError("Invalid order id: {}.".format(order_id))
if order.date_shipped:
print("Order already shipped.")
return
try:
for i in order.order_lines:
i.item.quantity = i.item.quantity - i.quantity
order.date_shipped = datetime.now()
session.commit()
print("Transaction completed.")
except IntegrityError as e:
print(e)
print("Rolling back ...")
session.rollback()
print("Transaction failed.")
我们的第一个订单是 3 把椅子和 2 支笔。用1的订单 id 调用dispatch_order()函数,将返回如下输出:
dispatch_order(1)
预期输出:
Transaction completed.
此时,items和order_lines表应该是这样的:
[]
[]
我们的下一个订单是 1 把椅子和 4 支笔,但是我们现在只有 3 支笔了!
让我们试着运行dispatch_order()二阶,看看会发生什么。
dispatch_order(2)
预期输出:
(psycopg2.IntegrityError) new row for relation "items" violates check constraint "quantity_check"
DETAIL: Failing row contains (2, Pen, 3.45, 4.51, -3).
[SQL: 'UPDATE items SET quantity=%(quantity)s WHERE items.id = %(items_id)s'] [parameters: ({'quantity': 1, 'items_id': 1}, {'quantity': -3, 'items_id': 2})] (Background on this error at: http://sqlalche.me/e/gkpj)
Rolling back ...
Transaction failed.
不出所料,我们的发货失败了,因为我们的库存中没有足够的笔,并且因为我们正在使用事务,所以我们的数据库恢复到了事务开始之前的状态。
我希望这个教程会对你有所帮助。如果你遇到任何错别字或错误。使用下面的联系页面或评论框联系。
