Merge pull request #6 from kuhyx/fix/usage-report-cpu-and-idle-fork-storm

fix: usage-report HZ-as-CPU bug + idle-inhibit fork-storm rewrite
This commit is contained in:
kuhyx 2026-06-04 18:24:59 +02:00 committed by GitHub
commit 87d46180c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 1991 additions and 172 deletions

View File

@ -0,0 +1,15 @@
{
"title": "Fix HZ-as-CPU off-by-one in usage_report and restore the native helper",
"objective": "The usage report must attribute CPU time from atop's utime/stime columns, not its HZ field. Success means short-lived, near-zero-CPU processes (xset, dd, chronyc, sleep) no longer appear in the Top CPU table, real consumers rank correctly, and both the Python parser and the restored native C aggregator agree on the numbers.",
"acceptance_criteria": [
"_parse_prc skips the HZ field; xset/dd/chronyc report ~0 CPU-seconds.",
"The native C helper builds from linux_configuration/C/atop_agg and emits the same CPU figures as the Python path.",
"A regression test feeds a raw PRC line including the HZ field and fails against the pre-fix indices.",
"usage_report.py runs end-to-end and the Top CPU table is led by genuine consumers (e.g. SkyrimSE)."
],
"out_of_scope": [
"Rewriting the digital_wellbeing polling daemons that generate the fork storm.",
"Achieving 100% line coverage of the C helper's defensive OOM/hash-full paths."
],
"verifier": "python3 usage_report.py --date 20260604; make test in linux_configuration/C/atop_agg; python3 -m pytest linux_configuration/tests/test_usage_report_since.py"
}

View File

@ -0,0 +1,38 @@
{
"intent": "Eliminate the fork storm from turn_off_auto_idle_screen_shutdown.sh's controller watcher. Previously, while a game controller was connected, each joystick event forked 4 xset + 1 xdotool + a dd read + a sleep (~21 forks/s during gaming). The session must still be kept awake while a controller is connected, but with no per-event forks.",
"scope": [
"linux_configuration/scripts/single_use/utils/turn_off_auto_idle_screen_shutdown.sh",
"Non-goal: the one-shot idle-disable steps (xset/gsettings at startup) are unchanged",
"Non-goal: external chronyc forks (~1/s) which originate outside this repo"
],
"changes": [
"Replaced reset_idle_activity + watch_js_device + the polling start_controller_watchers with a single long-lived `systemd-inhibit --what=idle:sleep` lock held only while a /dev/input/js* device is present.",
"Controller presence is re-evaluated on udev input add/remove events (event-driven, no polling), with a 30 s presence-poll fallback when udevadm is absent.",
"Cleanup hardened: EXIT plus INT/TERM traps release the inhibitor on any termination path; presence check is a pure-bash glob (zero forks)."
],
"verification": [
{
"command": "bash turn_off_auto_idle_screen_shutdown.sh --watch-controller (with js0 connected)",
"result": "pass",
"evidence": "systemd-inhibit --list shows exactly one 'game controller connected' lock; the watcher subtree stays {systemd-inhibit, udevadm} with zero dd/xset/xdotool over a 3 s sample, versus the old watcher still churning dd."
},
{
"command": "kill -TERM <watcher>; systemd-inhibit --list",
"result": "pass",
"evidence": "Watcher exits cleanly, 0 inhibitors remain, no orphaned systemd-inhibit reparented to init."
},
{
"command": "bash -n + shellcheck",
"result": "pass",
"evidence": "syntax OK; shellcheck clean."
}
],
"risks": [
"Semantics changed from 'awake only during active controller input' to 'awake while a controller is connected'; a permanently-plugged controller will keep the session from auto-idling.",
"If the holding process is SIGKILLed outside a systemd cgroup (e.g. i3 crash), the inhibitor lingers until session end; EXIT trap covers normal termination."
],
"rollback": [
"git checkout the script to restore the previous controller watcher.",
"Re-run with --watch-controller and confirm whether xset/dd churn returns."
]
}

View File

@ -0,0 +1,40 @@
{
"intent": "Stop the usage report from charging atop's per-record HZ field as CPU time, which made short-lived processes (xset, dd, chronyc, sleep) appear as the top CPU consumers (xset reported 67h of CPU in a 5h40m window). After the fix the CPU table reflects real consumers (SkyrimSE, zstd, the video-capture pipeline) and the fork storm shows only in the accurate PID-count column.",
"scope": [
"linux_configuration/scripts/periodic_background/system-maintenance/bin/_usage_report_parsing.py",
"linux_configuration/C/atop_agg/ (restored native helper with the same fix)",
"linux_configuration/tests/test_usage_report_since.py (regression tests)",
"Non-goal: rewriting the digital_wellbeing daemons that cause the fork storm"
],
"changes": [
"_parse_prc now reads utime/stime at after+2/after+3, skipping atop's HZ field that sits between state and utime; bumped _PRC_MIN_LEN 11 to 12.",
"_atop_agg_binary returns None (Python fallback) when the C source tree is absent, instead of trusting an orphaned cached binary; removed the stale ~/.cache/usage_report/atop_agg.",
"Restored C/atop_agg from git history into linux_configuration/C/atop_agg with the identical HZ-skip fix (tokens[10]/[11]), guard bumped to n<12, redundant PRM length check removed, and test fixtures corrected to include the HZ field.",
"Added Python regression tests asserting HZ is not summed as CPU and that a missing C source falls back to Python."
],
"verification": [
{
"command": "python3 usage_report.py --date 20260604 --no-clipboard --quiet",
"result": "pass",
"evidence": "Top CPU now led by SkyrimSE.exe 933s; xset/dd/chronyc dropped out entirely (real CPU ~0). Cross-checked against atop directly with corrected field indices."
},
{
"command": "make test (linux_configuration/C/atop_agg)",
"result": "pass",
"evidence": "atop_agg tests: OK. Rebuilt binary emits xset cpu_ticks=0 vs 24427000 before."
},
{
"command": "python3 -m pytest test_usage_report_since.py -k 'parse_prc or atop_agg_binary'",
"result": "pass",
"evidence": "4 passed. Buggy indices would yield 107 ticks vs the asserted 10, so the regression test fails against the old code."
}
],
"risks": [
"Native fast path needs a C compiler; without cc the report now falls back to the (slower) Python parser rather than a stale binary.",
"C helper coverage remains below 100% on defensive OOM/hash-full paths (pre-existing; the suite is not coverage-gated for linux_configuration)."
],
"rollback": [
"git checkout the parsing module and remove linux_configuration/C/atop_agg to revert.",
"Re-run usage_report.py --date 20260604 and confirm whether xset reappears with inflated CPU."
]
}

View File

@ -0,0 +1,9 @@
# Build artifacts — atop_agg is rebuilt locally (and into ~/.cache/usage_report
# by usage_report.py); never commit the compiled binary or coverage output.
atop_agg
test_atop_agg
*.o
*.gcda
*.gcno
coverage.info
coverage_html/

View File

@ -0,0 +1,33 @@
CC := gcc
CFLAGS := -O2 -std=c11 -D_POSIX_C_SOURCE=200809L -Wall -Wextra -Wno-unused-parameter
COV := -O0 -g --coverage -std=c11 -D_POSIX_C_SOURCE=200809L -Wall -Wextra -Wno-unused-parameter -DATOP_AGG_NO_MAIN
SRC := atop_agg.c
HDR := atop_agg.h
BIN := atop_agg
.PHONY: all clean rebuild test coverage
all: $(BIN)
$(BIN): $(SRC) $(HDR)
$(CC) $(CFLAGS) -o $@ $(SRC)
test_atop_agg: test_atop_agg.c atop_agg.c atop_agg.h
$(CC) $(COV) -o test_atop_agg test_atop_agg.c atop_agg.c
test: test_atop_agg
./test_atop_agg
coverage: test_atop_agg
./test_atop_agg
lcov --capture --directory . --output-file coverage.info --no-external
lcov --remove coverage.info '*/test_atop_agg.c' --output-file coverage.info
genhtml coverage.info --output-directory coverage_html
@echo "Coverage report at coverage_html/index.html"
clean:
rm -f $(BIN) test_atop_agg *.o *.gcda *.gcno coverage.info
rm -rf coverage_html
rebuild: clean all

View File

@ -0,0 +1,478 @@
/*
* atop_agg fast per-PID aggregator for `atop -P PRC,PRM` output.
*
* Reads atop parseable output on stdin, folds it into per-PID CPU-tick
* and RSS trackers, and prints a compact TSV summary on stdout that a
* higher-level driver (Python) then name-folds into human-readable
* tables. This avoids the ~3s Python parse cost on a typical day's
* 1.7M-line atop dump; the C hot loop completes in well under a second
* so the pipeline runs at atop's own ~2s wall-clock floor.
*
* Output TSV lines:
* W<TAB>start_epoch<TAB>end_epoch<TAB>distinct_samples<TAB>median_interval
* C<TAB>pid<TAB>name<TAB>delta_ticks
* R<TAB>pid<TAB>name<TAB>peak_kb<TAB>sum_kb<TAB>samples
*/
#include "atop_agg.h"
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
/*
* A real-world day of atop on a dev box can see >700k distinct PIDs
* because every short-lived compiler/shell subprocess gets a fresh ID.
* 2M slots keeps the load factor below ~40% for that workload, keeping
* linear-probe chains short without dynamic resizing.
*/
#define HASH_CAP_BITS 21
#define HASH_CAP (1u << HASH_CAP_BITS)
#define HASH_MASK (HASH_CAP - 1u)
#define MAX_EPOCHS 4096
#define MAX_TOKENS 64
/* Knuth multiplicative hash → index in an open-addressed table. */
static unsigned int hash_pid(int pid)
{
unsigned int k = (unsigned int)pid;
return (k * 2654435761u) >> (32 - HASH_CAP_BITS);
}
static PidCpu *cpu_slot(State *s, int pid)
{
unsigned int h = hash_pid(pid);
for (unsigned int probes = 0; probes < HASH_CAP; probes++, h++)
{
PidCpu *slot = &s->cpu[h & HASH_MASK];
if (slot->pid == pid)
{
return slot;
}
if (slot->pid == 0)
{
slot->pid = pid;
slot->first_ticks = -1;
slot->last_ticks = 0;
slot->samples = 0;
slot->name[0] = '\0';
return slot;
}
}
/* Table full — drop the sample rather than loop forever. */
return NULL;
}
static PidRam *ram_slot(State *s, int pid)
{
unsigned int h = hash_pid(pid);
for (unsigned int probes = 0; probes < HASH_CAP; probes++, h++)
{
PidRam *slot = &s->ram[h & HASH_MASK];
if (slot->pid == pid)
{
return slot;
}
if (slot->pid == 0)
{
slot->pid = pid;
slot->peak_kb = 0;
slot->sum_kb = 0;
slot->samples = 0;
slot->name[0] = '\0';
return slot;
}
}
return NULL;
}
static void add_epoch(State *s, long epoch)
{
/* Linear scan — there are only a few dozen distinct epochs per log. */
for (int i = 0; i < s->n_epochs; i++)
{
if (s->epochs[i] == epoch)
{
return;
}
}
if (s->n_epochs < MAX_EPOCHS)
{
s->epochs[s->n_epochs++] = epoch;
}
}
/*
* Tokenise a whitespace-separated line in place. Fills *tokens* with
* pointers into *line* and returns the token count. A process name
* wrapped in parentheses is rejoined into a single token with spaces
* preserved (atop emits `(Web Content)` as three whitespace-split
* tokens, which we merge back).
*/
int tokenize_line(char *line, char **tokens, int max_tokens)
{
int n = 0;
char *p = line;
while (*p && n < max_tokens)
{
while (*p == ' ' || *p == '\t')
{
p++;
}
if (!*p || *p == '\n')
{
break;
}
char *start = p;
if (*p == '(')
{
/* Consume through the matching ')', preserving interior spaces. */
while (*p && *p != ')')
{
p++;
}
if (*p == ')')
{
p++;
}
}
else
{
while (*p && *p != ' ' && *p != '\t' && *p != '\n')
{
p++;
}
}
if (*p)
{
*p = '\0';
p++;
}
tokens[n++] = start;
}
return n;
}
/*
* Copy *src* into *dst* (capacity *cap*), stripping a leading '(' and
* trailing ')' if both are present. Always null-terminates. If the
* resulting name is empty, writes "unknown".
*/
void copy_name(char *dst, size_t cap, const char *src)
{
size_t len = strlen(src);
size_t start = 0;
if (len >= 2 && src[0] == '(' && src[len - 1] == ')')
{
start = 1;
len -= 2;
}
if (len == 0)
{
const char *fallback = "unknown";
size_t flen = strlen(fallback);
if (flen >= cap)
{
flen = cap - 1;
}
memcpy(dst, fallback, flen);
dst[flen] = '\0';
return;
}
if (len >= cap)
{
len = cap - 1;
}
memcpy(dst, src + start, len);
dst[len] = '\0';
}
/*
* Parse one PRC/PRM line and update *s*. Unknown labels and malformed
* records are silently skipped (atop emits a stable schema, but guard
* against future changes and header/separator lines).
*/
void process_line(char *line, State *s)
{
char *tokens[MAX_TOKENS];
int n = tokenize_line(line, tokens, MAX_TOKENS);
/* Both PRC and PRM need >= 12 fields: the 6-field generic prefix, pid,
(name), state, atop's per-label extra field (HZ for PRC / pagesize for
PRM), then the first data column we read at index 10/11. */
if (n < 12)
{
return;
}
const char *label = tokens[0];
int is_prc = (label[0] == 'P' && label[1] == 'R' && label[2] == 'C' && label[3] == '\0');
int is_prm = (label[0] == 'P' && label[1] == 'R' && label[2] == 'M' && label[3] == '\0');
if (!is_prc && !is_prm)
{
return;
}
long epoch = strtol(tokens[2], NULL, 10);
int pid = (int)strtol(tokens[6], NULL, 10);
if (pid <= 0)
{
return;
}
const char *name_tok = tokens[7];
if (is_prc)
{
/* atop inserts its clock-tick rate (HZ) at tokens[9], between the
state field and utime/stime, so the CPU columns live at [10]/[11].
Reading [9] charged a constant HZ (100) as CPU to every record
the bug this fixes. */
long utime = strtol(tokens[10], NULL, 10);
long stime = strtol(tokens[11], NULL, 10);
long ticks = utime + stime;
add_epoch(s, epoch);
PidCpu *slot = cpu_slot(s, pid);
if (slot == NULL)
{
return;
}
if (slot->first_ticks < 0)
{
slot->first_ticks = ticks;
}
slot->last_ticks = ticks;
slot->samples++;
copy_name(slot->name, sizeof(slot->name), name_tok);
return;
}
/* PRM: rsize_kb sits at tokens[11] (after state, pagesize, vsize); the
n < 12 length guard at the top already guarantees it is present. */
long rsize_kb = strtol(tokens[11], NULL, 10);
PidRam *slot = ram_slot(s, pid);
if (slot == NULL)
{
return;
}
if (rsize_kb > slot->peak_kb)
{
slot->peak_kb = rsize_kb;
}
slot->sum_kb += rsize_kb;
slot->samples++;
copy_name(slot->name, sizeof(slot->name), name_tok);
}
static int cmp_long(const void *a, const void *b)
{
long la = *(const long *)a;
long lb = *(const long *)b;
if (la < lb)
{
return -1;
}
if (la > lb)
{
return 1;
}
return 0;
}
/* FNV-1a 32-bit over a NUL-terminated string; used to key the name table. */
static unsigned int fnv1a(const char *s)
{
unsigned int h = 2166136261u;
while (*s)
{
h ^= (unsigned char)*s++;
h *= 16777619u;
}
return h;
}
/*
* Per-name aggregate, built in a second pass over cpu/ram tables so that
* the caller only has to parse a few thousand output rows instead of one
* row per PID. The name table is deliberately oversized (64k slots for an
* expected few-thousand names) to keep linear-probe chains short.
*/
#define NAME_CAP_BITS 16
#define NAME_CAP (1u << NAME_CAP_BITS)
#define NAME_MASK (NAME_CAP - 1u)
typedef struct
{
char name[ATOP_AGG_NAME_MAX];
long cpu_ticks;
int cpu_pids;
long peak_kb;
long sum_avg_kb;
int rss_samples;
int ram_pids;
char used;
} NameAgg;
static NameAgg *name_slot(NameAgg *table, const char *name)
{
unsigned int h = fnv1a(name);
for (unsigned int probes = 0; probes < NAME_CAP; probes++, h++)
{
NameAgg *slot = &table[h & NAME_MASK];
if (!slot->used)
{
slot->used = 1;
/* copy_name already enforced \0-termination on the source. */
size_t i = 0;
while (name[i] && i + 1 < sizeof(slot->name))
{
slot->name[i] = name[i];
i++;
}
slot->name[i] = '\0';
return slot;
}
if (strcmp(slot->name, name) == 0)
{
return slot;
}
}
return NULL;
}
/* Write the aggregated summary to *out* in the documented TSV schema. */
void emit_results(State *s, FILE *out)
{
long start_epoch = 0;
long end_epoch = 0;
long median_interval = 0;
if (s->n_epochs > 0)
{
qsort(s->epochs, (size_t)s->n_epochs, sizeof(long), cmp_long);
start_epoch = s->epochs[0];
end_epoch = s->epochs[s->n_epochs - 1];
if (s->n_epochs >= 2)
{
long deltas[MAX_EPOCHS];
for (int i = 0; i < s->n_epochs - 1; i++)
{
deltas[i] = s->epochs[i + 1] - s->epochs[i];
}
qsort(deltas, (size_t)(s->n_epochs - 1), sizeof(long), cmp_long);
median_interval = deltas[(s->n_epochs - 1) / 2];
}
}
fprintf(out, "W\t%ld\t%ld\t%d\t%ld\n", start_epoch, end_epoch, s->n_epochs, median_interval);
NameAgg *names = calloc(NAME_CAP, sizeof(NameAgg));
if (!names)
{
return;
}
for (unsigned int i = 0; i < HASH_CAP; i++)
{
PidCpu *slot = &s->cpu[i];
if (slot->pid == 0)
{
continue;
}
long delta = slot->last_ticks;
if (slot->samples >= 2)
{
delta = slot->last_ticks - slot->first_ticks;
if (delta < 0)
{
delta = 0;
}
}
NameAgg *na = name_slot(names, slot->name);
if (!na)
{
continue;
}
na->cpu_ticks += delta;
na->cpu_pids++;
}
for (unsigned int i = 0; i < HASH_CAP; i++)
{
PidRam *slot = &s->ram[i];
if (slot->pid == 0)
{
continue;
}
long avg_kb = slot->samples ? slot->sum_kb / slot->samples : 0;
NameAgg *na = name_slot(names, slot->name);
if (!na)
{
continue;
}
if (slot->peak_kb > na->peak_kb)
{
na->peak_kb = slot->peak_kb;
}
na->sum_avg_kb += avg_kb;
na->rss_samples++;
na->ram_pids++;
}
for (unsigned int i = 0; i < NAME_CAP; i++)
{
NameAgg *na = &names[i];
if (!na->used)
{
continue;
}
int pids = na->cpu_pids > na->ram_pids ? na->cpu_pids : na->ram_pids;
fprintf(out, "N\t%s\t%ld\t%ld\t%ld\t%d\t%d\n", na->name, na->cpu_ticks, na->peak_kb,
na->sum_avg_kb, na->rss_samples, pids);
}
free(names);
}
State *state_new(void)
{
State *s = calloc(1, sizeof(State));
if (!s)
{
return NULL;
}
s->cpu = calloc(HASH_CAP, sizeof(PidCpu));
s->ram = calloc(HASH_CAP, sizeof(PidRam));
s->epochs = calloc(MAX_EPOCHS, sizeof(long));
if (!s->cpu || !s->ram || !s->epochs)
{
state_free(s);
return NULL;
}
s->n_epochs = 0;
return s;
}
void state_free(State *s)
{
if (!s)
{
return;
}
free(s->cpu);
free(s->ram);
free(s->epochs);
free(s);
}
#ifndef ATOP_AGG_NO_MAIN
int main(void)
{
State *s = state_new();
if (!s)
{
fprintf(stderr, "atop_agg: out of memory\n");
return 1;
}
char *line = NULL;
size_t cap = 0;
ssize_t got;
while ((got = getline(&line, &cap, stdin)) != -1)
{
process_line(line, s);
}
free(line);
emit_results(s, stdout);
state_free(s);
return 0;
}
#endif

View File

@ -0,0 +1,42 @@
#ifndef ATOP_AGG_H
#define ATOP_AGG_H
#include <stdio.h>
/* NAME_MAX capped to keep slot size compact; typical atop comm is 15 chars. */
#define ATOP_AGG_NAME_MAX 40
typedef struct
{
int pid;
char name[ATOP_AGG_NAME_MAX];
long first_ticks;
long last_ticks;
int samples;
} PidCpu;
typedef struct
{
int pid;
char name[ATOP_AGG_NAME_MAX];
long peak_kb;
long sum_kb;
int samples;
} PidRam;
typedef struct
{
PidCpu *cpu;
PidRam *ram;
long *epochs;
int n_epochs;
} State;
State *state_new(void);
void state_free(State *s);
int tokenize_line(char *line, char **tokens, int max_tokens);
void copy_name(char *dst, size_t cap, const char *src);
void process_line(char *line, State *s);
void emit_results(State *s, FILE *out);
#endif

View File

@ -0,0 +1,12 @@
#!/usr/bin/env bash
# Build and demo atop_agg on today's atop log.
set -euo pipefail
cd "$(dirname "$0")"
make
LOG="${1:-/var/log/atop/atop_$(date +%Y%m%d)}"
if [[ ! -f "$LOG" ]]; then
echo "No atop log at $LOG; pass a path as arg 1." >&2
exit 1
fi
echo "Aggregating $LOG ..." >&2
atop -r "$LOG" -P PRC,PRM | ./atop_agg | head -20

View File

@ -0,0 +1,229 @@
/*
* Unit tests for atop_agg helpers. Compiled with --coverage; aims for
* 100% line coverage of atop_agg.c (excluding main, which is guarded
* by -DATOP_AGG_NO_MAIN).
*/
#include "atop_agg.h"
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
static int failures = 0;
#define CHECK(cond) \
do \
{ \
if (!(cond)) \
{ \
fprintf(stderr, "FAIL %s:%d: %s\n", __FILE__, __LINE__, #cond); \
failures++; \
} \
} while (0)
static void test_copy_name(void)
{
char buf[16];
copy_name(buf, sizeof(buf), "(bash)");
CHECK(strcmp(buf, "bash") == 0);
copy_name(buf, sizeof(buf), "bash");
CHECK(strcmp(buf, "bash") == 0);
copy_name(buf, sizeof(buf), "()");
CHECK(strcmp(buf, "unknown") == 0);
copy_name(buf, sizeof(buf), "");
CHECK(strcmp(buf, "unknown") == 0);
/* Truncation. */
copy_name(buf, sizeof(buf), "(veryverylongnameabc)");
CHECK(strlen(buf) == sizeof(buf) - 1);
/* Fallback truncation: buf too small for "unknown" itself. */
char tiny[4];
copy_name(tiny, sizeof(tiny), "");
CHECK(strcmp(tiny, "unk") == 0);
}
static void test_tokenize(void)
{
char line[] = "PRC host 1000 2026/01/01 12:00:00 600 123 (bash) S 100 10 20\n";
char *toks[32];
int n = tokenize_line(line, toks, 32);
CHECK(n == 12);
CHECK(strcmp(toks[0], "PRC") == 0);
CHECK(strcmp(toks[7], "(bash)") == 0);
CHECK(strcmp(toks[9], "100") == 0); /* HZ field atop inserts before utime */
CHECK(strcmp(toks[10], "10") == 0); /* utime */
CHECK(strcmp(toks[11], "20") == 0); /* stime */
/* Multi-word parenthesised name. */
char line2[] = "PRM host 1000 d t 600 200 (Web Content) S 4096 1 2 0 0\n";
char *t2[32];
int n2 = tokenize_line(line2, t2, 32);
CHECK(n2 >= 12);
CHECK(strncmp(t2[7], "(Web Content)", 13) == 0);
/* Empty / whitespace-only line. */
char empty[] = " \n";
char *t3[4];
CHECK(tokenize_line(empty, t3, 4) == 0);
/* Max-tokens cap respected. */
char big[] = "a b c d e f g h i j k";
char *t4[3];
CHECK(tokenize_line(big, t4, 3) == 3);
/* Unclosed paren at EOL — consumed to end. */
char unclosed[] = "(abc";
char *t5[2];
int n5 = tokenize_line(unclosed, t5, 2);
CHECK(n5 == 1);
CHECK(strcmp(t5[0], "(abc") == 0);
}
static void test_process_and_emit(void)
{
State *s = state_new();
assert(s != NULL);
/* Two PRC samples for PID 100: first utime+stime=30, last=100.
Delta should be 70. The "100" after the state is atop's HZ field. */
char prc1[] = "PRC h 1000 d t 600 100 (cc1) S 100 10 20\n";
char prc2[] = "PRC h 1600 d t 600 100 (cc1) S 100 70 30\n";
process_line(prc1, s);
process_line(prc2, s);
/* One PRM sample for PID 100: rss=4096 kB. */
char prm1[] = "PRM h 1000 d t 600 100 (cc1) S 4096 100 4096 0 0\n";
process_line(prm1, s);
/* PRC sample for PID 200 seen only once → delta == last_ticks. */
char prc3[] = "PRC h 1000 d t 600 200 (short) S 100 5 5\n";
process_line(prc3, s);
/* Header / separator / unknown label should be ignored. */
char header[] = "# comment line\n";
process_line(header, s);
char sep[] = "SEP\n";
process_line(sep, s);
char other[] = "CPU h 1000 d t 600 0 0 0 0 0 0 0 0\n";
process_line(other, s);
/* Malformed: pid <= 0 (12 tokens so it passes the length guard and
actually reaches the pid check). */
char bad_pid[] = "PRC h 1000 d t 600 0 (x) S 100 1 1\n";
process_line(bad_pid, s);
/* PRC short (< 12 tokens) should hit the shared length guard, not crash. */
char prc_short[] = "PRC h 1000 d t 600 300 (y) S 1\n";
process_line(prc_short, s);
/* PRM short (< 12 tokens) hits the same shared length guard. */
char prm_short[] = "PRM h 1000 d t 600 300 (y) S 4096 1\n";
process_line(prm_short, s);
/* Emit and sanity-check the output. */
char *buf = NULL;
size_t sz = 0;
FILE *out = open_memstream(&buf, &sz);
assert(out != NULL);
emit_results(s, out);
fclose(out);
CHECK(strstr(buf, "W\t1000\t1600\t2\t600\n") != NULL);
/* cc1: cpu delta 70 (pid 100 two samples) + 0 pids column via max(cpu,ram).
Peak RSS 4096, sum_avg 4096, rss_samples 1, pids max(1,1)=1. */
CHECK(strstr(buf, "N\tcc1\t70\t4096\t4096\t1\t1\n") != NULL);
/* short: single-sample pid 200 → delta == 10; no RAM, so peak/sum/rss=0. */
CHECK(strstr(buf, "N\tshort\t10\t0\t0\t0\t1\n") != NULL);
free(buf);
state_free(s);
}
static void test_empty_and_single_epoch(void)
{
State *s = state_new();
/* No input at all → window line with zeroes. */
char *buf = NULL;
size_t sz = 0;
FILE *out = open_memstream(&buf, &sz);
emit_results(s, out);
fclose(out);
CHECK(strstr(buf, "W\t0\t0\t0\t0\n") != NULL);
free(buf);
state_free(s);
/* Exactly one epoch → median interval stays 0. */
s = state_new();
char prc[] = "PRC h 500 d t 600 50 (a) S 100 1 1\n";
process_line(prc, s);
buf = NULL;
sz = 0;
out = open_memstream(&buf, &sz);
emit_results(s, out);
fclose(out);
CHECK(strstr(buf, "W\t500\t500\t1\t0\n") != NULL);
free(buf);
state_free(s);
}
static void test_delta_clamped_to_zero(void)
{
/* Counter reset: last < first → delta must clamp to 0. */
State *s = state_new();
char a[] = "PRC h 100 d t 600 77 (x) S 100 50 50\n";
char b[] = "PRC h 700 d t 600 77 (x) S 100 10 10\n";
process_line(a, s);
process_line(b, s);
char *buf = NULL;
size_t sz = 0;
FILE *out = open_memstream(&buf, &sz);
emit_results(s, out);
fclose(out);
CHECK(strstr(buf, "N\tx\t0\t") != NULL);
free(buf);
state_free(s);
}
static void test_hash_collision(void)
{
/* Force two PIDs into adjacent slots (Knuth hash rarely collides on
small integers, but we sweep a range to exercise the linear-probe
branch). */
State *s = state_new();
for (int pid = 1; pid <= 2000; pid++)
{
char line[128];
snprintf(line, sizeof(line), "PRC h 1000 d t 600 %d (p) S 100 1 1\n", pid);
process_line(line, s);
snprintf(line, sizeof(line), "PRM h 1000 d t 600 %d (p) S 4096 1 1 0 0\n", pid);
process_line(line, s);
}
state_free(s);
}
static void test_state_free_null(void)
{
/* Freeing NULL must be safe. */
state_free(NULL);
}
int main(void)
{
test_copy_name();
test_tokenize();
test_process_and_emit();
test_empty_and_single_epoch();
test_delta_clamped_to_zero();
test_hash_collision();
test_state_free_null();
if (failures > 0)
{
fprintf(stderr, "%d test failures\n", failures);
return 1;
}
printf("atop_agg tests: OK\n");
return 0;
}

View File

@ -25,10 +25,13 @@ from _usage_report_types import (
# atop parseable output layout (atop 2.x, same on Arch/Debian/Ubuntu):
# 0 label, 1 host, 2 epoch, 3 YYYY/MM/DD, 4 HH:MM:SS, 5 interval_s,
# then per-process fields starting at index 6.
# PRC per-proc: pid name(parens) state utime_ticks stime_ticks ...
# PRC per-proc: pid name(parens) state HZ utime_ticks stime_ticks ...
# NOTE: atop inserts its clock-tick rate (HZ) between `state` and `utime`
# (the PRC analogue of the pagesize field PRM inserts before its memory
# columns); utime/stime therefore live two and three slots past `state`.
_PRC_PID_IDX = 6
_PRC_NAME_IDX = 7
_PRC_MIN_LEN = 11
_PRC_MIN_LEN = 12
# PRM per-proc: pid name state pagesz_b vsize_kb rsize_kb ...
_PRM_PID_IDX = 6
_PRM_NAME_IDX = 7
@ -61,13 +64,39 @@ def _run(cmd: list[str]) -> str:
return proc.stdout
def _iter_atop_lines(log: Path, labels: str) -> Iterator[str]:
def _atop_read_cmd(
log: Path,
labels: str,
begin: str | None,
end: str | None,
) -> list[str]:
"""Build an `atop -r` command, optionally bounded by begin/end times.
*begin*/*end* are atop `-b`/`-e` arguments (`[YYYYMMDD]hhmm[ss]`) used to
restrict replay to a sub-window of the day's log, so a "since last report"
run does not double-count the part of the first day already reported.
"""
cmd = ["atop", "-r", str(log)]
if begin is not None:
cmd += ["-b", begin]
if end is not None:
cmd += ["-e", end]
cmd += ["-P", labels]
return cmd
def _iter_atop_lines(
log: Path,
labels: str,
begin: str | None = None,
end: str | None = None,
) -> Iterator[str]:
"""Stream `atop -r LOG -P LABELS` stdout line-by-line.
Uses `Popen` so the report can show progress while atop is still
decoding its binary log, rather than buffering the whole output.
"""
cmd = ["atop", "-r", str(log), "-P", labels]
cmd = _atop_read_cmd(log, labels, begin, end)
with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
@ -112,10 +141,13 @@ def _parse_prc(parts: list[str], pid_cpu: dict[int, _PidCpu]) -> None:
except (ValueError, IndexError):
return
name, after = _parse_name(parts, _PRC_NAME_IDX)
# After name comes: state utime stime ...
# After name comes: state HZ utime stime ... (HZ is atop's clock-tick
# rate; skipping it is what keeps a constant 100 from being charged as
# CPU to every record — the bug that made cpu-seconds collapse to PID
# count for short-lived processes).
try:
utime = int(parts[after + 1])
stime = int(parts[after + 2])
utime = int(parts[after + 2])
stime = int(parts[after + 3])
except (ValueError, IndexError):
return
pid_cpu.setdefault(pid, _PidCpu()).observe(name, utime + stime)
@ -153,6 +185,8 @@ def _window_from_epochs(epochs: set[int]) -> _Window:
distinct_samples=len(ordered),
interval_s=interval,
seconds=ordered[-1] - ordered[0],
start_epoch=ordered[0],
end_epoch=ordered[-1],
)
@ -163,12 +197,18 @@ def _atop_agg_binary() -> Path | None:
is unavailable, in which case callers use the pure-Python parser.
"""
src_c = _ATOP_AGG_SRC_DIR / "atop_agg.c"
if _ATOP_AGG_CACHE_BIN.exists() and (
not src_c.exists()
or src_c.stat().st_mtime <= _ATOP_AGG_CACHE_BIN.stat().st_mtime
if not src_c.exists():
# Source tree is gone (relocated/extracted): never trust an orphaned
# cached binary whose provenance we can no longer verify against
# source — a stale build can silently carry parsing bugs. Fall back to
# the pure-Python parser instead.
return None
if (
_ATOP_AGG_CACHE_BIN.exists()
and src_c.stat().st_mtime <= _ATOP_AGG_CACHE_BIN.stat().st_mtime
):
return _ATOP_AGG_CACHE_BIN
if not src_c.exists() or shutil.which("cc") is None:
if shutil.which("cc") is None:
return None
_ATOP_AGG_CACHE_BIN.parent.mkdir(parents=True, exist_ok=True)
make_cmd = ["make", "-s", "-C", str(_ATOP_AGG_SRC_DIR), "atop_agg"]
@ -218,6 +258,8 @@ def _window_from_native(parts: list[str]) -> _Window:
distinct_samples=n_epochs,
interval_s=int(interval_s),
seconds=end_epoch - start_epoch,
start_epoch=start_epoch,
end_epoch=end_epoch,
)
@ -225,12 +267,14 @@ def _aggregate_atop_native(
log: Path,
progress: _Progress,
binary: Path,
begin: str | None = None,
end: str | None = None,
) -> tuple[dict[str, ProcAgg], _Window]:
"""Aggregate via `atop | atop_agg`; return `(by_name, window)`."""
progress.start_stage("atop: parse PRC+PRM (native)")
agg_map: dict[str, ProcAgg] = {}
window = _Window()
atop_cmd = ["atop", "-r", str(log), "-P", "PRC,PRM"]
atop_cmd = _atop_read_cmd(log, "PRC,PRM", begin, end)
agg_cmd = [str(binary)]
with (
subprocess.Popen(
@ -265,16 +309,21 @@ def _aggregate_atop_native(
def aggregate_atop(
log: Path,
progress: _Progress,
begin: str | None = None,
end: str | None = None,
) -> tuple[dict[str, ProcAgg], _Window]:
"""Stream PRC+PRM records, fold them into `{name: ProcAgg}`, return window.
Prefers the native `atop_agg` C helper (auto-built into
``~/.cache/usage_report/``) for ~7x speedup on full-day logs, falling
back to an inline Python parser when the helper is unavailable.
*begin*/*end* are optional atop `-b`/`-e` arguments that bound replay to a
sub-window of the day's log (used by the "since last report" mode).
"""
binary = _atop_agg_binary()
if binary is not None:
return _aggregate_atop_native(log, progress, binary)
return _aggregate_atop_native(log, progress, binary, begin, end)
progress.start_stage("atop: parse PRC+PRM")
pid_cpu: dict[int, _PidCpu] = {}
pid_ram: dict[int, _PidRam] = {}
@ -285,7 +334,7 @@ def aggregate_atop(
# 10-min-interval log. The fraction is only used for the progress bar,
# so a rough calibration is fine; it caps at 99% if we underestimate.
est_total_bytes = log_size * 11 or 1
for raw in _iter_atop_lines(log, "PRC,PRM"):
for raw in _iter_atop_lines(log, "PRC,PRM", begin, end):
bytes_seen += len(raw) + 1
if not raw or raw[0] == "#" or raw.startswith("RESET") or raw == "SEP":
continue
@ -365,11 +414,33 @@ def _pid_comm_name(pid: int) -> str | None:
return Path(comm).name if comm else None
def _pmon_row_epoch(parts: list[str]) -> float | None:
"""Local-time epoch of a pmon row from its `date`/`time` columns, or None.
pmon timestamps are naive local time (`YYYYMMDD HH:MM:SS`); `.astimezone()`
attaches the local offset so the result is comparable to a `begin_epoch`
derived the same way.
"""
try:
stamp = _dt.datetime.strptime(
f"{parts[0]} {parts[1]}",
"%Y%m%d %H:%M:%S",
).astimezone()
except (ValueError, IndexError):
return None
return stamp.timestamp()
def aggregate_pmon(
log: Path,
progress: _Progress,
begin_epoch: float | None = None,
) -> tuple[dict[str, GpuAgg], int]:
"""Return `({program: GpuAgg}, sample_count)` from the pmon *log*."""
"""Return `({program: GpuAgg}, sample_count)` from the pmon *log*.
When *begin_epoch* is set, rows timestamped before it are skipped so the
first day of a "since last report" window starts at the previous run time.
"""
progress.start_stage("pmon log scan")
agg: dict[str, GpuAgg] = {}
samples = 0
@ -385,6 +456,10 @@ def aggregate_pmon(
parts = _pmon_fields(line)
if parts is None or len(parts) < _PMON_MIN_FIELDS:
continue
if begin_epoch is not None:
row_epoch = _pmon_row_epoch(parts)
if row_epoch is not None and row_epoch < begin_epoch:
continue
samples += _ingest_pmon_row(parts, agg)
progress.update(1.0)
return agg, samples
@ -414,3 +489,56 @@ def _ingest_pmon_row(parts: list[str], agg: dict[str, GpuAgg]) -> int:
entry.peak_sm_pct = max(entry.peak_sm_pct, sm)
entry.peak_mem_pct = max(entry.peak_mem_pct, mem)
return 1
def merge_proc_aggs(dst: dict[str, ProcAgg], src: dict[str, ProcAgg]) -> None:
"""Fold one day's CPU/RAM aggregates (*src*) into the running *dst*.
CPU-seconds and RSS sample counts add across days; peak RSS is the max;
PID counts add (each day contributes its own distinct PIDs).
"""
for name, item in src.items():
entry = dst.setdefault(name, ProcAgg(name=name))
entry.cpu_ticks += item.cpu_ticks
entry.peak_rss_kb = max(entry.peak_rss_kb, item.peak_rss_kb)
entry.rss_kb_sum += item.rss_kb_sum
entry.rss_samples += item.rss_samples
entry.extra_pids += item.pid_count
def merge_gpu_aggs(dst: dict[str, GpuAgg], src: dict[str, GpuAgg]) -> None:
"""Fold one day's GPU aggregates (*src*) into the running *dst*."""
for name, item in src.items():
entry = dst.setdefault(name, GpuAgg(name=name))
entry.sm_pct_sum += item.sm_pct_sum
entry.mem_pct_sum += item.mem_pct_sum
entry.samples += item.samples
entry.peak_sm_pct = max(entry.peak_sm_pct, item.peak_sm_pct)
entry.peak_mem_pct = max(entry.peak_mem_pct, item.peak_mem_pct)
entry.extra_pids += item.pid_count
def merge_windows(windows: list[_Window]) -> _Window:
"""Combine per-day coverage *windows* into one spanning window.
Start/end span the earliest and latest samples; ``seconds`` sums the
per-day coverage (not wall-clock end-start) so the denominator for average
CPU% reflects only the time actually monitored, excluding gap days.
"""
real = [w for w in windows if w.distinct_samples]
if not real:
return _Window()
first = min(real, key=lambda w: w.start_epoch)
last = max(real, key=lambda w: w.end_epoch)
intervals = [w.interval_s for w in real if w.interval_s]
# Representative interval = the most common per-day interval, if any.
interval = max(set(intervals), key=intervals.count) if intervals else 0
return _Window(
start=first.start,
end=last.end,
distinct_samples=sum(w.distinct_samples for w in real),
interval_s=interval,
seconds=sum(w.seconds for w in real),
start_epoch=first.start_epoch,
end_epoch=last.end_epoch,
)

View File

@ -79,6 +79,15 @@ class ProcAgg:
rss_kb_sum: int = 0
rss_samples: int = 0
pid_set: set[int] = field(default_factory=set)
# PID counts folded in when merging per-day aggregates. Tracked as a plain
# integer (not by extending `pid_set`) because the native parser stores a
# synthetic `range(n)` set whose union across days would collapse counts.
extra_pids: int = 0
@property
def pid_count(self) -> int:
"""Distinct PIDs seen, including those merged from other day windows."""
return len(self.pid_set) + self.extra_pids
@property
def cpu_seconds(self) -> float:
@ -109,6 +118,13 @@ class GpuAgg:
peak_sm_pct: float = 0.0
peak_mem_pct: float = 0.0
pid_set: set[int] = field(default_factory=set)
# PID counts folded in when merging per-day aggregates (see ProcAgg).
extra_pids: int = 0
@property
def pid_count(self) -> int:
"""Distinct PIDs seen, including those merged from other day windows."""
return len(self.pid_set) + self.extra_pids
@property
def gpu_seconds(self) -> float:
@ -190,3 +206,7 @@ class _Window:
distinct_samples: int = 0
interval_s: int = 0
seconds: int = 0
# Raw epoch bounds, kept so multiple per-day windows can be merged by
# min(start)/max(end) without re-parsing the ISO strings above.
start_epoch: int = 0
end_epoch: int = 0

View File

@ -1,19 +1,24 @@
#!/usr/bin/env python3
"""End-of-day resource usage report from atop + nvidia-smi pmon logs.
"""Resource usage report from atop + nvidia-smi pmon logs.
Parses the current-day (or given) `atop` binary log via `atop -P PRC,PRM -r`
and the per-process nvidia-smi pmon log, aggregates CPU seconds, peak/average
RSS, and GPU SM-% seconds per program, and prints a compact Markdown report
intended to be pasted into an LLM (Claude / Copilot) for further analysis.
Parses one or more daily `atop` binary logs via `atop -P PRC,PRM -r` and the
per-process nvidia-smi pmon logs, aggregates CPU seconds, peak/average RSS, and
GPU SM-% seconds per program, and prints a compact Markdown report intended to
be pasted into an LLM (Claude / Copilot) for further analysis.
Run with no arguments to report on today's logs:
Run with no arguments to report on **everything since the last report**: the
previous run's timestamp is persisted, and each run covers the whole window
from then until now, spanning as many daily logs as needed (so skipped days are
never lost). After a successful report the timestamp is advanced to "now".
usage_report.py # today
usage_report.py --date 20260419 # specific day
usage_report.py # since the last report (multi-day)
usage_report.py --since 20260419 # ad hoc: from a date to now, no state
usage_report.py --date 20260419 # one specific day (ad hoc, no state)
usage_report.py --top 20 # keep 20 rows per table
usage_report.py --no-update-state # don't advance the saved timestamp
usage_report.py > report.md # redirect to a file
The output intentionally front-loads metadata (hostname, window, sample
The output intentionally front-loads metadata (hostname, period, window, sample
count, HZ, machine specs) so the LLM never has to guess context.
"""
@ -21,7 +26,9 @@ from __future__ import annotations
import argparse
from collections import defaultdict
from dataclasses import dataclass
import datetime as _dt
import json
import os
from pathlib import Path
import platform
@ -34,7 +41,14 @@ from typing import TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Iterable
from _usage_report_parsing import _run, aggregate_atop, aggregate_pmon
from _usage_report_parsing import (
_run,
aggregate_atop,
aggregate_pmon,
merge_gpu_aggs,
merge_proc_aggs,
merge_windows,
)
from _usage_report_types import (
_HZ,
_PMON_INTERVAL_S,
@ -52,6 +66,12 @@ _SEC_PER_DAY = 86_400
_SEC_PER_HOUR = 3600
_SEC_PER_MIN = 60
# Persisted marker of when the last report was generated. Lives under
# ~/.local/share (durable app state), not ~/.cache, so clearing caches does not
# silently reset the "since last report" window back to today-only.
_STATE_DIR = Path.home() / ".local/share/usage_report"
_STATE_FILE = _STATE_DIR / "last_report.json"
def _host_profile() -> dict[str, str]:
"""Collect a small bag of identifying facts about the host."""
@ -127,7 +147,7 @@ def _cpu_table(aggs: Iterable[ProcAgg], window_s: int, top: int) -> list[str]:
f"{idx} | {_md_escape(item.name)} | "
f"{item.cpu_seconds:,.0f}s ({_fmt_h(item.cpu_seconds)}) | "
f"{single:.1f}% | {box:.1f}% | "
f"{item.peak_rss_mb:,.0f} MiB | {len(item.pid_set)} |",
f"{item.peak_rss_mb:,.0f} MiB | {item.pid_count} |",
)
return rows
@ -151,7 +171,7 @@ def _dedupe_ram(aggs: Iterable[ProcAgg]) -> list[tuple[ProcAgg, list[str]]]:
buckets[key].append(item)
result: list[tuple[ProcAgg, list[str]]] = []
for bucket in buckets.values():
bucket.sort(key=lambda a: (a.cpu_ticks, len(a.pid_set)), reverse=True)
bucket.sort(key=lambda a: (a.cpu_ticks, a.pid_count), reverse=True)
rep = bucket[0]
siblings = [b.name for b in bucket[1:]]
result.append((rep, siblings))
@ -186,7 +206,7 @@ def _ram_table(aggs: Iterable[ProcAgg], top: int) -> list[str]:
f"{item.peak_rss_mb:,.0f} MiB | "
f"{item.avg_rss_mb:,.0f} MiB | "
f"{item.cpu_seconds:,.0f}s | "
f"{len(item.pid_set)} | {sib} |",
f"{item.pid_count} | {sib} |",
)
return rows
@ -212,7 +232,7 @@ def _gpu_table(aggs: dict[str, GpuAgg], total_samples: int, top: int) -> list[st
f"{item.peak_sm_pct:.0f}% | "
f"{item.peak_mem_pct:.0f}% | "
f"{item.samples} ({presence:.0f}%) | "
f"{len(item.pid_set)} |",
f"{item.pid_count} |",
)
return rows
@ -227,11 +247,15 @@ def _fingerprint_section() -> list[str]:
]
def _methodology_section(atop_log: Path, pmon_log: Path, window: _Window) -> list[str]:
def _methodology_section(
atop_desc: str,
pmon_desc: str,
window: _Window,
) -> list[str]:
window_note = (
f"- **Coverage window**: {_fmt_h(window.seconds)} "
f"(from first to last atop sample; window may be shorter than wall "
f"clock since the next atop tick has not yet fired)."
f"(sum of per-day atop coverage from first to last sample; excludes "
f"any gap days where atop was not logging, and the final partial tick)."
)
interval_note = (
f"- **atop sample interval (observed)**: {window.interval_s}s"
@ -266,8 +290,8 @@ def _methodology_section(atop_log: Path, pmon_log: Path, window: _Window) -> lis
return [
"## Methodology",
"",
f"- **atop log**: `{atop_log}` (binary, replay with `atop -r`)",
f"- **pmon log**: `{pmon_log}` (`nvidia-smi pmon -d {_PMON_INTERVAL_S}`)",
f"- **atop log(s)**: {atop_desc}",
f"- **pmon log(s)**: {pmon_desc}",
f"- **HZ**: {_HZ} ticks/s; **page size**: {_PAGE_KB} KiB",
window_note,
interval_note,
@ -293,34 +317,60 @@ def _compute_window(atop_log: Path, progress: _Progress) -> _Window:
_LLM_PROMPT = [
"> Below is a day's worth of aggregated resource usage for my Linux workstation.",
"> Identify which programs are the biggest hogs, flag anything that looks abnormal",
"> for a typical developer/gaming setup, and suggest concrete optimisations",
"> (config tweaks, process limits, alternative tools). Be specific.",
"> Below is aggregated resource usage for my Linux workstation over the",
"> reporting period shown above. Identify which programs are the biggest",
"> hogs, flag anything that looks abnormal for a typical developer/gaming",
"> setup, and suggest concrete optimisations (config tweaks, process limits,",
"> alternative tools). Be specific.",
]
_REPORT_STAGES = 2
def _build_report(
args: argparse.Namespace,
atop_log: Path,
pmon_log: Path,
) -> str:
progress = _Progress(
enabled=not args.quiet,
total_stages=_REPORT_STAGES,
)
cpu_aggs, window = aggregate_atop(atop_log, progress)
if not window.seconds:
window.seconds = _SEC_PER_DAY
gpu_aggs, gpu_samples = aggregate_pmon(pmon_log, progress)
progress.finish()
@dataclass
class _Segment:
"""One calendar day's resolved logs plus optional in-day start bounds.
*atop_begin* is an atop ``-b`` argument (``YYYYMMDDhhmmss``) and
*pmon_begin_epoch* the matching local epoch; both are set only for the first
day of a "since last report" window so re-runs do not double-count.
"""
atop_log: Path
pmon_log: Path
atop_begin: str | None = None
pmon_begin_epoch: float | None = None
@dataclass
class _Aggregates:
"""Merged CPU/GPU aggregates and coverage window for a reporting window.
*days_with_data* is the number of daily logs that actually yielded atop
samples (gap days where the machine was off contribute nothing).
"""
cpu: dict[str, ProcAgg]
gpu: dict[str, GpuAgg]
window: _Window
gpu_samples: int
days_with_data: int
def _render_report(
aggs: _Aggregates,
*,
top: int,
atop_desc: str,
pmon_desc: str,
period_line: str,
) -> str:
"""Assemble the Markdown report from already-aggregated data."""
window = aggs.window
gpu_section = (
_gpu_table(gpu_aggs, gpu_samples, args.top)
if gpu_aggs
_gpu_table(aggs.gpu, aggs.gpu_samples, top)
if aggs.gpu
else ["_No GPU pmon data found._"]
)
generated = _dt.datetime.now().astimezone().isoformat(timespec="seconds")
@ -329,20 +379,21 @@ def _build_report(
"# System resource usage report",
"",
f"- **Generated**: {generated}",
period_line,
f"- **atop window**: {window.start} \u2192 {window.end}",
f"- **atop samples**: {window.distinct_samples} distinct "
f"timestamps (sample interval \u2248 {interval})",
f"- **GPU pmon samples**: {gpu_samples} (\u2248{_PMON_INTERVAL_S}s each)",
f"- **GPU pmon samples**: {aggs.gpu_samples} (\u2248{_PMON_INTERVAL_S}s each)",
"",
*_fingerprint_section(),
*_methodology_section(atop_log, pmon_log, window),
*_methodology_section(atop_desc, pmon_desc, window),
"## Top CPU consumers",
"",
*_cpu_table(cpu_aggs.values(), window.seconds, args.top),
*_cpu_table(aggs.cpu.values(), window.seconds, top),
"",
"## Top RAM consumers (by peak RSS, deduped by shared-memory bucket)",
"",
*_ram_table(cpu_aggs.values(), args.top),
*_ram_table(aggs.cpu.values(), top),
"",
"## Top GPU consumers",
"",
@ -356,12 +407,117 @@ def _build_report(
return "\n".join(lines) + "\n"
def _aggregate_segments(
segments: list[_Segment],
progress: _Progress,
) -> _Aggregates:
"""Aggregate and merge every existing daily log in *segments*.
Missing daily logs (gap days) are skipped silently.
"""
cpu_total: dict[str, ProcAgg] = {}
gpu_total: dict[str, GpuAgg] = {}
windows: list[_Window] = []
gpu_samples = 0
days_with_data = 0
for seg in segments:
if seg.atop_log.exists():
cpu, window = aggregate_atop(seg.atop_log, progress, seg.atop_begin)
merge_proc_aggs(cpu_total, cpu)
if window.distinct_samples:
windows.append(window)
days_with_data += 1
gpu, samples = aggregate_pmon(seg.pmon_log, progress, seg.pmon_begin_epoch)
merge_gpu_aggs(gpu_total, gpu)
gpu_samples += samples
return _Aggregates(
cpu_total,
gpu_total,
merge_windows(windows),
gpu_samples,
days_with_data,
)
def _describe_logs(paths: list[Path], how: str) -> str:
"""One-line Markdown description of the log files actually consumed."""
if not paths:
return f"_none found_ (`{how}`)"
if len(paths) == 1:
return f"`{paths[0]}` (`{how}`)"
return (
f"{len(paths)} daily logs `{paths[0].name}` \u2026 `{paths[-1].name}` "
f"in `{paths[0].parent}` (`{how}`)"
)
def _log_descriptions(segments: list[_Segment]) -> tuple[str, str]:
"""Return ``(atop_desc, pmon_desc)`` for the logs present in *segments*."""
atop_present = [seg.atop_log for seg in segments if seg.atop_log.exists()]
pmon_present = [seg.pmon_log for seg in segments if seg.pmon_log.exists()]
return (
_describe_logs(atop_present, "atop -r"),
_describe_logs(pmon_present, f"nvidia-smi pmon -d {_PMON_INTERVAL_S}"),
)
def _resolve_logs(date: str) -> tuple[Path, Path]:
atop_log = _ATOP_LOG_DIR / f"atop_{date}"
pmon_log = _PMON_LOG_DIR / f"pmon-{date}.log"
return atop_log, pmon_log
def _read_last_generated() -> _dt.datetime | None:
"""Return the timestamp of the previous report run, or None if unknown."""
try:
raw = _STATE_FILE.read_text(encoding="utf-8")
except OSError:
return None
try:
stamp = json.loads(raw)["last_generated"]
return _dt.datetime.fromisoformat(stamp).astimezone()
except (ValueError, KeyError, TypeError):
return None
def _write_last_generated(when: _dt.datetime) -> None:
"""Persist *when* as the last-report timestamp for the next run."""
_STATE_DIR.mkdir(parents=True, exist_ok=True)
payload = json.dumps({"last_generated": when.isoformat(timespec="seconds")})
_STATE_FILE.write_text(payload + "\n", encoding="utf-8")
def _has_time_of_day(when: _dt.datetime) -> bool:
"""True when *when* is past local midnight, so a begin bound is needed."""
return bool(when.hour or when.minute or when.second or when.microsecond)
def _plan_segments(start: _dt.datetime, end: _dt.datetime) -> list[_Segment]:
"""Resolve one `_Segment` per calendar day across ``[start, end]``.
The first day is bounded at *start*'s time-of-day so a same-day re-run only
covers the slice since the previous report; later days are covered in full.
Returns an empty list when *start* is after *end* (e.g. a future state file).
"""
segments: list[_Segment] = []
day = start.date()
while day <= end.date():
atop_log, pmon_log = _resolve_logs(day.strftime("%Y%m%d"))
if day == start.date() and _has_time_of_day(start):
segments.append(
_Segment(
atop_log,
pmon_log,
start.strftime("%Y%m%d%H%M%S"),
start.timestamp(),
),
)
else:
segments.append(_Segment(atop_log, pmon_log))
day += _dt.timedelta(days=1)
return segments
_INSTALL_SCRIPT = Path(__file__).with_name("install_usage_monitoring.sh")
@ -406,13 +562,126 @@ def _copy_to_clipboard(text: str) -> None:
)
def main(argv: list[str] | None = None) -> int:
"""Entry point; see module docstring for CLI."""
def _emit(args: argparse.Namespace, report: str) -> None:
"""Write the report to stdout and (unless suppressed) the clipboard."""
sys.stdout.write(report)
if not args.no_clipboard:
_copy_to_clipboard(report)
def _period_line(start: _dt.datetime, end: _dt.datetime) -> str:
"""Markdown bullet describing the requested reporting period."""
span = _fmt_h(max((end - start).total_seconds(), 0.0))
return (
f"- **Reporting period**: {start.isoformat(timespec='seconds')}"
f"{end.isoformat(timespec='seconds')} ({span})"
)
def _is_single_day_mode(args: argparse.Namespace) -> bool:
"""True when the user pinned an exact day or explicit log paths."""
return (
args.date is not None or args.atop_log is not None or args.pmon_log is not None
)
def _should_advance_state(args: argparse.Namespace) -> bool:
"""Advance the saved timestamp only for genuine since-last-report runs.
An explicit ``--since`` is treated as a read-only ad-hoc query (like
``--date``) so "let me look from date X" never silently re-baselines the
saved tracking point.
"""
return args.since is None and not args.no_update_state
def _run_single_day(args: argparse.Namespace, now: _dt.datetime) -> int:
"""Report on one specific day (legacy behaviour); never touches state."""
date = args.date or now.strftime("%Y%m%d")
atop_default, pmon_default = _resolve_logs(date)
atop_log = args.atop_log or atop_default
pmon_log = args.pmon_log or pmon_default
_preflight(atop_log)
segment = _Segment(atop_log, pmon_log)
progress = _Progress(enabled=not args.quiet, total_stages=_REPORT_STAGES)
aggs = _aggregate_segments([segment], progress)
progress.finish()
if not aggs.window.seconds:
aggs.window.seconds = _SEC_PER_DAY
atop_desc, pmon_desc = _log_descriptions([segment])
_emit(
args,
_render_report(
aggs,
top=args.top,
atop_desc=atop_desc,
pmon_desc=pmon_desc,
period_line=f"- **Reporting period**: {date} (single day)",
),
)
return 0
def _resolve_start(args: argparse.Namespace, now: _dt.datetime) -> _dt.datetime:
"""Pick the window start: --since, else last report, else today midnight."""
if args.since is not None:
return _dt.datetime.strptime(args.since, "%Y%m%d").astimezone()
last = _read_last_generated()
if last is not None:
return last
return now.replace(hour=0, minute=0, second=0, microsecond=0)
def _run_since(args: argparse.Namespace, now: _dt.datetime) -> int:
"""Report on everything since the last run, spanning multiple daily logs."""
if not shutil.which("atop"):
sys.exit(f"error: `atop` is not installed.\nrun: {_INSTALL_SCRIPT}")
start = _resolve_start(args, now)
segments = _plan_segments(start, now)
progress = _Progress(
enabled=not args.quiet,
total_stages=max(2 * len(segments), 1),
)
aggs = _aggregate_segments(segments, progress)
progress.finish()
if aggs.days_with_data == 0:
sys.stderr.write(
f"no atop logs with data for {start.date()}{now.date()}; "
"nothing to report.\n",
)
if _should_advance_state(args):
_write_last_generated(now)
return 0
if not aggs.window.seconds:
aggs.window.seconds = _SEC_PER_DAY
atop_desc, pmon_desc = _log_descriptions(segments)
_emit(
args,
_render_report(
aggs,
top=args.top,
atop_desc=atop_desc,
pmon_desc=pmon_desc,
period_line=_period_line(start, now),
),
)
if _should_advance_state(args):
_write_last_generated(now)
return 0
def _build_parser() -> argparse.ArgumentParser:
"""Construct the command-line argument parser."""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--date",
default=_dt.datetime.now().astimezone().strftime("%Y%m%d"),
help="YYYYMMDD to report on (default: today)",
default=None,
help="report on one specific day (YYYYMMDD); ad hoc, ignores state",
)
parser.add_argument(
"--since",
default=None,
help="ad-hoc: report from this date (YYYYMMDD) to now; leaves state",
)
parser.add_argument(
"--top",
@ -424,35 +693,39 @@ def main(argv: list[str] | None = None) -> int:
"--atop-log",
type=Path,
default=None,
help="override atop log path",
help="override atop log path (implies single-day mode)",
)
parser.add_argument(
"--pmon-log",
type=Path,
default=None,
help="override pmon log path",
help="override pmon log path (implies single-day mode)",
)
parser.add_argument(
"--no-clipboard",
action="store_true",
help="skip copying the report to the X clipboard",
)
parser.add_argument(
"--no-update-state",
action="store_true",
help="do not advance the saved last-report timestamp",
)
parser.add_argument(
"--quiet",
action="store_true",
help="suppress the progress line on stderr",
)
args = parser.parse_args(argv)
return parser
atop_default, pmon_default = _resolve_logs(args.date)
atop_log = args.atop_log or atop_default
pmon_log = args.pmon_log or pmon_default
_preflight(atop_log)
report = _build_report(args, atop_log, pmon_log)
sys.stdout.write(report)
if not args.no_clipboard:
_copy_to_clipboard(report)
return 0
def main(argv: list[str] | None = None) -> int:
"""Entry point; see module docstring for CLI."""
args = _build_parser().parse_args(argv)
now = _dt.datetime.now().astimezone()
if _is_single_day_mode(args):
return _run_single_day(args, now)
return _run_since(args, now)
if __name__ == "__main__":

View File

@ -12,7 +12,7 @@
# Optional persistence (requires sudo):
# --persist-systemd -> Set IdleAction=ignore in /etc/systemd/logind.conf and restart logind
# Optional activity watcher:
# --watch-controller -> Treat game controller (e.g., Xbox) input as user activity to keep session awake
# --watch-controller -> Hold a systemd idle/sleep inhibitor while a game controller is connected (keeps the session awake, fork-free)
#
# Notes:
# - This script focuses on keeping the screen on and unlocked. Use with care on shared systems.
@ -42,7 +42,7 @@ Disables idle detection, screen blanking, and auto-lock for the current session.
Options:
--persist-systemd Also set IdleAction=ignore in /etc/systemd/logind.conf (needs sudo)
--watch-controller Watch game controllers and generate activity to keep the session awake
--watch-controller Hold an idle/sleep inhibitor while a game controller is connected
-h, --help Show this help and exit
What this does:
@ -52,7 +52,7 @@ What this does:
- Sway: kill swayidle if running
- TTY: setterm -blank 0 -powersave off -powerdown 0
- Optional: systemd-logind IdleAction=ignore
- Optional: watch controller input and reset idle timers
- Optional: hold a systemd idle inhibitor while a controller is connected
EOF
exit 0
;;
@ -136,76 +136,85 @@ disable_tty_idle() {
fi
}
reset_idle_activity() {
# Trigger activity hints depending on environment
if [[ -n ${DISPLAY:-} ]]; then
if has_cmd xset; then
xset s reset || true
xset -dpms || true
xset s off || true
xset s noblank || true
fi
if has_cmd xdotool; then
# No-op mousemove to generate X11 activity without visible movement
xdotool mousemove_relative -- 0 0 2> /dev/null || true
fi
# PID of the single long-lived idle/sleep inhibitor we hold while a controller
# is connected. Empty when no inhibitor is active.
inhibit_pid=""
start_idle_inhibit() {
# Hold one systemd idle/sleep inhibitor for the whole time a controller is
# connected. This replaces the previous per-event fork storm (4 xset + an
# xdotool + a dd read + a sleep on *every* joystick event, ~21 forks/s while
# gaming): a single long-lived process keeps logind from idling, suspending,
# or locking, while X11 blanking stays off thanks to the one-shot
# disable_x11_idle above. Idempotent — a live inhibitor is reused.
if [[ -n $inhibit_pid ]] && kill -0 "$inhibit_pid" 2> /dev/null; then
return 0
fi
systemd-inhibit --what=idle:sleep --who="idle-off" \
--why="game controller connected" sleep infinity &
inhibit_pid=$!
log "Holding idle/sleep inhibitor (pid ${inhibit_pid}) while a controller is connected"
}
watch_js_device() {
local dev="$1"
log "Watching controller device: $dev"
while :; do
if [[ ! -e $dev ]]; then
warn "Device disappeared: $dev"
break
fi
# Joystick API event size is 8 bytes; block until an event arrives
if dd if="$dev" bs=8 count=1 status=none of=/dev/null; then
reset_idle_activity
# Debounce bursts of events
sleep 0.3
else
# On read error (e.g., permission), backoff
sleep 1
stop_idle_inhibit() {
if [[ -z $inhibit_pid ]]; then
return 0
fi
kill "$inhibit_pid" 2> /dev/null || true
wait "$inhibit_pid" 2> /dev/null || true
inhibit_pid=""
log "Released idle/sleep inhibitor; normal idle behaviour resumes"
}
controller_connected() {
# Pure-bash glob check — zero forks. True if any /dev/input/js* node exists.
local dev
for dev in /dev/input/js*; do
[[ -e $dev ]] && return 0
done
return 1
}
sync_inhibit_to_controllers() {
# Hold the inhibitor exactly when a controller is present.
if controller_connected; then
start_idle_inhibit
else
stop_idle_inhibit
fi
}
start_controller_watchers() {
# Attempt to watch all /dev/input/js* devices; rescan periodically for new ones
declare -A pids
# Initial permission check
local any_js=false any_readable=false
for dev in /dev/input/js*; do
[[ -e $dev ]] || continue
any_js=true
if [[ -r $dev ]]; then any_readable=true; fi
done
if [[ $any_js == true && $any_readable == false ]]; then
warn "No read permission to /dev/input/js*; add your user to the 'input' group or create udev rules."
# Event-driven and fork-free in the hot path: react only to input-device
# add/remove (rare udev events), never to individual joystick *input* events,
# and hold a single systemd-inhibit lock while a controller is present.
if ! has_cmd systemd-inhibit; then
warn "systemd-inhibit not found; cannot hold an idle inhibitor"
return 0
fi
# EXIT covers every termination path (including a SIGTERM that interrupts the
# blocking read below); INT/TERM additionally give a clean exit status.
trap 'stop_idle_inhibit' EXIT
trap 'exit 0' INT TERM
while :; do
local found_any=false
for dev in /dev/input/js*; do
[[ -e $dev ]] || continue
found_any=true
if [[ -z ${pids[$dev]:-} ]] || ! kill -0 "${pids[$dev]}" 2> /dev/null; then
# Start a watcher for this device in background
watch_js_device "$dev" &
pids[$dev]=$!
fi
done
if [[ $found_any == false ]]; then
# No joystick devices; quiet rescan
sleep 5
sync_inhibit_to_controllers # apply current state once at startup
if has_cmd udevadm; then
log "Watching controller hotplug via udev (no polling)"
# Process substitution (not a pipe) keeps the loop in this shell so
# inhibit_pid persists across events.
while read -r _; do
sync_inhibit_to_controllers
done < <(udevadm monitor --udev --subsystem-match=input 2> /dev/null)
else
# Rescan less frequently when active
sleep 2
fi
# Fallback when udevadm is unavailable: a low-frequency presence poll. One
# sleep per 30 s cycle (~0.03 forks/s) versus the old ~21 forks/s.
warn "udevadm not found; falling back to a 30 s presence poll"
while :; do
sync_inhibit_to_controllers
sleep 30
done
fi
}
persist_with_systemd_logind() {
@ -255,14 +264,9 @@ main() {
persist_with_systemd_logind
if [[ $watch_controller == true ]]; then
log "Controller activity watcher enabled"
# Keep the script alive to watch controllers
start_controller_watchers &
watcher_pid=$!
log "Watcher PID: $watcher_pid"
# Wait indefinitely and forward termination
trap 'log "Stopping controller watcher"; kill "$watcher_pid" 2>/dev/null || true; exit 0' INT TERM
wait "$watcher_pid"
log "Controller activity watcher enabled (idle-inhibitor mode)"
# Blocks until terminated; releases the inhibitor on exit via its own trap.
start_controller_watchers
else
log "Done. The screen should no longer blank, lock, or power down automatically."
fi

View File

@ -0,0 +1,21 @@
"""Pytest bootstrap: make usage_report's ``bin/`` importable for these tests.
The usage-report modules live in a non-package script directory and use
absolute imports (``from _usage_report_parsing import ...``), so the directory
must be on ``sys.path`` before the tests import them.
"""
from __future__ import annotations
from pathlib import Path
import sys
_BIN = (
Path(__file__).resolve().parents[1]
/ "scripts"
/ "periodic_background"
/ "system-maintenance"
/ "bin"
)
if str(_BIN) not in sys.path:
sys.path.insert(0, str(_BIN))

View File

@ -2,42 +2,26 @@
from __future__ import annotations
import importlib.util
from pathlib import Path
import sys
from typing import TYPE_CHECKING
import _usage_report_parsing as parsing
if TYPE_CHECKING:
import pytest
MODULE_PATH = (
Path(__file__).resolve().parents[1]
/ "scripts"
/ "system-maintenance"
/ "bin"
/ "usage_report.py"
)
SPEC = importlib.util.spec_from_file_location("usage_report", MODULE_PATH)
if SPEC is None or SPEC.loader is None:
msg = "could not load usage_report module"
raise RuntimeError(msg)
usage_report = importlib.util.module_from_spec(SPEC)
sys.modules[SPEC.name] = usage_report
SPEC.loader.exec_module(usage_report)
def test_normalize_pmon_command_prefers_first_executable_token() -> None:
"""The parser should keep executable-like token, not trailing args."""
tokens = ["code-insiders", "--type=", "gpu-process", "Not"]
assert usage_report._normalize_pmon_command(tokens) == "code-insiders"
assert parsing._normalize_pmon_command(tokens) == "code-insiders"
def test_normalize_pmon_command_skips_leading_option_tokens() -> None:
"""If the first token is an option, use the next non-option token."""
tokens = ["--type=", "code-insiders", "--flag"]
assert usage_report._normalize_pmon_command(tokens) == "code-insiders"
assert parsing._normalize_pmon_command(tokens) == "code-insiders"
def test_ingest_pmon_row_uses_command_field_start_not_last_token() -> None:
@ -60,7 +44,7 @@ def test_ingest_pmon_row_uses_command_field_start_not_last_token() -> None:
]
agg: dict[str, object] = {}
consumed = usage_report._ingest_pmon_row(row, agg)
consumed = parsing._ingest_pmon_row(row, agg)
assert consumed == 1
assert "code-insiders" in agg
@ -85,8 +69,8 @@ def test_ingest_pmon_row_falls_back_to_proc_comm_on_unknown(
]
agg: dict[str, object] = {}
monkeypatch.setattr(usage_report, "_pid_comm_name", lambda _pid: "python")
consumed = usage_report._ingest_pmon_row(row, agg)
monkeypatch.setattr(parsing, "_pid_comm_name", lambda _pid: "python")
consumed = parsing._ingest_pmon_row(row, agg)
assert consumed == 1
assert "python" in agg

View File

@ -0,0 +1,479 @@
"""Tests for the "since last report" multi-day aggregation in usage_report.
Covers the helpers added to span and merge several daily logs: aggregate
merging, window merging, PID-count carry-over, pmon timestamp filtering,
atop command bounding, the persisted last-report state, day-segment planning,
and the run-mode dispatch logic.
"""
from __future__ import annotations
import argparse
import datetime as _dt
from pathlib import Path
from typing import TYPE_CHECKING
import _usage_report_parsing as parsing
from _usage_report_types import GpuAgg, ProcAgg, _PidCpu, _Progress, _Window
import usage_report
if TYPE_CHECKING:
import pytest
# Aware timezone matching how the parser localizes naive timestamps, so epochs
# computed here line up with `_pmon_row_epoch`'s `.astimezone()` conversion.
_LOCAL_TZ = _dt.datetime.now().astimezone().tzinfo
def _at(
year: int, month: int, day: int, hour: int = 0, minute: int = 0
) -> _dt.datetime:
"""Build an aware local datetime for tests."""
return _dt.datetime(year, month, day, hour, minute, tzinfo=_LOCAL_TZ)
# --------------------------------------------------------------------------- #
# PID-count carry-over (types)
# --------------------------------------------------------------------------- #
def test_proc_pid_count_combines_set_and_extra() -> None:
"""`pid_count` adds the live set length and merged-in extras."""
agg = ProcAgg("x", pid_set={1, 2, 3}, extra_pids=2)
assert agg.pid_count == 5
def test_gpu_pid_count_combines_set_and_extra() -> None:
"""GpuAgg exposes the same combined PID count."""
agg = GpuAgg("x", pid_set={9}, extra_pids=4)
assert agg.pid_count == 5
# --------------------------------------------------------------------------- #
# Aggregate merging (parsing)
# --------------------------------------------------------------------------- #
def test_merge_proc_aggs_sums_and_takes_peak() -> None:
"""CPU/RSS sums accumulate, peak RSS is the max, PID counts add."""
dst: dict[str, ProcAgg] = {}
parsing.merge_proc_aggs(
dst,
{
"a": ProcAgg(
"a",
cpu_ticks=100,
peak_rss_kb=200,
rss_kb_sum=50,
rss_samples=2,
pid_set={1, 2},
)
},
)
parsing.merge_proc_aggs(
dst,
{
"a": ProcAgg(
"a",
cpu_ticks=10,
peak_rss_kb=500,
rss_kb_sum=5,
rss_samples=1,
pid_set={3},
)
},
)
entry = dst["a"]
assert entry.cpu_ticks == 110
assert entry.peak_rss_kb == 500
assert entry.rss_kb_sum == 55
assert entry.rss_samples == 3
assert entry.pid_count == 3
def test_merge_gpu_aggs_sums_and_takes_peak() -> None:
"""GPU sample sums accumulate and peaks take the max across days."""
dst: dict[str, GpuAgg] = {}
parsing.merge_gpu_aggs(
dst,
{
"g": GpuAgg(
"g",
sm_pct_sum=30.0,
mem_pct_sum=10.0,
samples=3,
peak_sm_pct=40.0,
peak_mem_pct=20.0,
pid_set={1},
)
},
)
parsing.merge_gpu_aggs(
dst,
{
"g": GpuAgg(
"g",
sm_pct_sum=5.0,
mem_pct_sum=2.0,
samples=1,
peak_sm_pct=80.0,
peak_mem_pct=15.0,
pid_set={2, 3},
)
},
)
entry = dst["g"]
assert entry.sm_pct_sum == 35.0
assert entry.samples == 4
assert entry.peak_sm_pct == 80.0
assert entry.peak_mem_pct == 20.0
assert entry.pid_count == 3
# --------------------------------------------------------------------------- #
# Window merging (parsing)
# --------------------------------------------------------------------------- #
def test_merge_windows_empty_returns_default() -> None:
"""Merging no real windows yields the empty default window."""
assert parsing.merge_windows([]).distinct_samples == 0
assert parsing.merge_windows([_Window()]).distinct_samples == 0
def test_merge_windows_spans_and_sums() -> None:
"""Span uses min start / max end; samples and seconds sum; interval is modal."""
w_empty = _Window() # distinct_samples == 0, must be ignored
w1 = _Window(
start="s1",
end="e1",
distinct_samples=5,
interval_s=600,
seconds=100,
start_epoch=1000,
end_epoch=2000,
)
w2 = _Window(
start="s2",
end="e2",
distinct_samples=3,
interval_s=600,
seconds=50,
start_epoch=500,
end_epoch=3000,
)
merged = parsing.merge_windows([w_empty, w1, w2])
assert merged.start == "s2" # earliest start_epoch (500)
assert merged.end == "e2" # latest end_epoch (3000)
assert merged.distinct_samples == 8
assert merged.seconds == 150
assert merged.interval_s == 600
# --------------------------------------------------------------------------- #
# pmon timestamp helpers (parsing)
# --------------------------------------------------------------------------- #
def test_pmon_row_epoch_parses_valid_row() -> None:
"""A well-formed pmon row yields the matching local epoch."""
row = ["20260604", "10:30:00", "0", "100", "G", "5", "1"]
assert parsing._pmon_row_epoch(row) == _at(2026, 6, 4, 10, 30).timestamp()
def test_pmon_row_epoch_returns_none_on_bad_input() -> None:
"""Malformed or short rows return None rather than raising."""
assert parsing._pmon_row_epoch([]) is None
assert parsing._pmon_row_epoch(["nope", "alsonope"]) is None
def _write_pmon(path: Path) -> None:
"""Write a tiny pmon log with two rows ten minutes apart."""
path.write_text(
"#Date Time gpu pid type sm mem enc dec jpg ofa command\n"
" 20260604 10:00:00 0 100 G 5 1 - - - - Xorg\n"
" 20260604 11:00:00 0 101 G 7 2 - - - - thorium\n",
encoding="utf-8",
)
def test_aggregate_pmon_without_bound_keeps_all_rows(tmp_path: Path) -> None:
"""No begin_epoch means every data row counts."""
log = tmp_path / "pmon.log"
_write_pmon(log)
_, samples = parsing.aggregate_pmon(log, _Progress(enabled=False, total_stages=1))
assert samples == 2
def test_aggregate_pmon_filters_rows_before_begin(tmp_path: Path) -> None:
"""Rows timestamped before begin_epoch are skipped."""
log = tmp_path / "pmon.log"
_write_pmon(log)
cutoff = _at(2026, 6, 4, 10, 30).timestamp()
agg, samples = parsing.aggregate_pmon(
log,
_Progress(enabled=False, total_stages=1),
begin_epoch=cutoff,
)
assert samples == 1
assert "thorium" in agg
assert "Xorg" not in agg
# --------------------------------------------------------------------------- #
# atop command bounding (parsing)
# --------------------------------------------------------------------------- #
def test_atop_read_cmd_unbounded() -> None:
"""Without bounds the command is a plain replay."""
cmd = parsing._atop_read_cmd(
Path("/var/log/atop/atop_20260604"), "PRC,PRM", None, None
)
assert cmd == ["atop", "-r", "/var/log/atop/atop_20260604", "-P", "PRC,PRM"]
def test_atop_read_cmd_with_begin_and_end() -> None:
"""Begin/end inject -b/-e before the -P selector."""
cmd = parsing._atop_read_cmd(Path("/x"), "PRC", "202606041400", "202606042000")
assert cmd == [
"atop",
"-r",
"/x",
"-b",
"202606041400",
"-e",
"202606042000",
"-P",
"PRC",
]
# --------------------------------------------------------------------------- #
# Persisted last-report state (usage_report)
# --------------------------------------------------------------------------- #
def test_state_round_trip(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""A written timestamp reads back as an equal aware datetime."""
state = tmp_path / "state" / "last_report.json"
monkeypatch.setattr(usage_report, "_STATE_DIR", state.parent)
monkeypatch.setattr(usage_report, "_STATE_FILE", state)
when = _at(2026, 6, 2, 9, 0)
usage_report._write_last_generated(when)
assert usage_report._read_last_generated() == when
def test_state_missing_file_returns_none(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""No state file yet means "unknown", so the caller falls back to today."""
monkeypatch.setattr(usage_report, "_STATE_FILE", tmp_path / "absent.json")
assert usage_report._read_last_generated() is None
def test_state_corrupt_file_returns_none(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Corrupt or partial JSON is treated as unknown, not a crash."""
bad = tmp_path / "bad.json"
bad.write_text("{ not json", encoding="utf-8")
monkeypatch.setattr(usage_report, "_STATE_FILE", bad)
assert usage_report._read_last_generated() is None
bad.write_text("{}", encoding="utf-8") # valid JSON, missing key
assert usage_report._read_last_generated() is None
# --------------------------------------------------------------------------- #
# Day-segment planning (usage_report)
# --------------------------------------------------------------------------- #
def test_has_time_of_day() -> None:
"""Midnight needs no begin bound; any later time does."""
assert usage_report._has_time_of_day(_at(2026, 6, 4, 14, 30)) is True
assert usage_report._has_time_of_day(_at(2026, 6, 4, 0, 0)) is False
def test_plan_segments_single_day_midnight_unbounded() -> None:
"""A start at local midnight covers the whole first day (no -b bound)."""
segments = usage_report._plan_segments(_at(2026, 6, 4), _at(2026, 6, 4, 12))
assert len(segments) == 1
assert segments[0].atop_begin is None
assert segments[0].pmon_begin_epoch is None
def test_plan_segments_bounds_only_first_day() -> None:
"""A mid-day start bounds the first day only; later days are full."""
start = _at(2026, 6, 2, 14, 0)
segments = usage_report._plan_segments(start, _at(2026, 6, 4, 10, 0))
assert len(segments) == 3
assert segments[0].atop_begin == "20260602140000"
assert segments[0].pmon_begin_epoch == start.timestamp()
assert all(seg.atop_begin is None for seg in segments[1:])
assert segments[-1].atop_log.name == "atop_20260604"
def test_plan_segments_start_after_end_is_empty() -> None:
"""A future state file (start past end) yields no segments."""
assert usage_report._plan_segments(_at(2026, 6, 5), _at(2026, 6, 4)) == []
# --------------------------------------------------------------------------- #
# Start resolution and mode dispatch (usage_report)
# --------------------------------------------------------------------------- #
def _args(**overrides: object) -> argparse.Namespace:
"""Build a Namespace with the usage_report CLI defaults."""
base: dict[str, object] = {
"date": None,
"since": None,
"atop_log": None,
"pmon_log": None,
}
base.update(overrides)
return argparse.Namespace(**base)
def test_resolve_start_prefers_since(monkeypatch: pytest.MonkeyPatch) -> None:
"""--since wins over any saved state and starts at local midnight."""
monkeypatch.setattr(usage_report, "_read_last_generated", lambda: _at(2026, 1, 1))
start = usage_report._resolve_start(_args(since="20260604"), _at(2026, 6, 4, 12))
assert start.date() == _dt.date(2026, 6, 4)
assert (start.hour, start.minute) == (0, 0)
def test_resolve_start_uses_last_report(monkeypatch: pytest.MonkeyPatch) -> None:
"""Without --since, the saved last-report timestamp is the start."""
last = _at(2026, 6, 2, 9, 0)
monkeypatch.setattr(usage_report, "_read_last_generated", lambda: last)
assert usage_report._resolve_start(_args(), _at(2026, 6, 4, 12)) == last
def test_resolve_start_first_run_is_today_midnight(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""First-ever run (no state) covers today from local midnight."""
monkeypatch.setattr(usage_report, "_read_last_generated", lambda: None)
now = _at(2026, 6, 4, 12, 30)
assert usage_report._resolve_start(_args(), now) == _at(2026, 6, 4, 0, 0)
def test_is_single_day_mode() -> None:
"""Pinning a date or explicit log path selects single-day mode."""
assert usage_report._is_single_day_mode(_args(date="20260604")) is True
assert usage_report._is_single_day_mode(_args(atop_log=Path("/x"))) is True
assert usage_report._is_single_day_mode(_args(pmon_log=Path("/x"))) is True
assert usage_report._is_single_day_mode(_args()) is False
def test_should_advance_state_only_for_default_run() -> None:
"""Only a plain since-last-report run re-baselines the saved timestamp."""
assert usage_report._should_advance_state(_args(no_update_state=False)) is True
assert usage_report._should_advance_state(_args(no_update_state=True)) is False
# --since is an ad-hoc query and must never advance state.
assert (
usage_report._should_advance_state(
_args(since="20260510", no_update_state=False),
)
is False
)
# --------------------------------------------------------------------------- #
# Report fragments (usage_report)
# --------------------------------------------------------------------------- #
def test_period_line_contains_both_bounds() -> None:
"""The period bullet shows start, end, and the span."""
line = usage_report._period_line(_at(2026, 6, 2, 9), _at(2026, 6, 4, 9))
assert "2026-06-02T09:00:00" in line
assert "2026-06-04T09:00:00" in line
assert "" in line
def test_describe_logs_counts() -> None:
"""Log description switches between none / single / multiple wording."""
assert "none found" in usage_report._describe_logs([], "atop -r")
assert usage_report._describe_logs(
[Path("/var/log/atop/atop_20260604")], "atop -r"
).startswith(
"`/var/log/atop/atop_20260604`",
)
many = usage_report._describe_logs(
[Path("/v/atop_20260601"), Path("/v/atop_20260604")],
"atop -r",
)
assert "2 daily logs" in many
# --------------------------------------------------------------------------- #
# PRC field parsing — HZ-field regression (parsing)
# --------------------------------------------------------------------------- #
def test_parse_prc_does_not_charge_hz_as_cpu() -> None:
"""atop emits `... pid (name) state HZ utime stime`; the HZ column must be
skipped, never summed as CPU.
Regression for the off-by-one that read HZ (100) as utime, which inflated
every process's CPU-seconds to its record/PID count (xset showing 67h).
"""
pid_cpu: dict[int, _PidCpu] = {}
# 6 generic fields, pid, (name), state, HZ=100, utime=7, stime=3, + tail.
line = "PRC host 1000 2026/06/04 12:00:00 600 4242 (xset) E 100 7 3 0 0 0"
parsing._parse_prc(line.split(), pid_cpu)
entry = pid_cpu[4242]
assert entry.name == "xset"
assert entry.delta_ticks == 10 # utime+stime, never the HZ constant (100)
def test_parse_prc_skips_hz_with_multiword_name() -> None:
"""The HZ skip stays aligned when the name spans several tokens."""
pid_cpu: dict[int, _PidCpu] = {}
line = "PRC h 1000 d t 600 99 (Web Content) S 100 40 2 0 0"
parsing._parse_prc(line.split(), pid_cpu)
assert pid_cpu[99].name == "Web Content"
assert pid_cpu[99].delta_ticks == 42 # 40+2, HZ(100) skipped
def test_parse_prc_too_short_is_ignored() -> None:
"""A truncated PRC record (missing stime) is skipped, not a crash."""
pid_cpu: dict[int, _PidCpu] = {}
# Tokens run out at utime — no stime at after+3, so the record is dropped.
line = "PRC h 1000 d t 600 7 (x) S 100 5"
parsing._parse_prc(line.split(), pid_cpu)
assert pid_cpu == {}
# --------------------------------------------------------------------------- #
# Native helper selection (parsing)
# --------------------------------------------------------------------------- #
def test_atop_agg_binary_missing_source_falls_back(
tmp_path: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""A deleted C source tree yields None (Python fallback) even when a cached
binary exists never trust an orphaned, unverifiable build."""
monkeypatch.setattr(parsing, "_ATOP_AGG_SRC_DIR", tmp_path / "gone")
cache = tmp_path / "atop_agg"
cache.write_text("stale binary", encoding="utf-8")
monkeypatch.setattr(parsing, "_ATOP_AGG_CACHE_BIN", cache)
assert parsing._atop_agg_binary() is None

View File

@ -204,7 +204,10 @@ exclude_dirs = ["tests", ".venv", "Bash/ffmpeg-build"]
# PYTEST - Testing framework configuration
# ============================================================================
[tool.pytest.ini_options]
testpaths = ["python_pkg"]
# linux_configuration/tests covers the standalone usage_report scripts. It adds
# no python_pkg coverage (those tests don't import python_pkg), so running it
# alongside leaves the 100%-on-python_pkg gate untouched.
testpaths = ["python_pkg", "linux_configuration/tests"]
python_files = ["test_*.py", "*_test.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]

View File

@ -13,8 +13,8 @@
set -euo pipefail
SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)"
REPORT_SCRIPT="$SCRIPT_DIR/linux_configuration/scripts/system-maintenance/bin/usage_report.py"
ARTIFACT_INIT_SCRIPT="$SCRIPT_DIR/scripts/init_agent_artifacts.sh"
REPORT_SCRIPT="$SCRIPT_DIR/linux_configuration/scripts/periodic_background/system-maintenance/bin/usage_report.py"
ARTIFACT_INIT_SCRIPT="$SCRIPT_DIR/meta/scripts/init_agent_artifacts.sh"
if [[ ! -f "$REPORT_SCRIPT" ]]; then
echo "Error: usage_report.py not found at: $REPORT_SCRIPT" >&2

View File

@ -9,6 +9,10 @@ invocation with whole-repo coverage measured against ``python_pkg``.
Running all packages together (rather than just the touched ones) ensures that
100% branch coverage is maintained across the entire codebase on every commit,
not just the files that happened to change.
Standalone script suites outside ``python_pkg/`` (currently
``linux_configuration/tests``) are also run so their behaviour is gated, but
they are not coverage-measured (coverage stays scoped to ``python_pkg``).
"""
from __future__ import annotations
@ -21,6 +25,10 @@ import sys
_TOTAL_MEM = "4G"
# Standalone script test suites outside python_pkg/ that should be gated but
# not coverage-measured. Skipped silently if the directory does not exist.
_EXTRA_TEST_DIRS = ("linux_configuration/tests",)
def main() -> int:
"""Entry point."""
@ -35,6 +43,9 @@ def main() -> int:
if not packages:
return 0
test_dirs = [f"python_pkg/{pkg}/tests" for pkg in packages]
test_dirs += [d for d in _EXTRA_TEST_DIRS if Path(d).is_dir()]
cmd = [
sys.executable,
"-m",
@ -50,7 +61,7 @@ def main() -> int:
# Override addopts from pyproject.toml to avoid double --cov flags.
"-o",
"addopts=--strict-markers --strict-config -ra",
*[f"python_pkg/{pkg}/tests" for pkg in packages],
*test_dirs,
]
if shutil.which("systemd-run") is not None: