[Python]python3.5下的阿里云邮件推送集成

时间:2022-09-24 12:53:38

文档:https://www.aliyun.com/product/directmail?spm=5176.8142029.388261.132.8AS88o

之前参考过一些网上的文章做了python2.7下的集成,然而还没用几天,老板说把这块代码拉出去单独做个项目吧,哦呵呵呵。新项目是python3.5的,大体上差不多,有一些编码会有些变动,基本照着文档的接口写就可以了。最后直接调用send_mail函数就可以。

代码:

import base64
import hmac
from hashlib import sha1
import urllib.parse
import time
import uuid
import requests
from django.conf import settings
from mail.utils import process_failed_mail
from mail import constant
import datetime


class Aliyuncs(object):
def __init__(self):
self.access_id = settings.ALIYUN_ACCESS_KEY_ID
self.access_secret = settings.ALIYUN_ACCESS_KEY_SECRET
self.url = settings.ALIYUNCS_API_URL

def sign(self, accessKeySecret, parameter):
sortedParameters = sorted(parameter.items(),
key=lambda parameter: parameter[0])
canonicalizedQueryString = ''

for (k, v) in sortedParameters:
canonicalizedQueryString += '&' + self.percent_encode(k) + \
'=' + self.percent_encode(v)

stringToSign = 'GET&%2F&' + self.percent_encode(
canonicalizedQueryString[1:])

h = hmac.new(bytes((accessKeySecret + "&").encode("utf-8")),
stringToSign.encode('utf-8'), sha1)
signature = base64.encodestring(h.digest()).strip()
return signature

def percent_encode(self, encodeStr):
encodeStr = str(encodeStr)
res = urllib.parse.quote(encodeStr.encode('utf-8'), '')
res = res.replace('+', '%20')
res = res.replace('*', '%2A')
res = res.replace('%7E', '~')
return res

def make_url(self, params):
timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
parameters = {
'Format': 'JSON',
'Version': '2015-11-23',
'AccessKeyId': self.access_id,
'SignatureVersion': '1.0',
'SignatureMethod': 'HMAC-SHA1',
'SignatureNonce': str(uuid.uuid1()),
'Timestamp': timestamp,
}
for key in params.keys():
parameters[key] = params[key]
signature = self.sign(self.access_secret, parameters)
parameters['Signature'] = signature
url = self.url + "/?" + urllib.parse.urlencode(parameters)
return url


def send_mail(mail):
'''
deliver mails to aliyuncs
'''
try:
aliyun = Aliyuncs()
data = {
'Action': 'SingleSendMail',
'AccountName': settings.ALIYUNCS_FROM_EMAIL,
'ReplyToAddress': settings.ALIYUNCS_REPLY_TO_ADRESS,
'AddressType': settings.ALIYUNCS_ADRESS_TYPE,
'ToAddress': mail.recipient_email,
'FromAlias': settings.NEWSLETTER_FROM_NAME,
'Subject': mail.subject,
'TextBody': mail.message
}
url = aliyun.make_url(data)
response = requests.get(url)
if response.status_code != 200:
mail = process_failed_mail(mail)
return False, 'send error'
except requests.Timeout:
mail = process_failed_mail(mail)
return False, 'time out'
mail.finish_time = datetime.datetime.now()
mail.status = constant.STATUS_FINISHED
mail.save()
return True, 'success'