Python3 调试企业微信消息推送详解:监控报警

时间:2024-04-03 18:34:01

Python3 调试企业微信消息推送


一、申请企业微信
企业微信的API文档

二、创建应用
1、管理员进入企业微信“管理后台” — “企业应用”创建应用,“收益率”这个应用是我自己创建的,
      红色箭头指向的地方可以设置应用logo、名称、描述、可见范围等。
Python3 调试企业微信消息推送详解:监控报警

二、创建应用 获取access_token

请求方式:GET(HTTPS
注:此处标注大写的单词ID和SECRET,为需要替换的变量,根据实际获取值更新。其它接口也采用相同的标注,不再说明。
若企业安装了第三方应用,该服务商可通过“获取企业access_token”获得此调用凭证。
参数说明:

参数 必须 说明
corpid 企业ID
corpsecret 应用的凭证**

权限说明:
每个应用有独立的secret,所以每个应用的access_token应该分开来获取
返回结果:
{
"errcode":0
"errmsg":""
"access_token": "accesstoken000001",
"expires_in": 7200
}

参数 说明
access_token 获取到的凭证,最长为512字节
expires_in 凭证的有效时间(秒)

出错返回示例:
{
"errcode":40091,
"errmsg":"secret is invalid"
}
特别说明:
企业微信所有接口,返回包里都有errcode、errmsg。开发者需根据errcode是否为0判断是否调用成功(errcode意义请见全局错误码)。

Python3 调试企业微信消息推送详解:监控报警

四、发送消息
应用支持推送文本、图片、视频、文件、图文等类型。
请求方式:POST(HTTPS


五、代码实现
Python3 调试企业微信消息推送详解:监控报警
#-------------------------------------------------------------------------------
# Name:        WeChat.py
# Purpose:
#
# Author:      hyma
#
# Created:     14-07-2018
# Copyright:   (c) hyma 2018
# Licence:     <your licence>
#-------------------------------------------------------------------------------
#!/usr/bin/python
# -*- coding: utf-8 -*-

import time
import requests
import json


class WeChat_SMS:
    def __init__(self):
        self.CORPID = '**********'       #企业ID, 登陆企业微信,在我的企业-->企业信息里查看
        self.CORPSECRET = '************'    #自建应用,每个自建应用里都有单独的secret
        self.AGENTID = '*******' #应用代码
        self.TOUSER = "*******"  # 接收者用户名,  @all 全体成员

    def _get_access_token(self):
        values = {'corpid': self.CORPID,
                  'corpsecret': self.CORPSECRET,
                  }
        req = requests.post(url, params=values)
        data = json.loads(req.text)
#        print (data)
        return data["access_token"]

    def get_access_token(self):
        try:
            with open('access_token.conf', 'r') as f:
                t, access_token = f.read().split()
        except:
            with open('access_token.conf', 'w') as f:
                access_token = self._get_access_token()
                cur_time = time.time()
                f.write('\t'.join([str(cur_time), access_token]))
                return access_token
        else:
            cur_time = time.time()
            if 0 < cur_time - float(t) < 7200:       #token的有效时间7200s
                return access_token
            else:
                with open('access_token.conf', 'w') as f:
                    access_token = self._get_access_token()
                    f.write('\t'.join([str(cur_time), access_token]))
                    return access_token

    def send_data(self, msg):
        send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + self.get_access_token()
        send_values = {
            "touser": self.TOUSER,
            "msgtype": "text",
            "agentid": self.AGENTID,
            "text": {
                "content": msg
                },
            "safe": "0"
            }
        send_msges=(bytes(json.dumps(send_values), 'utf-8'))
        respone = requests.post(send_url, send_msges)
        respone = respone.json()#当返回的数据是json串的时候直接用.json即可将respone转换成字典
#        print (respone["errmsg"])
        return respone["errmsg"]


if __name__ == '__main__':
    wx = WeChat_SMS()
    wx.send_data(msg="***")
Python3 调试企业微信消息推送详解:监控报警