【Python+requests+unittest+excel】实现接口自动化测试框架

时间:2024-04-18 16:22:02

一、框架结构:

 工程目录

二、Case文件设计

三、基础包 base3.1 封装get/post请求(runmethon.py)

 
 1 import requests
 2 import json
 3 class RunMethod:
 4     def post_main(self,url,data,header=None):
 5         res = None
 6         if header !=None:    
 7             res = requests.post(url=url,data=data,headers=header)
 8         else:
 9             res = requests.post(url=url,data=data)
10         return res.json()
11 
12     def get_main(self,url,data=None,header=None):
13         res = None
14         if header !=None:    
15             res = requests.get(url=url,data=data,headers=header,verify=False)
16         else:
17             res = requests.get(url=url,data=data,verify=False)
18         return res.json()
19 
20     def run_main(self,method,url,data=None,header=None):
21         res = None
22         if method == 'Post':
23             res = self.post_main(url,data,header)
24         else:
25             res = self.get_main(url,data,header)
26         return json.dumps(res,ensure_ascii=False,sort_keys=True,indent=2) 

3.2 封装mock(mock.py)

1 from mock import mock
2 #模拟mock 封装
3 def mock_test(mock_method,request_data,url,method,response_data):
4     mock_method = mock.Mock(return_value=response_data)
5     res = mock_method(url,method,request_data)
6     return res

四、数据操作包 operation_data4.1 获取excel单元格中的内容(get_data.py)  

 
  1 #coding:utf-8
  2 from tool.operation_excel import OperationExcel
  3 import data_config
  4 from tool.operation_json import OperetionJson
  5 from tool.connect_db import OperationMysql
  6 class GetData:
  7     def __init__(self):
  8         self.opera_excel = OperationExcel()
  9 
 10     #去获取excel行数,就是case的个数    
 11     def get_case_lines(self):
 12         return self.opera_excel.get_lines()
 13 
 14     #获取是否执行
 15     def get_is_run(self,row):
 16         flag = None
 17         col = int(data_config.get_run())
 18         run_model = self.opera_excel.get_cell_value(row,col)
 19         if run_model == 'yes':
 20             flag = True
 21         else:
 22             flag = False
 23         return flag
 24 
 25     #是否携带header
 26     def is_header(self,row):
 27         col = int(data_config.get_header())
 28         header = self.opera_excel.get_cell_value(row,col)
 29         if header != '':
 30             return header
 31         else:
 32             return None
 33 
 34     #获取请求方式
 35     def get_request_method(self,row):
 36         col = int(data_config.get_run_way())
 37         request_method = self.opera_excel.get_cell_value(row,col)
 38         return request_method
 39 
 40     #获取url
 41     def get_request_url(self,row):
 42         col = int(data_config.get_url())
 43         url = self.opera_excel.get_cell_value(row,col)
 44         return url
 45 
 46     #获取请求数据
 47     def get_request_data(self,row):
 48         col = int(data_config.get_data())
 49         data = self.opera_excel.get_cell_value(row,col)
 50         if data == '':
 51             return None
 52         return data
 53 
 54     #通过获取关键字拿到data数据
 55     def get_data_for_json(self,row):
 56         opera_json = OperetionJson()
 57         request_data = opera_json.get_data(self.get_request_data(row))
 58         return request_data
 59 
 60     #获取预期结果
 61     def get_expcet_data(self,row):
 62         col = int(data_config.get_expect())
 63         expect = self.opera_excel.get_cell_value(row,col)
 64         if expect == '':
 65             return None
 66         return expect
 67 
 68     #通过sql获取预期结果
 69     def get_expcet_data_for_mysql(self,row):
 70         op_mysql = OperationMysql()
 71         sql = self.get_expcet_data(row)
 72         res = op_mysql.search_one(sql)
 73         return res.decode('unicode-escape')
 74 
 75     def write_result(self,row,value):
 76         col = int(data_config.get_result())
 77         self.opera_excel.write_value(row,col,value)
 78 
 79     #获取依赖数据的key
 80     def get_depend_key(self,row):
 81         col = int(data_config.get_data_depend())
 82         depent_key = self.opera_excel.get_cell_value(row,col)
 83         if depent_key == "":
 84             return None
 85         else:
 86             return depent_key
 87 
 88     #判断是否有case依赖
 89     def is_depend(self,row):
 90         col = int(data_config.get_case_depend())
 91         depend_case_id = self.opera_excel.get_cell_value(row,col)
 92         if depend_case_id == "":
 93             return None
 94         else:
 95             return depend_case_id
 96 
 97     #获取数据依赖字段
 98     def get_depend_field(self,row):
 99         col = int(data_config.get_field_depend())
100         data = self.opera_excel.get_cell_value(row,col)
101         if data == "":
102             return None
103         else:
104             return data

4.2 获取excel中每个列(data_config.py)

 
 1 #coding:utf-8
 2 class global_var:
 3     #case_id
 4     Id = '0'
 5     request_name = '1'
 6     url = '2'
 7     run = '3'
 8     request_way = '4'
 9     header = '5'
10     case_depend = '6'
11     data_depend = '7'
12     field_depend = '8'
13     data = '9'
14     expect = '10'
15     result = '11'
16 #获取caseid
17 def get_id():
18     return global_var.Id
19 
20 #获取url
21 def get_url():
22     return global_var.url
23 
24 def get_run():
25     return global_var.run
26 
27 def get_run_way():
28     return global_var.request_way
29 
30 def get_header():
31     return global_var.header
32 
33 def get_case_depend():
34     return global_var.case_depend
35 
36 def get_data_depend():
37     return global_var.data_depend
38 
39 def get_field_depend():
40     return global_var.field_depend
41 
42 def get_data():
43     return global_var.data
44 
45 def get_expect():
46     return global_var.expect
47 
48 def get_result():
49     return global_var.result
50 
51 def get_header_value():
52     return global_var.header

4.3 解决数据依赖(dependent.py )

 
 1 #coding:utf-8
 2 import sys
 3 import json
 4 sys.path.append('C:/Users/lxz/Desktop/InterFace_JIA')
 5 from tool.operation_excel import OperationExcel
 6 from base.runmethod import RunMethod
 7 from operation_data.get_data import GetData
 8 from jsonpath_rw import jsonpath,parse
 9 class DependdentData:
10     def __init__(self,case_id):
11         self.case_id = case_id
12         self.opera_excel = OperationExcel()
13         self.data = GetData()
14 
15     #通过case_id去获取该case_id的整行数据
16     def get_case_line_data(self):
17         rows_data = self.opera_excel.get_rows_data(self.case_id)
18         return rows_data
19 
20     #执行依赖测试,获取结果
21     def run_dependent(self):
22         run_method = RunMethod()
23         row_num  = self.opera_excel.get_row_num(self.case_id)
24         request_data = self.data.get_data_for_json(row_num)
25         #header = self.data.is_header(row_num)
26         method = self.data.get_request_method(row_num)
27         url = self.data.get_request_url(row_num)
28         res = run_method.run_main(method,url,request_data)
29         return json.loads(res)
30 
31     #根据依赖的key去获取执行依赖测试case的响应,然后返回
32     def get_data_for_key(self,row):
33         depend_data = self.data.get_depend_key(row)
34         response_data = self.run_dependent()
35         json_exe = parse(depend_data)
36         madle = json_exe.find(response_data)
37         return [math.value for math in madle][0]
38 
39 if __name__ == '__main__':
40     order = {
41         "data": {
42             "_input_charset": "utf-8", 
43             "body": "京东订单-1710141907182334", 
44             "it_b_pay": "1d", 
45             "notify_url": "http://order.imooc.com/pay/notifyalipay", 
46             "out_trade_no": "1710141907182334", 
47             "partner": "2088002966755334", 
48             "payment_type": "1", 
49             "seller_id": "yangyan01@tcl.com", 
50             "service": "mobile.securitypay.pay", 
51             "sign": "kZBV53KuiUf5HIrVLBCcBpWDg%2FnzO%2BtyEnBqgVYwwBtDU66Xk8VQUTbVOqDjrNymCupkVhlI%2BkFZq1jOr8C554KsZ7Gk7orC9dDbQl
                       pr%2BaMmdjO30JBgjqjj4mmM%2Flphy9Xwr0Xrv46uSkDKdlQqLDdGAOP7YwOM2dSLyUQX%2Bo4%3D", 
52             "sign_type": "RSA", 
53             "string": "_input_charset=utf-8&body=京东订单-1710141907182334&it_b_pay=1d&notify_url=http://order.imooc.com/pay/
                         notifyalipay&out_trade_no=1710141907182334&partner=2088002966755334&payment_type=1&seller_id=yangyan01@
                         tcl.com&service=mobile.securitypay.pay&subject=京东订单-1710141907182334&total_fee=299&sign=kZBV53KuiUf5H
                         IrVLBCcBpWDg%2FnzO%2BtyEnBqgVYwwBtDU66Xk8VQUTbVOqDjrNymCupkVhlI%2BkFZq1jOr8C554KsZ7Gk7orC9dDbQlpr%2BaMmdjO30
                         JBgjqjj4mmM%2Flphy9Xwr0Xrv46uSkDKdlQqLDdGAOP7YwOM2dSLyUQX%2Bo4%3D&sign_type=RSA", 
54             "subject": "京东订单-1710141907182334", 
55             "total_fee": 299
56             }, 
57             "errorCode": 1000, 
58             "errorDesc": "成功", 
59             "status": 1, 
60             "timestamp": 1507979239100
61         }
62     res = "data.out_trade_no"
63     json_exe = parse(res)
64     madle = json_exe.find(order)
65     print [math.value for math in madle][0]

五、工具类包 tool5.1 操作excel (operation_excel.py)

 
 1 #coding:utf-8
 2 import xlrd
 3 from xlutils.copy import copy
 4 class OperationExcel:
 5     def __init__(self,file_name=None,sheet_id=None):
 6         if file_name:
 7             self.file_name = file_name
 8             self.sheet_id = sheet_id    
 9         else:
10             self.file_name = '../dataconfig/case1.xls'
11             self.sheet_id = 0
12         self.data = self.get_data()
13 
14     #获取sheets的内容
15     def get_data(self):
16         data = xlrd.open_workbook(self.file_name)
17         tables = data.sheets()[self.sheet_id]
18         return tables
19 
20     #获取单元格的行数
21     def get_lines(self):
22         tables = self.data
23         return tables.nrows
24 
25     #获取某一个单元格的内容
26     def get_cell_value(self,row,col):
27         return self.data.cell_value(row,col)
28 
29     #写入数据
30     def write_value(self,row,col,value):
31         '''
32         写入excel数据
33         row,col,value
34         '''
35         read_data = xlrd.open_workbook(self.file_name)
36         write_data = copy(read_data)
37         sheet_data = write_data.get_sheet(0)
38         sheet_data.write(row,col,value)
39         write_data.save(self.file_name)
40 
41     #根据对应的caseid 找到对应行的内容
42     def get_rows_data(self,case_id):
43         row_num = self.get_row_num(case_id)
44         rows_data = self.get_row_values(row_num)
45         return rows_data
46 
47     #根据对应的caseid找到对应的行号
48     def get_row_num(self,case_id):
49         num = 0
50         clols_data = self.get_cols_data()
51         for col_data in clols_data:
52             if case_id in col_data:
53                 return num
54             num = num+1
55 
56 
57     #根据行号,找到该行的内容
58     def get_row_values(self,row):
59         tables = self.data
60         row_data = tables.row_values(row)
61         return row_data
62 
63     #获取某一列的内容
64     def get_cols_data(self,col_id=None):
65         if col_id != None:
66             cols = self.data.col_values(col_id)
67         else:
68             cols = self.data.col_values(0)
69         return cols
70 
71 
72 if __name__ == '__main__':
73     opers = OperationExcel()
74     print opers.get_cell_value(1,2)

5.2判断字符串包含,判断字典是否相等(common_util.py)

 
 1 #coding:utf-8
 2 import json
 3 class CommonUtil:
 4     def is_contain(self,str_one,str_two):
 5         '''
 6         判断一个字符串是否再另外一个字符串中
 7         str_one:查找的字符串
 8         str_two:被查找的字符串
 9         '''
10         flag = None
11         if isinstance(str_one,unicode):
12             str_one = str_one.encode('unicode-escape').decode('string_escape')
13         return cmp(str_one,str_two)
14         if str_one in str_two:
15             flag = True
16         else:
17             flag = False
18         return flag
19 
20 
21     def is_equal_dict(self,dict_one,dict_two):
22         '''
23         判断两个字典是否相等
24         '''
25         if isinstance(dict_one,str):
26             dict_one = json.loads(dict_one)
27         if isinstance(dict_two,str):
28             dict_two = json.loads(dict_two)
29         return cmp(dict_one,dict_two)

5.3 操作header(operation_herder.py)

 
 1 #coding:utf-8
 2 import requests
 3 import json
 4 from operation_json import OperetionJson
 5 
 6 
 7 class OperationHeader:
 8 
 9     def __init__(self,response):
10         self.response = json.loads(response)
11 
12     def get_response_url(self):
13         '''
14         获取登录返回的token的url
15         '''
16         url = self.response['data']['url'][0]
17         return url
18 
19     def get_cookie(self):
20         '''
21         获取cookie的jar文件
22         '''
23         url = self.get_response_url()+"&callback=jQuery21008240514814031887_1508666806688&_=1508666806689"
24         cookie = requests.get(url).cookies
25         return cookie
26 
27     def write_cookie(self):
28         cookie = requests.utils.dict_from_cookiejar(self.get_cookie())
29         op_json = OperetionJson()
30         op_json.write_data(cookie)
31         
32 if __name__ == '__main__':
33     
34     url = "http://www.jd.com/passport/user/login"
35     data = {
36         "username":"18513199586",
37         "password":"111111",
38         "verify":"",
39         "referer":"https://www.jd.com"
40     }
41     res = json.dumps(requests.post(url,data).json())
42     op_header = OperationHeader(res)
43     op_header.write_cookie()

5.4 操作json文件(operation_json.py)

 
 1 #coding:utf-8
 2 import json
 3 class OperetionJson:
 4 
 5     def __init__(self,file_path=None):
 6         if file_path  == None:
 7             self.file_path = '../dataconfig/user.json'
 8         else:
 9             self.file_path = file_path
10         self.data = self.read_data()
11 
12     #读取json文件
13     def read_data(self):
14         with open(self.file_path) as fp:
15             data = json.load(fp)
16             return data
17 
18     #根据关键字获取数据
19     def get_data(self,id):
20         print type(self.data)
21         return self.data[id]
22 
23     #写json
24     def write_data(self,data):
25         with open('../dataconfig/cookie.json','w') as fp:
26             fp.write(json.dumps(data))
27 
28 
29 
30 if __name__ == '__main__':
31     opjson = OperetionJson()
32     print opjson.get_data('shop')

5.5 操作数据库(connect_db.py)

 
 1 #coding:utf-8
 2 import MySQLdb.cursors
 3 import json
 4 class OperationMysql:
 5     def __init__(self):
 6         self.conn = MySQLdb.connect(
 7             host='localhost',
 8             port=3306,
 9             user='root',
10             passwd='123456',
11             db='le_study',
12             charset='utf8',
13             cursorclass=MySQLdb.cursors.DictCursor
14             )
15         self.cur = self.conn.cursor()
16 
17     #查询一条数据
18     def search_one(self,sql):
19         self.cur.execute(sql)
20         result = self.cur.fetchone()
21         result = json.dumps(result)
22         return result
23 
24 if __name__ == '__main__':
25     op_mysql = OperationMysql()
26     res = op_mysql.search_one("select * from web_user where Name='ailiailan'")
27     print res

5.6 发送报告邮件(send_email.py)

 
 1 #coding:utf-8
 2 import smtplib
 3 from email.mime.text import MIMEText
 4 class SendEmail:
 5     global send_user
 6     global email_host
 7     global password
 8     email_host = "smtp.163.com"
 9     send_user = "jiaxiaonan666@163.com"
10     password = "jia_668"
11     def send_mail(self,user_list,sub,content):
12         user = "jiaxiaonan"+"<"+send_user+">"
13         message = MIMEText(content,_subtype='plain',_charset='utf-8')
14         message['Subject'] = sub
15         message['From'] = user
16         message['To'] = ";".join(user_list)
17         server = smtplib.SMTP()
18         server.connect(email_host)
19         server.login(send_user,password)
20         server.sendmail(user,user_list,message.as_string())
21         server.close()
22 
23     def send_main(self,pass_list,fail_list):
24         pass_num = float(len(pass_list))
25         fail_num = float(len(fail_list))
26         count_num = pass_num+fail_num
27         #90%
28         pass_result = "%.2f%%" %(pass_num/count_num*100)
29         fail_result = "%.2f%%" %(fail_num/count_num*100)
30 
31 
32         user_list = ['609037724@qq.com']
33         sub = "接口自动化测试报告"
34         content = "此次一共运行接口个数为%s个,通过个数为%s个,失败个数为%s,通过率为%s,失败率为%s" %(count_num,pass_num,fail_num,pass_result,fail_result )
35         self.send_mail(user_list,sub,content)
36 
37 if __name__ == '__main__':
38     sen = SendEmail()
39     sen.send_main([1,2,3,4],[2,3,4,5,6,7])

六、主函数run_test.py

 
 1 #coding:utf-8
 2 import sys
 3 sys.path.append("C:/Users/lxz/Desktop/InterFace_JIA")
 4 from base.runmethod import RunMethod
 5 from operation_data.get_data import GetData
 6 from tool.common_util import CommonUtil
 7 from operation_data.dependent_data import DependdentData
 8 from tool.send_email import SendEmail
 9 from tool.operation_header import OperationHeader
10 from tool.operation_json import OperetionJson
11 class RunTest:
12     def __init__(self):
13         self.run_method = RunMethod()
14         self.data = GetData()
15         self.com_util = CommonUtil()
16         self.send_mai = SendEmail()
17 
18     #程序执行的
19     def go_on_run(self):
20         res = None
21         pass_count = []
22         fail_count = []
23         #10  0,1,2,3
24         rows_count = self.data.get_case_lines()
25         for i in range(1,rows_count):
26             is_run = self.data.get_is_run(i)
27             if is_run:
28                 url = self.data.get_request_url(i)
29                 method = self.data.get_request_method(i)
30                 request_data = self.data.get_data_for_json(i)
31                 expect = self.data.get_expcet_data_for_mysql(i)
32                 header = self.data.is_header(i)
33                 depend_case = self.data.is_depend(i)
34                 if depend_case != None:
35                     self.depend_data = DependdentData(depend_case)
36                     #获取的依赖响应数据
37                     depend_response_data = self.depend_data.get_data_for_key(i)
38                     #获取依赖的key
39                     depend_key = self.data.get_depend_field(i)
40                     request_data[depend_key] = depend_response_data
41                 if header == 'write':
42                     res = self.run_method.run_main(method,url,request_data)
43                     op_header = OperationHeader(res)
44                     op_header.write_cookie()
45 
46                 elif header == 'yes':
47                     op_json = OperetionJson('../dataconfig/cookie.json')
48                     cookie = op_json.get_data('apsid')
49                     cookies = {
50                         'apsid':cookie
51                     }
52                     res = self.run_method.run_main(method,url,request_data,cookies)
53                 else:
54                     res = self.run_method.run_main(method,url,request_data)
55 
56                 if self.com_util.is_equal_dict(expect,res) == 0:
57                     self.data.write_result(i,'pass')
58                     pass_count.append(i)
59                 else:
60                     self.data.write_result(i,res)
61                     fail_count.append(i)
62         self.send_mai.send_main(pass_count,fail_count)
63 
64     #将执行判断封装
65     #def get_cookie_run(self,header):
66 
67 
68 if __name__ == '__main__':
69     run = RunTest()
70     run.go_on_run()

 行动吧,在路上总比一直观望的要好,未来的你肯定会感 谢现在拼搏的自己!如果想学习提升找不到资料,没人答疑解惑时,请及时加入扣群: 320231853,里面有各种软件测试+开发资料和技术可以一起交流学习哦。

最后感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!