kitestacks-homelab/apps/kitestacks-portal/api/main.py

353 lines
12 KiB
Python

"""
KiteStacks metrics API.
Exposes real host metrics via psutil. Container must be launched with
`pid: host` and the relevant host paths mounted so psutil reads the
laptop's namespaces rather than the container's.
"""
import os
os.environ.setdefault("PROCFS_PATH", os.environ.get("HOST_PROC", "/proc"))
import socket
import platform
import subprocess
import time
from functools import lru_cache
import httpx
import psutil
psutil.PROCFS_PATH = os.environ.get("HOST_PROC", "/proc")
from fastapi import FastAPI
from fastapi.responses import JSONResponse
app = FastAPI(title="KiteStacks Metrics API")
# Cache last network sample so we can compute deltas (bytes/sec)
_last_net = {"ts": time.time(), "sent": 0, "recv": 0, "iface": None}
# Skip virtual / container interfaces when auto-detecting the host's nic
SKIP_IFACE_PREFIXES = (
"docker", "br-", "veth", "cni", "flannel", "cali",
"lo", "tun", "tap", "virbr", "kube",
)
def active_interface() -> str | None:
"""Auto-detect the interface used for the default route on the HOST."""
try:
route_path = "/host/proc/net/route"
if os.path.exists(route_path):
with open(route_path) as f:
for line in f.readlines()[1:]:
fields = line.strip().split()
if len(fields) >= 2 and fields[1] == "00000000":
name = fields[0]
if not name.startswith(SKIP_IFACE_PREFIXES):
return name
except Exception:
pass
# Fallback: first non-virtual interface that is up
try:
for name, stats in psutil.net_if_stats().items():
if stats.isup and not name.startswith(SKIP_IFACE_PREFIXES):
return name
except Exception:
pass
return None
@app.get("/api/metrics")
def metrics():
global _last_net
# CPU — short blocking sample so we don't return 0 on the first call
cpu_pct = psutil.cpu_percent(interval=0.3)
cpu_per_core = psutil.cpu_percent(interval=None, percpu=True)
# RAM
vm = psutil.virtual_memory()
ram_used_gb = vm.used / (1024 ** 3)
ram_total_gb = vm.total / (1024 ** 3)
ram_pct = vm.percent
# Storage — root filesystem (host root is mounted at /host)
root_path = "/host" if os.path.ismount("/host") else "/"
du = psutil.disk_usage(root_path)
storage_used_gb = du.used / (1024 ** 3)
storage_total_gb = du.total / (1024 ** 3)
storage_pct = du.percent
# Uptime — from host /proc/uptime if available, otherwise psutil
uptime_s = None
try:
host_uptime = "/host/proc/uptime"
if os.path.exists(host_uptime):
with open(host_uptime) as f:
uptime_s = int(float(f.read().split()[0]))
except Exception:
pass
if uptime_s is None:
uptime_s = int(time.time() - psutil.boot_time())
days = uptime_s // 86400
hours = (uptime_s % 86400) // 3600
minutes = (uptime_s % 3600) // 60
# Network — active host interface, bytes/sec since last call
iface = active_interface()
net_sent_kbs = 0.0
net_recv_kbs = 0.0
if iface:
counters = psutil.net_io_counters(pernic=True).get(iface)
if counters:
now = time.time()
dt = max(now - _last_net["ts"], 0.001)
if _last_net["iface"] == iface and _last_net["sent"] > 0:
net_sent_kbs = max(0.0, (counters.bytes_sent - _last_net["sent"]) / dt / 1024)
net_recv_kbs = max(0.0, (counters.bytes_recv - _last_net["recv"]) / dt / 1024)
_last_net = {
"ts": now,
"sent": counters.bytes_sent,
"recv": counters.bytes_recv,
"iface": iface,
}
overview = _system_overview()
return {
"cpu": {"pct": round(cpu_pct, 1), "cores": [round(c, 1) for c in cpu_per_core]},
"ram": {
"pct": round(ram_pct, 1),
"used_gb": round(ram_used_gb, 1),
"total_gb": round(ram_total_gb, 1),
},
"storage": {
"pct": round(storage_pct, 1),
"used_gb": round(storage_used_gb, 0),
"total_gb": round(storage_total_gb, 0),
},
"uptime": {
"days": days,
"hours": hours,
"minutes": minutes,
"seconds": uptime_s,
},
"network": {
"iface": iface,
"tx_kbs": round(net_sent_kbs, 1),
"rx_kbs": round(net_recv_kbs, 1),
},
"overview": overview,
"ts": int(time.time()),
}
@lru_cache(maxsize=1)
def _system_overview():
"""Cached because none of this changes during a session."""
# Hostname — read from host /etc/hostname if mounted; fall back to socket
hostname = socket.gethostname()
try:
host_hostname = "/host/etc/hostname"
if os.path.exists(host_hostname):
with open(host_hostname) as f:
hostname = f.read().strip() or hostname
except Exception:
pass
# OS pretty name from host /etc/os-release if mounted
os_pretty = platform.platform()
try:
release_path = (
"/host/etc/os-release"
if os.path.exists("/host/etc/os-release")
else "/etc/os-release"
)
with open(release_path) as f:
for line in f:
if line.startswith("PRETTY_NAME="):
os_pretty = line.split("=", 1)[1].strip().strip('"')
break
except Exception:
pass
# Kernel from host /proc/sys/kernel/osrelease if mounted
kernel = platform.release()
try:
version_path = "/host/proc/sys/kernel/osrelease"
if os.path.exists(version_path):
with open(version_path) as f:
kernel = f.read().strip()
except Exception:
pass
# Docker version via mounted socket
docker_ver = "unknown"
try:
out = subprocess.check_output(
["docker", "--version"], stderr=subprocess.DEVNULL, timeout=2
).decode()
# "Docker version 25.0.5, build ..."
docker_ver = out.split()[2].rstrip(",")
except Exception:
pass
# Timezone
tz = time.tzname[0] if time.tzname else "UTC"
try:
tz_link = (
"/host/etc/localtime"
if os.path.exists("/host/etc/localtime")
else "/etc/localtime"
)
if os.path.islink(tz_link):
tz = os.readlink(tz_link).split("zoneinfo/")[-1]
except Exception:
pass
return {
"hostname": hostname,
"os": os_pretty,
"kernel": kernel,
"docker": docker_ver,
"timezone": tz,
}
# Weather — cached for 10 minutes
_weather_cache = {"ts": 0, "data": None}
@app.get("/api/weather")
async def weather():
global _weather_cache
now = time.time()
if _weather_cache["data"] and (now - _weather_cache["ts"]) < 600:
return _weather_cache["data"]
try:
async with httpx.AsyncClient(timeout=5.0) as client:
# 1. Geolocate via public IP (no key needed)
geo = await client.get("https://ipapi.co/json/")
geo.raise_for_status()
g = geo.json()
lat, lon = g.get("latitude"), g.get("longitude")
city = g.get("city", "Unknown")
# 2. Weather from open-meteo (no key)
wx = await client.get(
"https://api.open-meteo.com/v1/forecast",
params={
"latitude": lat,
"longitude": lon,
"current": "temperature_2m,weather_code,is_day",
"temperature_unit": "fahrenheit",
},
)
wx.raise_for_status()
w = wx.json()["current"]
data = {
"temp_f": round(w["temperature_2m"]),
"code": w["weather_code"],
"is_day": bool(w["is_day"]),
"city": city,
"description": _wcode(w["weather_code"]),
}
_weather_cache = {"ts": now, "data": data}
return data
except Exception as e:
return JSONResponse(
{"error": str(e), "temp_f": None, "description": "Offline"},
status_code=200,
)
def _wcode(code: int) -> str:
"""Open-Meteo WMO weather code → short description."""
m = {
0: "Clear Sky", 1: "Mainly Clear", 2: "Partly Cloudy", 3: "Overcast",
45: "Fog", 48: "Rime Fog",
51: "Light Drizzle", 53: "Drizzle", 55: "Heavy Drizzle",
61: "Light Rain", 63: "Rain", 65: "Heavy Rain",
71: "Light Snow", 73: "Snow", 75: "Heavy Snow",
77: "Snow Grains",
80: "Rain Showers", 81: "Heavy Showers", 82: "Violent Showers",
85: "Snow Showers", 86: "Heavy Snow Showers",
95: "Thunderstorm", 96: "Thunderstorm + Hail", 99: "Severe Thunderstorm",
}
return m.get(code, "Unknown")
# Forgejo activity — public events only, cached 60s
_forge_cache = {"ts": 0, "data": None}
FORGEJO_BASE = os.environ.get("FORGEJO_BASE", "https://gitforge.kitestacks.com").rstrip("/")
FORGEJO_API_BASE = os.environ.get("FORGEJO_API_BASE", FORGEJO_BASE).rstrip("/")
@app.get("/api/activity")
async def activity():
"""Fetch recent public Forgejo events across all public repos."""
global _forge_cache
now = time.time()
if _forge_cache["data"] is not None and (now - _forge_cache["ts"]) < 60:
return _forge_cache["data"]
items = []
try:
async with httpx.AsyncClient(timeout=6.0, follow_redirects=True) as client:
# /repos/search?limit=20&sort=updated → most recently active public repos
r = await client.get(
f"{FORGEJO_API_BASE}/api/v1/repos/search",
params={"limit": 20, "sort": "updated", "order": "desc", "private": "false"},
)
r.raise_for_status()
repos = r.json().get("data", [])
# For each repo, fetch its activity feed (small, fast); take a few
for repo in repos[:8]:
owner = repo.get("owner", {}).get("login")
name = repo.get("name")
if not owner or not name:
continue
if repo.get("private"):
continue
try:
fr = await client.get(
f"{FORGEJO_API_BASE}/api/v1/repos/{owner}/{name}/activities/feeds",
params={"limit": 5},
)
if fr.status_code != 200:
continue
for ev in fr.json():
items.append({
"repo": f"{owner}/{name}",
"repo_url": f"{FORGEJO_BASE}/{owner}/{name}",
"op_type": ev.get("op_type", "unknown"),
"ref_name": ev.get("ref_name", ""),
"comment": (ev.get("content") or "")[:120],
"actor": (ev.get("act_user") or {}).get("login", "unknown"),
"created": ev.get("created"),
})
except Exception:
continue
# Sort by created desc, take top 8
items.sort(key=lambda x: x.get("created", ""), reverse=True)
items = items[:8]
data = {"items": items, "source": "forgejo", "ts": int(now)}
_forge_cache = {"ts": now, "data": data}
return data
except Exception as e:
return JSONResponse(
{"error": str(e), "items": [], "source": "forgejo"},
status_code=200,
)
@app.get("/api/health")
def health():
return {"ok": True}