前言:目前很多公司用的是企业微信或者钉钉,对于服务的可用性都会有一个告警通知,方面我们及时了解信息,这里我做了一个简单的封装,方便大家使用!

#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# @project : dadi-api-platform
# @File    : send_notify.py
# @Date    : 2021/2/23 11:28 上午
# @Author  : 李文良
import requests
import time
import hmac
import hashlib
import base64
import urllib.parse


def send_wxwork_notify_markdown(content, api_key, headers=None):
    if headers is None:
        headers = {'Content-Type': 'application/json'}
    hook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={}".format(api_key)
    data = {"msgtype": "markdown",
            "markdown": {
                "content": "{}".format(content)
            }}
    notify_res = requests.post(url=hook_url, json=data, headers=headers)
    return notify_res


def send_wxwork_notify_text(content, mentioned_mobile_list, api_key, headers=None):
    if headers is None:
        headers = {'Content-Type': 'application/json'}
    if not mentioned_mobile_list or not isinstance(mentioned_mobile_list, list):
        raise TypeError("输入的手机号应该是一个列表!")
    hook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={}".format(api_key)
    data = {"msgtype": "text",
            "text": {
                "content": "{}".format(content),
                "mentioned_mobile_list": mentioned_mobile_list
            }}
    notify_res = requests.post(url=hook_url, json=data, headers=headers)
    return notify_res


def send_ding_talk_notify_markdown(content_title, content_text, access_token, at_mobiles=[], secret=None, headers=None):
    if headers is None:
        headers = {'Content-Type': 'application/json'}
    hook_url = "https://oapi.dingtalk.com/robot/send?access_token={}".format(access_token)
    if secret:
        timestamp, sign = generate_timestamp_and_sign(secret)
        hook_url = "https://oapi.dingtalk.com/robot/send?access_token={}&timestamp={}&sign={}".format(access_token,
                                                                                                      timestamp, sign)
    data = {
        "msgtype": "markdown",
    }
    if at_mobiles and "@all" in at_mobiles:
        data['markdown'] = {
            "title": "{} @all".format(content_title),
            "text": "{} @all".format(content_text)
        }
        data['at'] = {
            "atMobiles": at_mobiles,
            "isAtAll": True
        }
    elif at_mobiles and "@all" not in at_mobiles:
        if len(at_mobiles) > 1:
            at_mobiles_str = "@".join(at_mobiles)
            at_mobiles_str = "@{}".format(at_mobiles_str)
        else:
            at_mobiles_str = "@{}".format(at_mobiles[0])
        data['markdown'] = {
            "title": "{} {}".format(content_title, at_mobiles_str),
            "text": "{} {}".format(content_text, at_mobiles_str)
        }
        data['at'] = {
            "atMobiles": at_mobiles,
            "isAtAll": False
        }
    notify_res = requests.post(url=hook_url, json=data, headers=headers)
    return notify_res


def generate_timestamp_and_sign(secret):
    timestamp = str(round(time.time() * 1000))
    secret_enc = secret.encode('utf-8')
    string_to_sign = '{}\n{}'.format(timestamp, secret)
    string_to_sign_enc = string_to_sign.encode('utf-8')
    hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
    sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
    return timestamp, sign


if __name__ == "__main__":
    now_time = time.strftime("%Y-%m-%d %H:%M:%S")
    H5_URL = [
        'https://staging.airdoc.com/user/report/xYwZ08fWMzCixeOG3%2BMNRlzgaetiA30LtGBqj%2BjK8qD0OUiVdfBLKilRThZnP8gd',
        'https://staging.airdoc.com/user/report/%2F52BwBuKeO3SAWrk1459MYSQQ4t5FRUMSEI9vpfoQAaESxwr9I%2Fnp199qfY5LHKe',
        'https://staging.airdoc.com/user/report/SJEEz9rBe33UAl7iTo3DZA6Y2wvrYR2PBSk1BTPAjKVrmZQjgmWty5W7ZMuxpyzN']
    PDF_URL = [
        'https://staging.airdoc.com/user/report/xYwZ08fWMzCixeOG3%2BMNRlzgaetiA30LtGBqj%2BjK8qD0OUiVdfBLKilRThZnP8gd',
        'https://staging.airdoc.com/user/report/%2F52BwBuKeO3SAWrk1459MYSQQ4t5FRUMSEI9vpfoQAaESxwr9I%2Fnp199qfY5LHKe',
        'https://staging.airdoc.com/user/report/SJEEz9rBe33UAl7iTo3DZA6Y2wvrYR2PBSk1BTPAjKVrmZQjgmWty5W7ZMuxpyzN']

    notify_title = '接口测试通知'
    content = "**{}**\n\n" \
              "测试用例1信息如下:\n\nH5_URL: **{}**\n\nPDF_URL: **{}**\n\n" \
              "测试用例2信息如下:\n\nH5_URL: **{}**\n\nPDF_URL: **{}** \n\n" \
              "测试用例3信息如下:\n\nH5_URL: **{}**\n\nPDF_URL: **{}** \n\n" \
              "报告生成时间: **{}** \n\n".format(notify_title, H5_URL[0], PDF_URL[0], H5_URL[1], PDF_URL[1], H5_URL[2],
                                           PDF_URL[2], now_time)
    notify_res = send_ding_talk_notify_markdown(notify_title, content,
                                                "246ca8e0c8fd55d8949dd484eb4aae1d6675565e45e321f51744ce2973fe7d5c8f6",
                                                [1577925XXXX],
                                                "SEC00db279767bb3929bfa7dd86c139370ce65651a1cc189303c966524b553fcef1cc51")
    print(notify_res.text)
    pass

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐