""" KiteStacks metrics API. Exposes real host metrics via psutil. Container must be launched with `pid: host` and the relevant host paths mounted so psutil reads the laptop's namespaces rather than the container's. """ import os os.environ.setdefault("PROCFS_PATH", os.environ.get("HOST_PROC", "/proc")) import socket import platform import subprocess import time from functools import lru_cache import httpx import psutil psutil.PROCFS_PATH = os.environ.get("HOST_PROC", "/proc") from fastapi import FastAPI from fastapi.responses import JSONResponse app = FastAPI(title="KiteStacks Metrics API") # Cache last network sample so we can compute deltas (bytes/sec) _last_net = {"ts": time.time(), "sent": 0, "recv": 0, "iface": None} # Skip virtual / container interfaces when auto-detecting the host's nic SKIP_IFACE_PREFIXES = ( "docker", "br-", "veth", "cni", "flannel", "cali", "lo", "tun", "tap", "virbr", "kube", ) def active_interface() -> str | None: """Auto-detect the interface used for the default route on the HOST.""" try: route_path = "/host/proc/net/route" if os.path.exists(route_path): with open(route_path) as f: for line in f.readlines()[1:]: fields = line.strip().split() if len(fields) >= 2 and fields[1] == "00000000": name = fields[0] if not name.startswith(SKIP_IFACE_PREFIXES): return name except Exception: pass # Fallback: first non-virtual interface that is up try: for name, stats in psutil.net_if_stats().items(): if stats.isup and not name.startswith(SKIP_IFACE_PREFIXES): return name except Exception: pass return None @app.get("/api/metrics") def metrics(): global _last_net # CPU — short blocking sample so we don't return 0 on the first call cpu_pct = psutil.cpu_percent(interval=0.3) cpu_per_core = psutil.cpu_percent(interval=None, percpu=True) # RAM vm = psutil.virtual_memory() ram_used_gb = vm.used / (1024 ** 3) ram_total_gb = vm.total / (1024 ** 3) ram_pct = vm.percent # Storage — root filesystem (host root is mounted at /host) root_path = "/host" if os.path.ismount("/host") else "/" du = psutil.disk_usage(root_path) storage_used_gb = du.used / (1024 ** 3) storage_total_gb = du.total / (1024 ** 3) storage_pct = du.percent # Uptime — from host /proc/uptime if available, otherwise psutil uptime_s = None try: host_uptime = "/host/proc/uptime" if os.path.exists(host_uptime): with open(host_uptime) as f: uptime_s = int(float(f.read().split()[0])) except Exception: pass if uptime_s is None: uptime_s = int(time.time() - psutil.boot_time()) days = uptime_s // 86400 hours = (uptime_s % 86400) // 3600 minutes = (uptime_s % 3600) // 60 # Network — active host interface, bytes/sec since last call iface = active_interface() net_sent_kbs = 0.0 net_recv_kbs = 0.0 if iface: counters = psutil.net_io_counters(pernic=True).get(iface) if counters: now = time.time() dt = max(now - _last_net["ts"], 0.001) if _last_net["iface"] == iface and _last_net["sent"] > 0: net_sent_kbs = max(0.0, (counters.bytes_sent - _last_net["sent"]) / dt / 1024) net_recv_kbs = max(0.0, (counters.bytes_recv - _last_net["recv"]) / dt / 1024) _last_net = { "ts": now, "sent": counters.bytes_sent, "recv": counters.bytes_recv, "iface": iface, } overview = _system_overview() return { "cpu": {"pct": round(cpu_pct, 1), "cores": [round(c, 1) for c in cpu_per_core]}, "ram": { "pct": round(ram_pct, 1), "used_gb": round(ram_used_gb, 1), "total_gb": round(ram_total_gb, 1), }, "storage": { "pct": round(storage_pct, 1), "used_gb": round(storage_used_gb, 0), "total_gb": round(storage_total_gb, 0), }, "uptime": { "days": days, "hours": hours, "minutes": minutes, "seconds": uptime_s, }, "network": { "iface": iface, "tx_kbs": round(net_sent_kbs, 1), "rx_kbs": round(net_recv_kbs, 1), }, "overview": overview, "ts": int(time.time()), } @lru_cache(maxsize=1) def _system_overview(): """Cached because none of this changes during a session.""" # Hostname — read from host /etc/hostname if mounted; fall back to socket hostname = socket.gethostname() try: host_hostname = "/host/etc/hostname" if os.path.exists(host_hostname): with open(host_hostname) as f: hostname = f.read().strip() or hostname except Exception: pass # OS pretty name from host /etc/os-release if mounted os_pretty = platform.platform() try: release_path = ( "/host/etc/os-release" if os.path.exists("/host/etc/os-release") else "/etc/os-release" ) with open(release_path) as f: for line in f: if line.startswith("PRETTY_NAME="): os_pretty = line.split("=", 1)[1].strip().strip('"') break except Exception: pass # Kernel from host /proc/sys/kernel/osrelease if mounted kernel = platform.release() try: version_path = "/host/proc/sys/kernel/osrelease" if os.path.exists(version_path): with open(version_path) as f: kernel = f.read().strip() except Exception: pass # Docker version via mounted socket docker_ver = "unknown" try: out = subprocess.check_output( ["docker", "--version"], stderr=subprocess.DEVNULL, timeout=2 ).decode() # "Docker version 25.0.5, build ..." docker_ver = out.split()[2].rstrip(",") except Exception: pass # Timezone tz = time.tzname[0] if time.tzname else "UTC" try: tz_link = ( "/host/etc/localtime" if os.path.exists("/host/etc/localtime") else "/etc/localtime" ) if os.path.islink(tz_link): tz = os.readlink(tz_link).split("zoneinfo/")[-1] except Exception: pass return { "hostname": hostname, "os": os_pretty, "kernel": kernel, "docker": docker_ver, "timezone": tz, } # Weather — cached for 10 minutes _weather_cache = {"ts": 0, "data": None} @app.get("/api/weather") async def weather(): global _weather_cache now = time.time() if _weather_cache["data"] and (now - _weather_cache["ts"]) < 600: return _weather_cache["data"] try: async with httpx.AsyncClient(timeout=5.0) as client: # 1. Geolocate via public IP (no key needed) geo = await client.get("https://ipapi.co/json/") geo.raise_for_status() g = geo.json() lat, lon = g.get("latitude"), g.get("longitude") city = g.get("city", "Unknown") # 2. Weather from open-meteo (no key) wx = await client.get( "https://api.open-meteo.com/v1/forecast", params={ "latitude": lat, "longitude": lon, "current": "temperature_2m,weather_code,is_day", "temperature_unit": "fahrenheit", }, ) wx.raise_for_status() w = wx.json()["current"] data = { "temp_f": round(w["temperature_2m"]), "code": w["weather_code"], "is_day": bool(w["is_day"]), "city": city, "description": _wcode(w["weather_code"]), } _weather_cache = {"ts": now, "data": data} return data except Exception as e: return JSONResponse( {"error": str(e), "temp_f": None, "description": "Offline"}, status_code=200, ) def _wcode(code: int) -> str: """Open-Meteo WMO weather code → short description.""" m = { 0: "Clear Sky", 1: "Mainly Clear", 2: "Partly Cloudy", 3: "Overcast", 45: "Fog", 48: "Rime Fog", 51: "Light Drizzle", 53: "Drizzle", 55: "Heavy Drizzle", 61: "Light Rain", 63: "Rain", 65: "Heavy Rain", 71: "Light Snow", 73: "Snow", 75: "Heavy Snow", 77: "Snow Grains", 80: "Rain Showers", 81: "Heavy Showers", 82: "Violent Showers", 85: "Snow Showers", 86: "Heavy Snow Showers", 95: "Thunderstorm", 96: "Thunderstorm + Hail", 99: "Severe Thunderstorm", } return m.get(code, "Unknown") # Forgejo activity — public events only, cached 60s _forge_cache = {"ts": 0, "data": None} FORGEJO_BASE = os.environ.get("FORGEJO_BASE", "https://gitforge.kitestacks.com").rstrip("/") FORGEJO_API_BASE = os.environ.get("FORGEJO_API_BASE", FORGEJO_BASE).rstrip("/") @app.get("/api/activity") async def activity(): """Fetch recent public Forgejo events across all public repos.""" global _forge_cache now = time.time() if _forge_cache["data"] is not None and (now - _forge_cache["ts"]) < 60: return _forge_cache["data"] items = [] try: async with httpx.AsyncClient(timeout=6.0, follow_redirects=True) as client: # /repos/search?limit=20&sort=updated → most recently active public repos r = await client.get( f"{FORGEJO_API_BASE}/api/v1/repos/search", params={"limit": 20, "sort": "updated", "order": "desc", "private": "false"}, ) r.raise_for_status() repos = r.json().get("data", []) # For each repo, fetch its activity feed (small, fast); take a few for repo in repos[:8]: owner = repo.get("owner", {}).get("login") name = repo.get("name") if not owner or not name: continue if repo.get("private"): continue try: fr = await client.get( f"{FORGEJO_API_BASE}/api/v1/repos/{owner}/{name}/activities/feeds", params={"limit": 5}, ) if fr.status_code != 200: continue for ev in fr.json(): items.append({ "repo": f"{owner}/{name}", "repo_url": f"{FORGEJO_BASE}/{owner}/{name}", "op_type": ev.get("op_type", "unknown"), "ref_name": ev.get("ref_name", ""), "comment": (ev.get("content") or "")[:120], "actor": (ev.get("act_user") or {}).get("login", "unknown"), "created": ev.get("created"), }) except Exception: continue # Sort by created desc, take top 8 items.sort(key=lambda x: x.get("created", ""), reverse=True) items = items[:8] data = {"items": items, "source": "forgejo", "ts": int(now)} _forge_cache = {"ts": now, "data": data} return data except Exception as e: return JSONResponse( {"error": str(e), "items": [], "source": "forgejo"}, status_code=200, ) @app.get("/api/health") def health(): return {"ok": True}