
Python接口测试常用方法封装
在读取每条测试用例时,实际上是读取每行的数据,然后可以通过固定的关键字(列)来获取对应的值,最后传递给接口方法来执行接口测试。
发布日期:2021-05-06 07:21:33
浏览次数:85
分类:技术文章
本文共 19869 字,大约阅读时间需要 66 分钟。
1. Excel表格操作方法封装
python有3个模块时对Excel文件的操作,分别是openpyxl、xlrd和xlwt
xlrd
xlrd是用来从Excel中读写数据的,用xlrd进行读取比较方便,流程和平常手动操作Excel一样,打开工作簿(workbook),选择工作簿(sheets),然后操作单元格(cell)。
例子:打开当前目录下名为data.xlsx的Excel文件,选择第1张工作表,然后读取第一行的全部内容并打印出来。import osimport xlrd# 获取当前文件的绝对路径curPath = os.path.abspath(os.path.dirname(__file__))print(curPath)# 获取项目根目录rootPath = os.path.abspath(os.path.dirname(curPath))print(rootPath)# 获取文件路径file_path = r'data\data.xlsx'file_path = os.path.join(rootPath, file_path)print(file_path)# 打开Excel文件data = xlrd.open_workbook(file_path)# 选择第1张工作表table = data.sheets()[0]# data_list用来存放数据data_list = []# 将table中第1行的数据读取并添加到data_list中data_list.extend(table.row_values(0))# 打印出第1行的全部数据for item in data_list: print(item)
运行结果:

xlwt
xlwt只能对Excel进行写操作。
xlwt写库的局限性:只能写入新建的Excel。使用xlutils库的copy功能可以解决这个问题。import osimport xlwt# 获取当前文件的绝对路径curPath = os.path.abspath(os.path.dirname(__file__))print(curPath)# 获取项目根目录rootPath = os.path.abspath(os.path.dirname(curPath))print(rootPath)# 新建一个Excel文件(只能通过新建写入)data = xlwt.Workbook()# 新建一个工作表table = data.add_sheet('name')# 写入数据到A1单元格table.write(0, 0, u'哈哈')# 注意:如果对同一个单元格重复操作,会引发overwrite Exception,想要取消该功能,需要在添加工作表时指定为可覆盖table = data.add_sheet('sheet1', cell_overwrite_ok=True)# 保存data.save(os.path.join(rootPath, r'data\test.xls'))
查看结果:

openpyxl
该模块支持最新版的Excel文件格式,可对Excel文件进行读写操作。
import osfrom openpyxl import load_workbook# 获取当前文件的绝对路径curPath = os.path.abspath(os.path.dirname(__file__))print(curPath)# 获取项目根目录rootPath = os.path.abspath(os.path.dirname(curPath))print(rootPath)# 获取文件路径file_path = r'data\test_openpyxl.xlsx'file_path = os.path.join(rootPath, file_path)print(file_path)if __name__ == '__main__': # 读取Excel文件 wb = load_workbook(file_path) # 取第1张表 sheet_names = wb.sheetnames print("所有表名称:", sheet_names) ws = wb[sheet_names[0]] # 显示表名、表行数、表列数 print('工作表行数:', ws.max_row) print('工作表列数: ', ws.max_column) # 获取指定行的值,如第3行 row3_values = [] row3_cell_list = list(ws.rows)[2] for cell in row3_cell_list: row3_values.append(cell.value) print(row3_values) # 获取所有行的数据 data = { } for i in range(0, ws.max_row): every_row_values = [] every_row_cell_list = list(ws.rows)[i] for cell in every_row_cell_list: every_row_values.append(cell.value) data[i + 1] = every_row_values print(data) # 写入数据 # 将第1个工作表的名称改为name ws.title = 'name' # 向某个单元格中写入数据 ws.cell(5, 5).value = u'write data!!' # 保存文件 wb.save(file_path)
查看结果:


封装读写
import osfrom openpyxl import load_workbook# 获取当前文件的绝对路径curPath = os.path.abspath(os.path.dirname(__file__))print(curPath)# 获取项目根目录rootPath = os.path.abspath(os.path.dirname(curPath))print(rootPath)class Operate_excel(object): """ 操作Excel类 """ # 定义构造函数,创建对象自动执行 def __init__(self, file_path=None, sheet_id=None): """ :param file_path: 如果没传值,默认为Excel路径 :param sheet_id: 如果没传值,默认为第一个sheet """ if file_path: # 成员变量 self.file_path = file_path else: self.file_path = r'data\operate_excel.xlsx' # 将文件目录拼接成绝对路径 self.file_path = os.path.join(rootPath, self.file_path) print(self.file_path) if sheet_id: self.sheet_id = sheet_id else: self.sheet_id = 0 # 调用成员方法 self.sheet_table = self.get_sheet() """成员方法""" # 获取sheet页操作对象 def get_sheet(self): # 读取Excel文件 data = load_workbook(self.file_path) # 读取表 sheet_table = data.worksheets[self.sheet_id] return sheet_table # 获取sheet页的行数和列数,返回的是一个元组 def get_sheet_nrows_ncols(self): return self.sheet_table.max_row, self.sheet_table.max_column # 获取sheet页的行数 def get_sheet_nrows(self): return self.sheet_table.max_row # 获取sheet页的列数 def get_sheet_ncols(self): return self.sheet_table.max_column # 获取具体单元格的数据 def get_sheet_cell_value(self, row, col): """ :param row: 单元格行值 :param col: 单元格列值 :return: cell_data """ cell_data = self.sheet_table.cell(row, col).value return cell_data # 获取指定行的数据 def get_sheet_row_value(self, row): row_value = [] row_cell_list = list(self.sheet_table.rows)[row - 1] for cell in row_cell_list: row_value.append(cell.value) return row_value # 获取所有行的数据 def get_sheet_every_row_value(self): all_data = { } for i in range(0, self.sheet_table.max_row): every_row_values = [] every_row_cell_list = list(self.sheet_table.rows)[i] for cell in every_row_cell_list: every_row_values.append(cell.value) all_data[i + 1] = every_row_values return all_data # 写入数据到Excel中 def write_to_excel(self, row, col, values): # 打开Excel文件读取数据句柄 data = load_workbook(self.file_path) # 读取表 sheet_table = data.worksheets[self.sheet_id] # 写入数据 sheet_table.cell(row, col).value = values # 保存文件 data.save(self.file_path)if __name__ == '__main__': read_xlsx = Operate_excel() print('获取到的工作表:', read_xlsx.sheet_table) print('获取Excel表的行数与列数,返回元组格式:', read_xlsx.get_sheet_nrows_ncols()) print('获取Excel表的行数:', read_xlsx.get_sheet_nrows()) print('获取Excel表的列数:', read_xlsx.get_sheet_ncols()) print('获取Excel表的单元格(1,1)的值:', read_xlsx.get_sheet_cell_value(1, 1)) print('获取Excel表第2行的值:', read_xlsx.get_sheet_row_value(2)) print('获取Excel表所有行的值:', read_xlsx.get_sheet_every_row_value()) print('写入Excel表的单元格(4,3)的值:', read_xlsx.write_to_excel(4, 3, 'test'))
查看结果:

2. 接口关键字封装
在Excel中存放接口测试用例数据,通常第一行字段的值都是固定的值,如host/path/method/data等等,对应接口的关键字。
接口测试用例主要的关键字常量有:
获取接口关键字的封装如下:
class TestCaseKeyWord(object): """ 定义测试用例关键字类 """ CASE_ID = '1' CASE_NAME = '2' IS_EXECUTE = '3' INTERFACE_URL = '4' METHOD = '5' HEADER = '6' REQUEST_DATA = '7' EXPECTED_RESULT = '8' ACTUAL_RESULT = '9' RESULT = '10'# 获取用例iddef get_case_id(): return TestCaseKeyWord.CASE_ID# 获取用例名称def get_case_name(): return TestCaseKeyWord.CASE_NAME# 用例是否执行def get_case_is_execute(): return TestCaseKeyWord.IS_EXECUTE# 接口URLdef get_case_interface_url(): return TestCaseKeyWord.INTERFACE_URL# 用例方法def get_case_method(): return TestCaseKeyWord.METHOD# 请求头def get_case_header(): return TestCaseKeyWord.HEADER# 请求参数def get_case_payload(): return TestCaseKeyWord.REQUEST_DATA# 预期结果def get_case_expected_result(): return TestCaseKeyWord.EXPECTED_RESULT# 实际结果def get_case_actual_result(): return TestCaseKeyWord.ACTUAL_RESULT# 用例执行结果def get_case_result(): return TestCaseKeyWord.RESULT
在Excel中添加1条测试用例,新建test_keyword.py文件,获取接口测试用例名称:
from business import testcases_keywordfrom business.excel_operating import Operate_excelget_excel = Operate_excel()# 获取用例数print("用例数是:", get_excel.get_sheet_nrows() - 1)# 返回用例名称关键字的列值case_name_col = int(testcases_keyword.get_case_name())print("用例名称关键字的列值:", case_name_col)# 返回第一条用例的名称get_case_name = get_excel.get_sheet_cell_value(2, case_name_col)print("第一条用例的名称:", get_case_name)
运行结果如下:

3. 获取接口数据封装
取到接口测试用例中的关键字之后,要获取关键字对应的值。
获取接口关键字对应数据的封装如下:
from business import testcases_keywordfrom business.excel_operating import Operate_excelimport jsonclass getData(object): def __init__(self): self.op_excel = Operate_excel() def get_case_nums(self): """获取测试用例条数""" return self.op_excel.get_sheet_nrows() - 1 def get_is_header(self, row): """是否携带请求头""" col = int(testcases_keyword.get_case_header()) header = self.op_excel.get_sheet_cell_value(row + 1, col) if header is not None: return header else: print("没有header!") return None def get_is_execute(self, row): """是否执行""" col = int(testcases_keyword.get_case_is_execute()) is_execute = self.op_excel.get_sheet_cell_value(row + 1, col) if is_execute == 'yes': flag = True else: flag = False return flag def get_url(self, row): """获取url""" col = int(testcases_keyword.get_case_interface_url()) url = self.op_excel.get_sheet_cell_value(row + 1, col) return url def get_method(self, row): """获取请求方法""" col = int(testcases_keyword.get_case_method()) method = self.op_excel.get_sheet_cell_value(row + 1, col) return method def get_data(self, row): """获取请求数据""" col = int(testcases_keyword.get_case_payload()) data = self.op_excel.get_sheet_cell_value(row + 1, col) return json.loads(data) def get_expected_result(self, row): """获取预期结果""" col = int(testcases_keyword.get_case_expected_result()) expected_result = self.op_excel.get_sheet_cell_value(row + 1, col) if expected_result == '': return None else: return expected_result def get_actual_result(self, row, value): """获取实际结果并写入""" col = int(testcases_keyword.get_case_actual_result()) actual_result = expected_result = self.op_excel.get_sheet_cell_value(row + 1, col) self.op_excel.write_to_excel(row + 1, col, value) if __name__ == '__main__': get_data = getData() print(get_data.get_data(1)) print(get_data.get_url(1)) print(get_data.get_is_execute(1))
运行结果如下:

4.接口请求方法封装
写好接口测试用例,获取到测试数据后,就可以模拟接口请求方法。
请求方法封装如下:
import requestsimport jsonclass ApiRequests(object): """ 请求方法封装 """ # get请求 def get_method(self, url, data=None, header=None): if header is not None: res = requests.get(url, params=data, headers=header) else: res = requests.get(url, params=data) return res.json() # post请求 def post_method(self, url, data=None, header=None): if header is not None: res = requests.post(url, json=data, headers=header) else: res = requests.post(url, json=data) return res.json() # put请求 def put_method(self, url, data=None, header=None): if header is not None: res = requests.put(url, json=data, headers=header) else: res = requests.put(url, json=data) return res.json() # delete请求 def delete_method(self, url, data=None, header=None): if header is not None: res = requests.delete(url, json=data, headers=header) else: res = requests.delete(url, json=data) return res.json() # 主方法 def run_method(self, method, url, data=None, header=None): if method == 'get' or method == 'GET': res = self.get_method(url, data, header) elif method == 'post' or method == 'POST': res = self.post_method(url, data, header) elif method == 'put' or method == 'PUT': res = self.put_method(url, data, header) elif method == 'delete' or method == 'DELETE': res = self.delete_method(url, data, header) else: res = '请求方法错误' # 返回res return json.dumps(res, ensure_ascii=False, indent=4, sort_keys=True, separators=(',', ':'))
新建test_api_requests.py文件,先获取Excel中的测试数据,再通过封装好的请求方法模拟请求接口:
from business.get_testdata_from_keyword import getData # 通过接口关键字获取测试数据from business.api_requests import ApiRequests # 请求方法get_data = getData()print('获取是否执行: ', get_data.get_is_execute(1))print('获取接口url:', get_data.get_url(1))print('获取接口请求方法:', get_data.get_method(1))print('获取请求数据:', get_data.get_data(1))url = get_data.get_url(1)method = get_data.get_method(1)data = get_data.get_data(1)ar = ApiRequests()result = ar.run_method(url=url, method=method, data=data)print(result)
运行结果如下:

5. 发送邮件封装
利用python内置对SMTP协议的支持,将测试结果以邮件的方式发送。
- SMTP是发送邮件的协议,python内置对SMTP的支持,可以发送纯文本邮件、HTML邮件以及带附件的邮件
- python对SMTP支持有smtplib和email两个模块,email负责构造邮件,smtplib负责发送邮件
- smtplib在发送邮件的过程中,起到服务器直接互相通信的作用
- email用来设置服务器之间通信的信息,包括信息头、信息主体等等
发送邮件的流程:
- 登陆:服务器、账号、密码
- 构造邮件:发送人、收件人、邮件主体、邮件正文内容
- 发送:接收地址
# 导入发送邮件模块import smtplib# 导入构造邮件模块from email.mime.text import MIMETextclass SendEmail(object): """ 发送邮件模块 """ def __init__(self): """ 初始化邮件配置服务 """ self.send_user = '###' # 发送人邮箱地址 self.mail_host = 'smtp.qq.com' self.password = '###' # 授权码 def send_mail(self, user_lists, subject, content): """ 执行发送邮件 :param user_lists:接收人邮箱地址 :param subject:邮件主题 :param content:邮件正文内容 :return: """ user = '发送人名称' + '<' + self.send_user + '>' message = MIMEText(content, _subtype='plain', _charset='utf8') message['Subject'] = subject message['From'] = user message['To'] = ';'.join(user_lists) try: server = smtplib.SMTP() server.connect(self.mail_host) server.login(self.send_user, self.password) # as_string将MIMETEXT对象转成str server.sendmail(user, user_lists, message.as_string()) server.close() print('邮件发送成功'.center(60, '=')) except: print('邮件发送失败'.center(60, '=')) raise def send_content(self, data): """ 发送邮件内容 """ pass_cases_nums = int(len(data['pass_cases'])) print('用例执行成功数:%s' % pass_cases_nums) fail_cases_nums = int(len(data['fail_cases'])) print('用例执行失败数:%s' % fail_cases_nums) not_execute_nums = int(len(data['not_execute_cases'])) print('用例未执行数:%s' % not_execute_nums) execute_num = float(pass_cases_nums + fail_cases_nums) total_cases = float(pass_cases_nums + fail_cases_nums + not_execute_nums) pass_ratio = "%.2f%%" % (pass_cases_nums / total_cases * 100) fail_ratio = "%.2f%%" % (fail_cases_nums / total_cases * 100) user_lists = ['###'] subject = '【接口自动化测试用例执行统计】' content = '一共 %f 个用例, 执行了 %f 个用例,未执行 %f 个用例;成功 %f 个,通过率为 %s;失败 %f 个,失败率为 %s' % ( total_cases, execute_num, not_execute_nums, pass_cases_nums, pass_ratio, fail_cases_nums, fail_ratio) self.send_mail(user_lists, subject, content)if __name__ == '__main__': sm = SendEmail() sm.send_content({ 'pass_cases': [1, 3, 5], 'fail_cases': [2, 4, 6], 'not_execute_cases': [1, 2, 3]})
6. 操作数据库封装
操作数据库的流程:
- 连接数据库
- 获取操作游标
- 执行sql语句,新增/修改/删除/查询
- 提交更新数据或回滚
- 关闭连接
import pymysqlimport jsonclass OperateMysql(object): def __init__(self): # 数据库初始化连接 self.conn = pymysql.connect( host='localhost', port=3306, user='root', password='12345678', charset='utf8mb4', db='test', cursorclass=pymysql.cursors.DictCursor ) # 创建游标操作数据库 self.cursor = self.conn.cursor() def select_first_data(self, sql): """ 查询第一条数据 """ try: # 执行sql语句 self.cursor.execute(sql) except Exception as e: print('执行sql异常:%s' % e) else: # 获取查询到的第一条数据 first_data = self.cursor.fetchone() # 将返回结果转换成str数据格式,禁用ascii编码 first_data = json.dumps(first_data, ensure_ascii=False) return first_data def select_all_data(self, sql): """ 查询结果集 """ try: # 执行sql语句 self.cursor.execute(sql) except Exception as e: print('执行sql异常:%s' % e) else: all_data = self.cursor.fetchall() # 将返回结果转换成str数据格式,禁用ascii编码 # all_data = json.dumps(all_data, ensure_ascii=False) return all_data def del_data(self, sql): """ 删除数据 """ res = { } try: # 执行sql语句 result = self.cursor.execute(sql) if result != 0: # 提交修改 self.conn.commit() res = { '删除成功'} else: res = { '没有要删除的数据'} except Exception as e: # 发生错误时回滚 self.conn.rollback() res = { '删除失败', e} return res def update_data(self, sql): """ 修改数据 """ res = { } try: # 执行sql语句 self.cursor.execute(sql) self.conn.commit() res = { '更新成功'} except Exception as e: # 发生错误时回滚 self.conn.rollback() res = { '更新失败', e} return res def insert_data(self, sql, data): """ 新增数据 """ res = { } try: # 执行sql语句 self.cursor.execute(sql, data) self.conn.commit() res = { data, '新增成功'} except Exception as e: self.conn.rollback() res = { '新增失败', e} return res def conn_close(self): """ 关闭数据库 """ self.cursor.close() self.conn.close()if __name__ == '__main__': # 类的实例化 om = OperateMysql() # 新增 data = [{ 'id': 1, 'name': '张三', 'age': 10}, { 'id': 2, 'name': '李四', 'age': 20}, { 'id': 3, 'name': '王五', 'age': 30}] for i in data: i_data = (i['id'], i['name'], i['age']) insert_res = om.insert_data( "INSERT INTO students (id,name,age) VALUES (%s,%s,%s)", i_data ) print(insert_res) # 查询 one_data = om.select_first_data( "SELECT * FROM students" ) print(one_data) all_data = om.select_all_data( "SELECT * FROM students" ) print('查询的总数据:', len(all_data), '分别是:', all_data) # 修改 update_data = om.update_data( "UPDATE students SET name='wangwu' WHERE id=3" ) print(update_data) # 删除 delete_data = om.del_data( "DELETE FROM students WHERE id in(1,2,3)" ) print(delete_data) # 关闭数据库 om.conn_close()
7. Json数据文件封装
可以用json文件来存储测试数据,python读取json文件的流程是:
- 获取文件名和绝对路径
- 打开读取数据
- 提取关键数据
- 关闭文件
具体代码实现:
import osimport json# 获取当前文件的绝对路径curPath = os.path.abspath(os.path.dirname(__file__))print(curPath)# 获取项目根目录rootPath = os.path.abspath(os.path.dirname(curPath))print(rootPath)config_file_name = r'data\topics.json'class OperateJson(object): """ 操作Json文件 """ def __init__(self, file_name=None): if file_name: self.file_name = file_name else: self.get_file = config_file_name self.file_name = os.path.join(rootPath, self.get_file) print('文件名称:%s' % self.file_name) self.data = self.read_json() def read_json(self): """ 读取json数据 """ with open(self.file_name, encoding='utf8') as fp: # 反序列化,从文件读取(string转dict) data = json.load(fp) fp.close() return data def get_key_word(self, key): """" 读取关键字 """ return self.data[key]if __name__ == '__main__': oj = OperateJson() print(oj.read_json()) print(oj.get_key_word('test_data')) print(oj.get_key_word('test_data')[0]) print(oj.get_key_word('test_data')[0][0]) print(oj.get_key_word('test_data')[0][0]['title'])
读取的json文件是:
{ "test_data": [ [ { "accesstoken": "", "title": "eyuyu", "tab": "ask", "content": "AAAAAAAAA"},401,"错误的accessToken" ], [ { "accesstoken": "4a4790e1-d707-488f-9a7f-df0918fa483b", "title": "", "tab": "ask", "content": "AAAAAAAAA"},400,"标题不能为空" ], [ { "accesstoken": "4a4790e1-d707-488f-9a7f-df0918fa483b", "title": "eyuyu", "tab": "", "content": ""},400,"必须选择一个版块" ] ]}
运行结果是:

8. logging日志模块封装
business目录下新建logger.py,用于封装日志记录模块;
新建目录logs,用于存放日志信息import logging# 创建一个日志logger = logging.getLogger("cnodeapp")# 设置日志打印等级logger.setLevel(logging.DEBUG)# 设置日志格式format = logging.Formatter('[%(asctime)s] [%(levelname)s] [%(funcName)s] %(message)s')# 创建一个handler,用于创建日志文件fl = logging.FileHandler(filename='logs/cnode.log', mode='a', encoding='utf8')fl.setFormatter(format)logger.addHandler(fl)# 创建一个handler,用于输出到控制台sl = logging.StreamHandler()sl.setFormatter(format)logger.addHandler(sl)
测试用例中引用日志模块

发表评论
最新留言
路过按个爪印,很不错,赞一个!
[***.219.124.196]2025年03月25日 05时23分41秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
VTK:隐式函数之ImplicitSphere
2019-03-03
数据结构与算法学习1-----稀疏数组
2019-03-03
焦点事件
2019-03-03
web前端面试一从输入url到看到页面发生了什么
2019-03-03
C getopt.h
2019-03-03
H5页面授权获取微信授权(openId,微信nickname等信息)
2019-03-03
2018年年终总结
2019-03-03
【redis键过期删除策略】很高兴再次认识你
2019-03-03
【工具篇】EasyExcel的应用
2019-03-03
监控264后缀文件播放
2019-03-03
2019数字音乐市场年度回顾,QQ音乐全面领先
2019-03-03
花1亿扶持优质红人,如涵推动网红经济出圈之路有何深意?
2019-03-03
抢滩抖音、B站,快手港股IPO进程加速
2019-03-03
理解PendingIntent
2019-03-03
Android SurfaceFlinger4 提交Buffer
2019-03-03
深入理解 ClientLifecycleManager 机制
2019-03-03
android基础知识回顾--ContentProvider简单用法
2019-03-03
js try{}catch(){}finally{}语句
2019-03-03