geekdoc-python-zh/docs/overiq/228.md

30 KiB
Raw Permalink Blame History

使用 SQLAlchemy ORM 的 CRUD

原文:https://overiq.com/sqlalchemy-101/crud-using-sqlalchemy-orm/

最后更新于 2020 年 7 月 27 日


创建会话

使用 SQLAlchemy ORM 时,我们使用Session对象与数据库进行交互。Session对象还包装了数据库连接和事务。一旦Session开始与数据库通信,事务就隐式开始,并将保持打开状态,直到Session被提交、回滚或关闭。

创建Session对象的一种方法是使用sqlalchemy.orm包中的Session类。

from sqlalchemy.orm import create_engine, Session
engine = create_engine("postgres+psycopg2://postgres:pass@localhost/mydb")
session = Session(bind=engine)

每次您想要与数据库通信时,都必须创建Session对象。

Session构造函数接受许多参数来定制它的工作。如果我们选择使用这个方法创建 Session我们将不得不在整个应用中反复调用带有相同参数集的Session构造函数。

为了让事情变得更简单SQLAlchemy 提供了sessionmaker类,该类使用为其构造函数设置的默认参数来创建Session类。

from sqlalchemy.orm import sessionmaker, Session
Session = sessionmaker(bind=engine)

您应该在全局范围内的应用中调用sessionmaker一次。

一旦我们访问了定制的Session类,您可以根据需要多次实例化它,而无需向它传递任何参数。

session = Session()

请注意,实例化Session对象不会立即建立与数据库的连接。只有当您开始向数据库发送查询时,才会建立连接。

**注意:**本教程涵盖的内容还不止于此。有关更多详细信息,请查看官方文档。

插入数据

要使用 SQLAlchemy ORM 创建新记录,我们需要执行以下步骤:

  1. 创建一个对象。
  2. 将对象添加到会话中。
  3. 提交会话。

让我们创建两个新的Customer对象,如下所示:

c1 = Customer(first_name = 'Toby', 
              last_name = 'Miller', 
              username = 'tmiller', 
              email = 'tmiller@example.com', 
              address = '1662 Kinney Street',
              town = 'Wolfden'
             )

c2 = Customer(first_name = 'Scott', 
              last_name = 'Harvey', 
              username = 'scottharvey', 
              email = 'scottharvey@example.com', 
              address = '424 Patterson Street',
              town = 'Beckinsdale'
             )
c1, c2

预期输出:

(<Customer:None-johngreen>, <Customer:None-katwilson>)

这里我们已经创建了两个Customer对象。我们可以使用点(.)运算符访问对象的属性,如下所示:

c1.first_name, c1.last_name
c2.first_name, c2.last_name

预期输出:

('John', 'Green')
('Katherine', 'Wilson')

接下来,我们将对象添加到会话中。

session.add(c1)
session.add(c2)

向会话中添加对象实际上并没有将它们写入数据库,它只是准备要在下一次提交中保存的对象。我们可以通过检查对象的主键来验证这一点。

c1.id, c2.id

预期输出:

(None, None)

两个对象的id属性的值都是None。这意味着我们的对象还没有保存在数据库中。

我们可以使用add_all()方法,而不是一次向会话中添加一个对象。add_all()方法接受要添加到会话中的对象列表。

session.add_all([c1, c2])

多次向会话添加对象不会引发任何错误。您可以随时使用session.new查看添加到会话中的对象。

session.new

预期输出:

IdentitySet([<Customer:None-johngreen>, <Customer:None-katwilson>])

最后,要将对象保存到数据库中,调用commit()方法如下:

session.commit()

一旦提交事务,由Session对象引用的连接资源将返回到连接池。后续操作将在新的事务中进行。

访问Customer对象的id属性现在将返回主键,而不是None

c1.id, c2.id

预期输出:

(1, 2)

此时,Customer表应该是这样的:

我们的顾客没有点任何东西。因此c1.ordersc2.orders将返回一个空列表。

c1.orders, c2.orders

预期输出:

([], [])

让我们在customers表中增加一些客户:

c3 = Customer(
            first_name = "John", 
            last_name = "Lara", 
            username = "johnlara", 
            email = "johnlara@mail.com", 
            address = "3073 Derek Drive",
            town = "Norfolk"
)

c4 = Customer(          
            first_name = "Sarah", 
            last_name = "Tomlin", 
            username = "sarahtomlin", 
            email = "sarahtomlin@mail.com",
            address = "3572 Poplar Avenue",
            town = "Norfolk"        
)

c5 = Customer(first_name = 'Toby', 
              last_name = 'Miller', 
              username = 'tmiller', 
              email = 'tmiller@example.com', 
              address = '1662 Kinney Street',
              town = 'Wolfden'
             )

c6 = Customer(first_name = 'Scott', 
              last_name = 'Harvey', 
              username = 'scottharvey', 
              email = 'scottharvey@example.com', 
              address = '424 Patterson Street',
              town = 'Beckinsdale'
             )

session.add_all([c3, c4, c5, c6])
session.commit()

在我们接受订单之前,让我们在items表中添加一些产品。

i1 = Item(name = 'Chair', cost_price = 9.21, selling_price = 10.81, quantity = 5)
i2 = Item(name = 'Pen', cost_price = 3.45, selling_price = 4.51, quantity = 3)
i3 = Item(name = 'Headphone', cost_price = 15.52, selling_price = 16.81, quantity = 50)
i4 = Item(name = 'Travel Bag', cost_price = 20.1, selling_price = 24.21, quantity = 50)
i5 = Item(name = 'Keyboard', cost_price = 20.1, selling_price = 22.11, quantity = 50)
i6 = Item(name = 'Monitor', cost_price = 200.14, selling_price = 212.89, quantity = 50)
i7 = Item(name = 'Watch', cost_price = 100.58, selling_price = 104.41, quantity = 50)
i8 = Item(name = 'Water Bottle', cost_price = 20.89, selling_price = 25, quantity = 50)

session.add_all([i1, i2, i3, i4, i5, i6, i7, i8])
session.commit()

现在创建一些订单:

o1 = Order(customer = c1)
o2 = Order(customer = c1)

line_item1 = OrderLine(order = o1, item = i1, quantity =  3)
line_item2 = OrderLine(order = o1, item = i2, quantity =  2)
line_item3 = OrderLine(order = o2, item = i1, quantity =  1)
line_item3 = OrderLine(order = o2, item = i2, quantity =  4)

session.add_all([o1, o2])

session.new
session.commit()

请注意,我们只是在会话中添加Order对象(即o1o2)。OrderOrderLine对象是一对多关系。向会话添加一个Order对象也隐含地向会话添加相关的OrderLine对象。但是,即使您仍然将OrderLine对象手动添加到会话中,您也不会得到任何错误。

我们也可以这样做,而不是在创建OrderLine实例时传递Order对象:

o3 = Order(customer = c1)
orderline1 = OrderLine(item = i1, quantity = 5)
orderline2 = OrderLine(item = i2, quantity = 10)

o3.order_lines.append(orderline1)
o3.order_lines.append(orderline2)

session.add_all([o3])

session.commit()

提交后,ordersorder_lines表现在应该如下所示:

[]

再次访问Customer对象的orders属性,这次会得到一个非空列表,如下图:

c1.orders

预期输出:

[<Order:8>, <Order:9>]

从关系的另一面,我们可以使用Order对象上的customer属性来访问订单所属的Customer对象。

o1.customer

预期输出:

<Customer:1-tmiller>

客户c1现在有两个订单。要查看订单中的订单行,请使用Order对象的order_lines属性。

c1.orders[0].order_lines, c1.orders[1].order_lines

预期输出:

([<OrderLine:1>, <OrderLine:2>], [<OrderLine:3>, <OrderLine:4>])

要访问订单行中的项目,请使用item属性。

for ol in c1.orders[0].order_lines:
    ol.id, ol.item, ol.quantity

print('-------')

for ol in c1.orders[1].order_lines:
    ol.id, ol.item, ol.quantity

预期输出:

(1, <Item:1-Chair>, 3)
(2, <Item:2-Pen>, 2)
-------
(3, <Item:1-Chair>, 1)
(4, <Item:2-Pen>, 4)

请记住,所有这些之所以成为可能,是因为我们在模型中定义了relationship()

查询数据

要查询数据库,我们使用session对象的query()方法。query()方法返回类型为sqlalchemy.orm.query.Query的对象,简称为QueryQuery对象表示将用于查询数据库的SELECT语句。下表列出了Query类的一些常用方法。

方法 描述
all() 以列表形式返回查询结果(用Query表示)。
count() 返回查询中记录的总数。
first() 返回查询的第一个结果,如果结果中没有行,则返回None
scalar() 返回第一行的第一列,如果结果集为空,则返回None。如果遇到多行,它会抛出MultipleResultsFound异常。
one 只返回一行。如果遇到多行,它会抛出MultipleResultsFound异常。如果结果集为空,则抛出NoResultFound异常。
get(pk) 如果没有找到匹配给定主键(pk)或None的对象,则返回该对象。
filter(*criterion) WHERE子句应用于查询后,返回一个新的Query实例。
limit(limit) LIMIT子句应用于查询后,返回一个新的Query实例。
offset(offset) OFFSET子句应用于查询后,返回一个新的Query实例。
order_by(*criterion) 在查询中应用ORDER BY子句后,返回一个新的Query实例。
join(*props, **kwargs) 在查询上创建 SQL INNER JOIN 之后,返回一个新的Query实例。
outerjoin(*props, **kwargs) 在查询上创建 SQL LEFT OUTER JOIN 后,返回一个新的Query实例。
group_by(*criterion) 在查询中添加GROUP BY子句后,返回一个新的Query实例。
having(criterion) 在查询中添加HAVING子句后,返回一个新的Query实例。

all()方法

最简单的形式是query()方法可以将一个或多个模型类或列作为参数。以下代码返回customers表中的所有记录。

session.query(Customer).all()

预期输出:

[<Customer:1-jongreen>,
 <Customer:2-katwilson>,
 <Customer:3-johnlara>,
 <Customer:4-sarahtomlin>,
 <Customer:5-tmiller>,
 <Customer:6-scottharvey>]

同样,下面的代码返回来自itemsorders表的所有记录。

session.query(Item).all()
session.query(Order).all()

预期输出:

[<Item:1-Chair>,
 <Item:2-Pen>,
 <Item:3-Headphone>,
 <Item:4-Travel Bag>,
 <Item:5-Keyboard>,
 <Item:6-Monitor>,
 <Item:7-Watch>,
 <Item:8-Water Bottle>]

[<Order:1>, <Order:2>, <Order:3>]

要获取用于查询数据库的原始 SQL只需按如下方式打印sqlalchemy.orm.query.Query对象:

print(session.query(Customer))

预期输出:

SELECT
    customers. ID AS customers_id,
    customers.first_name AS customers_first_name,
    customers.last_name AS customers_last_name,
    customers.username AS customers_username,
    customers.email AS customers_email,
    customers.address AS customers_address,
    customers.town AS customers_town,
    customers.created_on AS customers_created_on,
    customers.updated_on AS customers_updated_on
FROM
    customers

在大的结果集中调用all()方法是低效的,相反,我们可以使用 for 循环来迭代Query对象,如下所示:

q = session.query(Customer)

for c in q:
    print(c.id, c.first_name)

预期输出:

1 John
2 Katherine
3 John
4 Sarah
5 Toby
6 Scott

前面的查询已经从表的所有列中返回了数据。我们可以通过将列名显式传递给query()方法来防止这种情况,如下所示:

session.query(Customer.id, Customer.first_name).all()

预期输出:

[(1, 'John'),
 (2, 'Katherine'),
 (3, 'John'),
 (4, 'Sarah'),
 (5, 'Toby'),
 (6, 'Scott')]

请注意,现在列表中的每一项都是元组,而不是模型实例。

count()方法

count()方法返回查询返回的结果数。

session.query(Customer).count() # get the total number of records in the customers table
session.query(Item).count()  # get the total number of records in the items table
session.query(Order).count()  # get the total number of records in the orders table

预期输出:

6
8
3

first()方法

first()方法返回查询的第一个结果,如果查询返回零个结果,则返回None

session.query(Customer).first()
session.query(Item).first()
session.query(Order).first()

预期输出:

<Customer:1-johngreen>
<Item:1-Chair>
<Order:1>

get()方法

get()方法返回与传递给它的主键匹配的实例,如果没有找到这样的对象,则返回None

session.query(Customer).get(1)
session.query(Item).get(1)
session.query(Order).get(100)

预期输出:

<Customer:1-johngreen>
<Item:1-Chair>

filter()方法

filter()方法允许我们通过在查询中添加WHERE子句来过滤结果。它至少接受一列、一个运算符和值。这里有一个例子:

session.query(Customer).filter(Customer.first_name == 'John').all()

预期输出:

[<Customer:1-johngreen>, <Customer:3-johnlara>]

该查询返回所有名字为John的客户。该查询的 SQL 等价物是:

print(session.query(Customer).filter(Customer.first_name == 'John'))

预期输出:

SELECT
    customers.id AS customers_id,
    customers.first_name AS customers_first_name,
    customers.last_name AS customers_last_name,
    customers.username AS customers_username,
    customers.email AS customers_email,
    customers.address AS customers_address,
    customers.town AS customers_town,
    customers.created_on AS customers_created_on,
    customers.updated_on AS customers_updated_on
FROM
    customers
WHERE
    customers.first_name = %(first_name_1)s

WHERE子句中的字符串%(first_name_1)s是一个占位符,在执行查询时将被实际值(即John)替换。

我们可以将多个过滤器传递给filter()方法,它们将使用 SQL AND运算符连接在一起。例如:

session.query(Customer).filter(Customer.id <= 5, Customer.town == "Norfolk").all()

预期输出:

[<Customer:1-johngreen>, <Customer:3-johnlara>, <Customer:4-sarahtomlin>]

该查询返回主键小于等于 5 且城镇名称以Nor开头的所有客户。它的 SQL 等价物是:

print(session.query(Customer).filter(Customer.id <= 5, Customer.town.like("Nor%"))

预期输出:

组合条件的另一种方式是使用连词(即and_()or_()not_())。以下是一些例子:

# find all customers who either live in Peterbrugh or Norfolk

session.query(Customer).filter(or_(
    Customer.town == 'Peterbrugh', 
    Customer.town == 'Norfolk'
)).all()

# find all customers whose first name is John and live in Norfolk

session.query(Customer).filter(and_(
    Customer.first_name == 'John', 
    Customer.town == 'Norfolk'
)).all()

# find all johns who don't live in Peterbrugh

session.query(Customer).filter(and_(
    Customer.first_name == 'John', 
    not_(
        Customer.town == 'Peterbrugh', 
    )
)).all()

预期输出:

[<Customer:1-johngreen>,
 <Customer:2-katwilson>,
 <Customer:3-johnlara>,
 <Customer:4-sarahtomlin>]

[<Customer:1-johngreen>, <Customer:3-johnlara>]

[<Customer:1-johngreen>, <Customer:3-johnlara>]

下面的清单显示了如何使用一些常见的比较运算符和filter()方法。

为空

session.query(Order).filter(Order.date_shipped == None).all()

预期输出:

[<Order:1>, <Order:2>, <Order:3>]

不为空

session.query(Order).filter(Order.date_shipped != None).all()

预期输出:

[]

在…里

session.query(Customer).filter(Customer.first_name.in_(['Toby', 'Sarah'])).all()

预期输出:

[<Customer:4-sarahtomlin>, <Customer:5-tmiller>]

不在

session.query(Customer).filter(Customer.first_name.notin_(['Toby', 'Sarah'])).all()

预期输出:

[<Customer:1-johngreen>,
 <Customer:2-katwilson>,
 <Customer:3-johnlara>,
 <Customer:6-scottharvey>]

在...之间

session.query(Item).filter(Item.cost_price.between(10, 50)).all()

预期输出:

[<Item:3-Headphone>,
 <Item:4-Travel Bag>,
 <Item:5-Keyboard>,
 <Item:8-Water Bottle>]

不在中间

session.query(Item).filter(not_(Item.cost_price.between(10, 50))).all()

预期输出:

[<Item:1-Chair>, <Item:2-Pen>, <Item:6-Monitor>, <Item:7-Watch>]

喜欢

session.query(Item).filter(Item.name.like("%r")).all()

预期输出:

[<Item:1-Chair>, <Item:6-Monitor>]

like()方法执行区分大小写的匹配。对于不区分大小写的匹配,使用ilike()

session.query(Item).filter(Item.name.ilike("w%")).all()

预期输出:

[<Item:7-Watch>, <Item:8-Water Bottle>]

不喜欢

session.query(Item).filter(not_(Item.name.like("W%"))).all()

预期输出:

[<Item:1-Chair>,
 <Item:2-Pen>,
 <Item:3-Headphone>,
 <Item:4-Travel Bag>,
 <Item:5-Keyboard>,
 <Item:6-Monitor>]

极限()方法

limit()方法将LIMIT子句添加到查询中。它接受您希望从查询中返回的行数。

session.query(Customer).limit(2).all()
session.query(Customer).filter(Customer.address.ilike("%avenue")).limit(2).all()

预期输出:

[<Customer:1-johngreen>, <Customer:2-katwilson>]

[<Customer:2-katwilson>, <Customer:4-sarahtomlin>]

上述查询的 SQL 等价物如下:

print(session.query(Customer).limit(2))
print(session.query(Customer).filter(Customer.address.ilike("%avenue")).limit(2))

预期输出:

SELECT
    customers. id AS customers_id,
    customers.first_name AS customers_first_name,
    customers.last_name AS customers_last_name,
    customers.username AS customers_username,
    customers.email AS customers_email,
    customers.address AS customers_address,
    customers.town AS customers_town,
    customers.created_on AS customers_created_on,
    customers.updated_on AS customers_updated_on
FROM
    customers
LIMIT %(param_1)s 

SELECT
    customers.id AS customers_id,
    customers.first_name AS customers_first_name,
    customers.last_name AS customers_last_name,
    customers.username AS customers_username,
    customers.email AS customers_email,
    customers.address AS customers_address,
    customers.town AS customers_town,
    customers.created_on AS customers_created_on,
    customers.updated_on AS customers_updated_on
FROM
    customers
WHERE
    customers.address ILIKE % (address_1)s
LIMIT %(param_1)s

offset()方法

offset()方法将OFFSET子句添加到查询中。它接受偏移量作为参数。它常用于limit()从句。

session.query(Customer).limit(2).offset(2).all()

预期输出:

[<Customer:3-johnlara>, <Customer:4-sarahtomlin>]

上述查询的等效 SQL 如下:

print(session.query(Customer).limit(2).offset(2))

预期输出:

SELECT
    customers. ID AS customers_id,
    customers.first_name AS customers_first_name,
    customers.last_name AS customers_last_name,
    customers.username AS customers_username,
    customers.email AS customers_email,
    customers.address AS customers_addrees,
    customers.town AS customers_town,
    customers.created_on AS customers_created_on,
    customers.updated_on AS customers_updated_on
FROM
    customers
LIMIT %(param_1)s OFFSET %(param_2)s

order_by()方法

通过在查询中添加ORDER BY子句,使用order_by()方法对结果进行排序。它接受订单应该基于的列名。默认情况下,它按升序排序。

session.query(Item).filter(Item.name.ilike("wa%")).all()
session.query(Item).filter(Item.name.ilike("wa%")).order_by(Item.cost_price).all()

预期输出:

[<Item:7-Watch>, <Item:8-Water Bottle>]
[<Item:8-Water Bottle>, <Item:7-Watch>]

要按降序排序,使用desc()功能,如下所示:

from sqlalchemy import desc
session.query(Item).filter(Item.name.ilike("wa%")).order_by(desc(Item.cost_price)).all()

预期输出:

[<Item:7-Watch>, <Item:8-Water Bottle>]

join()方法

join()方法用于创建 SQL INNER JOIN。它接受要为其创建 SQL JOIN 的表名。

让我们用join()方法找到所有有一个或多个订单的客户。

session.query(Customer).join(Order).all()

预期输出:

[<Customer:1-johngreen>]

该查询相当于以下 SQL:

print(session.query(Customer).join(Order))

预期输出:

SELECT
    customers.id AS customers_id,
    customers.first_name AS customers_first_name,
    customers.last_name AS customers_last_name,
    customers.username AS customers_username,
    customers.email AS customers_email,
    customers.address AS customers_address,
    customers.town AS customers_town,
    customers.created_on AS customers_created_on,
    customers.updated_on AS customers_updated_on
FROM
    customers
JOIN orders ON customers.id = orders.customer_id

join()方法通常用于在单个查询中从一个或多个表中获取数据。例如:

session.query(Customer.id, Customer.username, Order.id).join(Order).all()

预期输出:

[(1, 'johngreen', 1), (1, 'johngreen', 2), (1, 'johngreen', 3)]

我们可以通过如下链接join()方法为两个以上的表创建 SQL JOIN:

session.query(Table1).join(Table2).join(Table3).join(Table4).all()

这里还有一个例子,它使用 3 个连接来查找John Green的第一个顺序中的所有项目。

session.query(
    Customer.first_name, 
    Item.name, 
    Item.selling_price, 
    OrderLine.quantity
).join(Order).join(OrderLine).join(Item).filter(
    Customer.first_name == 'John',
    Customer.last_name == 'Green',
    Order.id == 1,
).all()

预期输出:

[('John', 'Chair', Decimal('10.81'), 3), ('John', 'Pen', Decimal('4.51'), 2)]

outerjoin()方法

outerjoin()方法的工作原理和join()一样,但是创造了LEFT OUTER JOIN

session.query(        
    Customer.first_name,
    Order.id,
).outerjoin(Order).all()

预期输出:

[('John', 1),
 ('John', 2),
 ('John', 3),
 ('Katherine', None),
 ('Toby', None),
 ('Scott', None),
 ('Sarah', None),
 ('John', None)]

在这个查询中,左边的表是customers表。因此,它将返回来自customers表(左表)的所有行,并且只从orders表(右表)返回满足连接条件的行。

您也可以通过将full=True传递给outerjoin()方法来创建FULL OUTER JOIN。例如:

session.query(        
    Customer.first_name,
    Order.id,
).outerjoin(Order, full=True).all()

预期输出:

[('John', 1),
 ('John', 2),
 ('John', 3),
 ('Katherine', None),
 ('Toby', None),
 ('Scott', None),
 ('Sarah', None),
 ('John', None)]

group_by()方法

我们使用group_by()方法对结果进行分组。它接受一列或多列,并根据列中的值对行进行分组。

下面的查询使用join()group_by()来统计约翰·格林的订单数量。

from sqlalchemy import func

session.query(func.count(Customer.id)).join(Order).filter(
    Customer.first_name == 'John',
    Customer.last_name == 'Green',    
).group_by(Customer.id).scalar()

预期输出:

3

具有()方法

为了根据聚合函数返回的值过滤出结果,我们使用having()方法,该方法将HAVING子句添加到SELECT语句中。就像where()条款一样,它接受一个条件。

# find the number of customers lives in each town

session.query(
    func.count("*").label('town_count'),    
    Customer.town
).group_by(Customer.town).having(func.count("*") > 2).all()

预期输出:

[(3, 'Norfolk')]

处理重复项

为了处理结果集中的重复行,我们使用DISTINCT选项。我们可以使用distinct()方法将DISTINCT选项添加到SELECT语句中。例如:

from sqlalchemy import distinct

session.query(Customer.town).filter(Customer.id  < 10).all()
session.query(Customer.town).filter(Customer.id  < 10).distinct().all()

session.query(        
    func.count(distinct(Customer.town)),
    func.count(Customer.town)
).all()

预期输出:

[('Norfolk'),
 ('Peterbrugh'),
 ('Norfolk'),
 ('Norfolk'),
 ('Wolfden'),
 ('Beckinsdale')]

[('Peterbrugh'), ('Beckinsdale'), ('Wolfden'), ('Norfolk')]

[(4, 6)]

铸造

将数据从一种类型转换为另一种类型是一种常见的操作,通过sqlalchemy包中的cast()功能完成。

from sqlalchemy import cast, Date, distinct, union

session.query(
    cast(func.pi(), Integer),
    cast(func.pi(), Numeric(10,2)),
    cast("2010-12-01", DateTime),
    cast("2010-12-01", Date),
).all()

预期输出:

[(3,
  Decimal('3.14'),
  datetime.datetime(2010, 12, 1, 0, 0),
  datetime.date(2010, 12, 1))]

联盟

对于联合查询,我们使用Query对象的union()方法。它需要一个或多个查询。例如:

s1 = session.query(Item.id, Item.name).filter(Item.name.like("Wa%"))
s2 = session.query(Item.id, Item.name).filter(Item.name.like("%e%"))
s1.union(s2).all()

预期输出:

[(2, 'Pen'),
 (4, 'Travel Bag'),
 (3, 'Headphone'),
 (5, 'Keyboard'),
 (7, 'Watch'),
 (8, 'Water Bottle')]

默认情况下,union()从结果集中删除所有重复的行。如果您想保留副本,请使用union_all()

s1.union_all(s2).all()

预期输出:

[(7, 'Watch'),
 (8, 'Water Bottle'),
 (2, 'Pen'),
 (3, 'Headphone'),
 (4, 'Travel Bag'),
 (5, 'Keyboard'),
 (8, 'Water Bottle')]

更新数据

要更新对象,只需将其属性设置为新值,将对象添加到会话中并提交更改。

i = session.query(Item).get(8)
i.selling_price = 25.91
session.add(i)
session.commit()

这样我们一次只能更新一个对象。要同时更新多行,请使用Query对象的update()方法。它返回更新的记录总数。例如:

# update quantity of all quantity of items to 60 whose name starts with 'W'

session.query(Item).filter(
    Item.name.ilike("W%")
).update({"quantity": 60}, synchronize_session='fetch')
session.commit()

预期输出:

2

删除数据

要删除对象,请使用会话对象的delete()方法。它接受一个对象,并将其标记为在下一次提交时删除。

i = session.query(Item).filter(Item.name == 'Monitor').one()
i
session.delete(i)
session.commit()

预期输出:

<Item:6-Monitor>

该提交从items表中移除Monitor

要一次删除多条记录,请使用Query对象的delete()方法。

session.query(Item).filter(
    Item.name.ilike("W%")
).delete(synchronize_session='fetch')
session.commit()

预期输出:

2

该提交将删除所有名称以W开头的项目。

原始查询

ORM 也给了你直接使用的灵活性,直接使用 SQL 使用text()函数。例如:

from sqlalchemy import text

session.query(Customer).filter(text("first_name = 'John'")).all()

session.query(Customer).filter(text("town like 'Nor%'")).all()

session.query(Customer).filter(text("town like 'Nor%'")).order_by(text("first_name, id desc")).all()

预期输出:

[<Customer:1-johngreen>, <Customer:3-johnlara>]

[<Customer:1-johngreen>, <Customer:3-johnlara>, <Customer:4-sarahtomlin>]

[<Customer:3-johnlara>, <Customer:1-johngreen>, <Customer:4-sarahtomlin>]

处理

事务是一种执行一组 SQL 语句的方式,以便成功执行所有语句或根本不执行任何语句。如果事务中涉及的任何语句失败,那么数据库将返回到事务启动前的状态。

我们目前在数据库中有两个订单。发送订单的过程如下:

  1. orders表的date_shipped栏中设置发货日期
  2. items表中减去订购项目的数量

这两个操作必须作为一个单元来执行,以确保表中的数据是正确的。

在下面的列表中,我们定义了dispatch_order()方法,该方法接受order_id作为参数,并在事务中执行上述任务。

from sqlalchemy import update
from sqlalchemy.exc import IntegrityError
from datetime import datetime

def dispatch_order(order_id):

    # check whether order_id is valid or not
    order = session.query(Order).get(order_id)

    if not order:
        raise ValueError("Invalid order id: {}.".format(order_id))    

    if order.date_shipped:
        print("Order already shipped.")
        return

    try:
        for i in order.order_lines:
            i.item.quantity = i.item.quantity - i.quantity            

        order.date_shipped = datetime.now()                            
        session.commit()
        print("Transaction completed.")

    except IntegrityError as e:
        print(e)
        print("Rolling back ...")
        session.rollback()
        print("Transaction failed.")

我们的第一个订单是 3 把椅子和 2 支笔。用1的订单 id 调用dispatch_order()函数,将返回如下输出:

dispatch_order(1)

预期输出:

Transaction completed.

此时,itemsorder_lines表应该是这样的:

[]

[]

我们的下一个订单是 1 把椅子和 4 支笔,但是我们现在只有 3 支笔了!

让我们试着运行dispatch_order()二阶,看看会发生什么。

dispatch_order(2)

预期输出:

(psycopg2.IntegrityError) new row for relation "items" violates check constraint "quantity_check"
DETAIL:  Failing row contains (2, Pen, 3.45, 4.51, -3).
 [SQL: 'UPDATE items SET quantity=%(quantity)s WHERE items.id = %(items_id)s'] [parameters: ({'quantity': 1, 'items_id': 1}, {'quantity': -3, 'items_id': 2})] (Background on this error at: http://sqlalche.me/e/gkpj)
Rolling back ...
Transaction failed.

不出所料,我们的发货失败了,因为我们的库存中没有足够的笔,并且因为我们正在使用事务,所以我们的数据库恢复到了事务开始之前的状态。

我希望这个教程会对你有所帮助。如果你遇到任何错别字或错误。使用下面的联系页面或评论框联系。