【群晖】自动获取动态IPV4并解析到阿里云域名

时间:2024-03-16 08:18:38
#!/usr/bin/env python # -*- coding: utf-8 -*- """ @Time : 2023/08/23 @Author : LXW @Site : @File : ipv4.py @Software: PyCharm @Description: - 版本 -- python==3.8 -- alibabacloud_alidns20150109==3.0.7 -- aliyun-python-sdk-domain==3.14.9 - 依赖安装 -- pip3 install alibabacloud_alidns20150109==3.0.7 -- pip3 install aliyun-python-sdk-domain==3.14.9 """ import json import requests from alibabacloud_alidns20150109 import models as alidns_20150109_models from alibabacloud_alidns20150109.client import Client as Alidns20150109Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_tea_util import models as util_models from alibabacloud_tea_util.client import Client as UtilClient class RemoteIp: def __init__(self, url, username, passwd): self.url = url self.username = username self.passwd = passwd def __get_token(self): _url = self.url + "/emby/Users/authenticatebyname?X-Emby-Client=Emby%20Web&X-Emby-Device-Name=Chrome%20macOS&X-Emby-Device-Id=fa6b7733-23fc-4cb7-bae3-d7f0536b485e&X-Emby-Client-Version=4.7.13.0&X-Emby-Language=zh-cn" content = requests.post(_url, False, {"Username": self.username, "Pw": self.passwd}).text return json.loads(content)["AccessToken"] def get_remote_addr(self): token = self.__get_token() full_url = self.url + "/emby/System/Info?X-Emby-Token=" + token content = requests.get(full_url, False).text addr = list(json.loads(content)["RemoteAddresses"]) remote_add = str(addr[0]).split(":")[1].replace("/", "") return remote_add def request_params(request, value): request.value = value class AliyunOperation: def __init__(self, access_key_id, access_secret): """ 初始化 :param access_key_id: 阿里云AccessKey的ID :param access_secret: 阿里云AccessKey的密码 """ config = open_api_models.Config( # 必填,您的 AccessKey ID, access_key_id=access_key_id, # 必填,您的 AccessKey Secret, access_key_secret=access_secret ) # Endpoint 请参考 https://api.aliyun.com/product/Alidns config.endpoint = f'alidns.cn-hangzhou.aliyuncs.com' self.client = Alidns20150109Client(config) def describe_subdomain_records(self, subdomain, domain_name): """ 子域名解析记录查询 https://api.aliyun.com/api/Alidns/2015-01-09/DescribeSubDomainRecords 这个函数会返回一大堆东西,但是目前我们需要使用的就两个,一个是解析记录的数量,一个就是RecordId,RecordId修改域名解析时需要用到 :param domain_name: :param subdomain: :return: """ describe_sub_domain_records_request = alidns_20150109_models.DescribeSubDomainRecordsRequest( sub_domain=subdomain, domain_name=domain_name ) runtime = util_models.RuntimeOptions() try: response = self.client.describe_sub_domain_records_with_options(describe_sub_domain_records_request, runtime) if 200 == response.status_code: return response.body except Exception as error: # 如有需要,请打印 error UtilClient.assert_as_string(error.message) def add_record(self, domain_name, rr, domain_type, value): """ 新增域名解析记录 https://api.aliyun.com/api/Alidns/2015-01-09/AddDomainRecord 如果域名解析记录不存在时,就需要新增域名解析记录,新增域名解析记录比较简单,直接填写参数即可 :param domain_name: 域名名称 baidu.com :param rr: 主机记录。如果要解析@.exmaple.com,主机记录要填写”@”,而不是空。 :param domain_type: 解析记录类型。https://help.aliyun.com/document_detail/29805.html?spm=api-workbench.api_explorer.0.0.6b1a134bcODqcI :param value: 记录值。String :return: """ add_domain_record_request = alidns_20150109_models.AddDomainRecordRequest( domain_name=domain_name, rr=rr, type=domain_type, value=value ) runtime = util_models.RuntimeOptions() try: response = self.client.add_domain_record_with_options(add_domain_record_request, runtime) if 200 == response.status_code: return response.body except Exception as error: # 如有需要,请打印 error UtilClient.assert_as_string(error.message) def update_record(self, domain_type, value, rr, record_id): """ 更新域名解析记录 如果域名解析记录已存在,则不能使用新增,而是更新域名解析记录。更新的时候需要用到RecordId,这个一般查询的时候就会有返回,直接使用即可 :param domain_type: 解析记录类型。https://help.aliyun.com/document_detail/29805.html?spm=api-workbench.api_explorer.0.0.6b1a134bcODqcI :param value: 记录值。String :param rr: 主机记录。如果要解析@.exmaple.com,主机记录要填写”@”,而不是空。 :param record_id: 解析记录的ID :return: """ update_domain_record_request = alidns_20150109_models.UpdateDomainRecordRequest( record_id=record_id, rr=rr, type=domain_type, value=value ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 self.client.update_domain_record_with_options(update_domain_record_request, runtime) except Exception as error: # 如有需要,请打印 error UtilClient.assert_as_string(error.message) def __execute(): # 获取动态 IPV4 地址(如果是IPV6也可以用相同方式获取) __remote_addr = __get_from_urls() if "None" == __remote_addr: __remote_addr = RemoteIp(url=__url, username=__username, passwd=__passwd).get_remote_addr() print("current ipv4: " + __remote_addr) __aliyun_client = AliyunOperation(access_key_id=__accessKeyId, access_secret=__accessSecret) # 查询 example.com 主机下 test.example.com 域名的解析列表 content = __aliyun_client.describe_subdomain_records("redirect.your.domain", "your.domain") print("current desc domain content: " + str(content)) if 0 == content.total_count: # 新增一条 test.example.com 的IPV4域名解析记录(如果是IPV6地址,修改参数 domain_type 为 AAAA) __aliyun_client.add_record("your.domain", "redirect", "A", __remote_addr) elif 1 == content.total_count: # 如果 test.example.com 域名解析已经存在,则进行更新操作 source = content.domain_records.record[0] record_id = source.record_id if __remote_addr != source.value: __aliyun_client.update_record("A", __remote_addr, "redirect", record_id) # else: # # 不推荐使用该方式进行域名解析更新,容易错误更新 # sources = content.domain_records.record # for source in sources: # # 因为是更新IP,所以只考虑 A 或者 AAAA # domain_type = source.type # if "A" == domain_type or "AAAA" == domain_type: # record_id = source.record_id # rr = source.rr # __aliyun_client.update_record(domain_type, __remote_addr, rr, record_id) def __get_from_urls(): """ 通过url获取本地IP地址 "https://myip4.ipip.net", "https://ddns.oray.com/checkip", "https://ip.3322.net", "https://4.ipw.cn" :return: ip """ ipw = "https://4.ipw.cn" response = requests.get(ipw) if 200 == response.status_code: content = response.text return content.strip() ip = "https://ip.3322.net" response = requests.get(ip) if 200 == response.status_code: content = response.text return content.strip() myip4 = "https://myip4.ipip.net" response = requests.get(myip4) if 200 == response.status_code: content = response.text.split(":")[1] content = content.replace("来自于", "") return content.strip() ddns = "https://ddns.oray.com/checkip" response = requests.get(ddns) if 200 == response.status_code: content = response.text.split(":")[1] return content.strip() return "None" if __name__ == '__main__': # emby相关参数,可以不填 __url = "http://127.0.0.1:8096" __username = "emby_user" __passwd = "emby_passwd" # 域名解析权限,必填 __accessKeyId = "__accessKeyId" __accessSecret = "__accessSecret" __execute()