| File / Crate | What it demonstrates | Key concepts |
|---|---|---|
service/src/bin.rs | Main service entry point with multi-runtime supervision | Runtime creation, flow dispatch, signal handling, coordinated shutdown |
service/init/orchestrator.rc | Android init service integration | System property triggers, capability management, lifecycle hooks |
bpf/cgroup.bpf.c | BPF cgroup lifecycle monitoring | Ring buffer events, tracepoint hooks, cgroupv2 path filtering |
crates/cpuflow/bpf/cpu.bpf.c | Per-CPU, per-PID execution tracking | Nested BPF maps, scheduler hooks, frequency-weighted credits |
crates/gpuflow/bpf/credits.bpf.c | GPU power level and per-PID utilization tracking | Vendor tracepoint hooks, pinned maps, time-in-state accounting |
crates/common/src/lib.rs | Core trait definitions | Flow trait, async reactor abstraction |
crates/common/src/process_monitor.rs | Policy orchestration and lifecycle | Policy, PolicyFactory, AggregatePolicy traits, task supervision |
crates/common/src/cgroup_monitor.rs | BPF event processing with state machine | Sequential event processing, async ring buffer polling, initial filesystem scan |
crates/common/src/cgroup.rs | cgroupv2 filesystem abstraction | File descriptor caching, reclaim type control, async file I/O |
crates/common/src/stats.rs | PSI and memory statistics parsing | Pressure triggers, cgroup stat formats, async pressure monitoring |
crates/memflow/src/lib.rs | Fully implemented memory management flow | Policy composition, experiment configuration, PSI integration |
crates/memflow/src/policy/telemetry.rs | Observability for memory usage | Per-process telemetry collection, statsd integration |
crates/memflow/src/policy/mem_throttle.rs | Working set size measurement via throttle | memory.high manipulation, WSS estimation, cleanup guarantees |
crates/memflow/src/policy/system_psi.rs | PI controller for pressure-based memory limits | Proportional-integral control, anti-windup, deadband filtering |
crates/memflow/src/policy/senpai_psi.rs | Exponential backoff/probe pressure management | Pressure integral tracking, grace periods, power-state awareness |
crates/memflow/src/policy/static_min.rs | Guaranteed memory protection for system services | Aggregate policy, memory.min enforcement, per-UID accumulation |
crates/memflow/src/policy/mem_budget.rs | Declarative budget enforcement with remote config | JSON budget schema, enforcement levels (Trace/Throttle/Kill), dry-run mode |
crates/cpuflow/src/lib.rs | Stub CPU flow (infrastructure only) | BPF data collection without userspace policy |
crates/gpuflow/src/lib.rs | Stub GPU flow (infrastructure only) | BPF data collection without userspace policy |
/proc/pressure/memory, builds policy instances based on Android system property configuration, and starts the process monitor. The process monitor listens to a stream of cgroup lifecycle events from the BPF program, reads process information from /proc/{pid}/, and spawns individual policy tick tasks for each enrolled process.Flow trait in crates/common/src/lib.rs that abstracts a long-running resource management domain. The main binary creates a separate tokio runtime for each flow, isolating scheduler contention and allowing per-flow thread naming for debugging:#[async_trait]
pub trait Flow: Send {
async fn run(&mut self, cancel_token: CancellationToken) -> anyhow::Result<()>;
}
tokio::select! with biased to prioritize shutdown signals over flow completion. See service/src/bin.rs for the complete supervision pattern.PolicyFactory conditionally creates per-process policies based on process metadata. Policy defines a periodic tick interface with cleanup guarantees. AggregatePolicy manages multiple processes collectively for cross-process resource decisions:async fn create(&self, process: &ProcessInfo) -> anyhow::Result<Option<Box<dyn Policy>>>;
Option<Box<dyn Policy>>, allowing enrollment decisions at runtime. The ProcessMonitor in crates/common/src/process_monitor.rs iterates all factories for each new process and spawns tick tasks only for enrolled policies.tokio::io::unix::AsyncFd for async polling:let ring_buffer = AsyncFd::new(ring_buffer)?; ring_buffer.readable().await?;
crates/common/src/cgroup_monitor.rs tracks which processes are attached to cgroups, handling the Android process lifecycle where app processes get their identity at attach time but system services require exec events.CgroupEntry trait provides async methods for reading and writing cgroupv2 pseudo-files. The FileSystemCgroup implementation caches file descriptors and seeks to the start for repeated reads, avoiding open/close overhead on hot paths:let stats = cgroup.memory_stats().await?; cgroup.write_memory_high(limit_bytes).await?;
ReclaimType parameter controls whether writes use O_NONBLOCK, determining whether memory reclaim is charged to the orchestrator or deferred to the target cgroup. See crates/common/src/cgroup.rs for the complete abstraction.memory.high to maintain a target pressure percentage. The Senpai PSI policy uses exponential backoff when pressure exceeds the target and exponential probing when pressure is below target.memory.pressure and compute the delta since the last tick. See crates/memflow/src/policy/system_psi.rs and crates/memflow/src/policy/senpai_psi.rs for the complete control logic.tokio_util::CancellationToken for hierarchical shutdown coordination. A global token created in main() is cloned and passed to each flow. The ProcessMonitor creates its own global token for all policy tasks:select! {
_ = cancel_token.cancelled() => break,
_ = interval.tick() => { /* policy work */ }
}
cleanup() before exiting. The orchestrator enforces a 190ms shutdown timeout, aborting tasks that fail to respond to cancellation.PolicyFactory and Policy traits (see crates/memflow/src/policy/telemetry.rs for a minimal example), register the factory in crates/memflow/src/lib.rs, and add configuration system properties.