FIN_ASSISTANT / fetch.py
QAway-to
model change
49e5fc1
raw
history blame
1.91 kB
import re
import httpx
# Извлечение UUID из текста
def extract_portfolio_id(text: str) -> str | None:
match = re.search(
r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}",
text
)
return match.group(0) if match else None
# Асинхронное получение метрик (плоские значения)
async def fetch_metrics_async(portfolio_id: str) -> dict | None:
url = f"https://api.tradelink.pro/portfolio/get?portfolioId={portfolio_id}&extended=1"
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url)
extended = resp.json().get("data", {}).get("extended", {})
result = {}
for k, v in extended.items():
if not isinstance(v, (int, float)):
continue
if k in {"cagr", "alphaRatio", "volatility", "maxDD"}:
result[k] = v * 100 # в процентах
else:
result[k] = v
return result
except Exception as e:
print(f"[ERROR] fetch_metrics_async: {e}")
return None
# Асинхронное получение absolutePnL
async def fetch_absolute_pnl_async(portfolio_id: str) -> list[dict] | None:
url = f"https://api.tradelink.pro/portfolio/get?portfolioId={portfolio_id}&extended=1&step=day"
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url)
data = resp.json().get("data", {}).get("extended", {}).get("absolutePnL", [])
if not isinstance(data, list):
print("[ERROR] absolutePnL не является списком")
return None
return data
except Exception as e:
print(f"[ERROR] fetch_absolute_pnl_async: {e}")
return None