企业微信消息推送和钉钉消息推送python代码封装
前言:目前很多公司用的是企业微信或者钉钉,对于服务的可用性都会有一个人告警通知,方面我们及时了解消息,这里我做了一个简单的封装,方便大家使用!#!/usr/bin/env python# _*_ coding: utf-8 _*_# @project : dadi-api-platform# @File: send_notify.py# @Date: 2021/2/23 11:28 上午# @Au
·
前言:目前很多公司用的是企业微信或者钉钉,对于服务的可用性都会有一个告警通知,方面我们及时了解信息,这里我做了一个简单的封装,方便大家使用!
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# @project : dadi-api-platform
# @File : send_notify.py
# @Date : 2021/2/23 11:28 上午
# @Author : 李文良
import requests
import time
import hmac
import hashlib
import base64
import urllib.parse
def send_wxwork_notify_markdown(content, api_key, headers=None):
if headers is None:
headers = {'Content-Type': 'application/json'}
hook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={}".format(api_key)
data = {"msgtype": "markdown",
"markdown": {
"content": "{}".format(content)
}}
notify_res = requests.post(url=hook_url, json=data, headers=headers)
return notify_res
def send_wxwork_notify_text(content, mentioned_mobile_list, api_key, headers=None):
if headers is None:
headers = {'Content-Type': 'application/json'}
if not mentioned_mobile_list or not isinstance(mentioned_mobile_list, list):
raise TypeError("输入的手机号应该是一个列表!")
hook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={}".format(api_key)
data = {"msgtype": "text",
"text": {
"content": "{}".format(content),
"mentioned_mobile_list": mentioned_mobile_list
}}
notify_res = requests.post(url=hook_url, json=data, headers=headers)
return notify_res
def send_ding_talk_notify_markdown(content_title, content_text, access_token, at_mobiles=[], secret=None, headers=None):
if headers is None:
headers = {'Content-Type': 'application/json'}
hook_url = "https://oapi.dingtalk.com/robot/send?access_token={}".format(access_token)
if secret:
timestamp, sign = generate_timestamp_and_sign(secret)
hook_url = "https://oapi.dingtalk.com/robot/send?access_token={}×tamp={}&sign={}".format(access_token,
timestamp, sign)
data = {
"msgtype": "markdown",
}
if at_mobiles and "@all" in at_mobiles:
data['markdown'] = {
"title": "{} @all".format(content_title),
"text": "{} @all".format(content_text)
}
data['at'] = {
"atMobiles": at_mobiles,
"isAtAll": True
}
elif at_mobiles and "@all" not in at_mobiles:
if len(at_mobiles) > 1:
at_mobiles_str = "@".join(at_mobiles)
at_mobiles_str = "@{}".format(at_mobiles_str)
else:
at_mobiles_str = "@{}".format(at_mobiles[0])
data['markdown'] = {
"title": "{} {}".format(content_title, at_mobiles_str),
"text": "{} {}".format(content_text, at_mobiles_str)
}
data['at'] = {
"atMobiles": at_mobiles,
"isAtAll": False
}
notify_res = requests.post(url=hook_url, json=data, headers=headers)
return notify_res
def generate_timestamp_and_sign(secret):
timestamp = str(round(time.time() * 1000))
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return timestamp, sign
if __name__ == "__main__":
now_time = time.strftime("%Y-%m-%d %H:%M:%S")
H5_URL = [
'https://staging.airdoc.com/user/report/xYwZ08fWMzCixeOG3%2BMNRlzgaetiA30LtGBqj%2BjK8qD0OUiVdfBLKilRThZnP8gd',
'https://staging.airdoc.com/user/report/%2F52BwBuKeO3SAWrk1459MYSQQ4t5FRUMSEI9vpfoQAaESxwr9I%2Fnp199qfY5LHKe',
'https://staging.airdoc.com/user/report/SJEEz9rBe33UAl7iTo3DZA6Y2wvrYR2PBSk1BTPAjKVrmZQjgmWty5W7ZMuxpyzN']
PDF_URL = [
'https://staging.airdoc.com/user/report/xYwZ08fWMzCixeOG3%2BMNRlzgaetiA30LtGBqj%2BjK8qD0OUiVdfBLKilRThZnP8gd',
'https://staging.airdoc.com/user/report/%2F52BwBuKeO3SAWrk1459MYSQQ4t5FRUMSEI9vpfoQAaESxwr9I%2Fnp199qfY5LHKe',
'https://staging.airdoc.com/user/report/SJEEz9rBe33UAl7iTo3DZA6Y2wvrYR2PBSk1BTPAjKVrmZQjgmWty5W7ZMuxpyzN']
notify_title = '接口测试通知'
content = "**{}**\n\n" \
"测试用例1信息如下:\n\nH5_URL: **{}**\n\nPDF_URL: **{}**\n\n" \
"测试用例2信息如下:\n\nH5_URL: **{}**\n\nPDF_URL: **{}** \n\n" \
"测试用例3信息如下:\n\nH5_URL: **{}**\n\nPDF_URL: **{}** \n\n" \
"报告生成时间: **{}** \n\n".format(notify_title, H5_URL[0], PDF_URL[0], H5_URL[1], PDF_URL[1], H5_URL[2],
PDF_URL[2], now_time)
notify_res = send_ding_talk_notify_markdown(notify_title, content,
"246ca8e0c8fd55d8949dd484eb4aae1d6675565e45e321f51744ce2973fe7d5c8f6",
[1577925XXXX],
"SEC00db279767bb3929bfa7dd86c139370ce65651a1cc189303c966524b553fcef1cc51")
print(notify_res.text)
pass
更多推荐
已为社区贡献3条内容
所有评论(0)