mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4mobile wallpaper 5mobile wallpaper 6
428 字
1 分钟
阿里云抢占服务器定时任务
2026-06-02

阿里云服务器激活:

参考资料:

1、【补档】使用阿里云CDT 享受每月200G/4元高速流量

2、阿里 抢占式ECS实例 CDT,分享费用数据&TG通知脚本&保活脚本增强版

安装 Python3 和阿里云依赖(非阿里云服务器)#

1.安装 Python3、pip 和 cron 定时服务:

apk update && apk add python3 py3-pip dcron curl bash

2. 安装阿里云核心 SDK(脚本查询 CDT 和 ECS 必备):

pip3 install aliyun-python-sdk-core aliyun-python-sdk-ecs --break-system-packages

添加运行脚本(服务器保活脚本 Python3 解释器**):**#

1.创建并打开脚本文件:

nano /root/traffic_monitor.py

2.填入配置信息后粘贴脚本内容:

# -*- coding: utf-8 -*-
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
from aliyunsdkecs.request.v20140526 import StartInstancesRequest, StopInstancesRequest, DescribeInstancesRequest
import json
import sys
import logging
# pip install aliyun-python-sdk-core aliyun-python-sdk-ecs
# ================== 1. 配置日志 ==================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
stream=sys.stdout
)
logger = logging.getLogger(__name__)
# ================== 2. 配置阿里云凭证和ECS实例信息 ==================
ACCESS_KEY_ID = 'LTAIxxxxxxx' # 您的AccessKey ID
ACCESS_KEY_SECRET = 'xxxxxxxxxx' # 您的AccessKey Secret
REGION_ID = 'cn-hongkong' # 区域ID
ECS_INSTANCE_ID = 'i-xxxxxxxxx' # 您要控制的ECS实例ID
# 流量阈值 (GB)
TRAFFIC_THRESHOLD_GB = 180
# ================== 3. 初始化客户端 ==================
try:
client = AcsClient(ACCESS_KEY_ID, ACCESS_KEY_SECRET, REGION_ID)
logger.info("AcsClient initialized successfully.")
except Exception as e:
logger.error(f"Failed to initialize AcsClient: {e}")
sys.exit(1)
# ================== 4. 查询当前总流量 ==================
def get_total_traffic_gb(client):
request = CommonRequest()
request.set_domain('cdt.aliyuncs.com')
request.set_version('2021-08-13')
request.set_action_name('ListCdtInternetTraffic')
request.set_method('POST')
try:
response = client.do_action_with_exception(request)
response_json = json.loads(response.decode('utf-8'))
total_bytes = sum(d.get('Traffic', 0) for d in response_json.get('TrafficDetails', []))
total_gb = total_bytes / (1024 ** 3)
logger.info(f"当前总互联网流量: {total_gb:.2f} GB")
return total_gb
except Exception as e:
logger.error(f"获取CDT流量失败: {e}")
sys.exit(1)
# ================== 5. 查询ECS实例状态 ==================
def get_ecs_status(client, instance_id):
try:
request = DescribeInstancesRequest.DescribeInstancesRequest()
request.set_InstanceIds([instance_id])
response = client.do_action_with_exception(request)
response_json = json.loads(response.decode('utf-8'))
instances = response_json.get("Instances", {}).get("Instance", [])
if not instances:
logger.error("未找到该ECS实例信息。")
return None
status = instances[0].get("Status")
logger.info(f"ECS实例 {instance_id} 当前状态: {status}")
return status
except Exception as e:
logger.error(f"获取ECS实例状态失败: {e}")
return None
# ================== 6. 启动ECS实例 ==================
def ecs_start(client, instance_id):
status = get_ecs_status(client, instance_id)
if status == "Running":
logger.info(f"ECS实例 {instance_id} 已经是运行状态,无需启动。")
return
try:
request = StartInstancesRequest.StartInstancesRequest()
request.set_InstanceIds([instance_id])
request.set_accept_format('json')
response = client.do_action_with_exception(request)
logger.info(f"ECS启动响应: {response.decode('utf-8')}")
except Exception as e:
logger.error(f"启动ECS实例失败: {e}")
# ================== 7. 停止ECS实例 ==================
def ecs_stop(client, instance_id):
status = get_ecs_status(client, instance_id)
if status == "Stopped":
logger.info(f"ECS实例 {instance_id} 已经是停止状态,无需再次停止。")
return
try:
request = StopInstancesRequest.StopInstancesRequest()
request.set_InstanceIds([instance_id])
request.set_ForceStop(False)
request.set_accept_format('json')
response = client.do_action_with_exception(request)
logger.info(f"ECS停止响应: {response.decode('utf-8')}")
except Exception as e:
logger.error(f"停止ECS实例失败: {e}")
# ================== 8. 主流程 ==================
def main():
total_gb = get_total_traffic_gb(client)
if total_gb < TRAFFIC_THRESHOLD_GB:
logger.info(f"流量 {total_gb:.2f} GB < 阈值 {TRAFFIC_THRESHOLD_GB} GB,尝试启动 ECS")
ecs_start(client, ECS_INSTANCE_ID)
else:
logger.info(f"流量 {total_gb:.2f} GB ≥ 阈值 {TRAFFIC_THRESHOLD_GB} GB,尝试停止 ECS")
ecs_stop(client, ECS_INSTANCE_ID)
logger.info("脚本执行完毕。")
if __name__ == "__main__":
main()

添加定时任务**(服务器保活脚本)**:#

1.赋予脚本执行权限

chmod +x /root/traffic_monitor.py

2.打开定时任务编辑面板:

crontab -e

3.在文件最底部,新起一行,原封不动地贴入截图中的这一行代码:

* * * * * /usr/bin/python3 /root/traffic_monitor.py >> /var/log/traffic_monitor.log 2>&1

按 Ctrl + O 回车,再按 Ctrl + X 退出

4.重载定时任务服务(让配置立刻生效):

systemctl restart cron

5.执行一次脚本:

python3 /root/traffic_monitor.py

添加运行脚本(TG通知脚本 Bash 解释器):#

1.创建并打开脚本文件:

nano /root/cdt_daily_report.sh

2.填入配置信息后粘贴脚本内容:

#!/bin/bash Bash 解释器
# ============================================
# 使用说明
# ============================================
# 1. Telegram Bot 配置
# - TG_BOT_TOKEN:你的 Telegram Bot Token
# - TG_CHAT_ID:你的 Telegram Chat ID
#
# 2. 阿里云 ECS / CDT 配置
# - ACCESS_KEY_ID / ACCESS_KEY_SECRET:AK/SK
# - REGION_ID:实例所在地域 ID(如 cn-hongkong)
# - ECS_INSTANCE_ID:实例的 ECS ID
# - ECS_NAME:实例名称(自定义)
# ============================================
# ================================
# 1. 配置信息(请填写)
# ================================
TG_BOT_TOKEN="REDACTED_TG_BOT_TOKEN"
TG_CHAT_ID="REDACTED_CHAT_ID"
ACCESS_KEY_ID='REDACTED_AK'
ACCESS_KEY_SECRET='REDACTED_SK'
REGION_ID='cn-hongkong'
ECS_INSTANCE_ID='REDACTED_ECS_ID'
ECS_NAME='AliCloud-CDT'
# 流量阈值设置(GB),CDT 免费额度通常为 200GB
LIMIT=200
# ================================
# 发送 Telegram 消息函数
# ================================
send_tg() {
curl -s -o /dev/null -X POST "https://api.telegram.org/bot${TG_BOT_TOKEN}/sendMessage" \
-H "Content-Type: application/json" \
-d "{\"chat_id\": \"${TG_CHAT_ID}\", \"text\": \"$1\", \"parse_mode\": \"HTML\"}"
}
# ================================
# Python 查询 CDT 流量
# ================================
query_cdt() {
python3 <<EOF
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
import json
client = AcsClient("$ACCESS_KEY_ID", "$ACCESS_KEY_SECRET", "$REGION_ID")
request = CommonRequest()
request.set_domain('cdt.aliyuncs.com')
request.set_version('2021-08-13')
request.set_action_name('ListCdtInternetTraffic')
request.set_method('POST')
try:
response = client.do_action_with_exception(request)
data = json.loads(response.decode('utf-8'))
total_bytes = sum(d.get('Traffic', 0) for d in data.get('TrafficDetails', []))
print(f"{total_bytes / (1024**3):.2f}")
except Exception as e:
print("0.00")
EOF
}
# ================================
# Python 获取 ECS 公网 IP
# ================================
get_public_ip() {
python3 <<EOF
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
import json
client = AcsClient("$ACCESS_KEY_ID", "$ACCESS_KEY_SECRET", "$REGION_ID")
request = CommonRequest()
request.set_domain('ecs.aliyuncs.com')
request.set_version('2014-05-26')
request.set_action_name('DescribeInstances')
request.set_method('POST')
request.add_query_param('InstanceIds', f'["$ECS_INSTANCE_ID"]')
try:
response = client.do_action_with_exception(request)
data = json.loads(response.decode('utf-8'))
instance = data['Instances']['Instance'][0]
eip = instance.get('EipAddress', {}).get('IpAddress')
pub = instance.get('PublicIpAddress', {}).get('IpAddress', [])
if eip:
print(eip)
elif pub:
print(pub[0])
else:
print("无公网IP")
except:
print("获取失败")
EOF
}
# ================================
# 执行查询与计算
# ================================
TRAFFIC=$(query_cdt)
PUBLIC_IP=$(get_public_ip)
# 计算剩余与百分比
REMAIN=$(awk "BEGIN {r=$LIMIT-$TRAFFIC; if(r<0) r=0; printf \"%.2f\", r}")
USAGE_PERCENT=$(awk "BEGIN {printf \"%.2f\", ($TRAFFIC/$LIMIT)*100}")
# 颜色等级判定
COLOR=$(awk 'BEGIN {p='$USAGE_PERCENT'; if(p<60) print "🟢 正常"; else if(p<90) print "🟡 注意"; else print "🔴 危险";}')
# ================================
# 生成消息内容(已修改:强制北京时间)
# ================================
# 将 date 改为 TZ=Asia/Shanghai date
MESSAGE="<b>📊 CDT 每日流量报告</b>
<b>🖥 ${ECS_NAME}</b>
┃ 📦 实例:${ECS_INSTANCE_ID}
┃ 🌐 公网 IP:${PUBLIC_IP}
┃ 📍 地域:${REGION_ID}
┃ 📈 已用:<b>${TRAFFIC} GB</b>
┃ 💾 剩余:<b>${REMAIN} GB</b>
┃ 🔥 使用率:<b>${USAGE_PERCENT}%</b> ${COLOR}
⏰ <i>更新时间:$(TZ=Asia/Shanghai date '+%Y-%m-%d %H:%M')</i>"
# 发送通知
send_tg "$MESSAGE"
# ================================
# 写入日志(已修改:强制北京时间)
# ================================
LOG_DIR="/var/log/cdt_daily_report"
mkdir -p $LOG_DIR
{
echo "-------------------------------------------"
# 这里也改成了北京时间
echo "更新时间:$(TZ=Asia/Shanghai date '+%Y-%m-%d %H:%M')"
echo "${ECS_NAME}"
echo "┃ 实例:${ECS_INSTANCE_ID}"
echo "┃ 公网 IP:${PUBLIC_IP}"
echo "┃ 地域:${REGION_ID}"
echo "┃ 已用:${TRAFFIC} GB"
echo "┃ 剩余:${REMAIN} GB"
echo "┃ 使用率:${USAGE_PERCENT}% $(echo ${COLOR} | sed 's/.* //')"
echo "-------------------------------------------"
# 这里的文件名也改成了北京时间,防止跨月的那天日志建错文件
} >> "${LOG_DIR}/cdt_report_$(TZ=Asia/Shanghai date '+%Y%m').log"

添加定时任务(TG通知脚本):#

1.赋予脚本执行权限

chmod +x /root/cdt_daily_report.sh

2.打开定时任务编辑面板:

crontab -e

3.在文件最底部,新起一行,原封不动地贴入截图中的这一行代码:

59 23 * * * /bin/bash /root/cdt_daily_report.sh

按 Ctrl + O 回车,再按 Ctrl + X 退出

4.重载定时任务服务(让配置立刻生效):

systemctl restart cron

5.执行一次脚本测试

/root/cdt_daily_report.sh

分享

如果这篇文章对你有帮助,欢迎分享给更多人!

阿里云抢占服务器定时任务
bk90.333797.xyz
作者
瓶子
发布于
2026-06-02
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时