700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > Python ORM之SQLAlchemy 数据库连接引擎实现Mysql PostgreSQL Oracle连接以及高级查询的相关实例

Python ORM之SQLAlchemy 数据库连接引擎实现Mysql PostgreSQL Oracle连接以及高级查询的相关实例

时间:2020-04-17 10:26:25

相关推荐

Python ORM之SQLAlchemy 数据库连接引擎实现Mysql PostgreSQL Oracle连接以及高级查询的相关实例

1 环境

SQLAlchemy 2.0.7PyMySQL 1.0.2Python 3.8.16

2 背景

SQLAlchemy 工具

实现多种数据库连接支持

MetaData、automap_base 实现已有数据表反射

对原有表的数据inster update delete等高级操作

3 数据库连接引擎即高级查询的相关实例

# coding: utf-8"""数据库链接引擎实现:1、数据库链接 支持Mysql、PostgreSQL、Oracle2、sqlalchemy 数据 预览 插入 获取表信息3、sqlalchemy 相关高级查询"""# 连接引擎 MetaData类from sqlalchemy import create_engine,MetaData,Table# 创建会话from sqlalchemy.orm import sessionmaker# 使用df 插入数据库import pandas as pd# automap_base 反射表from sqlalchemy.ext.automap import automap_base# 记录报错日志import logginglogging.basicConfig(filename="test.log", filemode="w", format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%d-%M-%Y %H:%M:%S", level=logging.DEBUG)class Dbbase(object):"""db_con : 数据库连接get_table_dicts : 获取数据库表映射字典db_show_tables : 获取数据库所有表db_select_table_columns : 获取数据表的列信息db_select_table : 查看数据表数据table_insert_row : 数据表插入单条数据table_insert_df : 通过df实现批量插入数据table_insert_df_old : 通过df实现批量插入数据方法2 ,和前面方法调用方式略微不同get_table_instantiation : 该方法可用于Table创建表对象db_close :关闭数据库引擎"""def __init__(self,db_dict):self.db_type = db_dict['db_type']self.db_user = db_dict['db_user']self.db_passwd = db_dict['db_passwd']self.db_ip = db_dict['db_ip']self.db_port = db_dict['db_port']self.db_name = db_dict['db_name']if db_dict['db_charset'] :self.db_charset = db_dict['db_charset']else:self.db_charset = 'utf8'def db_con(self):if self.db_type == 'MySQL' :self.engine = create_engine(f'mysql+pymysql://{self.db_user}:{self.db_passwd}@{self.db_ip}:{self.db_port}/{self.db_name}?charset={self.db_charset}')elif self.db_type == 'PostgreSQL' :self.engine = create_engine(f'postgresql://{self.db_user}:{self.db_passwd}@{self.db_ip}:{self.db_port}/{self.db_name}')elif self.db_type == 'Oracle' :self.engine = create_engine(f'cx_Oracle://{self.db_user}:{self.db_passwd}@{self.db_ip}:{self.db_port}/{self.db_name}')else:print('数据库类型暂不支持!!!')self.session = sessionmaker(bind=self.engine)()self.conn = self.engine.connect()# base实列self.Base = automap_base()# reflect the tablesself.Base.prepare(self.engine, reflect=True)# MetaData 实列self.metadata = MetaData()try:# reflect 映射dash_base库下的表结构self.metadata.reflect(schema=self.db_name, bind=self.engine)print("数据库连接成功!!!")except Exception as e:# 主要错误日志# logging.error(e)# 详细日志信息logging.exception(e)self.session = ''print("数据库连接失败!!!")return self.session,self.conndef get_table_dicts(self):# 字典的形式将metadata.tables解析出来,方便检索,也不用再使用Table()类self.table_dicts = {i.name: i for i in self.metadata.tables.values()}return self.table_dictsdef db_show_tables(self):table_name_list = self.table_dicts.keys()print('table_name_list :\n', table_name_list,'\n')return table_name_listdef db_select_table_columns(self,table_name):table_columns_list = [ i.name for i in self.table_dicts[table_name].columns]print('table_columns_list :\n', table_columns_list,'\n')return table_columns_listdef get_table_class(self,table_name):table_class = eval(f'self.Base.classes.{table_name}')return table_classdef db_select_table(self,table_name,limit_size = 2 ):if limit_size == -1:rows = [instance for instance in self.session.query(self.table_dicts[table_name]).all()]else:rows = [instance for instance in self.session.query(self.table_dicts[table_name]).limit(limit_size)]print('rows_list :\n', rows,'\n')return rowsdef table_insert_row(self,table_name,row_list):row_data = dict(zip(self.db_select_table_columns(table_name),row_list))self.conn.execute(self.get_table_dicts()[table_name].insert().values(row_data))mit()def table_insert_df(self,table_name,data,chunksize = 10000,if_exists = 'append',index = False):""":param table_name: 数据表:param data: 列表数据:param chunksize: 缓存值如果data数据量大,需要设置合理的chunksize值,这和数据库缓存大小有关,可以设置在50000-10000,如果提示数据库连接超时错误,就将size值调小。:param if_exists: append 追加 replace 替换覆盖:param index: True 插入index字段 False 不插入index字段:return:""""""示例:from sqlalchemy.types import DATE,CHAR,VARCHAR DTYPES = {'col_1字段名称' : DATE, 'col_2':CHAR(4),'col_3':VARCHAR(10)}df.to_sql(....,dtype = DTYPES)将写入数据表的df中,dtype 指定 根据列名对应的数据类型字段即可如果使用.to_sql()需要指定dtype类型时,如果数据库中不存在目标表,则相应创建;如果数据库中已经存在目标表,则设置append追加模式写入数据库时,可能会引起字段类型冲突。"""df = pd.DataFrame(data, columns=db_base.db_select_table_columns(table_name))df.to_sql(table_name,con=self.engine,chunksize=chunksize,if_exists=if_exists,index=index) # 暂时不用replace会卡死def table_insert_df_old(self,table_name,data,chunksize = 10000,if_exists = 'append',index = False):""":param table_name: 数据表:param data: 列表数据:param chunksize: 缓存值如果data数据量大,需要设置合理的chunksize值,这和数据库缓存大小有关,可以设置在50000-10000,如果提示数据库连接超时错误,就将size值调小。:param if_exists: append 追加 replace 替换覆盖:param index: True 插入index字段 False 不插入index字段:return:"""df = pd.DataFrame(data, columns=db_base.db_select_table_columns(table_name))pd.io.sql.to_sql(df,table_name,con = self.engine,schema = self.db_name, chunksize=chunksize,if_exists = if_exists,index=index)def get_table_instantiation(self,table_name):table_instantiation = Table(table_name,self.metadata,schema=self.db_name)return table_instantiationdef table_truncate(self,table_name):self.conn.execute(text(f'TRUNCATE TABLE {table_name}'))self.conn.close()def db_close(self):self.conn.close()self.session.close()self.engine.dispose()if __name__ == '__main__':db_dict = {}db_dict['db_type'] = 'MySQL'db_dict['db_user'] = 'root'db_dict['db_passwd'] = 'root'db_dict['db_ip'] = '192.168.10.1'db_dict['db_port'] = 3306db_dict['db_name'] = 'test'db_dict['db_charset'] = 'utf8'# 实例化类db_base = Dbbase(db_dict)# 数据库连接db_session,db_conn = db_base.db_con()if db_session:# 获取数据库表映射db_table_dicts = db_base.get_table_dicts()# 查看数据库的数据表db_base.db_show_tables()# 查看数据表列名db_base.db_select_table_columns('users')# 数据预览db_base.db_select_table('users')# # 设置查询前几条数据# dbb.db_select_table('users',4)# # 查询数据表所有数据# dbb.db_select_table('users',-1)# 数据插入# 单条数据插入# 创建列表数据对象row_list = ['11','ed099099','fred',9]db_base.table_insert_row('users',row_list)# df 方式插入数据# 创建列表数据对象data = [[13, 'ed099099', 'fred', '13'], [14, 'ed099099', 'fred', '14'], [15, 'ed099099', 'fred', '15'],[16, 'ed099099', 'fred', '16']]# 方式一db_base.table_insert_df('users',data)# 方式二db_base.table_insert_df_old('users',data)################################################################ automap_base 反射表 的使用################################################################ 多条数据插入# db_session 批量插入user_data = [{'id': i,'name':'name%s' %i,'fullname':'fullname%s' %i,'nickname':'nickname%s' %i} for i in range(18,20)]User = db_base.get_table_class('users')db_session.bulk_insert_mappings(User, user_data)# 多条数据更新# db_session 批量更新user_data = [{'id': i,'name':'name%s' %i,'fullname':'test%s' %i,'nickname':'test%s' %i} for i in range(18,20)]User = db_base.get_table_class('users')db_session.bulk_update_mappings(User, user_data)print('全表查询 sql\n:',db_session.query(User))print('全表查询\n:',db_session.query(User).all(),'\n')################################################################ MetaData 表字典的使用高级查询#automap_base 反射表查询使用时 User对象替换db_table_dicts['users'],User.id,User.name 替换 db_table_dicts['users'].columns['id','name']################################################################ 查询 可事先定义db_query 即全表查询# # 自定义数据表查询db_query_customer = db_session.query(db_table_dicts['users'].columns['id','name'])# 全表查询db_query = db_session.query(db_table_dicts['users'])# 查询 all listprint("查询 all list sql :\n", db_query)print("查询 all list :\n" ,db_query.all(),'\n')# 查询 first() 第一个print("查询 first() 第一个 :\n" ,db_query.first(),'\n')# 筛选 查询 filter()print("筛选 查询 filter() sql :\n", db_query.filter(db_table_dicts['users'].columns['id'] == 1))print("筛选 查询 filter() :\n", db_query.filter(db_table_dicts['users'].columns['id'] == 1).all(),'\n')# 升\降序 查询 order_by()# 升序print("升序 查询 order_by() sql :\n", db_query.order_by(db_table_dicts['users'].columns['id']))print("升序 查询 order_by() :\n", db_query.order_by(db_table_dicts['users'].columns['id']).all(),'\n')# 降序print("降序 查询 order_by() sql :\n", db_query.order_by(db_table_dicts['users'].columns['id'].desc()))print("降序 查询 order_by() :\n", db_query.order_by(db_table_dicts['users'].columns['id']).all(),'\n')# 查询 one() 如果这个结果集少于或者多于一条数据, 结论有且只有一条数据的时候才会正常的返回,否则抛出异常print("查询 one() :\n",db_query.filter(db_table_dicts['users'].columns['id'] == 1).one(),'\n')# print("查询 one() :\n", db_query.filter(db_table_dicts['users'].columns['id'] == 7).one(), '\n')# 查询 one_or_none() 在结果集中没有数据的时候也不会抛出异常print("查询 one_or_none() :\n",db_query.filter(db_table_dicts['users'].columns['id'] == 6).one_or_none())print("查询 one_or_none() :\n", db_query.filter(db_table_dicts['users'].columns['id'] == 7).one_or_none(), '\n')# 查询 scalar() 底层调用one()方法,并且如果one()方法没有抛出异常,会返回查询到的第一列的数据print("查询 scalar() :\n",db_query.filter(db_table_dicts['users'].columns['name'] == "fred").scalar(), '\n')# 查询 text类型from sqlalchemy import text# 升序print("升序 查询 text类型 sql:\n",db_query.filter(text("id< 3")).order_by(text('id')))print("升序 查询 text类型:\n",db_query.filter(text("id< 3")).order_by(text('id')).all(), '\n')# 降序print("降序 查询 text类型 sql:\n",db_query.filter(text("id< 3")).order_by(text('id desc')))print("降序 查询 text类型:\n",db_query.filter(text("id< 3")).order_by(text('id desc')).all(), '\n')# text 带变量方式 用 :传入变量,用params接收print("查询 text 带变量方式 sql:\n", db_query.filter(text("id< :value")).params(value=4))print("查询 text 带变量方式:\n", db_query.filter(text("id< :value")).params(value=4).all(), '\n')# from_statement 原生sql语句print("查询 from_statement 原生sql语句 sql:\n",db_query.from_statement(text("select * from users where id>:value")).params(value=2))print("查询 from_statement 原生sql语句:\n", db_query.from_statement(text("select * from users where id>:value")).params(value=2).all(),'\n')# and_ or_ 普通的表达式在这里面不好使from sqlalchemy.sql import and_, asc, desc, or_print("查询 and_ or_ 普通的表达式 sql:\n",db_query.filter(and_(db_table_dicts['users'].columns['id'] == 4, db_table_dicts['users'].columns['name'] == "ed")))print("查询 and_ or_ 普通的表达式:\n",db_query.filter(and_(db_table_dicts['users'].columns['id'] == 4, db_table_dicts['users'].columns['name'] == "ed")).first())print("查询 and_ or_ 普通的表达式 sql:\n", db_query.filter(or_(db_table_dicts['users'].columns['id'] == 4, db_table_dicts['users'].columns['name'] == "ed")))print("查询 and_ or_ 普通的表达式:\n", db_query.filter(or_(db_table_dicts['users'].columns['id'] == 4, db_table_dicts['users'].columns['name'] == "ed")).first(),'\n')# between 大于多少小于多少print("查询 between 大于多少小于多少 sql:\n",db_query.filter(db_table_dicts['users'].columns['id'].between(1, 3)))print("查询 between 大于多少小于多少:\n",db_query.filter(db_table_dicts['users'].columns['id'].between(1, 3)).all())# in_ 在里面对应还有not in等print("查询 in_ 在里面对应还有not in等 sql:\n",db_query.filter(db_table_dicts['users'].columns['id'].in_([1])))print("查询 in_ 在里面对应还有not in等:\n",db_query.filter(db_table_dicts['users'].columns['id'].in_([1])).all())print("查询 notin_ 在里面对应还有not in等 sql:\n", db_query.filter(db_table_dicts['users'].columns['id'].notin_([1])))print("查询 notin_ 在里面对应还有not in等:\n",db_query.filter(db_table_dicts['users'].columns['id'].notin_([1])).all(),'\n')# synchronize_session 可以理解为 引用更新 注意:要使用全部查询,使用筛选字段时,更新操作会报错失败# synchronize_session=False 在原有值的基础上增加和删除 099 strdb_query.filter(db_table_dicts['users'].columns['id'] > 0).update({db_table_dicts['users'].columns['name']: db_table_dicts['users'].columns['name'] + '099'}, synchronize_session=False)mit()# synchronize_session=evaluate 在原有值的基础上增加和减少 11, 必须是整数类型db_query.filter(db_table_dicts['users'].columns['id'] > 0).update({db_table_dicts['users'].columns['nickname']: db_table_dicts['users'].columns['nickname'] + 1}, synchronize_session="evaluate")mit()# 如果查询条件里有in_,需要在delete()中加如下参数: fetch 删除的更快db_query.filter(db_table_dicts['users'].columns['id'].in_([1, 2])).delete(synchronize_session='fetch')mit()# 计数 注意不能延用前面的db_query,使用db_session.query 查询from sqlalchemy import func# 简单func.count()计数查询print('简单func.count()计数查询 sql:\n',db_session.query(func.count(db_table_dicts['users'].columns['id'])))print('简单func.count()计数查询:\n', db_session.query(func.count(db_table_dicts['users'].columns['id'])).first(),'\n')# 如果想实现select count(*) from users,可以通过以下方式来实现:print('实现select count(*) sql:\n',db_session.query(func.count("*")).select_from(db_table_dicts['users']))print('实现select count(*):\n',db_session.query(func.count("*")).select_from(db_table_dicts['users']).scalar(),'\n')# 如果指定了要查找的表的字段,可以省略select_from()方法:print('实现select count(*) sql:\n', db_session.query(func.count(db_table_dicts['users'].columns['id'])))print('实现select count(*):\n', db_session.query(func.count(db_table_dicts['users'].columns['id'])).scalar(),'\n')# limit、offset和切片"""limit:可以限制每次查询的时候只查询几条数据。 offset:可以限制查找数据的时候过滤掉前面多少条。 切片 slice :可以对Query对象使用切片操作,来获取想要的数据。 """print('实现 limit 查询 sql:\n',db_query.limit(2))print('实现 limit 查询:\n', db_query.limit(2).all(),'\n')print('实现 limit offset 查询 sql:\n', db_query.limit(2).offset(1))print('实现 limit offset 查询:\n', db_query.limit(2).offset(1).all(), '\n')print('实现 切片 查询 sql:\n', db_query.slice(2,5))print('实现 切片 查询:\n', db_query.slice(2,5).all(), '\n')# group_by# 比如我想根据名字来分组, 统计每个名字分组里面有多少人from sqlalchemy import func# 我想根据名字来分组, 统计每个名字分组里面有多少人print('实现 group_by 查询 sql:\n', db_session.query(db_table_dicts['users'].columns['id'], func.count(db_table_dicts['users'].columns['id'])).group_by(db_table_dicts['users'].columns['id']))print('实现 group_by 查询:\n', db_session.query(db_table_dicts['users'].columns['name'], func.count(db_table_dicts['users'].columns['name'])).group_by(db_table_dicts['users'].columns['name']).all(),'\n')# having# having是对查找结果进一步过滤。比如只想要看未成年人的数量,那么可以首先对年龄进行分组统计人数,然后再对分组进行having过滤print('实现 having 查询 sql:\n', db_session.query(db_table_dicts['users'].columns['id'],func.count(db_table_dicts['users'].columns['name'])).group_by(db_table_dicts['users'].columns['id']).having(db_table_dicts['users'].columns['id'] >= 2))print('实现 having 查询:\n', db_session.query(db_table_dicts['users'].columns['id'],func.count(db_table_dicts['users'].columns['name'])).group_by(db_table_dicts['users'].columns['id']).having(db_table_dicts['users'].columns['id'] >= 2).all(),'\n')# joinprint('实现 join 查询 sql:\n',db_session.query(db_table_dicts['users'], db_table_dicts['addresses']).join(db_table_dicts['addresses']))print('实现 join 查询:\n',db_session.query(db_table_dicts['users'], db_table_dicts['addresses']).join(db_table_dicts['addresses']).all(),'\n')print('实现 outerjoin 查询 sql:\n',db_session.query(db_table_dicts['users'], db_table_dicts['addresses']).outerjoin(db_table_dicts['addresses']))print('实现 outerjoin 查询:\n',db_session.query(db_table_dicts['users'], db_table_dicts['addresses']).outerjoin(db_table_dicts['addresses']).all(),'\n')# 别名# 当多表查询的时候,有时候同一个表要用到多次,这时候用别名就可以方便的解决命名冲突的问题了from sqlalchemy.orm import aliasedadalias1 = aliased(db_table_dicts['users'])adalias2 = aliased(db_table_dicts['addresses'])print('实现 aliased 查询 sql:\n',db_session.query(adalias1.columns['id'], adalias1.columns['id'], adalias2.columns['id']).join(adalias1))print('实现 aliased 查询:\n',db_session.query(adalias1.columns['id'], adalias1.columns['id'], adalias2.columns['id']).join(adalias1).all(),'\n')# 子查询from sqlalchemy.sql import funcprint('创建查询 sql:\n',db_session.query( db_table_dicts['addresses'].columns['id'].label('user_id'), func.count("*").label('address_count')).group_by(db_table_dicts['addresses'].columns['id']))print('创建查询:\n',db_session.query( db_table_dicts['addresses'].columns['id'].label('user_id'), func.count("*").label('address_count')).group_by(db_table_dicts['addresses'].columns['id']).all(),'\n')print('创建子查询 sql:\n', db_session.query(db_table_dicts['addresses'].columns['id'].label('user_id'),func.count("*").label('address_count')).group_by(db_table_dicts['addresses'].columns['id']).subquery())addr = db_session.query(db_table_dicts['addresses'].columns['id'].label('user_id'),func.count("*").label('address_count')).group_by(db_table_dicts['addresses'].columns['id']).subquery()print('子查询带入父查询 sql:\n', db_session.query(db_table_dicts['users'],addr.c.address_count).outerjoin(addr,db_table_dicts['users'].columns['id'] == addr.c.user_id ).order_by(db_table_dicts['users'].columns['id'] ))print('子查询带入父查询:\n', db_session.query(db_table_dicts['users'],addr.c.address_count).outerjoin(addr,db_table_dicts['users'].columns['id'] == addr.c.user_id ).order_by(db_table_dicts['users'].columns['id'] ).all(),'\n')# updateprint('update sql:\n',db_query.filter(db_table_dicts['users'].columns['id'] == '1').update({'name':'test'}))mit()print('update 结果:\n',db_query.filter(db_table_dicts['users'].columns['id'] == '1').all(),'\n')# 条件删除 deleteprint('delete sql:\n', db_query.filter(db_table_dicts['users'].columns['id'] == '2').delete())mit()print('delete 结果:\n', db_query.filter(db_table_dicts['users'].columns['id'] == '2').all(), '\n')表数据清空print('update sql:\n', db_query.delete())mit()print('update 结果:\n', db_query.all(), '\n')TRUNCATE TABLE 实现db_base.table_truncate('users_copy1')# 关闭数据库db_base.db_close()

运行结果:

数据库连接成功!!!table_name_list :dict_keys(['addresses', 'users', 'keywords', 'post_keywords', 'posts', 'users_copy1']) table_columns_list :['id', 'name', 'fullname', 'nickname'] rows_list :[(1, 'test', 'fred', '6'), (3, 'ed099099', 'fred', '6')] 全表查询 sql: SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.nickname AS users_nickname FROM users全表查询: [<sqlalchemy.ext.automap.users object at 0x7fe069bee580>, <sqlalchemy.ext.automap.users object at 0x7fe069bee520>, <sqlalchemy.ext.automap.users object at 0x7fe069bee640>, <sqlalchemy.ext.automap.users object at 0x7fe069bee6a0>, <sqlalchemy.ext.automap.users object at 0x7fe069bee700>, <sqlalchemy.ext.automap.users object at 0x7fe069bee760>, <sqlalchemy.ext.automap.users object at 0x7fe069bee7c0>, <sqlalchemy.ext.automap.users object at 0x7fe069bee820>, <sqlalchemy.ext.automap.users object at 0x7fe069bee880>, <sqlalchemy.ext.automap.users object at 0x7fe069bee8e0>, <sqlalchemy.ext.automap.users object at 0x7fe069bee940>, <sqlalchemy.ext.automap.users object at 0x7fe069bee9a0>, <sqlalchemy.ext.automap.users object at 0x7fe069beea00>, <sqlalchemy.ext.automap.users object at 0x7fe069beea60>] 查询 all list sql :SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname FROM test.users查询 all list :[(1, 'test', 'fred', '6'), (3, 'ed099099', 'fred', '6'), (4, 'ed099099', 'fred', '6'), (5, 'fred099099', 'Fred Flintstone', '6'), (6, 'ed099099', 'fred', '6'), (9, 'ed099099', 'fred', '9'), (10, 'ed099099', 'fred', '9'), (11, 'ed099099', 'fred', '9'), (13, 'ed099099', 'fred', '13'), (14, 'ed099099', 'fred', '14'), (15, 'ed099099', 'fred', '15'), (16, 'ed099099', 'fred', '16'), (18, 'name18', 'test18', 'test18'), (19, 'name19', 'test19', 'test19')] 查询 first() 第一个 :(1, 'test', 'fred', '6') 筛选 查询 filter() sql :SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname FROM test.users WHERE test.users.id = %(id_1)s筛选 查询 filter() :[(1, 'test', 'fred', '6')] 升序 查询 order_by() sql :SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname FROM test.users ORDER BY test.users.id升序 查询 order_by() :[(1, 'test', 'fred', '6'), (3, 'ed099099', 'fred', '6'), (4, 'ed099099', 'fred', '6'), (5, 'fred099099', 'Fred Flintstone', '6'), (6, 'ed099099', 'fred', '6'), (9, 'ed099099', 'fred', '9'), (10, 'ed099099', 'fred', '9'), (11, 'ed099099', 'fred', '9'), (13, 'ed099099', 'fred', '13'), (14, 'ed099099', 'fred', '14'), (15, 'ed099099', 'fred', '15'), (16, 'ed099099', 'fred', '16'), (18, 'name18', 'test18', 'test18'), (19, 'name19', 'test19', 'test19')] 降序 查询 order_by() sql :SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname FROM test.users ORDER BY test.users.id DESC降序 查询 order_by() :[(1, 'test', 'fred', '6'), (3, 'ed099099', 'fred', '6'), (4, 'ed099099', 'fred', '6'), (5, 'fred099099', 'Fred Flintstone', '6'), (6, 'ed099099', 'fred', '6'), (9, 'ed099099', 'fred', '9'), (10, 'ed099099', 'fred', '9'), (11, 'ed099099', 'fred', '9'), (13, 'ed099099', 'fred', '13'), (14, 'ed099099', 'fred', '14'), (15, 'ed099099', 'fred', '15'), (16, 'ed099099', 'fred', '16'), (18, 'name18', 'test18', 'test18'), (19, 'name19', 'test19', 'test19')] 查询 one() :(1, 'test', 'fred', '6') 查询 one_or_none() :(6, 'ed099099', 'fred', '6')查询 one_or_none() :None 查询 scalar() :None 升序 查询 text类型 sql:SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname FROM test.users WHERE id< 3 ORDER BY id升序 查询 text类型:[(1, 'test', 'fred', '6')] 降序 查询 text类型 sql:SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname FROM test.users WHERE id< 3 ORDER BY id desc降序 查询 text类型:[(1, 'test', 'fred', '6')] 查询 text 带变量方式 sql:SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname FROM test.users WHERE id< %(value)s查询 text 带变量方式:[(1, 'test', 'fred', '6'), (3, 'ed099099', 'fred', '6')] 查询 from_statement 原生sql语句 sql:select * from users where id>%(value)s查询 from_statement 原生sql语句:[(3, 'ed099099', 'fred', '6'), (4, 'ed099099', 'fred', '6'), (5, 'fred099099', 'Fred Flintstone', '6'), (6, 'ed099099', 'fred', '6'), (9, 'ed099099', 'fred', '9'), (10, 'ed099099', 'fred', '9'), (11, 'ed099099', 'fred', '9'), (13, 'ed099099', 'fred', '13'), (14, 'ed099099', 'fred', '14'), (15, 'ed099099', 'fred', '15'), (16, 'ed099099', 'fred', '16'), (18, 'name18', 'test18', 'test18'), (19, 'name19', 'test19', 'test19')] 查询 and_ or_ 普通的表达式 sql:SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname FROM test.users WHERE test.users.id = %(id_1)s AND test.users.name = %(name_1)s查询 and_ or_ 普通的表达式:None查询 and_ or_ 普通的表达式 sql:SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname FROM test.users WHERE test.users.id = %(id_1)s OR test.users.name = %(name_1)s查询 and_ or_ 普通的表达式:(4, 'ed099099', 'fred', '6') 查询 between 大于多少小于多少 sql:SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname FROM test.users WHERE test.users.id BETWEEN %(id_1)s AND %(id_2)s查询 between 大于多少小于多少:[(1, 'test', 'fred', '6'), (3, 'ed099099', 'fred', '6')]查询 in_ 在里面对应还有not in等 sql:SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname FROM test.users WHERE test.users.id IN (__[POSTCOMPILE_id_1])查询 in_ 在里面对应还有not in等:[(1, 'test', 'fred', '6')]查询 notin_ 在里面对应还有not in等 sql:SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname FROM test.users WHERE (test.users.id NOT IN (__[POSTCOMPILE_id_1]))查询 notin_ 在里面对应还有not in等:[(3, 'ed099099', 'fred', '6'), (4, 'ed099099', 'fred', '6'), (5, 'fred099099', 'Fred Flintstone', '6'), (6, 'ed099099', 'fred', '6'), (9, 'ed099099', 'fred', '9'), (10, 'ed099099', 'fred', '9'), (11, 'ed099099', 'fred', '9'), (13, 'ed099099', 'fred', '13'), (14, 'ed099099', 'fred', '14'), (15, 'ed099099', 'fred', '15'), (16, 'ed099099', 'fred', '16'), (18, 'name18', 'test18', 'test18'), (19, 'name19', 'test19', 'test19')] 简单func.count()计数查询 sql:SELECT count(test.users.id) AS count_1 FROM test.users简单func.count()计数查询:(14,) 实现select count(*) sql:SELECT count(%(count_2)s) AS count_1 FROM test.users实现select count(*):14 实现select count(*) sql:SELECT count(test.users.id) AS count_1 FROM test.users实现select count(*):14 实现 limit 查询 sql:SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname FROM test.users LIMIT %(param_1)s实现 limit 查询:[(1, 'test', 'fred', '6'), (3, 'ed099099', 'fred', '6')] 实现 limit offset 查询 sql:SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname FROM test.users LIMIT %(param_1)s, %(param_2)s实现 limit offset 查询:[(3, 'ed099099', 'fred', '6'), (4, 'ed099099', 'fred', '6')] 实现 切片 查询 sql:SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname FROM test.users LIMIT %(param_1)s, %(param_2)s实现 切片 查询:[(4, 'ed099099', 'fred', '6'), (5, 'fred099099', 'Fred Flintstone', '6'), (6, 'ed099099', 'fred', '6')] 实现 group_by 查询 sql:SELECT test.users.id AS test_users_id, count(test.users.id) AS count_1 FROM test.users GROUP BY test.users.id实现 group_by 查询:[('ed099099', 10), ('fred099099', 1), ('name18', 1), ('name19', 1), ('test', 1)] 实现 having 查询 sql:SELECT test.users.id AS test_users_id, count(test.users.name) AS count_1 FROM test.users GROUP BY test.users.id HAVING test.users.id >= %(id_1)s实现 having 查询:[(3, 1), (4, 1), (5, 1), (6, 1), (9, 1), (10, 1), (11, 1), (13, 1), (14, 1), (15, 1), (16, 1), (18, 1), (19, 1)] 实现 join 查询 sql:SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname, test.addresses.id AS test_addresses_id, test.addresses.email_address AS test_addresses_email_address, test.addresses.user_id AS test_addresses_user_id FROM test.users INNER JOIN test.addresses ON test.users.id = test.addresses.user_id实现 join 查询:[(1, 'test', 'fred', '6', 1, '11@', 1)] 实现 outerjoin 查询 sql:SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname, test.addresses.id AS test_addresses_id, test.addresses.email_address AS test_addresses_email_address, test.addresses.user_id AS test_addresses_user_id FROM test.users LEFT OUTER JOIN test.addresses ON test.users.id = test.addresses.user_id实现 outerjoin 查询:[(1, 'test', 'fred', '6', 1, '11@', 1), (3, 'ed099099', 'fred', '6', None, None, None), (4, 'ed099099', 'fred', '6', None, None, None), (5, 'fred099099', 'Fred Flintstone', '6', None, None, None), (6, 'ed099099', 'fred', '6', None, None, None), (9, 'ed099099', 'fred', '9', None, None, None), (10, 'ed099099', 'fred', '9', None, None, None), (11, 'ed099099', 'fred', '9', None, None, None), (13, 'ed099099', 'fred', '13', None, None, None), (14, 'ed099099', 'fred', '14', None, None, None), (15, 'ed099099', 'fred', '15', None, None, None), (16, 'ed099099', 'fred', '16', None, None, None), (18, 'name18', 'test18', 'test18', None, None, None), (19, 'name19', 'test19', 'test19', None, None, None)] 实现 aliased 查询 sql:SELECT users_1.id AS users_1_id, users_1.id AS users_1_id__1, addresses_1.id AS addresses_1_id FROM test.addresses AS addresses_1 INNER JOIN test.users AS users_1 ON users_1.id = addresses_1.user_id实现 aliased 查询:[(1, 1, 1)] 创建查询 sql:SELECT test.addresses.id AS user_id, count(%(count_1)s) AS address_count FROM test.addresses GROUP BY test.addresses.id创建查询:[(1, 1)] 创建子查询 sql:SELECT test.addresses.id AS user_id, count(:count_1) AS address_count FROM test.addresses GROUP BY test.addresses.id子查询带入父查询 sql:SELECT test.users.id AS test_users_id, test.users.name AS test_users_name, test.users.fullname AS test_users_fullname, test.users.nickname AS test_users_nickname, anon_1.address_count AS anon_1_address_count FROM test.users LEFT OUTER JOIN (SELECT test.addresses.id AS user_id, count(%(count_1)s) AS address_count FROM test.addresses GROUP BY test.addresses.id) AS anon_1 ON test.users.id = anon_1.user_id ORDER BY test.users.id子查询带入父查询:[(1, 'test', 'fred', '6', 1), (3, 'ed099099', 'fred', '6', None), (4, 'ed099099', 'fred', '6', None), (5, 'fred099099', 'Fred Flintstone', '6', None), (6, 'ed099099', 'fred', '6', None), (9, 'ed099099', 'fred', '9', None), (10, 'ed099099', 'fred', '9', None), (11, 'ed099099', 'fred', '9', None), (13, 'ed099099', 'fred', '13', None), (14, 'ed099099', 'fred', '14', None), (15, 'ed099099', 'fred', '15', None), (16, 'ed099099', 'fred', '16', None), (18, 'name18', 'test18', 'test18', None), (19, 'name19', 'test19', 'test19', None)] update sql:1update 结果:[(1, 'test', 'fred', '6')] delete sql:0delete 结果:[]

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。