Python接口自动化测试框架
在自动化的测试体系中,包含了UI自动化测试和接口自动化测试,UI自动化实现的前提是软件版本进入稳定期,UI界面稳定、变动少,相比较之下接口自动化,接口受外界因素的影响较少,维护成本低,能够在最短时间发现问题。
一、浅谈接口测试
1、什么是接口测试:
API测试又称为接口测试,主要用于检测外部系统与系统之间以及内部各个子系统之间的交互点,是对系统接口功能进行测试的一种手段,也是集成测试的一部分,通过直接控制被测应用的接口(API)来确定是否在功能、可靠性、性能和安全方面达到预期的软件测试活动。
2、如何测试接口:
检查接口返回的数据是否与预期结果一致。
检查接口的容错性,假如传递数据的类型错误时是否可以处理。
接口参数的边界值。例如,传递的参数足够大或为负数时,接口是否可以正常处理。
接口的性能,http请求接口大多与后端代码逻辑、执行的SQL语句性能、算法等相关。
接口的安全性,外部调用的接口尤为重要。
3、接口测试的意义、目的:
接口测试的核心意义、目的在于:以保证系统的正确和稳定为核心,以持续集成为手段,提高测试效率,提升用户体验,降低产品研发成本。
二、定义框架目录分层
接口自动化框架的没有统一标准,可以根据实际需要自定义,以满足功能测试目标要求为目的。一个基本的接口自动化测试框架需要满足的功能:测试用例管理、各种配置信息管理、数据库操作、日志打印、报告输出、邮件发送
"""API_Autotest/
|-- API_Case/ #测试用例
| | |-- PreviewRelease_01_RegisterLogin.py #预发布环境注册登录接口case
| | |-- PreviewRelease_02_SubmitCredit.py #预发布环境主流程接口case
| | |-- PreviewRelease_03_MobilePhonePWD.py #预发布环境.......接口case
| | |-- Test_01_RegisterLogin.py #测试环境注册登录接口case
| | |-- Test_02_SubmitCredit.py #测试环境主流程接口case
| | |-- Test_03_LoanRepayment.py #测试环境......接口case
| | |
|-- data/ ###配置信息
| |-- conf_dict.py #账号密码等配置信息
| |-- custom_variable.py #向接口请求的请求的参数变量、keys、请求头
| |-- export_url.py #接口url
| |-- request_dict.py #向接口请求的请求参数
|
|-- logic/ ###主要逻辑
| |-- export_logic.py #接口实现主逻辑
| |-- database.py #数据库操作类
| |-- log_print.py #日志打印类
| |-- public_class.py #公用函数类
| |-- send_email.py #发送邮件类
|
|-- log/ #日志
| |-- Case--0418_log #根据运行日期保存操作日志
| |-- Case--0418_log
|
|-- report/ #测试报告
| |-- report--04181729.html #根据运行日期保存测试报告日志
| |-- report--04181731.html
|
|-- readme.md #readme
|
|-- manage.py #接口case运行管理类"""
三、知识技能储备
萝卜青菜各有所爱,每个人心中的接口自动化测试框架也各不相同,想实现一个基础功能完备的接口测试框架需要的Python知识如下:
1、Python基础知识
列举了一些需要掌握的基础知识
2、主要依赖的库
下面介绍的库都是数据库操作、日志打印、报告输出、邮件发送功能实现所依赖的库:
a、数据库操作
数据库操作主要pymysql库,下面为代码示例:
importpymysqlimportdatetime,timeimportosfrom data.conf_dict importConfDatafrom logic.log_print importLOGfrom logic.public_class importPublicclassConnectDatabase():"""连接数据库类"""
def __init__(self):
self.Log=LOG()
self.conf_data=ConfData()#连接数据库110
self.connection_110 = pymysql.connect(host=self.conf_data.get_conf_data("database_110", "host"),
port=self.conf_data.get_conf_data("database_110", "port"),
user=self.conf_data.get_conf_data("database_110", "user"),
password=self.conf_data.get_conf_data("database_110", "password"),
db=self.conf_data.get_conf_data("database_110", "db"),
charset='utf8',#以字典形式展示所查询数据
cursorclass=pymysql.cursors.DictCursor)#保存错误日志的文件名称
self.log_name = Public.get_new_file(path=os.path.join(os.path.dirname(os.path.dirname(__file__)), "log"))def select_sql(self, **kwargs):"""根据传入参数执行数据库查询操作
:param args:
:param kwargs: database:(database_110)选择数据库、table:表名、condition:where条件
:return:"""database_name= kwargs.get("database")
field_name= kwargs.get("field", "*")
table_name= kwargs.get("table")
where_condition= kwargs.get("condition")if database_name == "database_110":try:
with self.connection_110.cursor() as cursor:
sql= "SELECT %s FROM %s WHERE %s;"data=(field_name, table_name, where_condition)
cursor.execute(sql%data)
mit()
result=cursor.fetchone()returnresultexceptException as e:
self.Log.log_warning("database", self.log_name, "select_error:%s" %e)def select_sql_all(self, *args, **kwargs):"""根据传入参数执行数据库查询操作
:param args:
:param kwargs: database:(database_110)选择数据库、table:表名、condition:where条件
:return:"""database_name= kwargs.get("database")
field_name= kwargs.get("field", "*")
table_name= kwargs.get("table")
where_condition= kwargs.get("condition")if database_name == "database_110":try:
with self.connection_110.cursor() as cursor:
sql= "SELECT %s FROM %s WHERE %s;"data=(field_name, table_name, where_condition)print((sql %data))
cursor.execute(sql%data)
mit()
result=cursor.fetchall()returnresultexceptException as e:
self.Log.log_warning("database", self.log_name, "select_error:%s" %e)def update_sql(self, *args, **kwargs):"""根据传入参数执行数据库更新操作
:param args:
:param kwargs: database:(database_110)选择数据库、table:表名、set:更新的字段和值、condition:where条件
:return:"""database_name= kwargs.get("database")
table_name= kwargs.get("table")
set_value= kwargs.get("set")
where_condition= kwargs.get("condition")if database_name == "database_110":try:
with self.connection_110.cursor() as cursor:
sql= "UPDATE %s SET %s WHERE %s;"data=(table_name, set_value, where_condition)#print(sql%data)
cursor.execute(sql %data)
mit()returncursor.rowcount
cursor.close()exceptException as e:
self.Log.log_warning("database", self.log_name, "update_error:%s" %e)
self.connection_110.rollback()def delete_sql(self, *args, **kwargs):"""根据传入参数执行数据库删除操作
:param args:
:param kwargs: database:(database_110)选择数据库、table:表名、condition:where条件
:return:"""database_name= kwargs.get("database")
table_name= kwargs.get("table")
where_condition= kwargs.get("condition")if database_name == "database_110":try:
with self.connection_110.cursor() as cursor:
sql= "DELETE from %s where %s;"data=(table_name, where_condition)
cursor.execute(sql%data)
mit()returncursor.rowcountexceptException as e:
self.Log.log_warning("database", self.log_name, "delete_error:%s" %e)
self.connection_110.rollback()def insert_sql(self, *args, **kwargs):"""根据传入参数执行数据库插入操作
:param args:
:param kwargs: database:(database_110)选择数据库、sql:需要插入的sql语句
:return:"""database_name= kwargs.get("database")
insert_sql= kwargs.get("sql")if database_name == "database_110":try:
with self.connection_110.cursor() as cursor:
cursor.execute(insert_sql)
mit()returncursor.rowcountexceptException as e:
self.Log.log_warning("database", self.log_name, "insert_error:%s" %e)
self.connection_110.rollback()def mysql_function(self, *args, **kwargs):"""根据传入参数执行数据库函数操作
:param args:
:param kwargs: database:(database_110)选择数据库、function_name:函数名称,data_id:数据ID,phone:电话
:return:"""database_name= kwargs.get("database")
function_name= kwargs.get("function_name")
data_id= kwargs.get("data_id")
phone= kwargs.get("phone")
product_id= kwargs.get("product_id")if database_name == "database_110":try:
with self.connection_110.cursor() as cursor:
cursor.callproc(function_name,args=(data_id,phone,product_id,))
mit()exceptException as e:
self.Log.log_warning("database", self.log_name, "mysql_function:%s" %e)
self.connection_110.rollback()if __name__ == "__main__":
b=ConnectDatabase()
data_id= Public.create_short_id()
示例代码--删减版
PS:pymsql库操作mysql数据库增、删、查、改、调用函数
b、日志打印
日志打印依赖logging库,下面为代码示例:
importloggingimportosimportsysimportdatetime
log_path= os.path.join(os.path.dirname(os.path.dirname(__file__)), "log")
sys.path.append(log_path)classLOG(object):"""日志打印类"""
def __init__(self):
self.now_time= datetime.datetime.now().strftime("%Y%m%d%H%M")def log_info(self, *args):"""根据传入参数打印普通日志
:param arg[0]log的功能模块名,arg[1] 保存log的文件名,arg[2]要打印的日志内容
:return 返回logger 对象"""
#创建一个logger对象
logger =logging.getLogger(args[0])
logger.setLevel(logging.DEBUG)#创建一个向屏幕输入的handler对象
ch =logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)#创建一个像文件输入的handler对象
log_file = os.path.join(log_path, "%s--%s_log" % (args[1], self.now_time))
fh= logging.FileHandler(log_file, mode="a+", encoding="utf-8")
fh.setLevel(logging.DEBUG)#设置log输入格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)#logger,添加handler对象
logger.addHandler(ch)
logger.addHandler(fh)
logger.info(args[2])#在记录日志之后移除句柄, 不然会重复打印日志
logger.removeHandler(fh)
logger.removeHandler(ch)returnlogger
@staticmethoddef log_warning(*args):"""根据传入参数错误日志
:param arg[0]log的功能模块名,arg[1]文件名 arg[2] 需要打印的内容
:return 返回logger 对象"""
#创建一个logger对象
logger =logging.getLogger(args[0])
logger.setLevel(logging.DEBUG)#创建一个向屏幕输入的handler对象
ch =logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)#创建一个像文件输入的handler对象
log_file = os.path.join(log_path, args[1])
fh= logging.FileHandler(log_file, mode="a+", encoding="utf-8")
fh.setLevel(logging.DEBUG)#设置log输入格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)#logger,添加handler对象
logger.addHandler(ch)
logger.addHandler(fh)
logger.warning(args[2])#在记录日志之后移除句柄, 不然会重复打印日志
logger.removeHandler(fh)
logger.removeHandler(ch)print("aaa")returnlogger#@staticmethod
deflog_debug(self,message):"""根据传入参数错误日志
:param arg[0]log的功能模块名,arg[1]文件名 arg[2] 需要打印的内容
:return 返回logger 对象"""
#创建Logger
logger =logging.getLogger()
logger.setLevel(logging.DEBUG)#创建Handler
#终端Handler
consoleHandler =logging.StreamHandler(sys.stdout)
consoleHandler.setLevel(logging.DEBUG)#文件Handler
fileHandler = logging.FileHandler('ing.log', mode='w', encoding='UTF-8')
fileHandler.setLevel(logging.NOTSET)#Formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
consoleHandler.setFormatter(formatter)
fileHandler.setFormatter(formatter)#添加到Logger中
logger.addHandler(consoleHandler)
logger.addHandler(fileHandler)returnlogger.debug(message)#if __name__ == "__main__":#A=LOG()#A.log_debug('123')
示例代码--删减版
c、报告输入
报告输出依赖库主要为requests、unittest、下面为代码示例:
importrequestsfrom data.export_url importExportERLfrom data.request_dict importRequestDatafrom logic.log_print importLOGfrom logic.database importConnectDatabasefrom data.custom_variable importGlobalVariableclassExportLogic(object):"""ExportLogic 接口主逻辑类"""
def __init__(self):
self.Request=RequestData()
self.Url=ExportERL()
self.ConnectDatabase=ConnectDatabase()
self.Log=LOG()
self.Custom_variable=GlobalVariable()
@staticmethoddef request_post(*args, **kwargs):"""传入请求参数,返回json请求结果
:param args:
:param kwargs: json:请求数据, url:请求路径, header:请求头
:return:"""request_data= kwargs.get("json")
request_url= kwargs.get("url")
request_header= kwargs.get("header")
response_json= requests.post(url=request_url, data=request_data, headers=request_header).json()returnresponse_json
@staticmethoddef request_post_json(*args, **kwargs):"""传入请求参数,返回json请求结果
:param args:
:param kwargs: json:请求数据, url:请求路径, header:请求头
:return:"""request_data= kwargs.get("json")
request_url= kwargs.get("url")
request_header= kwargs.get("header")
response_json= requests.post(url=request_url, json=request_data, headers=request_header)returnresponse_json
@staticmethoddef request_post_file(*args, **kwargs):"""上传接口独有
:param args:
:param kwargs: json:请求数据, url:请求路径, header:请求头, file:文件路径
:return:"""request_data= kwargs.get("json")
request_url= kwargs.get("url")
request_header= kwargs.get("header")
request_file= kwargs.get("files")
response_json= requests.post(url=request_url, data=request_data, files=request_file,
headers=request_header).json()returnresponse_jsonif __name__ == "__main__":
b= ExportLogic()
requests库演示代码--删减版
importunittestimportosimportHTMLTestReportCNimportdatetimefrom logic.export_logic importExportLogicfrom data.custom_variable importGlobalVariablefrom data.export_url importExportERLfrom logic.public_class importPublicfrom data.request_dict importRequestDatafrom logic.log_print importLOGfrom logic.database importConnectDatabasefrom data.conf_dict importConfDatafrom logic.send_email importSendEmailimportrandom,requestsclassLZExportCase(unittest.TestCase):"""新浪有借有还接口测试用例"""
#全距
PHONE =Public().create_phone()#提交五项资料的Header
HEADER = GlobalVariable().get_header('lz_Header')#修改手机号的Header
HEADER1 = GlobalVariable().get_header('lz_Header')
BASEID= ''PERIODNUM= ''IMGID=''Repayid= ''Periodnum_= ''
defsetUp(self):
self.Export_logic=ExportLogic()
self.Export_url=ExportERL()
self.Public=Public()
self.Request_data=RequestData()
self.Custom_variable=GlobalVariable()
self.Log=LOG()
self.ConnectDatabase=ConnectDatabase()
self.Conf_data=ConfData()
self.password= GlobalVariable().get_variable("password")
self.phone=Public().create_phone()
self.newphone= GlobalVariable().get_variable("YJ_newphone")#self.report_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "report")
self.log_path = os.path.dirname(os.path.dirname(__file__))
self.path= os.path.join(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static"),"logo.jpg")deftearDownC(self):pass
#os.remove(os.path.join(self.report_path, "report.html"))
#os.remove(os.path.join(os.path.join(self.path, "log"), "Export_log"))
deftest_085_FileLoad(self):"""Case--上传文件(saveType正常)"""url= self.Export_url.get_export_url("LZ_test_url", "File")
request_json= self.Request_data.get_request_data('File')
file_= open(self.path, 'rb')
file={"file": ('logo.jpg', file_, 'image/jpeg')
}
request_json["saveType"] = '1'request_json= self.Public.sign_md5(keys="lz_test_keys", json=request_json)
self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"),"%s request data:%s" %(self._testMethodName, request_json))
response_json= self.Export_logic.request_post_file(url=url, json=request_json, files=file, header=self.HEADER)
LZExportCase.IMGID= response_json['data']['id']
self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"),"%s response data:%s" %(self._testMethodName, response_json))
self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'data_correct')['msg'], response_json['msg']) \and self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'data_correct')['code'], response_json['code'])
file_.close()deftest_086_HeadImg(self):"""Case--修改头像(headimgId正常)"""url= self.Export_url.get_export_url("LZ_test_url", "HeadImg")
request_json= self.Request_data.get_request_data('HeadImg')
request_json["headimgId"] =LZExportCase.IMGID
request_json= self.Public.sign_md5(keys="lz_test_keys", json=request_json)
self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"),"%s request data:%s" %(self._testMethodName, request_json))
response_json= self.Export_logic.request_post(url=url, json=request_json, header=self.HEADER)
self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"),"%s response data:%s" %(self._testMethodName, response_json))
self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'data_correct')['msg'], response_json['msg']) \and self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'data_correct')['code'], response_json['code'])deftest_087_HeadImg(self):"""Case--修改头像(headimgId错误)"""url= self.Export_url.get_export_url("LZ_test_url", "HeadImg")
request_json= self.Request_data.get_request_data('HeadImg')
request_json["headimgId"] = '3213adcx213vcv'request_json= self.Public.sign_md5(keys="lz_test_keys", json=request_json)
self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"),"%s request data:%s" %(self._testMethodName, request_json))
response_json= self.Export_logic.request_post(url=url, json=request_json, header=self.HEADER)
self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"),"%s response data:%s" %(self._testMethodName, response_json))
self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'headimgId_ERROR')['msg'], response_json['msg']) \and self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'headimgId_ERROR')['code'], response_json['code'])deftest_088_HeadImg(self):"""Case--修改头像(headimgId为空)"""url= self.Export_url.get_export_url("LZ_test_url", "HeadImg")
request_json= self.Request_data.get_request_data('HeadImg')
request_json["headimgId"] = ''request_json= self.Public.sign_md5(keys="lz_test_keys", json=request_json)
self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"),"%s request data:%s" %(self._testMethodName, request_json))
response_json= self.Export_logic.request_post(url=url, json=request_json, header=self.HEADER)
self.Log.log_info(self._testMethodDoc.strip(), self.Conf_data.get_conf_data("log_name", "LZ"),"%s response data:%s" %(self._testMethodName, response_json))
self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'headimgId_NULL')['msg'], response_json['msg']) \and self.assertEqual(self.Conf_data.get_conf_PromptMsg("prompt_msg", "LZ",'headimgId_NULL')['code'], response_json['code'])if __name__ == '__main__':
Send_mail=SendEmail()
now_time= datetime.datetime.now().strftime("%Y%m%d%H%M")
module_name= os.path.basename(__file__).split(".")[0]
module= __import__(module_name)
path= os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "report")
logo_path= os.path.join(os.path.join(path, "static"),"logo.jpg")
fp= open(os.path.join(path, "report--%s.html" % now_time), "wb")
runner= HTMLTestReportCN.HTMLTestRunner(stream=fp)
all_suite=unittest.defaultTestLoader.loadTestsFromModule(module)
runner.run(all_suite)
fp.close()
SendEmail.send_email(Send_mail)
unittest库演示代码--删减版
d、邮件发送
邮件发送依赖两个python内置库smtplib、email:
importosimportsmtplibimportemail.mime.multipartimportemail.mime.textfrom email importencodersimportdatetimefrom data.conf_dict importConfDatafrom logic.public_class importPublicclassSendEmail(object):def __init__(self):
self.conf_data=ConfData()
self.sender= self.conf_data.get_conf_data("email", "sender")
self.receiver= self.conf_data.get_conf_data("email", "receiver")
self.SMTP_server= self.conf_data.get_conf_data("email", "SMTP_server")
self.username= self.conf_data.get_conf_data("email", "username")
self.password= self.conf_data.get_conf_data("email", "password")
self.content= self.conf_data.get_conf_data("email", "content")
self.now_time= datetime.datetime.now().strftime("%Y%m%d%H%M")
self.report_path= os.path.join(os.path.dirname(os.path.dirname(__file__)), "report")
self.log_path= os.path.join(os.path.dirname(os.path.dirname(__file__)), "log")defcreate_msg(self,receiver):"""此函数主要构建收发邮件联系人、邮件正文、邮件附件、邮件标题
:return:"""
#构建邮件正文
msg =email.mime.multipart.MIMEMultipart()
msg["from"] =self.sender
msg["to"] =receiver
msg['subject'] = "API自动化测试报告"txt=email.mime.text.MIMEText(self.content)
msg.attach(txt)#构建邮件附件之一:自动化测试报告
report_name = Public.get_new_file(path=self.report_path)
report_path=os.path.join(self.report_path, report_name)
report= email.mime.text.MIMEText(open(report_path, "rb")
.read(),'html', 'utf-8')
report["Content-Type"] = 'application/octet-stream'report.add_header('Content-Disposition', 'attachment', filename=('gbk', '', report_name))
encoders.encode_base64(report)
msg.attach(report)#构建邮件附件之一:测试日志
log_name = Public.get_new_file(path=self.log_path)
log_path=os.path.join(self.log_path, log_name)
info_log= email.mime.text.MIMEText(open(log_path, 'rb').read(), 'base64', 'utf-8')
info_log["Content-Type"] = 'application/octet-stream'info_log.add_header('Content-Disposition', 'attachment', filename=('gbk', '', log_name))
encoders.encode_base64(info_log)
msg.attach(info_log)returnmsgdef send_email(self,*args):"""创建实例,发送邮件
:return:"""receiver= self.conf_data.get_conf_data("email", "%s_receiver" % args[0][0]) if args elseself.receiver
msg=self.create_msg(receiver)
smtp= smtplib.SMTP_SSL(self.SMTP_server, 465) #在Linux端使用ssL方式连接邮箱服务器
#smtp.connect(self.SMTP_server, 465) # 在windows端使用connect方式连接邮箱服务器
smtp.login(self.username, self.password)
smtp.sendmail(self.sender, receiver.split(","), msg.as_string())
smtp.quit()if __name__ == "__main__":
s=SendEmail()
s.send_email()
示例代码