本帖最后由 ForMuou 于 2024-11-25 15:12 编辑
[Python] 纯文本查看 复制代码 # -*- coding: utf-8 -*-
import requests
import time
import hmac
import hashlib
import base64
def get_weather(city, api_key):
"""
获取城市当前的天气信息
:param city: 城市名称
:param api_key: API 密钥
:return: 成功时返回天气描述、天气图标和温度,失败时返回None
"""
url = f'https://api.seniverse.com/v3/weather/now.json?key={api_key}&location={city}&language=zh-Hans'
print(f"Requesting weather data for {city}")
response = requests.get(url)
if response.status_code != 200:
print(f"Failed to get weather data for {city}: {response.status_code} - {response.text}")
return None
weather_info = response.json().get('results', [{}])[0].get('now', {})
return weather_info.get('text'), get_weather_icon(weather_info.get('text', '')), weather_info.get('temperature', '')
def get_weather_forecast(city, api_key):
"""
获取城市未来3天的天气预报
:param city: 城市名称
:param api_key: API 密钥
:return: 成功时返回未来3天的天气预报列表,失败时返回None
"""
url = f'https://api.seniverse.com/v3/weather/daily.json?key={api_key}&location={city}&language=zh-Hans&days=3'
print(f"Requesting weather forecast data for {city}")
response = requests.get(url)
if response.status_code != 200:
print(f"Failed to get weather forecast data for {city}: {response.status_code} - {response.text}")
return None
return response.json().get('results', [{}])[0].get('daily', [])
def get_weather_icon(weather_desc):
"""
根据天气描述返回对应的天气图标
:param weather_desc: 天气描述
:return: 对应的天气图标
"""
return '☀️' if '晴' in weather_desc else '☁️' if '多云' in weather_desc else '🌧️' if '雨' in weather_desc else ''
def send_to_dingtalk(message, webhook_url, secret):
"""
发送消息到DingTalk机器人
:param message: 消息内容
:param webhook_url: Webhook URL
:param secret: 机器人密钥
:return: 无返回值
"""
timestamp = str(round(time.time() * 1000))
sign = base64.b64encode(
hmac.new(secret.encode(), f"{timestamp}\n{secret}".encode(), hashlib.sha256).digest()).decode()
final_url = f"{webhook_url}×tamp={timestamp}&sign={sign}"
response = requests.post(final_url,
json={'msgtype': 'markdown', 'markdown': {'title': '天气提醒', 'text': message}},
headers={'Content-Type': 'application/json'})
print(
"Message sent successfully to DingTalk!" if response.status_code == 200 else f"Failed to send message to DingTalk: {response.status_code} - {response.text}")
# 多个城市名称
cities = ['北京', '上海', '广州', '深圳']
# 直接写死的API密钥 DingTalk的WebhookURL和密钥
api_key = '天气API秘钥'
webhook_url = 'WebhookURL'
secret = '签'
# 构建消息内容
message = "### 今日天气提醒\n\n"
for city in cities:
weather_data = get_weather(city, api_key)
forecasts = get_weather_forecast(city, api_key)
if weather_data and forecasts:
weather_desc, weather_icon, temperature = weather_data
message += f"**城市:{city}**\n天气:{weather_icon} {weather_desc}\n温度:{temperature}°C\n\n【未来3天天气预报】\n\n"
message += ''.join(
f"{forecast['date']}: {forecast['text_day']}, 温度:{forecast['low']}°C ~ {forecast['high']}°C\n\n" for
forecast in forecasts)
else:
message += f"未能获取到{city}的天气信息,请检查城市名称。\n\n"
# 发送消息到钉钉机器人
send_to_dingtalk(message, webhook_url, secret)
|