python http接口测试框架示例
本篇内容为大家提供的是python http接口测试框架示例等,详细而全面,感兴趣的同学可以参考学习一下。1、日志类,用于测试时日志记录 pyapilog.py# -*-coding:utf-8 -*-# !/usr/bin/python__author__ = 'dongjie'__data__ = '2015-05-20'imp
·
本篇内容为大家提供的是python http接口测试框架示例等,详细而全面,感兴趣的同学可以参考学习一下。
1、日志类,用于测试时日志记录
pyapilog.py
# -*-coding:utf-8 -*-
# !/usr/bin/python
__author__ = 'dongjie'
__data__ = '2015-05-20'
import logging
import datetime
import os
import setting
logLevel = {
1 : logging.NOTSET,
2 : logging.DEBUG,
3 : logging.INFO,
4 : logging.WARNING,
5 : logging.ERROR,
6 : logging.CRITICAL
}
setFile = os.path.join(setting.root_dir, 'setting.ini')
loggers = {}
# 定义日志方法,从配置文件读取日志等级,且定义日志输出路径
def pyapilog(**kwargs):
global loggers
log_level = setting.logLevel
log_path = setting.logFile
if os.path.exists(log_path):
log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
else:
os.mkdir(r'%s' % log_path)
log_file = os.path.join(log_path, datetime.datetime.now().strftime('%Y-%m-%d') + '.log')
logger = logging.getLogger()
logger.setLevel(logLevel[log_level])
if not logger.handlers:
# 创建一个handler,用于写入日志文件
fh = logging.FileHandler(log_file)
fh.setLevel(logLevel[log_level])
# 再创建一个handler,用于输出到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# 定义handler的输出格式
formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# 给logger添加handler
logger.addHandler(fh)
logger.addHandler(ch)
loggers.update(dict(name=logger))
return logger
2、http测试类
httprequest.py
# -*-coding:utf-8 -*-
# !/usr/bin/python
__author__ = 'dongjie'
__data__ = '2015-05-20'
from pyapilog import pyapilog
import requests
import json
import urllib
class SendHttpRequest(object):
def __init__(self, url):
self.url = url
# post request
def post(self, value=None):
params = urllib.urlencode(value)
try:
req = requests.post(self.url + "?%s" % params)
except Exception, err:
print err
if req.status_code == 200:
pyapilog().info(u"发送post请求: %s 服务器返回: %s" % (req.url, req.status_code))
else:
pyapilog().error(u"发送post请求: %s 服务器返回: %s\n error info: %s " % (req.url, req.status_code, req.text))
return req.text
def post_json(self, value):
head = {'content-type': 'application/json'}
try:
req = requests.post(self.url, data=json.dumps(value), headers=head)
print req.url
except Exception, err:
print err
if req.status_code == 200:
pyapilog().info(u"发送post请求: %s 服务器返回: %s" % (req.url, req.status_code))
return req.text
else:
pyapilog().error(u"发送post请求: %s 服务器返回: %s\n error info: %s " % (req.url, req.status_code, req.text))
def get(self, value=None):
try:
req = requests.get(self.url, params=value)
except Exception, err:
print err
if req.status_code == 200:
pyapilog().info(u"发送get请求: %s 服务器返回: %s" % (req.url, req.status_code))
else:
pyapilog().error(u"发送get请求: %s 服务器返回: %s\n error info: %s " % (req.url, req.status_code, req.text))
return req.text
3、数据库操作类
databasedriver.py# -*-coding:utf-8 -*# !/usr/bin/python
__author__ = 'dongjie'
__data__ = '2015-05-21'
import pymssql
import MySQLdb
import setting
from pyapilog import pyapilog
class sqldriver(object):
def __init__(self, host, port, user, password, database):
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
# 执行SQLserver查询
def exec_mssql(self, sql):
try:
conn = pymssql.connect(host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
charset="utf8")
cur = conn.cursor()
if cur:
pyapilog().info(u"执行SQL语句|%s|" % sql)
cur.execute(sql)
rows = cur.fetchall()
if len(rows) == 0:
pyapilog().warning(u"没有查询到数据")
return rows
else:
pyapilog().error(u"数据库连接不成功")
conn.close()
except Exception, e:
pyapilog().error(e)
# 执行Mysql查询
def exec_mysql(self, sql):
try:
conn = MySQLdb.connect(host=self.host,
port=self.port,
user=self.user,
passwd=self.password,
db=self.database,
)
cur = conn.cursor()
if cur:
pyapilog().info(u"执行SQL语句|%s|" % sql)
resList = cur.execute(sql)
return resList
except Exception, e:
pyapilog().error(e)
# 执行sql语句返回结果
def execsql(sql):
config = setting.DATABASE
driver = config.get("ENGINE")
host = config.get("HOST")
port = config.get("PORT")
user = config.get("USER")
password = config.get("PWD")
database = config.get("DATABASE")
if driver == "MYSQL":
try:
sql_result = sqldriver(
host=host,
port=port,
user=user,
password=password,
database=database
).exec_mysql(sql)
return sql_result
except Exception, e:
pyapilog().error(e)
elif driver == "MSSQL":
try:
sql_result = sqldriver(
host=host,
port=port,
user=user,
password=password,
database=database
).exec_mssql(sql)
return sql_result
except Exception, e:
pyapilog().error(e)
else:
pyapilog().error(u"[%s]数据库配置支持MYSQL、MSSQL、ORACLE" % driver)
4、解析json字符串
dataprase.py
# -*-coding:utf-8 -*-
# !/usr/bin/python
__author__ = 'dongjie'
__data__ = '2015-05-21'
import json
import xmltodict
from pyapilog import pyapilog
# 解析json字符串
class jsonprase(object):
def __init__(self, json_value):
try:
self.json_value = json.loads(json_value)
except ValueError, e:
pyapilog().error(e)
def find_json_node_by_xpath(self, xpath):
elem = self.json_value
nodes = xpath.strip("/").split("/")
for x in range(len(nodes)):
try:
elem = elem.get(nodes[x])
except AttributeError:
elem = [y.get(nodes[x]) for y in elem]
return elem
def datalength(self, xpath="/"):
return len(self.find_json_node_by_xpath(xpath))
@property
def json_to_xml(self):
try:
root = {"root": self.json_value}
xml = xmltodict.unparse(root, pretty=True)
except ArithmeticError, e:
pyapilog().error(e)
return xml
# 解析xml字符串
class xmlprase(object):
def __init__(self, xml_value):
self.xml_str = xml_value
@property
def xml_to_json(self):
try:
xml_dic = xmltodict.parse(self.xml_str,
encoding="utf-8",
process_namespaces=True,
)
json_str = json.dumps(xml_dic)
except Exception, e:
print e
return json_str
5、还有配置文件差点忘记说了
# -*-coding:utf-8 -*-
# !/usr/bin/python
__author__ = 'dongjie'
__data__ = '2015-05-20'
'''
配置系统相关的参数,提供全局的相关配置
'''
import os
import sys
root_dir = '/'.join(os.path.realpath(__file__).split('/')[:-1])
sys.path.append(root_dir)
# log等级,1:notset 2:debug 3:info 4:warning 5:error 6:critical
logLevel = 2
# 日志文件路径
logFile = os.path.join(root_dir, 'logs')
# 数据库配置,支持MYSQL、MSSQL、ORACLE
DATABASE = {
"ENGINE": "MSSQL",
"HOST": "",
"PORT": 3433,
"USER": "",
"PWD": "",
"DATABASE": ""
}
6、最后看看我们的测试用例吧,当然是数据驱动了
# -*-coding:utf-8 -*-
from ddt import ddt, data, unpack
import unittest
from papi.httprequest import SendHttpRequest
from papi.dataparse import jsonprase, xmlprase
@ddt
class TestSingleRequest(unittest.TestCase):
def setUp(self):
self.url = "http://xxxxxxxxxxxxxxxxxxx/api/xxxxxx"
@data(
(32351, 6),
(9555, 4)
)
@unpack
def test_Single_right(self, sid, count):
value = {"sid": sid, "count": count}
data = SendHttpRequest(self.url).get(value)
json_data = jsonprase(data)
point_lat = json_data.find_json_node_by_xpath("/Point/Lat")
point_lng = json_data.find_json_node_by_xpath("/Point/Lng")
is_exists_map = json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/IsExistsMap")
size = json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/Size")
# 断言
assert float(point_lat) != 0 and float(point_lng) != 0
# 断言
assert json_data.find_json_node_by_xpath("/Ptd/AmapGuideMap155/DownUrl") is not None
if is_exists_map == True:
assert size != ""
# 导常请求SingleRequest接口
@data(
("abceeffffg", 6),
(9555, "")
)
@unpack
def test_Single_error(self, sid, count):
value = {"sid": sid, "count": count}
data = SendHttpRequest(self.url).get(value)
self.assertEqual(data, u'{"Message":"请求无效。"}')
@ddt
class TourMaps(unittest.TestCase):
def setUp(self):
self.url = "http://xxxxxx/api/TourMap"
@data(32351, 9555)
def test_requests_online_xml(self, tourId):
xml_url = self.url + "/%s" % tourId
data = SendHttpRequest(xml_url).get()
json_st = xmlprase(data).xml_to_json
json_data = jsonprase(json_st)
lng = json_data.find_json_node_by_xpath("/root/data/@lng")
lat = json_data.find_json_node_by_xpath("/root/data/@lat")
assert lng != "" and lat != ""
son_tour = json_data.find_json_node_by_xpath("/root/data/data")
assert len(son_tour) > 0
class TourData(unittest.TestCase):
def setUp(self):
self.url = "http://xxxxxx/api/xxx"
@data(
(),
(),
(),
)
@unpack
def test_tourList_Location_in_open(self):
pass
def test_tourList_Location_not_open(self):
pass
def test_tour_open_city(self):
pass
if __name__ == "__main__":
suite = unittest.TestLoader().loadTestsFromTestCase(TourMaps, TestSingleRequest)
unittest.TextTestRunner(verbosity=2).run(suite)
测试结果生成,可以查看python nose相关文档,生成hmtl
目录结构
project
|
case#测试用例
|
suite#测试目录
|
logs#测试日志
|
papi#测试类
|
result#测试结果
|
setting.py#配置文件
更多推荐
已为社区贡献4条内容
所有评论(0)