Python接口测试常用方法封装
发布日期:2021-05-06 07:21:33 浏览次数:85 分类:技术文章

本文共 19869 字,大约阅读时间需要 66 分钟。

1. Excel表格操作方法封装

python有3个模块时对Excel文件的操作,分别是openpyxl、xlrd和xlwt

xlrd

xlrd是用来从Excel中读写数据的,用xlrd进行读取比较方便,流程和平常手动操作Excel一样,打开工作簿(workbook),选择工作簿(sheets),然后操作单元格(cell)。

例子:打开当前目录下名为data.xlsx的Excel文件,选择第1张工作表,然后读取第一行的全部内容并打印出来。

import osimport xlrd# 获取当前文件的绝对路径curPath = os.path.abspath(os.path.dirname(__file__))print(curPath)# 获取项目根目录rootPath = os.path.abspath(os.path.dirname(curPath))print(rootPath)# 获取文件路径file_path = r'data\data.xlsx'file_path = os.path.join(rootPath, file_path)print(file_path)# 打开Excel文件data = xlrd.open_workbook(file_path)# 选择第1张工作表table = data.sheets()[0]# data_list用来存放数据data_list = []# 将table中第1行的数据读取并添加到data_list中data_list.extend(table.row_values(0))# 打印出第1行的全部数据for item in data_list:    print(item)

运行结果:

在这里插入图片描述

xlwt

xlwt只能对Excel进行写操作。

xlwt写库的局限性:只能写入新建的Excel。使用xlutils库的copy功能可以解决这个问题。

import osimport xlwt# 获取当前文件的绝对路径curPath = os.path.abspath(os.path.dirname(__file__))print(curPath)# 获取项目根目录rootPath = os.path.abspath(os.path.dirname(curPath))print(rootPath)# 新建一个Excel文件(只能通过新建写入)data = xlwt.Workbook()# 新建一个工作表table = data.add_sheet('name')# 写入数据到A1单元格table.write(0, 0, u'哈哈')# 注意:如果对同一个单元格重复操作,会引发overwrite Exception,想要取消该功能,需要在添加工作表时指定为可覆盖table = data.add_sheet('sheet1', cell_overwrite_ok=True)# 保存data.save(os.path.join(rootPath, r'data\test.xls'))

查看结果:

在这里插入图片描述

openpyxl

该模块支持最新版的Excel文件格式,可对Excel文件进行读写操作。

import osfrom openpyxl import load_workbook# 获取当前文件的绝对路径curPath = os.path.abspath(os.path.dirname(__file__))print(curPath)# 获取项目根目录rootPath = os.path.abspath(os.path.dirname(curPath))print(rootPath)# 获取文件路径file_path = r'data\test_openpyxl.xlsx'file_path = os.path.join(rootPath, file_path)print(file_path)if __name__ == '__main__':    # 读取Excel文件    wb = load_workbook(file_path)    # 取第1张表    sheet_names = wb.sheetnames    print("所有表名称:", sheet_names)    ws = wb[sheet_names[0]]    # 显示表名、表行数、表列数    print('工作表行数:', ws.max_row)    print('工作表列数: ', ws.max_column)    # 获取指定行的值,如第3行    row3_values = []    row3_cell_list = list(ws.rows)[2]    for cell in row3_cell_list:        row3_values.append(cell.value)    print(row3_values)    # 获取所有行的数据    data = {   }    for i in range(0, ws.max_row):        every_row_values = []        every_row_cell_list = list(ws.rows)[i]        for cell in every_row_cell_list:            every_row_values.append(cell.value)        data[i + 1] = every_row_values    print(data)    # 写入数据    # 将第1个工作表的名称改为name    ws.title = 'name'    # 向某个单元格中写入数据    ws.cell(5, 5).value = u'write data!!'    # 保存文件    wb.save(file_path)

查看结果:

在这里插入图片描述
在这里插入图片描述

封装读写

import osfrom openpyxl import load_workbook# 获取当前文件的绝对路径curPath = os.path.abspath(os.path.dirname(__file__))print(curPath)# 获取项目根目录rootPath = os.path.abspath(os.path.dirname(curPath))print(rootPath)class Operate_excel(object):    """    操作Excel类    """    # 定义构造函数,创建对象自动执行    def __init__(self, file_path=None, sheet_id=None):        """        :param file_path: 如果没传值,默认为Excel路径        :param sheet_id: 如果没传值,默认为第一个sheet        """        if file_path:            # 成员变量            self.file_path = file_path        else:            self.file_path = r'data\operate_excel.xlsx'            # 将文件目录拼接成绝对路径            self.file_path = os.path.join(rootPath, self.file_path)            print(self.file_path)        if sheet_id:            self.sheet_id = sheet_id        else:            self.sheet_id = 0        # 调用成员方法        self.sheet_table = self.get_sheet()    """成员方法"""    # 获取sheet页操作对象    def get_sheet(self):        # 读取Excel文件        data = load_workbook(self.file_path)        # 读取表        sheet_table = data.worksheets[self.sheet_id]        return sheet_table    # 获取sheet页的行数和列数,返回的是一个元组    def get_sheet_nrows_ncols(self):        return self.sheet_table.max_row, self.sheet_table.max_column    # 获取sheet页的行数    def get_sheet_nrows(self):        return self.sheet_table.max_row    # 获取sheet页的列数    def get_sheet_ncols(self):        return self.sheet_table.max_column    # 获取具体单元格的数据    def get_sheet_cell_value(self, row, col):        """        :param row: 单元格行值        :param col: 单元格列值        :return: cell_data        """        cell_data = self.sheet_table.cell(row, col).value        return cell_data    # 获取指定行的数据    def get_sheet_row_value(self, row):        row_value = []        row_cell_list = list(self.sheet_table.rows)[row - 1]        for cell in row_cell_list:            row_value.append(cell.value)        return row_value    # 获取所有行的数据    def get_sheet_every_row_value(self):        all_data = {   }        for i in range(0, self.sheet_table.max_row):            every_row_values = []            every_row_cell_list = list(self.sheet_table.rows)[i]            for cell in every_row_cell_list:                every_row_values.append(cell.value)            all_data[i + 1] = every_row_values        return all_data    # 写入数据到Excel中    def write_to_excel(self, row, col, values):        # 打开Excel文件读取数据句柄        data = load_workbook(self.file_path)        # 读取表        sheet_table = data.worksheets[self.sheet_id]        # 写入数据        sheet_table.cell(row, col).value = values        # 保存文件        data.save(self.file_path)if __name__ == '__main__':    read_xlsx = Operate_excel()    print('获取到的工作表:', read_xlsx.sheet_table)    print('获取Excel表的行数与列数,返回元组格式:', read_xlsx.get_sheet_nrows_ncols())    print('获取Excel表的行数:', read_xlsx.get_sheet_nrows())    print('获取Excel表的列数:', read_xlsx.get_sheet_ncols())    print('获取Excel表的单元格(1,1)的值:', read_xlsx.get_sheet_cell_value(1, 1))    print('获取Excel表第2行的值:', read_xlsx.get_sheet_row_value(2))    print('获取Excel表所有行的值:', read_xlsx.get_sheet_every_row_value())    print('写入Excel表的单元格(4,3)的值:', read_xlsx.write_to_excel(4, 3, 'test'))

查看结果:

在这里插入图片描述

2. 接口关键字封装

在Excel中存放接口测试用例数据,通常第一行字段的值都是固定的值,如host/path/method/data等等,对应接口的关键字。

接口测试用例主要的关键字常量有:在这里插入图片描述
在读取每条测试用例时,实际上是读取每行的数据,然后可以通过固定的关键字(列)来获取对应的值,最后传递给接口方法来执行接口测试。

获取接口关键字的封装如下:

class TestCaseKeyWord(object):    """    定义测试用例关键字类    """    CASE_ID = '1'    CASE_NAME = '2'    IS_EXECUTE = '3'    INTERFACE_URL = '4'    METHOD = '5'    HEADER = '6'    REQUEST_DATA = '7'    EXPECTED_RESULT = '8'    ACTUAL_RESULT = '9'    RESULT = '10'# 获取用例iddef get_case_id():    return TestCaseKeyWord.CASE_ID# 获取用例名称def get_case_name():    return TestCaseKeyWord.CASE_NAME# 用例是否执行def get_case_is_execute():    return TestCaseKeyWord.IS_EXECUTE# 接口URLdef get_case_interface_url():    return TestCaseKeyWord.INTERFACE_URL# 用例方法def get_case_method():    return TestCaseKeyWord.METHOD# 请求头def get_case_header():    return TestCaseKeyWord.HEADER# 请求参数def get_case_payload():    return TestCaseKeyWord.REQUEST_DATA# 预期结果def get_case_expected_result():    return TestCaseKeyWord.EXPECTED_RESULT# 实际结果def get_case_actual_result():    return TestCaseKeyWord.ACTUAL_RESULT# 用例执行结果def get_case_result():    return TestCaseKeyWord.RESULT

在Excel中添加1条测试用例,新建test_keyword.py文件,获取接口测试用例名称:

from business import testcases_keywordfrom business.excel_operating import Operate_excelget_excel = Operate_excel()# 获取用例数print("用例数是:", get_excel.get_sheet_nrows() - 1)# 返回用例名称关键字的列值case_name_col = int(testcases_keyword.get_case_name())print("用例名称关键字的列值:", case_name_col)# 返回第一条用例的名称get_case_name = get_excel.get_sheet_cell_value(2, case_name_col)print("第一条用例的名称:", get_case_name)

运行结果如下:

在这里插入图片描述

3. 获取接口数据封装

取到接口测试用例中的关键字之后,要获取关键字对应的值。

获取接口关键字对应数据的封装如下:

from business import testcases_keywordfrom business.excel_operating import Operate_excelimport jsonclass getData(object):    def __init__(self):        self.op_excel = Operate_excel()    def get_case_nums(self):        """获取测试用例条数"""        return self.op_excel.get_sheet_nrows() - 1    def get_is_header(self, row):        """是否携带请求头"""        col = int(testcases_keyword.get_case_header())        header = self.op_excel.get_sheet_cell_value(row + 1, col)        if header is not None:            return header        else:            print("没有header!")            return None    def get_is_execute(self, row):        """是否执行"""        col = int(testcases_keyword.get_case_is_execute())        is_execute = self.op_excel.get_sheet_cell_value(row + 1, col)        if is_execute == 'yes':            flag = True        else:            flag = False        return flag    def get_url(self, row):        """获取url"""        col = int(testcases_keyword.get_case_interface_url())        url = self.op_excel.get_sheet_cell_value(row + 1, col)        return url    def get_method(self, row):        """获取请求方法"""        col = int(testcases_keyword.get_case_method())        method = self.op_excel.get_sheet_cell_value(row + 1, col)        return method    def get_data(self, row):        """获取请求数据"""        col = int(testcases_keyword.get_case_payload())        data = self.op_excel.get_sheet_cell_value(row + 1, col)        return json.loads(data)    def get_expected_result(self, row):        """获取预期结果"""        col = int(testcases_keyword.get_case_expected_result())        expected_result = self.op_excel.get_sheet_cell_value(row + 1, col)        if expected_result == '':            return None        else:            return expected_result    def get_actual_result(self, row, value):        """获取实际结果并写入"""        col = int(testcases_keyword.get_case_actual_result())        actual_result = expected_result = self.op_excel.get_sheet_cell_value(row + 1, col)        self.op_excel.write_to_excel(row + 1, col, value)            if __name__ == '__main__':    	get_data = getData()    	print(get_data.get_data(1))    	print(get_data.get_url(1))    	print(get_data.get_is_execute(1))

运行结果如下:

在这里插入图片描述

4.接口请求方法封装

写好接口测试用例,获取到测试数据后,就可以模拟接口请求方法。

请求方法封装如下:

import requestsimport jsonclass ApiRequests(object):    """    请求方法封装    """    # get请求    def get_method(self, url, data=None, header=None):        if header is not None:            res = requests.get(url, params=data, headers=header)        else:            res = requests.get(url, params=data)        return res.json()    # post请求    def post_method(self, url, data=None, header=None):        if header is not None:            res = requests.post(url, json=data, headers=header)        else:            res = requests.post(url, json=data)        return res.json()    # put请求    def put_method(self, url, data=None, header=None):        if header is not None:            res = requests.put(url, json=data, headers=header)        else:            res = requests.put(url, json=data)        return res.json()    # delete请求    def delete_method(self, url, data=None, header=None):        if header is not None:            res = requests.delete(url, json=data, headers=header)        else:            res = requests.delete(url, json=data)        return res.json()    # 主方法    def run_method(self, method, url, data=None, header=None):        if method == 'get' or method == 'GET':            res = self.get_method(url, data, header)        elif method == 'post' or method == 'POST':            res = self.post_method(url, data, header)        elif method == 'put' or method == 'PUT':            res = self.put_method(url, data, header)        elif method == 'delete' or method == 'DELETE':            res = self.delete_method(url, data, header)        else:            res = '请求方法错误'        # 返回res        return json.dumps(res, ensure_ascii=False, indent=4, sort_keys=True, separators=(',', ':'))

新建test_api_requests.py文件,先获取Excel中的测试数据,再通过封装好的请求方法模拟请求接口:

from business.get_testdata_from_keyword import getData  # 通过接口关键字获取测试数据from business.api_requests import ApiRequests  # 请求方法get_data = getData()print('获取是否执行: ', get_data.get_is_execute(1))print('获取接口url:', get_data.get_url(1))print('获取接口请求方法:', get_data.get_method(1))print('获取请求数据:', get_data.get_data(1))url = get_data.get_url(1)method = get_data.get_method(1)data = get_data.get_data(1)ar = ApiRequests()result = ar.run_method(url=url, method=method, data=data)print(result)

运行结果如下:

在这里插入图片描述

5. 发送邮件封装

利用python内置对SMTP协议的支持,将测试结果以邮件的方式发送。

  • SMTP是发送邮件的协议,python内置对SMTP的支持,可以发送纯文本邮件、HTML邮件以及带附件的邮件
  • python对SMTP支持有smtplib和email两个模块,email负责构造邮件,smtplib负责发送邮件
  • smtplib在发送邮件的过程中,起到服务器直接互相通信的作用
  • email用来设置服务器之间通信的信息,包括信息头、信息主体等等

发送邮件的流程:

  • 登陆:服务器、账号、密码
  • 构造邮件:发送人、收件人、邮件主体、邮件正文内容
  • 发送:接收地址
# 导入发送邮件模块import smtplib# 导入构造邮件模块from email.mime.text import MIMETextclass SendEmail(object):    """    发送邮件模块    """    def __init__(self):        """        初始化邮件配置服务        """        self.send_user = '###'  # 发送人邮箱地址        self.mail_host = 'smtp.qq.com'        self.password = '###'  # 授权码    def send_mail(self, user_lists, subject, content):        """        执行发送邮件        :param user_lists:接收人邮箱地址        :param subject:邮件主题        :param content:邮件正文内容        :return:        """        user = '发送人名称' + '<' + self.send_user + '>'        message = MIMEText(content, _subtype='plain', _charset='utf8')        message['Subject'] = subject        message['From'] = user        message['To'] = ';'.join(user_lists)        try:            server = smtplib.SMTP()            server.connect(self.mail_host)            server.login(self.send_user, self.password)            # as_string将MIMETEXT对象转成str            server.sendmail(user, user_lists, message.as_string())            server.close()            print('邮件发送成功'.center(60, '='))        except:            print('邮件发送失败'.center(60, '='))            raise    def send_content(self, data):        """        发送邮件内容        """        pass_cases_nums = int(len(data['pass_cases']))        print('用例执行成功数:%s' % pass_cases_nums)        fail_cases_nums = int(len(data['fail_cases']))        print('用例执行失败数:%s' % fail_cases_nums)        not_execute_nums = int(len(data['not_execute_cases']))        print('用例未执行数:%s' % not_execute_nums)        execute_num = float(pass_cases_nums + fail_cases_nums)        total_cases = float(pass_cases_nums + fail_cases_nums + not_execute_nums)        pass_ratio = "%.2f%%" % (pass_cases_nums / total_cases * 100)        fail_ratio = "%.2f%%" % (fail_cases_nums / total_cases * 100)        user_lists = ['###']        subject = '【接口自动化测试用例执行统计】'        content = '一共 %f 个用例, 执行了 %f 个用例,未执行 %f 个用例;成功 %f 个,通过率为 %s;失败 %f 个,失败率为 %s' % (            total_cases, execute_num, not_execute_nums, pass_cases_nums, pass_ratio, fail_cases_nums, fail_ratio)        self.send_mail(user_lists, subject, content)if __name__ == '__main__':    sm = SendEmail()    sm.send_content({   'pass_cases': [1, 3, 5], 'fail_cases': [2, 4, 6], 'not_execute_cases': [1, 2, 3]})

6. 操作数据库封装

操作数据库的流程:

  • 连接数据库
  • 获取操作游标
  • 执行sql语句,新增/修改/删除/查询
  • 提交更新数据或回滚
  • 关闭连接
import pymysqlimport jsonclass OperateMysql(object):    def __init__(self):        # 数据库初始化连接        self.conn = pymysql.connect(            host='localhost',            port=3306,            user='root',            password='12345678',            charset='utf8mb4',            db='test',            cursorclass=pymysql.cursors.DictCursor        )        # 创建游标操作数据库        self.cursor = self.conn.cursor()    def select_first_data(self, sql):        """        查询第一条数据        """        try:            # 执行sql语句            self.cursor.execute(sql)        except Exception as e:            print('执行sql异常:%s' % e)        else:            # 获取查询到的第一条数据            first_data = self.cursor.fetchone()            # 将返回结果转换成str数据格式,禁用ascii编码            first_data = json.dumps(first_data, ensure_ascii=False)            return first_data    def select_all_data(self, sql):        """        查询结果集        """        try:            # 执行sql语句            self.cursor.execute(sql)        except Exception as e:            print('执行sql异常:%s' % e)        else:            all_data = self.cursor.fetchall()            # 将返回结果转换成str数据格式,禁用ascii编码            # all_data = json.dumps(all_data, ensure_ascii=False)            return all_data    def del_data(self, sql):        """        删除数据        """        res = {   }        try:            # 执行sql语句            result = self.cursor.execute(sql)            if result != 0:                # 提交修改                self.conn.commit()                res = {   '删除成功'}            else:                res = {   '没有要删除的数据'}        except Exception as e:            # 发生错误时回滚            self.conn.rollback()            res = {   '删除失败', e}        return res    def update_data(self, sql):        """        修改数据        """        res = {   }        try:            # 执行sql语句            self.cursor.execute(sql)            self.conn.commit()            res = {   '更新成功'}        except Exception as e:            # 发生错误时回滚            self.conn.rollback()            res = {   '更新失败', e}        return res    def insert_data(self, sql, data):        """        新增数据        """        res = {   }        try:            # 执行sql语句            self.cursor.execute(sql, data)            self.conn.commit()            res = {   data, '新增成功'}        except Exception as e:            self.conn.rollback()            res = {   '新增失败', e}        return res    def conn_close(self):        """        关闭数据库        """        self.cursor.close()        self.conn.close()if __name__ == '__main__':    # 类的实例化    om = OperateMysql()    # 新增    data = [{   'id': 1, 'name': '张三', 'age': 10}, {   'id': 2, 'name': '李四', 'age': 20}, {   'id': 3, 'name': '王五', 'age': 30}]    for i in data:        i_data = (i['id'], i['name'], i['age'])        insert_res = om.insert_data(            "INSERT INTO students (id,name,age) VALUES (%s,%s,%s)",            i_data        )        print(insert_res)    # 查询    one_data = om.select_first_data(        "SELECT * FROM students"    )    print(one_data)    all_data = om.select_all_data(        "SELECT * FROM students"    )    print('查询的总数据:', len(all_data), '分别是:', all_data)    # 修改    update_data = om.update_data(        "UPDATE students SET name='wangwu' WHERE id=3"    )    print(update_data)    # 删除    delete_data = om.del_data(        "DELETE FROM students WHERE id in(1,2,3)"    )    print(delete_data)    # 关闭数据库    om.conn_close()

7. Json数据文件封装

可以用json文件来存储测试数据,python读取json文件的流程是:

  • 获取文件名和绝对路径
  • 打开读取数据
  • 提取关键数据
  • 关闭文件

具体代码实现:

import osimport json# 获取当前文件的绝对路径curPath = os.path.abspath(os.path.dirname(__file__))print(curPath)# 获取项目根目录rootPath = os.path.abspath(os.path.dirname(curPath))print(rootPath)config_file_name = r'data\topics.json'class OperateJson(object):    """    操作Json文件    """    def __init__(self, file_name=None):        if file_name:            self.file_name = file_name        else:            self.get_file = config_file_name            self.file_name = os.path.join(rootPath, self.get_file)            print('文件名称:%s' % self.file_name)        self.data = self.read_json()    def read_json(self):        """        读取json数据        """        with open(self.file_name, encoding='utf8') as fp:            # 反序列化,从文件读取(string转dict)            data = json.load(fp)            fp.close()        return data    def get_key_word(self, key):        """"        读取关键字        """        return self.data[key]if __name__ == '__main__':    oj = OperateJson()    print(oj.read_json())    print(oj.get_key_word('test_data'))    print(oj.get_key_word('test_data')[0])    print(oj.get_key_word('test_data')[0][0])    print(oj.get_key_word('test_data')[0][0]['title'])

读取的json文件是:

{     "test_data":  [      [        {   "accesstoken": "", "title": "eyuyu", "tab": "ask", "content": "AAAAAAAAA"},401,"错误的accessToken"      ],      [        {   "accesstoken": "4a4790e1-d707-488f-9a7f-df0918fa483b", "title": "", "tab": "ask", "content": "AAAAAAAAA"},400,"标题不能为空"      ],      [        {   "accesstoken": "4a4790e1-d707-488f-9a7f-df0918fa483b", "title": "eyuyu", "tab": "", "content": ""},400,"必须选择一个版块"      ]  ]}

运行结果是:

在这里插入图片描述

8. logging日志模块封装

business目录下新建logger.py,用于封装日志记录模块;

新建目录logs,用于存放日志信息

import logging# 创建一个日志logger = logging.getLogger("cnodeapp")# 设置日志打印等级logger.setLevel(logging.DEBUG)# 设置日志格式format = logging.Formatter('[%(asctime)s] [%(levelname)s] [%(funcName)s] %(message)s')# 创建一个handler,用于创建日志文件fl = logging.FileHandler(filename='logs/cnode.log', mode='a', encoding='utf8')fl.setFormatter(format)logger.addHandler(fl)# 创建一个handler,用于输出到控制台sl = logging.StreamHandler()sl.setFormatter(format)logger.addHandler(sl)

测试用例中引用日志模块

在这里插入图片描述

上一篇:MySQL数据库
下一篇:Jenkins+requests+pytest+allure持续集成

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2025年03月25日 05时23分41秒