Runner
PLC is the execution engine. It takes a program, holds the current state, and exposes methods to drive execution scan by scan.
Creating a runner
The constructor accepts:
- A
Program(the common case) - A list of rungs (
[rung1, rung2]) Nonefor an empty program (useful in tests)
Optional keyword arguments:
initial_state— aSystemStateto start from instead of the defaulthistory— retention window for the scan log and checkpoints. Duration string ("1h","30m"), scan count (int), orNone(unlimited, default). Prevents unbounded memory growth on long runs.cache— instant-lookup window for fullSystemStatesnapshots. Same formats ashistory.None(default) uses byte-budget-only eviction.history_budget— byte ceiling for the recent-state cache (default: 100 MB; minimum 1 MB). Acts as a safety net when duration-based policies aren't enough.
Time modes
runner = PLC(logic, dt=0.010) # fixed-step, 10 ms per scan (default)
runner = PLC(logic, realtime=True) # wall-clock
| Mode | Behavior | Use case |
|---|---|---|
dt=0.010 |
timestamp += dt each scan |
Tests, offline simulation |
realtime=True |
timestamp = actual elapsed time |
Live hardware, integration tests |
dt= is the default. Timer and counter instructions use timestamp, so fixed steps give perfectly reproducible results. realtime=True is intentionally non-deterministic — scan dt follows host elapsed time.
Real-time clock
Logic that depends on time of day (shift changes, scheduled events) uses the RTC system points (system.rtc.year4, system.rtc.month, system.rtc.hour, etc.). By default, these track wall-clock time. See System points for the full namespace overview.
set_rtc pins the RTC to a specific datetime:
The RTC then advances with simulation time: rtc = base_datetime + (current_sim_time - sim_time_at_set). With a fixed dt, this makes time-of-day logic fully deterministic. With realtime=True, it effectively offsets the wall clock.
Execution methods
step() — one scan
Executes one complete scan cycle (all phases) and returns the committed SystemState.
run(cycles) — N scans
Runs exactly N scans, unless a pause breakpoint fires first. Returns the final state.
run_for(seconds) — advance by time
Keeps stepping until the simulation clock has advanced by the given amount (or a pause breakpoint fires).
run_until(*conditions) — stop on condition
Accepts the same condition expressions used inside Rung(). Multiple conditions are AND-ed:
runner.run_until(Motor & ~Fault)
runner.run_until(Temp > 150.0)
runner.run_until(Or(AlarmA, AlarmB, AlarmC))
Stops when the condition is true, a pause breakpoint fires, or max_cycles is reached — whichever comes first.
run_until_fn(predicate) — callable predicate
For conditions that aren't expressible as tag/condition expressions:
The predicate receives the committed SystemState each scan.
Injecting inputs
patch() — one-shot
Values are applied at the start of the next step() and then discarded. Multiple patches before a step merge — last write per tag wins.
.value via context manager
Inside with PLC(...) as plc: (or with runner:), tag .value reads and writes go through the runner's current state:
with PLC(logic) as plc:
Button.value = True # queues a patch
print(Step.value) # reads current value
plc.step() # executes with the queued patch
assert Motor.value is True
Forces
Forces persist across scans, re-applied at two points each scan:
Phase 3: APPLY FORCES (pre-logic) ← sets force values before any rung runs
Phase 4: EXECUTE LOGIC ← logic may overwrite forced values mid-scan
Phase 5: APPLY FORCES (post-logic) ← re-asserts force values after all logic
This means:
- Forced values are present at scan start and scan end.
- Logic may temporarily change a forced value mid-scan (for example,
latch()on a forced-False tag sets it True temporarily, but the post-logic force pass restores it). - Edge detection (
rise/fall) sees the post-force values that carry across scans.
If a tag is both patched and forced in the same scan, the pre-logic force pass overwrites the patched value. The patch is consumed but has no effect.
For force usage patterns in tests, see Testing — Forces.
Mode control
stop() — enter STOP mode
Sets PLC mode to STOP. Does not clear tags. Idempotent.
Auto-restart from STOP
Any execution method (step, run, run_for, run_until) performs a STOP→RUN transition before executing:
- Non-retentive tags reset to defaults
- Retentive tags preserve values
- Runtime scope resets (
scan_id=0,timestamp=0.0, history/patches/forces cleared)
reboot() — power-cycle
Simulates a power cycle. Tag behavior depends on battery:
- Battery present (default): all tags preserve
- Battery absent: all tags reset to defaults
Runtime scope resets the same as STOP→RUN. Runner returns in RUN mode.
Inspecting state
runner.current_state # SystemState snapshot at latest committed scan
runner.simulation_time # shorthand for current_state.timestamp
runner.time_mode # current TimeMode
runner.forces # read-only view of active force overrides
SystemState fields:
state.scan_id # int — monotonic scan counter (starts at 0)
state.timestamp # float — simulation clock in seconds
state.tags # PMap[str, value] — all tag values
state.memory # PMap[str, value] — internal engine state
Both scan_id and timestamp reset to 0 on STOP→RUN transition or reboot().
History
Enable history retention to keep immutable state snapshots:
runner = PLC(logic) # 100 MB default cache, all scans addressable
runner.history.at(5) # state at scan 5
runner.history.range(3, 7) # [scan 3, 4, 5, 6]
runner.history.latest(10) # up to 10 most recent (oldest → newest)
Every scan from 0 to the current tip is addressable. Recent scans are served from an in-memory state cache (byte-bounded, default 100 MB); older scans are reconstructed on demand from the scan log and checkpoints.
To bound memory on long runs, set a retention window:
runner = PLC(logic, history="1h") # keep 1 hour of replayable history
runner = PLC(logic, history="1h", cache="5m") # last 5 minutes instant, rest via replay
runner = PLC(logic, history_budget=20 * 1024 * 1024) # 20 MB byte ceiling
history_budget must be at least 1 MB (raises ValueError below that).
Time-travel playhead
The playhead is a read-only cursor into history. It doesn't affect execution — step() always appends at the history tip.
runner.playhead # current inspection scan_id
runner.seek(scan_id=5) # jump to a historical scan
runner.rewind(seconds=1.0) # move backward by simulation time
snapshot = runner.history.at(runner.playhead)
rewind(seconds) finds the nearest state where timestamp <= target.
Diff
Compare two retained scans to see what changed:
Returns string-keyed dicts — only tags whose values differ. Missing tags appear as None.
Fork
Create an independent runner from a snapshot:
alt = runner.fork() # from current state (common case)
alt = runner.fork(scan_id=10) # from a retained historical scan
alt = runner.fork_from(scan_id=10) # alias
The fork starts with the snapshot's state and the same time mode. It has clean runtime state — no forces, patches, breakpoints, or monitors carry over. Only the fork snapshot is in its initial history.
See Testing — Forking for the alternate-outcomes pattern.
Breakpoints and monitors
when() creates condition breakpoints evaluated after each committed scan. monitor() watches a tag for value changes. Both return handles with .remove(), .enable(), .disable().
runner.when(Fault).pause() # halt run()/run_for()/run_until()
runner.when(Fault).snapshot("fault_seen") # label scan in history
runner.monitor(Motor, lambda curr, prev: print(f"{prev} → {curr}"))
See Testing — Monitoring changes and Testing — Predicate breakpoints for usage patterns.