class GDSSpec(Tagged):
"""Complete Generalized Dynamical System specification.
Mathematically: GDS = {h, X} where
X = state space (product of entity states)
h = transition map (composed from wirings)
Registration methods are chainable:
spec.register_type(t).register_space(s).register_entity(e)
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
name: str
description: str = ""
types: dict[str, TypeDef] = Field(default_factory=dict)
spaces: dict[str, Space] = Field(default_factory=dict)
entities: dict[str, Entity] = Field(default_factory=dict)
blocks: dict[str, Block] = Field(default_factory=dict)
wirings: dict[str, SpecWiring] = Field(default_factory=dict)
parameter_schema: ParameterSchema = Field(default_factory=ParameterSchema)
admissibility_constraints: dict[str, AdmissibleInputConstraint] = Field(
default_factory=dict
)
transition_signatures: dict[str, TransitionSignature] = Field(default_factory=dict)
state_metrics: dict[str, StateMetric] = Field(default_factory=dict)
execution_contract: ExecutionContract | None = None
# ── Registration ────────────────────────────────────────
def register_type(self, t: TypeDef) -> GDSSpec:
"""Register a TypeDef. Raises if name already registered."""
if t.name in self.types:
raise ValueError(f"Type '{t.name}' already registered")
self.types[t.name] = t
return self
def register_space(self, s: Space) -> GDSSpec:
"""Register a Space. Raises if name already registered."""
if s.name in self.spaces:
raise ValueError(f"Space '{s.name}' already registered")
self.spaces[s.name] = s
return self
def register_entity(self, e: Entity) -> GDSSpec:
"""Register an Entity. Raises if name already registered."""
if e.name in self.entities:
raise ValueError(f"Entity '{e.name}' already registered")
self.entities[e.name] = e
return self
def register_block(self, b: Block) -> GDSSpec:
"""Register a Block. Raises if name already registered."""
if b.name in self.blocks:
raise ValueError(f"Block '{b.name}' already registered")
self.blocks[b.name] = b
return self
def register_wiring(self, w: SpecWiring) -> GDSSpec:
"""Register a SpecWiring. Raises if name already registered."""
if w.name in self.wirings:
raise ValueError(f"Wiring '{w.name}' already registered")
self.wirings[w.name] = w
return self
def register_parameter(
self, param_or_name: ParameterDef | str, typedef: TypeDef | None = None
) -> GDSSpec:
"""Register a parameter definition.
Accepts either:
spec.register_parameter(ParameterDef(name="rate", typedef=Rate))
spec.register_parameter("rate", Rate) # legacy convenience
"""
if isinstance(param_or_name, str):
if typedef is None:
raise ValueError("typedef is required when registering by name string")
param = ParameterDef(name=param_or_name, typedef=typedef)
else:
param = param_or_name
self.parameter_schema = self.parameter_schema.add(param)
return self
def register_admissibility(self, ac: AdmissibleInputConstraint) -> GDSSpec:
"""Register an admissible input constraint.
Raises if name already registered.
"""
if ac.name in self.admissibility_constraints:
raise ValueError(f"Admissibility constraint '{ac.name}' already registered")
self.admissibility_constraints[ac.name] = ac
return self
def register_transition_signature(self, ts: TransitionSignature) -> GDSSpec:
"""Register a transition signature. Raises if mechanism already has one."""
if ts.mechanism in self.transition_signatures:
raise ValueError(
f"Transition signature for '{ts.mechanism}' already registered"
)
self.transition_signatures[ts.mechanism] = ts
return self
def register_state_metric(self, sm: StateMetric) -> GDSSpec:
"""Register a state metric. Raises if name already registered."""
if sm.name in self.state_metrics:
raise ValueError(f"State metric '{sm.name}' already registered")
self.state_metrics[sm.name] = sm
return self
@property
def parameters(self) -> dict[str, TypeDef]:
"""Legacy access: parameter name → TypeDef mapping."""
return {name: p.typedef for name, p in self.parameter_schema.parameters.items()}
# ── Bulk registration ─────────────────────────────────
def collect(
self, *objects: TypeDef | Space | Entity | Block | ParameterDef
) -> GDSSpec:
"""Register multiple objects by type-dispatching each.
Accepts any mix of TypeDef, Space, Entity, Block, and
ParameterDef instances. Does not handle SpecWiring,
AdmissibleInputConstraint, TransitionSignature, or
(name, typedef) parameter shorthand --- those stay explicit
via their respective ``register_*()`` methods.
Raises TypeError for unrecognized types.
"""
for obj in objects:
if isinstance(obj, TypeDef):
self.register_type(obj)
elif isinstance(obj, Space):
self.register_space(obj)
elif isinstance(obj, Entity):
self.register_entity(obj)
elif isinstance(obj, ParameterDef):
self.register_parameter(obj)
elif isinstance(obj, Block):
self.register_block(obj)
else:
raise TypeError(
f"collect() does not accept {type(obj).__name__!r}; "
f"expected TypeDef, Space, Entity, Block, or ParameterDef"
)
return self
# ── Validation ──────────────────────────────────────────
def validate_spec(self) -> list[str]:
"""Full structural validation. Returns list of error strings."""
errors: list[str] = []
errors += self._validate_space_types()
errors += self._validate_wiring_blocks()
errors += self._validate_mechanism_updates()
errors += self._validate_param_references()
errors += self._validate_admissibility_constraints()
errors += self._validate_transition_signatures()
errors += self._validate_state_metrics()
return errors
def _validate_space_types(self) -> list[str]:
"""Every TypeDef used in a Space is registered."""
errors: list[str] = []
for space in self.spaces.values():
for field_name, typedef in space.fields.items():
if typedef.name not in self.types:
errors.append(
f"Space '{space.name}' field '{field_name}' uses "
f"unregistered type '{typedef.name}'"
)
return errors
def _validate_wiring_blocks(self) -> list[str]:
"""Every block referenced in a wiring is registered."""
errors: list[str] = []
for wiring in self.wirings.values():
for bname in wiring.block_names:
if bname not in self.blocks:
errors.append(
f"Wiring '{wiring.name}' references "
f"unregistered block '{bname}'"
)
for wire in wiring.wires:
if wire.source not in self.blocks:
errors.append(
f"Wiring '{wiring.name}' wire source "
f"'{wire.source}' not in registered blocks"
)
if wire.target not in self.blocks:
errors.append(
f"Wiring '{wiring.name}' wire target "
f"'{wire.target}' not in registered blocks"
)
if wire.space and wire.space not in self.spaces:
errors.append(
f"Wiring '{wiring.name}' wire references "
f"unregistered space '{wire.space}'"
)
return errors
def _validate_mechanism_updates(self) -> list[str]:
"""Mechanisms only update existing entity variables."""
errors: list[str] = []
for block in self.blocks.values():
if isinstance(block, Mechanism):
for entity_name, var_name in block.updates:
if entity_name not in self.entities:
errors.append(
f"Mechanism '{block.name}' updates "
f"unknown entity '{entity_name}'"
)
elif var_name not in self.entities[entity_name].variables:
errors.append(
f"Mechanism '{block.name}' updates "
f"unknown variable '{entity_name}.{var_name}'"
)
return errors
def _validate_param_references(self) -> list[str]:
"""All parameter references in blocks are registered."""
errors: list[str] = []
param_names = self.parameter_schema.names()
for block in self.blocks.values():
if isinstance(block, HasParams):
for param in block.params_used:
if param not in param_names:
errors.append(
f"Block '{block.name}' references "
f"unregistered parameter '{param}'"
)
return errors
def _validate_admissibility_constraints(self) -> list[str]:
"""Admissibility constraints reference existing blocks and variables."""
errors: list[str] = []
for ac in self.admissibility_constraints.values():
if ac.boundary_block not in self.blocks:
errors.append(
f"Admissibility constraint '{ac.name}' references "
f"unregistered block '{ac.boundary_block}'"
)
elif not isinstance(self.blocks[ac.boundary_block], BoundaryAction):
errors.append(
f"Admissibility constraint '{ac.name}': "
f"block '{ac.boundary_block}' is not a BoundaryAction"
)
for entity_name, var_name in ac.depends_on:
if entity_name not in self.entities:
errors.append(
f"Admissibility constraint '{ac.name}' depends on "
f"unknown entity '{entity_name}'"
)
elif var_name not in self.entities[entity_name].variables:
errors.append(
f"Admissibility constraint '{ac.name}' depends on "
f"unknown variable '{entity_name}.{var_name}'"
)
return errors
def _validate_transition_signatures(self) -> list[str]:
"""Transition signatures reference existing Mechanisms and variables."""
errors: list[str] = []
for ts in self.transition_signatures.values():
if ts.mechanism not in self.blocks:
errors.append(
f"Transition signature references "
f"unregistered block '{ts.mechanism}'"
)
elif not isinstance(self.blocks[ts.mechanism], Mechanism):
errors.append(
f"Transition signature for '{ts.mechanism}': "
f"block is not a Mechanism"
)
for entity_name, var_name in ts.reads:
if entity_name not in self.entities:
errors.append(
f"Transition signature for '{ts.mechanism}' reads "
f"unknown entity '{entity_name}'"
)
elif var_name not in self.entities[entity_name].variables:
errors.append(
f"Transition signature for '{ts.mechanism}' reads "
f"unknown variable '{entity_name}.{var_name}'"
)
for bname in ts.depends_on_blocks:
if bname not in self.blocks:
errors.append(
f"Transition signature for '{ts.mechanism}' "
f"depends on unregistered block '{bname}'"
)
return errors
def _validate_state_metrics(self) -> list[str]:
"""State metrics reference existing entities and variables."""
errors: list[str] = []
for sm in self.state_metrics.values():
if not sm.variables:
errors.append(f"State metric '{sm.name}' has no variables")
for entity_name, var_name in sm.variables:
if entity_name not in self.entities:
errors.append(
f"State metric '{sm.name}' references "
f"unknown entity '{entity_name}'"
)
elif var_name not in self.entities[entity_name].variables:
errors.append(
f"State metric '{sm.name}' references "
f"unknown variable '{entity_name}.{var_name}'"
)
return errors