阿里云服务器激活:
参考资料:
1、【补档】使用阿里云CDT 享受每月200G/4元高速流量
2、阿里 抢占式ECS实例 CDT,分享费用数据&TG通知脚本&保活脚本增强版
安装 Python3 和阿里云依赖(非阿里云服务器)
1.安装 Python3、pip 和 cron 定时服务:
apk update && apk add python3 py3-pip dcron curl bash
2. 安装阿里云核心 SDK(脚本查询 CDT 和 ECS 必备):
pip3 install aliyun-python-sdk-core aliyun-python-sdk-ecs --break-system-packages
添加运行脚本(服务器保活脚本 Python3 解释器**):**
1.创建并打开脚本文件:
nano /root/traffic_monitor.py
2.填入配置信息后粘贴脚本内容:
# -*- coding: utf-8 -*-from aliyunsdkcore.client import AcsClientfrom aliyunsdkcore.request import CommonRequestfrom aliyunsdkecs.request.v20140526 import StartInstancesRequest, StopInstancesRequest, DescribeInstancesRequestimport jsonimport sysimport logging# pip install aliyun-python-sdk-core aliyun-python-sdk-ecs
# ================== 1. 配置日志 ==================logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", stream=sys.stdout)logger = logging.getLogger(__name__)
# ================== 2. 配置阿里云凭证和ECS实例信息 ==================ACCESS_KEY_ID = 'LTAIxxxxxxx' # 您的AccessKey IDACCESS_KEY_SECRET = 'xxxxxxxxxx' # 您的AccessKey SecretREGION_ID = 'cn-hongkong' # 区域IDECS_INSTANCE_ID = 'i-xxxxxxxxx' # 您要控制的ECS实例ID
# 流量阈值 (GB)TRAFFIC_THRESHOLD_GB = 180
# ================== 3. 初始化客户端 ==================try: client = AcsClient(ACCESS_KEY_ID, ACCESS_KEY_SECRET, REGION_ID) logger.info("AcsClient initialized successfully.")except Exception as e: logger.error(f"Failed to initialize AcsClient: {e}") sys.exit(1)
# ================== 4. 查询当前总流量 ==================def get_total_traffic_gb(client): request = CommonRequest() request.set_domain('cdt.aliyuncs.com') request.set_version('2021-08-13') request.set_action_name('ListCdtInternetTraffic') request.set_method('POST')
try: response = client.do_action_with_exception(request) response_json = json.loads(response.decode('utf-8'))
total_bytes = sum(d.get('Traffic', 0) for d in response_json.get('TrafficDetails', [])) total_gb = total_bytes / (1024 ** 3)
logger.info(f"当前总互联网流量: {total_gb:.2f} GB") return total_gb except Exception as e: logger.error(f"获取CDT流量失败: {e}") sys.exit(1)
# ================== 5. 查询ECS实例状态 ==================def get_ecs_status(client, instance_id): try: request = DescribeInstancesRequest.DescribeInstancesRequest() request.set_InstanceIds([instance_id]) response = client.do_action_with_exception(request) response_json = json.loads(response.decode('utf-8'))
instances = response_json.get("Instances", {}).get("Instance", []) if not instances: logger.error("未找到该ECS实例信息。") return None
status = instances[0].get("Status") logger.info(f"ECS实例 {instance_id} 当前状态: {status}") return status except Exception as e: logger.error(f"获取ECS实例状态失败: {e}") return None
# ================== 6. 启动ECS实例 ==================def ecs_start(client, instance_id): status = get_ecs_status(client, instance_id) if status == "Running": logger.info(f"ECS实例 {instance_id} 已经是运行状态,无需启动。") return
try: request = StartInstancesRequest.StartInstancesRequest() request.set_InstanceIds([instance_id]) request.set_accept_format('json')
response = client.do_action_with_exception(request) logger.info(f"ECS启动响应: {response.decode('utf-8')}") except Exception as e: logger.error(f"启动ECS实例失败: {e}")
# ================== 7. 停止ECS实例 ==================def ecs_stop(client, instance_id): status = get_ecs_status(client, instance_id) if status == "Stopped": logger.info(f"ECS实例 {instance_id} 已经是停止状态,无需再次停止。") return
try: request = StopInstancesRequest.StopInstancesRequest() request.set_InstanceIds([instance_id]) request.set_ForceStop(False) request.set_accept_format('json')
response = client.do_action_with_exception(request) logger.info(f"ECS停止响应: {response.decode('utf-8')}") except Exception as e: logger.error(f"停止ECS实例失败: {e}")
# ================== 8. 主流程 ==================def main(): total_gb = get_total_traffic_gb(client)
if total_gb < TRAFFIC_THRESHOLD_GB: logger.info(f"流量 {total_gb:.2f} GB < 阈值 {TRAFFIC_THRESHOLD_GB} GB,尝试启动 ECS") ecs_start(client, ECS_INSTANCE_ID) else: logger.info(f"流量 {total_gb:.2f} GB ≥ 阈值 {TRAFFIC_THRESHOLD_GB} GB,尝试停止 ECS") ecs_stop(client, ECS_INSTANCE_ID)
logger.info("脚本执行完毕。")
if __name__ == "__main__": main()添加定时任务**(服务器保活脚本)**:
1.赋予脚本执行权限
chmod +x /root/traffic_monitor.py
2.打开定时任务编辑面板:
crontab -e
3.在文件最底部,新起一行,原封不动地贴入截图中的这一行代码:
* * * * * /usr/bin/python3 /root/traffic_monitor.py >> /var/log/traffic_monitor.log 2>&1按 Ctrl + O 回车,再按 Ctrl + X 退出
4.重载定时任务服务(让配置立刻生效):
systemctl restart cron
5.执行一次脚本:
python3 /root/traffic_monitor.py
添加运行脚本(TG通知脚本 Bash 解释器):
1.创建并打开脚本文件:
nano /root/cdt_daily_report.sh2.填入配置信息后粘贴脚本内容:
#!/bin/bash Bash 解释器
# ============================================# 使用说明# ============================================# 1. Telegram Bot 配置# - TG_BOT_TOKEN:你的 Telegram Bot Token# - TG_CHAT_ID:你的 Telegram Chat ID## 2. 阿里云 ECS / CDT 配置# - ACCESS_KEY_ID / ACCESS_KEY_SECRET:AK/SK# - REGION_ID:实例所在地域 ID(如 cn-hongkong)# - ECS_INSTANCE_ID:实例的 ECS ID# - ECS_NAME:实例名称(自定义)# ============================================
# ================================# 1. 配置信息(请填写)# ================================TG_BOT_TOKEN="REDACTED_TG_BOT_TOKEN"TG_CHAT_ID="REDACTED_CHAT_ID"
ACCESS_KEY_ID='REDACTED_AK'ACCESS_KEY_SECRET='REDACTED_SK'REGION_ID='cn-hongkong'ECS_INSTANCE_ID='REDACTED_ECS_ID'ECS_NAME='AliCloud-CDT'
# 流量阈值设置(GB),CDT 免费额度通常为 200GBLIMIT=200
# ================================# 发送 Telegram 消息函数# ================================send_tg() { curl -s -o /dev/null -X POST "https://api.telegram.org/bot${TG_BOT_TOKEN}/sendMessage" \ -H "Content-Type: application/json" \ -d "{\"chat_id\": \"${TG_CHAT_ID}\", \"text\": \"$1\", \"parse_mode\": \"HTML\"}"}
# ================================# Python 查询 CDT 流量# ================================query_cdt() {python3 <<EOFfrom aliyunsdkcore.client import AcsClientfrom aliyunsdkcore.request import CommonRequestimport json
client = AcsClient("$ACCESS_KEY_ID", "$ACCESS_KEY_SECRET", "$REGION_ID")
request = CommonRequest()request.set_domain('cdt.aliyuncs.com')request.set_version('2021-08-13')request.set_action_name('ListCdtInternetTraffic')request.set_method('POST')
try: response = client.do_action_with_exception(request) data = json.loads(response.decode('utf-8')) total_bytes = sum(d.get('Traffic', 0) for d in data.get('TrafficDetails', [])) print(f"{total_bytes / (1024**3):.2f}")except Exception as e: print("0.00")EOF}
# ================================# Python 获取 ECS 公网 IP# ================================get_public_ip() {python3 <<EOFfrom aliyunsdkcore.client import AcsClientfrom aliyunsdkcore.request import CommonRequestimport json
client = AcsClient("$ACCESS_KEY_ID", "$ACCESS_KEY_SECRET", "$REGION_ID")
request = CommonRequest()request.set_domain('ecs.aliyuncs.com')request.set_version('2014-05-26')request.set_action_name('DescribeInstances')request.set_method('POST')request.add_query_param('InstanceIds', f'["$ECS_INSTANCE_ID"]')
try: response = client.do_action_with_exception(request) data = json.loads(response.decode('utf-8')) instance = data['Instances']['Instance'][0] eip = instance.get('EipAddress', {}).get('IpAddress') pub = instance.get('PublicIpAddress', {}).get('IpAddress', []) if eip: print(eip) elif pub: print(pub[0]) else: print("无公网IP")except: print("获取失败")EOF}
# ================================# 执行查询与计算# ================================TRAFFIC=$(query_cdt)PUBLIC_IP=$(get_public_ip)
# 计算剩余与百分比REMAIN=$(awk "BEGIN {r=$LIMIT-$TRAFFIC; if(r<0) r=0; printf \"%.2f\", r}")USAGE_PERCENT=$(awk "BEGIN {printf \"%.2f\", ($TRAFFIC/$LIMIT)*100}")
# 颜色等级判定COLOR=$(awk 'BEGIN {p='$USAGE_PERCENT'; if(p<60) print "🟢 正常"; else if(p<90) print "🟡 注意"; else print "🔴 危险";}')
# ================================# 生成消息内容(已修改:强制北京时间)# ================================# 将 date 改为 TZ=Asia/Shanghai dateMESSAGE="<b>📊 CDT 每日流量报告</b>
<b>🖥 ${ECS_NAME}</b>┃ 📦 实例:${ECS_INSTANCE_ID}┃ 🌐 公网 IP:${PUBLIC_IP}┃ 📍 地域:${REGION_ID}┃ 📈 已用:<b>${TRAFFIC} GB</b>┃ 💾 剩余:<b>${REMAIN} GB</b>┃ 🔥 使用率:<b>${USAGE_PERCENT}%</b> ${COLOR}
⏰ <i>更新时间:$(TZ=Asia/Shanghai date '+%Y-%m-%d %H:%M')</i>"
# 发送通知send_tg "$MESSAGE"
# ================================# 写入日志(已修改:强制北京时间)# ================================LOG_DIR="/var/log/cdt_daily_report"mkdir -p $LOG_DIR
{ echo "-------------------------------------------" # 这里也改成了北京时间 echo "更新时间:$(TZ=Asia/Shanghai date '+%Y-%m-%d %H:%M')" echo "${ECS_NAME}" echo "┃ 实例:${ECS_INSTANCE_ID}" echo "┃ 公网 IP:${PUBLIC_IP}" echo "┃ 地域:${REGION_ID}" echo "┃ 已用:${TRAFFIC} GB" echo "┃ 剩余:${REMAIN} GB" echo "┃ 使用率:${USAGE_PERCENT}% $(echo ${COLOR} | sed 's/.* //')" echo "-------------------------------------------"# 这里的文件名也改成了北京时间,防止跨月的那天日志建错文件} >> "${LOG_DIR}/cdt_report_$(TZ=Asia/Shanghai date '+%Y%m').log"添加定时任务(TG通知脚本):
1.赋予脚本执行权限
chmod +x /root/cdt_daily_report.sh
2.打开定时任务编辑面板:
crontab -e
3.在文件最底部,新起一行,原封不动地贴入截图中的这一行代码:
59 23 * * * /bin/bash /root/cdt_daily_report.sh
按 Ctrl + O 回车,再按 Ctrl + X 退出
4.重载定时任务服务(让配置立刻生效):
systemctl restart cron
5.执行一次脚本测试
/root/cdt_daily_report.sh
如果这篇文章对你有帮助,欢迎分享给更多人!
部分信息可能已经过时










