#!/usr/bin/env python3import timeimport shutilimport reimport sysfrom collections import dequeWINDOWS = [ ("1m", 60), ("5m", 5 * 60), ("15m", 15 * 60), ("60m", 60 * 60), ("720m", 12 * 60 * 60), ("1440m", 24 * 60 * 60),]GREEN = "\033[32m"YELLOW = "\033[33m"RED = "\033[31m"RESET = "\033[0m"ANSI_RE = re.compile(r"\x1b\[[0-9;]*m")def vis_len(s: str) -> int: return len(ANSI_RE.sub("", s))def ljust_ansi(s: str, width: int) -> str: pad = width - vis_len(s) return s + (" " * pad if pad > 0 else "")def rjust_ansi(s: str, width: int) -> str: pad = width - vis_len(s) return (" " * pad if pad > 0 else "") + sdef color(v: float) -> str: if v < 5.0: return GREEN if v <= 15.0: return YELLOW if v > 20.0: return RED return YELLOW # 15..20def read_cpu(): with open("/proc/stat", "r", encoding="utf-8") as f: for line in f: if line.startswith("cpu "): return list(map(int, line.split()[1:11])) raise RuntimeError("cpu line not found in /proc/stat")def steal_pct(prev, cur) -> float: dt = sum(cur) - sum(prev) if dt <= 0: return 0.0 dsteal = cur[7] - prev[7] if dsteal < 0: dsteal = 0 return (dsteal / dt) * 100.0class Roll: def __init__(self, sec: int): self.sec = int(sec) self.q = deque() # (t, v) self.s = 0.0 self._mn = 0.0 self._mx = 0.0 self._dirty = True def push(self, t: float, v: float) -> None: self.q.append((t, v)) self.s += v self._dirty = True cutoff = t - self.sec while self.q and self.q[0][0] < cutoff: _, x = self.q.popleft() self.s -= x self._dirty = True def stats(self): n = len(self.q) if n <= 0: return 0.0, 0.0, 0.0, 0 avg = self.s / n if self._dirty: mn = mx = self.q[0][1] for _, v in self.q: if v < mn: mn = v if v > mx: mx = v self._mn, self._mx = mn, mx self._dirty = False return avg, self._mn, self._mx, ndef clear_screen(): sys.stdout.write("\033[2J\033[H") sys.stdout.flush()def cursor_home(): sys.stdout.write("\033[H")def main(): rolls = {name: Roll(sec) for name, sec in WINDOWS} try: prev = read_cpu() except Exception as e: print(f"ERROR: cannot read /proc/stat: {e}", file=sys.stderr) return 2 clear_screen() try: while True: time.sleep(1) now = time.monotonic() try: cur = read_cpu() except Exception as e: cursor_home() sys.stdout.write(f"ERROR: cannot read /proc/stat: {e}\n") sys.stdout.flush() return 2 steal_now = steal_pct(prev, cur) prev = cur for r in rolls.values(): r.push(now, steal_now) term_w = shutil.get_terminal_size(fallback=(100, 24)).columns w = max(60, term_w) # Column widths (stable, don’t “move”) # WIN | AVG | MIN | MAX | N c_win = 6 c_avg = 10 c_min = 10 c_max = 10 c_n = 8 sep = " " fixed = c_win + c_avg + c_min + c_max + c_n + len(sep) * 4 if fixed > w: # shrink numeric cols if terminal is narrow (keep aligned) shrink = fixed - w for name in ("avg", "min", "max"): if shrink <= 0: break if name == "avg" and c_avg > 8: d = min(shrink, c_avg - 8); c_avg -= d; shrink -= d if name == "min" and c_min > 8: d = min(shrink, c_min - 8); c_min -= d; shrink -= d if name == "max" and c_max > 8: d = min(shrink, c_max - 8); c_max -= d; shrink -= d if shrink > 0 and c_n > 6: d = min(shrink, c_n - 6); c_n -= d; shrink -= d bar = "─" * min(w, 2000) cursor_home() # Header line now_c = color(steal_now) head = f"{now_c}steal_now {steal_now:6.2f}%{RESET}" sys.stdout.write(ljust_ansi(head, w) + "\n") sys.stdout.write(bar[:w] + "\n") # Table header h1 = rjust_ansi("WIN", c_win) h2 = rjust_ansi("AVG", c_avg) h3 = rjust_ansi("MIN", c_min) h4 = rjust_ansi("MAX", c_max) h5 = rjust_ansi("N", c_n) sys.stdout.write(ljust_ansi(f"{h1}{sep}{h2}{sep}{h3}{sep}{h4}{sep}{h5}", w) + "\n") sys.stdout.write(bar[:w] + "\n") # Rows for name, _sec in WINDOWS: avg, mn, mx, n = rolls[name].stats() avg_s = f"{color(avg)}{avg:6.2f}%{RESET}" max_s = f"{color(mx)}{mx:6.2f}%{RESET}" cell_win = rjust_ansi(name, c_win) cell_avg = rjust_ansi(avg_s, c_avg) cell_min = rjust_ansi(f"{mn:6.2f}%", c_min) cell_max = rjust_ansi(max_s, c_max) cell_n = rjust_ansi(str(n), c_n) row = f"{cell_win}{sep}{cell_avg}{sep}{cell_min}{sep}{cell_max}{sep}{cell_n}" sys.stdout.write(ljust_ansi(row, w) + "\n") sys.stdout.write(bar[:w] + "\n") sys.stdout.write(ljust_ansi("Ctrl+C to stop.", w)) sys.stdout.flush() except KeyboardInterrupt: sys.stdout.write(RESET + "\n") sys.stdout.flush() return 0if __name__ == "__main__": raise SystemExit(main())
# CPU Steal Monitor (terminal TUI, 1s sampling)## Что этоСкрипт в реальном времени мониторит **CPU steal%** на Linux (из `/proc/stat`) и показывает:- текущий `steal_now` (за последний 1s интервал),- статистику по окнам: **1m, 5m, 15m, 60m, 720m (12h), 1440m (24h)**: - **AVG** — среднее steal% по окну (подсвечивается цветом), - **MIN** — минимум steal% по окну, - **MAX** — максимум steal% по окну (подсвечивается цветом), - **N** — количество накопленных 1-секундных точек в окне.Вывод обновляется **на месте** (без прокрутки), табличка выровнена и подстраивается под ширину терминала.## Зачем это нужно`steal%` — это доля времени, когда CPU вашей VM/контейнера *хотел* выполняться, но гипервизор не дал CPU (oversubscription/contension на хосте).Рост steal часто проявляется как “всё тормозит”, при этом внутри VM CPU может быть не на 100%.Использование:- диагностика деградаций производительности в VM (KVM/Xen/VMware и т.п.),- проверка “шумных соседей” и перегруза хоста,- быстрый мониторинг по SSH/tmux без внешних зависимостей.## Как работает1. Каждую секунду читает строку `cpu` из `/proc/stat`:cpu user nice system idle iowait irq softirq steal guest guest_nice2. Считает `steal_now` как процент по дельтам за интервал:- `dt = sum(cur) - sum(prev)`- `dsteal = cur[steal] - prev[steal]`- `steal% = (dsteal / dt) * 100`3. Для каждого окна хранит очередь `(timestamp, value)` и сумму значений:- AVG считается быстро: `sum / N`- MIN/MAX пересчитываются лениво (только когда окно менялось)## ЦветаЦвет применяется **отдельно** для AVG и MAX:- `< 5%` — зелёный- `5–15%` — жёлтый- `> 20%` — красный- `15–20%` — жёлтый (предупреждение)Также цветом подсвечивается `steal_now`.> Примечание: цвета — ANSI escape codes. Работает в обычном TTY, SSH, tmux/screen. > В окружениях без ANSI можно увидеть “мусорные” символы — тогда запускать с `TERM` поддерживающим ANSI или убрать цвета.## Требования- Linux с `/proc` (т.е. обычная система/VM/контейнер)- Python 3.8+ (на практике подходит и более ранний 3.x)- Права: достаточно чтения `/proc/stat` (обычно доступно всем)## Запуск```bashchmod +x steal_monitor.py./steal_monitor.py
Возможно, надо думать, зависит от количества жёстких дисков в машине, от способа учёта ресурсов хостингом, и дальше по списку. Можно пробовать VPS на машине с быстрым запоминающим устройством, NVMe, и дальше по списку. Можно улучшать программное обеспечение, чтобы знания кэшировались, и другими способами.
Ещё один знаток хостинга. Поясняю специально для любителей честных CPU ядер, и других знатоков хостинга. Если программа на сайте (web сервер, или другая) обратилась для чтения к механическому жёсткому диску, то для получения знаний придётся ждать, когда читающая головка жёсткого диска будет установлена на дорожку с желаемыми знаниями. А жёсткий диск shared (общий) для всех VPS на машине. И честное CPU ядро будет ждать, когда общий жёсткий диск обслужит чужую виртуальную машину. Без всякого оверселлинга. Похожие задержки есть также при работе с другими общими ресурсами. Поэтому гонять честное CPU ядро на полную мощность может получиться только для чисто вычислительных задач, когда все знания умещаются в кэше процессора. А при обычном наборе действий на хостинге гонять честное CPU ядро на полную мощность не получится.
Вы основ не понимаете. Как машина устроена, и дальше по списку.