Spaces:
Sleeping
Sleeping
File size: 2,994 Bytes
e813406 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
import os
import requests
from lagent.actions.base_action import BaseAction, tool_api
from lagent.schema import ActionReturn, ActionStatusCode
class WeatherQuery(BaseAction):
def __init__(self):
super().__init__()
self.api_key = os.getenv("weather_token")
print(self.api_key)
if not self.api_key:
raise EnvironmentError("未找到环境变量 'token'。请设置你的和风天气 API Key 到 'weather_token' 环境变量中,比如export weather_token='xxx' ")
@tool_api
def run(self, location: str) -> dict:
"""
查询实时天气信息。
Args:
location (str): 要查询的地点名称、LocationID 或经纬度坐标(如 "101010100" 或 "116.41,39.92")。
Returns:
dict: 包含天气信息的字典
* location: 地点名称
* weather: 天气状况
* temperature: 当前温度
* wind_direction: 风向
* wind_speed: 风速(公里/小时)
* humidity: 相对湿度(%)
* report_time: 数据报告时间
"""
try:
# 如果 location 不是坐标格式(例如 "116.41,39.92"),则调用 GeoAPI 获取 LocationID
if not ("," in location and location.replace(",", "").replace(".", "").isdigit()):
# 使用 GeoAPI 获取 LocationID
geo_url = f"https://geoapi.qweather.com/v2/city/lookup?location={location}&key={self.api_key}"
geo_response = requests.get(geo_url)
geo_data = geo_response.json()
if geo_data.get("code") != "200" or not geo_data.get("location"):
raise Exception(f"GeoAPI 返回错误码:{geo_data.get('code')} 或未找到位置")
location = geo_data["location"][0]["id"]
# 构建天气查询的 API 请求 URL
weather_url = f"https://devapi.qweather.com/v7/weather/now?location={location}&key={self.api_key}"
response = requests.get(weather_url)
data = response.json()
# 检查 API 响应码
if data.get("code") != "200":
raise Exception(f"Weather API 返回错误码:{data.get('code')}")
# 解析和组织天气信息
weather_info = {
"location": location,
"weather": data["now"]["text"],
"temperature": data["now"]["temp"] + "°C",
"wind_direction": data["now"]["windDir"],
"wind_speed": data["now"]["windSpeed"] + " km/h",
"humidity": data["now"]["humidity"] + "%",
"report_time": data["updateTime"]
}
return {"result": weather_info}
except Exception as exc:
return ActionReturn(
errmsg=f"WeatherQuery 异常:{exc}",
state=ActionStatusCode.HTTP_ERROR
) |