Testing
The whole point of pyrung is to test logic before it touches hardware. Every scan is deterministic, every state is a snapshot, and pytest works out of the box.
Your first test
from pyrung import Bool, PLC, Program, Rung, latch, reset
Start = Bool("Start")
Stop = Bool("Stop")
Motor = Bool("Motor")
with Program() as logic:
with Rung(Start):
latch(Motor)
with Rung(Stop):
reset(Motor)
def test_start_latches_motor():
with PLC(logic, dt=0.1) as plc:
Start.value = True
plc.step()
assert Motor.value is True
# Release start — motor stays latched
Start.value = False
plc.step()
assert Motor.value is True
def test_stop_resets_motor():
with PLC(logic, dt=0.1) as plc:
Start.value = True
plc.step()
Start.value = False
Stop.value = True
plc.step()
assert Motor.value is False
Tags are defined at module level (just like PLC addresses), and each test gets a fresh PLC. Inside the with PLC(...) as plc: block, .value reads and writes go through the runner's current state. Set a value, step, assert — that's the pattern.
Testing timers
Timers accumulate time across scans. With a fixed dt, the math is exact:
from pyrung import Bool, Timer, PLC, Program, Rung, on_delay
Enable = Bool("Enable")
MyTimer = Timer.clone("MyTimer")
with Program() as logic:
with Rung(Enable):
on_delay(MyTimer, preset=100)
def test_timer_fires_at_preset():
with PLC(logic, dt=0.001) as plc:
Enable.value = True
# 99 scans = 99 ms — not yet
plc.run(cycles=99)
assert MyTimer.Done.value is False
assert MyTimer.Acc.value == 99
# One more scan — 100 ms, timer fires
plc.step()
assert MyTimer.Done.value is True
A 100 ms preset with 1 ms scans takes exactly 100 steps. No timing jitter, no flaky tests.
Testing time-of-day logic
Logic that depends on the real-time clock (shift changes, scheduled events, lighting) can be tested with set_rtc. With a fixed dt, the RTC advances with simulation time — no wall-clock dependency:
from datetime import datetime
def test_shift_changeover():
with PLC(logic, dt=0.1) as plc:
plc.set_rtc(datetime(2026, 3, 5, 6, 59, 50)) # 10 seconds before 7 AM
plc.run(cycles=100) # 10 seconds at 0.1s/scan
assert ShiftActive.value is True # Logic triggered at 7:00:00
Testing edge detection
rise() fires for exactly one scan on a false → true transition:
from pyrung import Bool, PLC, Program, Rung, out, rise
Sensor = Bool("Sensor")
Pulse = Bool("Pulse")
with Program() as logic:
with Rung(rise(Sensor)):
out(Pulse)
def test_rise_fires_once():
with PLC(logic, dt=0.1) as plc:
Sensor.value = True
plc.step()
assert Pulse.value is True # Rising edge — fires
plc.step()
assert Pulse.value is False # Still true, but no edge — doesn't fire
Forces
Forces override tag values independently of logic — the simulation equivalent of PLC "override" mode. Use them for edge-case testing, known-state setup, and debugging.
Force vs patch
patch() |
force() |
|
|---|---|---|
| Duration | One scan | Until explicitly removed |
| Applied | Pre-logic, once | Pre-logic AND post-logic, every scan |
| Use case | Momentary inputs, test steps | Persistent overrides, test fixtures |
Forces as test fixtures
When you need an input held across many scans, forces are cleaner than setting .value before every step:
def test_motor_runs_for_duration():
with PLC(logic, dt=0.1) as plc:
plc.force(Enable, True)
plc.run(cycles=50)
assert Motor.value is True
plc.unforce(Enable)
The forced() context manager scopes forces to a block and cleans up automatically:
def test_fault_during_operation():
with PLC(logic, dt=0.1) as plc:
with plc.forced({Enable: True}):
plc.run(cycles=10)
with plc.forced({Fault: True}):
plc.step()
assert Motor.value is False # Fault killed the motor
# Fault released, Enable still forced
# All forces released
Safe to nest — inner blocks add to (and restore from) the outer block's forces:
with plc.forced({AutoMode: True}):
plc.run(cycles=3)
with plc.forced({Fault: True}): # adds Fault while AutoMode stays forced
plc.run(cycles=2)
# Fault removed; AutoMode still True
# AutoMode removed
Adding and removing forces
plc.force(Button, True)
plc.force(Temperature, 75.5)
plc.unforce(Button)
plc.clear_forces() # remove all
Inspecting active forces
Supported tag types
Any writable tag (BOOL, INT, DINT, REAL, WORD, CHAR) can be forced. Read-only system tags cannot be forced and raise ValueError.
Forces and patches also accept string keys (
plc.force("Enable", True)) for cases where you're working with tag names directly.
Running until a condition
For tests where you care about what happens, not when, run_until accepts the same condition expressions you use inside Rung():
def test_motor_eventually_stops():
with PLC(logic, dt=0.1) as plc:
Start.value = True
plc.step()
Stop.value = True
plc.run_until(~Motor, max_cycles=100)
assert Motor.value is False
Conditions compose the same way they do in rungs:
runner.run_until(Motor & ~Fault) # Motor on, no fault
runner.run_until(Temp > 150.0) # Temperature exceeded
runner.run_until(Or(AlarmA, AlarmB, AlarmC)) # Any alarm triggered
run_until stops as soon as the condition is met, or after max_cycles — whichever comes first.
Forking: test alternate outcomes
Get your process to a decision point once, then fork and test both paths independently:
def test_fault_vs_normal():
with PLC(logic, dt=0.01) as plc:
Start.value = True
plc.run(cycles=200)
# What happens if a fault occurs?
fault_path = plc.fork()
with fault_path:
Fault.value = True
fault_path.run(cycles=50)
assert Motor.value is False
# What happens under normal operation?
normal_path = plc.fork()
with normal_path:
normal_path.run(cycles=50)
assert Motor.value is True
Each fork is an independent runner starting from the same snapshot. No need to duplicate a long warmup sequence in every test.
Monitoring changes
monitor watches a tag and fires a callback whenever its value changes:
def test_motor_transitions():
transitions = []
with PLC(logic, dt=0.1) as plc:
plc.monitor(Motor, lambda curr, prev: transitions.append((prev, curr)))
Start.value = True
plc.step()
Stop.value = True
plc.step()
assert transitions == [(False, True), (True, False)]
Predicate breakpoints and snapshots
when uses the same condition expressions to pause execution or label a scan in history:
def test_capture_fault_state():
with PLC(logic, dt=0.1) as plc:
plc.when(Fault).snapshot("fault_triggered")
Start.value = True
plc.run(cycles=500)
snap = plc.history.find_labeled("fault_triggered")
if snap is not None:
assert snap.scan_id > 0
when(condition).pause() halts run() / run_for() / run_until() after committing the triggering scan — useful for debugging a long simulation without stepping through every scan.
Comparing states
For debugging tests, diff shows exactly what changed between two scans:
def test_inspect_changes():
with PLC(logic, dt=0.1) as plc:
Start.value = True
plc.step() # scan 1
Stop.value = True
plc.step() # scan 2
changes = plc.diff(scan_a=1, scan_b=2)
# {"Motor": (True, False), "Stop": (False, True), ...}
Checking bounds
Tags with min/max or choices get runtime bounds checking at the end of every scan. Values are never clamped — the write goes through, but a warning fires and the violation lands in plc.bounds_violations:
from pyrung import Bool, Int, Real, PLC, Program, Rung, calc, copy
Pressure = Real("Pressure", min=0, max=100)
Mode = Int("Mode", choices={0: "Off", 1: "On", 2: "Auto"})
Enable = Bool("Enable")
Src = Int("Src")
with Program() as logic:
with Rung(Enable):
calc(Pressure + 60, Pressure)
copy(Src, Mode)
def test_pressure_stays_in_range():
with PLC(logic, dt=0.1) as plc:
plc.patch({Enable: True, Pressure: 50.0, Src: 1})
plc.step()
# 50 + 60 = 110, exceeds max=100
assert "Pressure" in plc.bounds_violations
assert plc.bounds_violations["Pressure"].kind == "range"
# Value goes through unclamped — the program sees its real output
assert Pressure.value == 110.0
def test_mode_rejects_unknown_choice():
with PLC(logic, dt=0.1) as plc:
plc.patch({Enable: True, Src: 5})
plc.step()
assert "Mode" in plc.bounds_violations
assert plc.bounds_violations["Mode"].kind == "choices"
def test_clean_scan_clears_violations():
with PLC(logic, dt=0.1) as plc:
plc.patch({Enable: True, Pressure: 50.0, Src: 1})
plc.step()
assert plc.bounds_violations # violation from Pressure
plc.patch({Enable: False})
plc.step()
assert plc.bounds_violations == {}
Violations also emit warnings.warn(), so pytest -W error::UserWarning will fail any test that triggers one — useful as a catch-all without explicit assertions.
The check runs after all instructions, forces, and system runtime writes — it sees the final committed values. Tags without min/max/choices are never checked (zero overhead). copy() still clamps to type limits (INT ±32768, etc.) before the bounds check runs, so a copy(40000, IntTag) where IntTag has max=500 will show a violation for 32767 (type-clamped), not 40000.
Pytest fixtures
For a shared program across multiple tests:
import pytest
from pyrung import Bool, PLC, Program, Rung, latch, reset
Start = Bool("Start")
Stop = Bool("Stop")
Motor = Bool("Motor")
with Program() as logic:
with Rung(Start):
latch(Motor)
with Rung(Stop):
reset(Motor)
@pytest.fixture
def plc():
return PLC(logic, dt=0.1)
def test_latch(plc):
with plc:
Start.value = True
plc.step()
assert Motor.value is True
def test_stop_after_start(plc):
with plc:
Start.value = True
plc.step()
Stop.value = True
plc.step()
assert Motor.value is False
Running tests
Autoharness: automatic feedback for device-heavy programs
When your UDTs declare physical= and link= on feedback fields, the autoharness can drive all feedback patches automatically — no manual toggling. See Physical Annotations and Autoharness.
Next steps
- Physical Annotations and Autoharness — annotate devices, eliminate feedback boilerplate
- Analysis — dataview, cause/effect chains, coverage queries
- Verification — prove(), fault coverage, lock files
- Runner Guide — time modes, execution methods, numeric behavior
- Quickstart — the traffic light example