摘要:可以看作是很多对象的集合,还有一些关于的信息。相关类定义在基础的模块中,比如最常用的三个它也支持同时定义多个形成联合主键。使用获取单行单列结果时需要注意,如果返回多于一行,它会抛出异常。比如违反唯一性约束等。
SQL Expression Language对原生SQL语言进行了简单的封装
两大模块SQLAlchemy Core and ORM:
Core:提供执行SQL Expression Language的接口
ORM
安装:SQLAlchemy及相关数据库驱动
pip install sqlalchemy pymysql
数据库连接字符串格式:请参考这里
mysql://username:password@hostname/database postgresql://username:password@hostname/database sqlite:////absolute/path/to/database oracle://scott:tiger@127.0.0.1:1521/orcl
比如SQLite如下:
from sqlalchemy import create_engine engine = create_engine("sqlite:///cookies.db") engine2 = create_engine("sqlite:///:memory:") engine3 = create_engine("sqlite:////home/cookiemonster/cookies.db") engine4 = create_engine("sqlite:///c:Userscookiemonstercookies.db")
注意:create_engine函数返回以一个engine实例,但是不会立即获取数据库连接,直到在engine上进行操作如查询时才会去获取connection
关于MySQL空闲连接8小时自动关闭的解决方案:传入 pool_recycle=3600参数
from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://cookiemonster:chocolatechip@mysql01.monster.internal/cookies", pool_recycle=3600)
create_engine其余的一些参数:
echo:是否log打印执行的sql语句及其参数。默认为False
encoding:默认utf-8
isolation_level:隔离级别
pool_recycle:指定连接回收间隔,这对于MySQL连接的8小时机制特别重要。默认-1
获取连接
from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://cookiemonster:chocolatechip" "@mysql01.monster.internal/cookies", pool_recycle=3600) connection = engine.connect()Schema and Types
四种类型集合:
• Generic
• SQL standard
• Vendor specific
• User defined
SQLAlchemy定义了很多generic types以兼容不同数据库。这些类型都定义在sqlalchemy.types模块中,为了方便也可以从sqlalchemy直接导入这些类型。
类型对应表如下:
SQLAlchemy | Python | SQL |
---|---|---|
BigInteger | int | BIGINT |
Boolean | bool | BOOLEAN or SMALLINT |
Date | datetime.date | DATE (SQLite: STRING) |
DateTime | datetime.datetime | DATETIME (SQLite: STRING) |
Enum | str | ENUM or VARCHAR |
Float | float or Decimal | FLOAT or REAL |
Integer | int | INTEGER |
Interval | datetime.timedelta | INTERVAL or DATE from epoch |
LargeBinary | byte | BLOB or BYTEA |
Numeric | decimal.Decimal | NUMERIC or DECIMAL |
Unicode | unicode | UNICODE or VARCHAR |
Text | str | CLOB or TEXT |
Time | datetime.time | DATETIME |
如果这些类型不能满足你,比如有些数据库支持json类型,那么你需要用到sqlalchemy.dialects模块中对应数据库的类型。比如from sqlalchemy.dialects.postgresql import JSON
Metadata & Table & ColumnMetadata为了快速访问数据库。可以看作是很多Table对象的集合,还有一些关于engin,connection的信息。可以通过MetaData.tables访问这些表对象字典
定义表对象之前需要先实例化Metadata:
from sqlalchemy import MetaData metadata = MetaData()
Table对象构建如下:第一个参数为名称,第二个参数为Metadata对象,后续参数为Column对象. Column对象参数为,名称,类型,及其余等
from sqlalchemy import Table, Column, Integer, Numeric, String, ForeignKey cookies = Table("cookies", metadata, Column("cookie_id", Integer(), primary_key=True), Column("cookie_name", String(50), index=True), Column("cookie_recipe_url", String(255)), Column("cookie_sku", String(55)), Column("quantity", Integer()), Column("unit_cost", Numeric(12, 2)) )
from datetime import datetime from sqlalchemy import DateTime users = Table("users", metadata, Column("user_id", Integer(), primary_key=True), Column("username", String(15), nullable=False, unique=True), Column("email_address", String(255), nullable=False), Column("phone", String(20), nullable=False), Column("password", String(25), nullable=False), Column("created_on", DateTime(), default=datetime.now), Column("updated_on", DateTime(), default=datetime.now, onupdate=datetime.now)
注意:这里default,onupdate属性是一个callable对象而不是直接值,比如datetime.now(),因为这样的话,就永远是这个值,而不是每个实例实例化、更新时的时间了。
比较有用的就是onupdate,每次更新时都会调用该方法或函数。
键和约束(Keys and Constraints)
键和约束既可以像上面那样通过kwargs定义在Column中,也可以在之后通过对象添加。相关类定义在基础的 sqlalchemy模块中,比如最常用的三个:
from sqlalchemy import PrimaryKeyConstraint, UniqueConstraint, CheckConstraint
PrimaryKeyConstraint("user_id", name="user_pk"),它也支持同时定义多个形成联合主键。 UniqueConstraint("username", name="uix_username") CheckConstraint("unit_cost >= 0.00", name="unit_cost_positive")
索引(Index)
from sqlalchemy import Index Index("ix_cookies_cookie_name", "cookie_name")
这个定义需要放置在Table构造器中。也可以在之后定义,比如
Index("ix_test", mytable.c.cookie_sku, mytable.c.cookie_name))
关联关系和外键约束(Relationships and ForeignKeyConstraints)
from sqlalchemy import ForeignKey orders = Table("orders", metadata, Column("order_id", Integer(), primary_key=True), Column("user_id", ForeignKey("users.user_id")), Column("shipped", Boolean(), default=False) ) line_items = Table("line_items", metadata, Column("line_items_id", Integer(), primary_key=True), Column("order_id", ForeignKey("orders.order_id")), Column("cookie_id", ForeignKey("cookies.cookie_id")), Column("quantity", Integer()), Column("extended_cost", Numeric(12, 2)) )
注意:这里ForeignKey用的是字符串参数(这些字符串对应的是数据库中的表名.列名),而非引用。这样隔离了模块间相互依赖
我们也可以使用:
ForeignKeyConstraint(["order_id"], ["orders.order_id"])
创建或持久化表模式(Persisting the Tables)
通过示例代码我们知道所有的Table定义,以及额外的模式定义都会与一个metadata对象关联。我们可以通过这个metadata对象来创建表:
metadata.create_all(engine)
注意:默认情况下create_all不会重新创建已有表,所以它可以安全地多次调用,而且也非常友好地与数据库迁移库如Ablembic集成而不需要你进行额外手动编码。
本节代码完整如下:
from datetime import datetime from sqlalchemy import (MetaData, Table, Column, Integer, Numeric, String, DateTime, ForeignKey, create_engine) metadata = MetaData() cookies = Table("cookies", metadata, Column("cookie_id", Integer(), primary_key=True), Column("cookie_name", String(50), index=True), Column("cookie_recipe_url", String(255)), Column("cookie_sku", String(55)), Column("quantity", Integer()), Column("unit_cost", Numeric(12, 2)) ) users = Table("users", metadata, Column("user_id", Integer(), primary_key=True), Column("customer_number", Integer(), autoincrement=True), Column("username", String(15), nullable=False, unique=True), Column("email_address", String(255), nullable=False), Column("phone", String(20), nullable=False), Column("password", String(25), nullable=False), Column("created_on", DateTime(), default=datetime.now), Column("updated_on", DateTime(), default=datetime.now, onupdate=datetime.now) ) orders = Table("orders", metadata, Column("order_id", Integer(), primary_key=True), Column("user_id", ForeignKey("users.user_id")) ) line_items = Table("line_items", metadata, Column("line_items_id", Integer(), primary_key=True), Column("order_id", ForeignKey("orders.order_id")), Column("cookie_id", ForeignKey("cookies.cookie_id")), Column("quantity", Integer()), Column("extended_cost", Numeric(12, 2)) ) engine = create_engine("sqlite:///:memory:") metadata.create_all(engine)SQLAlchemy-Core模块 插入数据:
ins = cookies.insert().values( cookie_name="chocolate chip", cookie_recipe_url="http://some.aweso.me/cookie/recipe.html", cookie_sku="CC01", quantity="12", unit_cost="0.50" ) print(str(ins))
当然你也可以这么做:
from sqlalchemy import insert ins = insert(cookies).values( cookie_name="chocolate chip", cookie_recipe_url="http://some.aweso.me/cookie/recipe.html", cookie_sku="CC01", quantity="12", unit_cost="0.50" )
上述编译成预编译语句如下:
INSERT INTO cookies (cookie_name, cookie_recipe_url, cookie_sku, quantity, unit_cost) VALUES (:cookie_name, :cookie_recipe_url, :cookie_sku, :quantity, :unit_cost)
实际过程会是如下ins对象内部会调用compile()方法编译成上述语句,然后将参数存储到ins.compile().params字典中。
接下来我们通过前面获取的connection对象执行statement:
result = connection.execute(ins)
当然你也可以这么查询:
ins = cookies.insert() result = connection.execute( ins, cookie_name="dark chocolate chip", cookie_recipe_url="http://some.aweso.me/cookie/recipe_dark.html", cookie_sku="CC02", quantity="1", unit_cost="0.75" ) result.inserted_primary_key批量插入:
inventory_list = [ { "cookie_name": "peanut butter", "cookie_recipe_url": "http://some.aweso.me/cookie/peanut.html", "cookie_sku": "PB01", "quantity": "24", "unit_cost": "0.25" }, { "cookie_name": "oatmeal raisin", "cookie_recipe_url": "http://some.okay.me/cookie/raisin.html", "cookie_sku": "EWW01", "quantity": "100", "unit_cost": "1.00" } ] result = connection.execute(ins, inventory_list)
注意:一定要确保所有字典参数拥有相同的keys
查询from sqlalchemy.sql import select s = select([cookies]) rp = connection.execute(s) results = rp.fetchall()
当然我们也可以使用字符串来代替:
s = select("""SELECT cookies.cookie_id, cookies.cookie_name, cookies.cookie_recipe_url, cookies.cookie_sku, cookies.quantity, cookies.unit_cost FROM cookies""")
connection.execute返回的rp变量是一个ResultProxy对象(它是DBAPI中cursor对象的封装)。
我们也可以这样写:
from sqlalchemy.sql import select s = cookies.select() rp = connection.execute(s) results = rp.fetchall()
ResultProxy使得查询结果可以通过index,name,or Column object访问列数据。例如:
first_row = results[0] first_row[1] #游标列索引从1开始,by index first_row.cookie_name # by name first_row[cookies.c.cookie_name] #by Column object.
你也可以迭代ResultProxy,如下:
rp = connection.execute(s) for record in rp: print(record.cookie_name)
ResultProxy其余可用来获取结果集的方法
first()
fetchone()
fetchall()
scalar():Returns a single value if a query results in a single record with one column.
keys() 获取列名
关于选择ResultProxy上述的方法的建议:
1、使用first()而不是fetchone()来获取单条记录,因为fetchone()调用之后仍然保留着打开的connections共后续使用,如果不小心的话很容易引起问题。
2、使用迭代方式获取所有结果,而不是fetchall(),更加省内存。
3、使用scalar()获取单行单列结果时需要注意,如果返回多于一行,它会抛出异常。
控制返回列的数目
s = select([cookies.c.cookie_name, cookies.c.quantity]) rp = connection.execute(s) print(rp.keys()) result = rp.first()
排序
s = select([cookies.c.cookie_name, cookies.c.quantity]) s = s.order_by(cookies.c.quantity) rp = connection.execute(s) for cookie in rp: print("{} - {}".format(cookie.quantity, cookie.cookie_name)) #倒序desc from sqlalchemy import desc s = select([cookies.c.cookie_name, cookies.c.quantity]) s = s.order_by(desc(cookies.c.quantity))
限制返回结果集的条数
s = select([cookies.c.cookie_name, cookies.c.quantity]) s = s.order_by(cookies.c.quantity) s = s.limit(2) rp = connection.execute(s) print([result.cookie_name for result in rp])内置SQL函数
在sqlalchemy.sql.func模块中
#sum from sqlalchemy.sql import func s = select([func.sum(cookies.c.quantity)]) rp = connection.execute(s) print(rp.scalar()) #count s = select([func.count(cookies.c.cookie_name)]) rp = connection.execute(s) record = rp.first() print(record.keys()) print(record.count_1) #字段名是自动生成的,_ ,可以设置别名的,看下面 #设置别名 s = select([func.count(cookies.c.cookie_name).label("inventory_count")]) rp = connection.execute(s) record = rp.first() print(record.keys()) print(record.inventory_count)
过滤
#where s = select([cookies]).where(cookies.c.cookie_name == "chocolate chip") rp = connection.execute(s) record = rp.first() print(record.items()) #调用row对象的items()方法。 #like s = select([cookies]).where(cookies.c.cookie_name.like("%chocolate%")) rp = connection.execute(s) for record in rp.fetchall(): print(record.cookie_name)
可以在where中使用的子句元素
between(cleft, cright)
concat(column_two) Concatenate column with column_two
distinct()
in_([list])
is_(None) Find where the column is None (commonly used for Null checks with None)
contains(string) Find where the column has string in it (case-sensitive)
endswith(string) Find where the column ends with string (case-sensitive)
like(string) Find where the column is like string (case-sensitive)
startswith(string) Find where the column begins with string (case-sensitive)
ilike(string) Find where the column is like string (this is not case-sensitive)
当然还包括一系列的notxxx方法,比如notin_(),唯一的例外是isnot()
操作符
+,-,*,/,%
==,!=,<,>,<=,>=
AND,OR,NOT,由于python关键字的原因,使用and_(),or_(),not_()来代替
+号还可以用于字符串拼接:
s = select([cookies.c.cookie_name, "SKU-" + cookies.c.cookie_sku]) for row in connection.execute(s): print(row)
from sqlalchemy import cast s = select([cookies.c.cookie_name, cast((cookies.c.quantity * cookies.c.unit_cost), Numeric(12,2)).label("inv_cost")]) for row in connection.execute(s): print("{} - {}".format(row.cookie_name, row.inv_cost))
注意:cast是另外一个函数,允许我们进行类型转换,上述转换是将数字转换为货币形式,和
print("{} - {:.2f}".format(row.cookie_name, row.inv_cost)).这个行为一致。
from sqlalchemy import and_, or_, not_ s = select([cookies]).where( and_( cookies.c.quantity > 23, cookies.c.unit_cost < 0.40 ) ) for row in connection.execute(s): print(row.cookie_name) from sqlalchemy import and_, or_, not_ s = select([cookies]).where( or_( cookies.c.quantity.between(10, 50), cookies.c.cookie_name.contains("chip") ) ) for row in connection.execute(s): print(row.cookie_name)update
from sqlalchemy import update u = update(cookies).where(cookies.c.cookie_name == "chocolate chip") u = u.values(quantity=(cookies.c.quantity + 120)) result = connection.execute(u) print(result.rowcount) s = select([cookies]).where(cookies.c.cookie_name == "chocolate chip") result = connection.execute(s).first() for key in result.keys(): print("{:>20}: {}".format(key, result[key]))delete
from sqlalchemy import delete u = delete(cookies).where(cookies.c.cookie_name == "dark chocolate chip") result = connection.execute(u) print(result.rowcount) s = select([cookies]).where(cookies.c.cookie_name == "dark chocolate chip") result = connection.execute(s).fetchall() print(len(result))joins
join(),outerjoin()函数,select_from()函数
columns = [orders.c.order_id, users.c.username, users.c.phone, cookies.c.cookie_name, line_items.c.quantity, line_items.c.extended_cost] cookiemon_orders = select(columns) cookiemon_orders = cookiemon_orders.select_from(orders.join(users).join( line_items).join(cookies)).where(users.c.username == "cookiemon") result = connection.execute(cookiemon_orders).fetchall() for row in result: print(row)
最终产生的SQL语句如下:
SELECT orders.order_id, users.username, users.phone, cookies.cookie_name, line_items.quantity, line_items.extended_cost FROM users JOIN orders ON users.user_id = orders.user_id JOIN line_items ON orders.order_id = line_items.order_id JOIN cookies ON cookies.cookie_id = line_items.cookie_id WHERE users.username = :username_1
outerjoin
columns = [users.c.username, func.count(orders.c.order_id)] all_orders = select(columns) all_orders = all_orders.select_from(users.outerjoin(orders)) all_orders = all_orders.group_by(users.c.username) result = connection.execute(all_orders).fetchall() for row in result: print(row)
表别名函数alias()
>>> manager = employee_table.alias("mgr") >>> stmt = select([employee_table.c.name], ... and_(employee_table.c.manager_id==manager.c.id, ... manager.c.name=="Fred")) >>> print(stmt) SELECT employee.name FROM employee, employee AS mgr WHERE employee.manager_id = mgr.id AND mgr.name = ?分组
columns = [users.c.username, func.count(orders.c.order_id)] all_orders = select(columns) all_orders = all_orders.select_from(users.outerjoin(orders)) all_orders = all_orders.group_by(users.c.username) result = connection.execute(all_orders).fetchall() for row in result: print(row)chaining
def get_orders_by_customer(cust_name, shipped=None, details=False): columns = [orders.c.order_id, users.c.username, users.c.phone] joins = users.join(orders) if details: columns.extend([cookies.c.cookie_name, line_items.c.quantity, line_items.c.extended_cost]) joins = joins.join(line_items).join(cookies) cust_orders = select(columns) cust_orders = cust_orders.select_from(joins) cust_orders = cust_orders.where(users.c.username == cust_name) if shipped is not None: cust_orders = cust_orders.where(orders.c.shipped == shipped) result = connection.execute(cust_orders).fetchall() return result执行原生SQL
返回的还是ResultProxy对象
1、完全采用原始SQL
result = connection.execute("select * from orders").fetchall() print(result)
2、部分采用原始SQL,text()函数
from sqlalchemy import text stmt = select([users]).where(text("username="cookiemon"")) print(connection.execute(stmt).fetchall())异常
SQLALchemy定义了很多异常。我们通过关心:AttributeErrors,IntegrityErrors.等
为了进行相关试验与说明,请先执行下面这些语句
from datetime import datetime from sqlalchemy import (MetaData, Table, Column, Integer, Numeric, String, DateTime, ForeignKey, Boolean, create_engine, CheckConstraint) metadata = MetaData() cookies = Table("cookies", metadata, Column("cookie_id", Integer(), primary_key=True), 37 Column("cookie_name", String(50), index=True), Column("cookie_recipe_url", String(255)), Column("cookie_sku", String(55)), Column("quantity", Integer()), Column("unit_cost", Numeric(12, 2)), CheckConstraint("quantity > 0", name="quantity_positive") ) users = Table("users", metadata, Column("user_id", Integer(), primary_key=True), Column("username", String(15), nullable=False, unique=True), Column("email_address", String(255), nullable=False), Column("phone", String(20), nullable=False), Column("password", String(25), nullable=False), Column("created_on", DateTime(), default=datetime.now), Column("updated_on", DateTime(), default=datetime.now, onupdate=datetime.now) ) orders = Table("orders", metadata, Column("order_id", Integer()), Column("user_id", ForeignKey("users.user_id")), Column("shipped", Boolean(), default=False) ) line_items = Table("line_items", metadata, Column("line_items_id", Integer(), primary_key=True), Column("order_id", ForeignKey("orders.order_id")), Column("cookie_id", ForeignKey("cookies.cookie_id")), Column("quantity", Integer()), Column("extended_cost", Numeric(12, 2)) ) engine = create_engine("sqlite:///:memory:") metadata.create_all(engine) connection = engine.connect()
from sqlalchemy import select, insert ins = insert(users).values( username="cookiemon", email_address="mon@cookie.com", phone="111-111-1111", password="password" ) result = connection.execute(ins) s = select([users.c.username]) results = connection.execute(s) for result in results: print(result.username) print(result.password) #此处包AttributeError异常
在违反约束的情况下会出现IntegrityError异常。比如违反唯一性约束等。
s = select([users.c.username]) connection.execute(s).fetchall() [(u"cookiemon",)] ins = insert(users).values( username="cookiemon", email_address="damon@cookie.com", phone="111-111-1111", password="password" ) result = connection.execute(ins) #此处报IntegrityError, UNIQUE constraint failed: users.username #异常处理 try: result = connection.execute(ins) except IntegrityError as error: print(error.orig.message, error.params)
所有的SQLAlchemy异常处理方式都是上面那种思路,通过[SQLAlchemyError](http://docs.sqlal
chemy.org/en/latest/core/exceptions.html)可以获取到的信息由如下:
orig :The DBAPI exception object.
params:The parameter list being used when this exception occurred.
statement :The string SQL statement being invoked when this exception occurred.
事务Transactionsfrom sqlalchemy.exc import IntegrityError def ship_it(order_id): s = select([line_items.c.cookie_id, line_items.c.quantity]) s = s.where(line_items.c.order_id == order_id) transaction = connection.begin() #开启事务 cookies_to_ship = connection.execute(s).fetchall() try: for cookie in cookies_to_ship: u = update(cookies).where(cookies.c.cookie_id == cookie.cookie_id) u = u.values(quantity=cookies.c.quantity - cookie.quantity) result = connection.execute(u) u = update(orders).where(orders.c.order_id == order_id) u = u.values(shipped=True) result = connection.execute(u) print("Shipped order ID: {}".format(order_id)) transaction.commit() #提交事务 except IntegrityError as error: transaction.rollback() #事务回滚 print(error)
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/38265.html
摘要:基于反射对象进行查询模块反射这里我们不再使用而是使用扩展模块的获取所有的对象名获取表对象进行操作反射关联关系可以反射并建立表之间的但是建立关联列的命名为例如关于更多信息请详细参看官方文档 示例数据库下载:http://chinookdatabase.codepl...在SQLALchemy中,我们使用反射技术来获取相关database schema信息,如tables,views,in...
摘要:支持从现有数据库自动生成代码,并支持一对多,一对一,多对多的关联关系。生成整个库的代码指定表保存到指定文件 pip install sqlacodegen sqlacodegen支持从现有数据库自动生成ORM代码,并支持一对多,一对一,多对多的关联关系。 #生成整个库的代码 sqlacodegen sqlite:///Chinook_Sqlite.sqlite #指定表 sqlacod...
摘要:默认的可以增量式创建数据库缺失的表,但是无法做到修改已有的表结构,或删除代码中已经移除的表。这个时候我们就需要用到这个库。 SQLAlchemy默认的create_all()可以增量式创建数据库缺失的表,但是无法做到修改已有的表结构,或删除代码中已经移除的表。这个时候我们就需要用到Alembic这个SQLAlchemy migrations库。安装:pip install alembi...
摘要:你应该使用工厂类来创建类,因为这确保了配置参数的正确性。对象包含创建数据库连接所需的一切信息,它不会立即创建连接对象,而是会在我们进行具体操作时创建。注意生产环境不要使用这个选项。关于选择的最佳实践使用迭代方式获取所有值,而不是。 定义模式Defining Schema 定义ORM类的4个步骤: 继承declarative_base()函数返回的类 定义__tablename__属性...
摘要:命令行参数文件鉴于迁移到后可能需要很长的命令行参数,有些会限制命令行长度,支持定义一个命令行参数文件。已有三分库可以自动转成模块,只要在启动时将放在指定路径中,便会自动变成。 java[c]命令行参数文件 鉴于迁移到java9后可能需要很长的命令行参数,有些os会限制命令行长度,java9支持定义一个命令行参数文件。使用方式: java @arguments.txt arguments...
阅读 3433·2023-04-25 18:14
阅读 1526·2021-11-24 09:38
阅读 3244·2021-09-22 14:59
阅读 3060·2021-08-09 13:43
阅读 2562·2019-08-30 15:54
阅读 563·2019-08-30 13:06
阅读 1540·2019-08-30 12:52
阅读 2719·2019-08-30 11:13