def graph_to_spec(
g: Graph,
*,
spec_uri: URIRef | None = None,
) -> GDSSpec:
"""Reconstruct a GDSSpec from an RDF graph.
If spec_uri is None, finds the first GDSSpec individual in the graph.
"""
from gds import (
GDSSpec,
ParameterDef,
SpecWiring,
Wire,
)
from gds.blocks.roles import BoundaryAction, Mechanism, Policy
from gds.constraints import (
AdmissibleInputConstraint,
StateMetric,
TransitionSignature,
)
from gds.spaces import Space
from gds.state import Entity, StateVariable
from gds.types.interface import Interface, port
from gds.types.typedef import TypeDef
if spec_uri is None:
specs = _subjects_of_type(g, GDS_CORE["GDSSpec"])
if not specs:
raise ValueError("No GDSSpec found in graph")
spec_uri = specs[0]
spec_name = _str(g, spec_uri, GDS_CORE["name"])
spec_desc = _str(g, spec_uri, GDS_CORE["description"])
spec = GDSSpec(name=spec_name, description=spec_desc)
# Import types
typedef_map: dict[str, TypeDef] = {}
type_uris = list(g.objects(spec_uri, GDS_CORE["hasType"]))
for t_uri in type_uris:
if not isinstance(t_uri, URIRef):
continue
td_fields = _import_typedef(g, t_uri)
td = TypeDef(**td_fields)
typedef_map[td.name] = td
spec.register_type(td)
# Also collect all TypeDef URIs for parameter types
all_typedef_uris = _subjects_of_type(g, GDS_CORE["TypeDef"])
for t_uri in all_typedef_uris:
td_fields = _import_typedef(g, t_uri)
if td_fields["name"] not in typedef_map:
td = TypeDef(**td_fields)
typedef_map[td.name] = td
# Import spaces
space_uris = list(g.objects(spec_uri, GDS_CORE["hasSpace"]))
for s_uri in space_uris:
if not isinstance(s_uri, URIRef):
continue
s_name = _str(g, s_uri, GDS_CORE["name"])
s_desc = _str(g, s_uri, GDS_CORE["description"])
fields: dict[str, TypeDef] = {}
for field_node in g.objects(s_uri, GDS_CORE["hasField"]):
field_name = _str(g, field_node, GDS_CORE["fieldName"])
field_type_uris = list(g.objects(field_node, GDS_CORE["fieldType"]))
if field_type_uris:
ft_name = _str(g, field_type_uris[0], GDS_CORE["name"])
if ft_name in typedef_map:
fields[field_name] = typedef_map[ft_name]
spec.register_space(Space(name=s_name, fields=fields, description=s_desc))
# Import entities
entity_uris = list(g.objects(spec_uri, GDS_CORE["hasEntity"]))
for e_uri in entity_uris:
if not isinstance(e_uri, URIRef):
continue
e_name = _str(g, e_uri, GDS_CORE["name"])
e_desc = _str(g, e_uri, GDS_CORE["description"])
variables: dict[str, StateVariable] = {}
for sv_uri in g.objects(e_uri, GDS_CORE["hasVariable"]):
if not isinstance(sv_uri, URIRef):
continue
sv_name = _str(g, sv_uri, GDS_CORE["name"])
sv_desc = _str(g, sv_uri, GDS_CORE["description"])
sv_symbol = _str(g, sv_uri, GDS_CORE["symbol"])
# Resolve typedef
sv_type_uris = list(g.objects(sv_uri, GDS_CORE["usesType"]))
if sv_type_uris:
sv_type_name = _str(g, sv_type_uris[0], GDS_CORE["name"])
sv_typedef = typedef_map.get(
sv_type_name,
TypeDef(name=sv_type_name, python_type=str),
)
else:
sv_typedef = TypeDef(name="unknown", python_type=str)
variables[sv_name] = StateVariable(
name=sv_name,
typedef=sv_typedef,
description=sv_desc,
symbol=sv_symbol,
)
spec.register_entity(
Entity(name=e_name, variables=variables, description=e_desc)
)
# Import parameters
param_uris = list(g.objects(spec_uri, GDS_CORE["hasParameter"]))
param_uri_map: dict[str, URIRef] = {}
for p_uri in param_uris:
if not isinstance(p_uri, URIRef):
continue
p_name = _str(g, p_uri, GDS_CORE["name"])
p_desc = _str(g, p_uri, GDS_CORE["description"])
param_uri_map[p_name] = p_uri
# Resolve typedef
pt_uris = list(g.objects(p_uri, GDS_CORE["paramType"]))
if pt_uris:
pt_name = _str(g, pt_uris[0], GDS_CORE["name"])
p_typedef = typedef_map.get(pt_name, TypeDef(name=pt_name, python_type=str))
else:
p_typedef = TypeDef(name="unknown", python_type=str)
spec.register_parameter(
ParameterDef(name=p_name, typedef=p_typedef, description=p_desc)
)
# Import blocks
block_uris = list(g.objects(spec_uri, GDS_CORE["hasBlock"]))
# Build reverse lookup: param URI -> param name
param_name_by_uri: dict[URIRef, str] = {}
for pname, puri in param_uri_map.items():
param_name_by_uri[puri] = pname
for b_uri in block_uris:
if not isinstance(b_uri, URIRef):
continue
b_name = _str(g, b_uri, GDS_CORE["name"])
b_kind = _str(g, b_uri, GDS_CORE["kind"])
# Reconstruct interface
iface_uris = list(g.objects(b_uri, GDS_CORE["hasInterface"]))
fwd_in_ports: list[str] = []
fwd_out_ports: list[str] = []
bwd_in_ports: list[str] = []
bwd_out_ports: list[str] = []
if iface_uris:
iface_uri = iface_uris[0]
for p in g.objects(iface_uri, GDS_CORE["hasForwardIn"]):
fwd_in_ports.append(_str(g, p, GDS_CORE["portName"]))
for p in g.objects(iface_uri, GDS_CORE["hasForwardOut"]):
fwd_out_ports.append(_str(g, p, GDS_CORE["portName"]))
for p in g.objects(iface_uri, GDS_CORE["hasBackwardIn"]):
bwd_in_ports.append(_str(g, p, GDS_CORE["portName"]))
for p in g.objects(iface_uri, GDS_CORE["hasBackwardOut"]):
bwd_out_ports.append(_str(g, p, GDS_CORE["portName"]))
iface = Interface(
forward_in=tuple(port(n) for n in sorted(fwd_in_ports)),
forward_out=tuple(port(n) for n in sorted(fwd_out_ports)),
backward_in=tuple(port(n) for n in sorted(bwd_in_ports)),
backward_out=tuple(port(n) for n in sorted(bwd_out_ports)),
)
# Params used
params_used = []
for pu in g.objects(b_uri, GDS_CORE["usesParameter"]):
if isinstance(pu, URIRef) and pu in param_name_by_uri:
params_used.append(param_name_by_uri[pu])
constraints = _strs(g, b_uri, GDS_CORE["constraint"])
options = _strs(g, b_uri, GDS_CORE["option"])
# Build block by kind
if b_kind == "boundary":
block = BoundaryAction(
name=b_name,
interface=iface,
params_used=params_used,
constraints=constraints,
options=options,
)
elif b_kind == "mechanism":
updates: list[tuple[str, str]] = []
for entry in g.objects(b_uri, GDS_CORE["updatesEntry"]):
ent = _str(g, entry, GDS_CORE["updatesEntity"])
var = _str(g, entry, GDS_CORE["updatesVariable"])
updates.append((ent, var))
block = Mechanism(
name=b_name,
interface=iface,
updates=updates,
params_used=params_used,
constraints=constraints,
)
elif b_kind == "policy":
block = Policy(
name=b_name,
interface=iface,
params_used=params_used,
constraints=constraints,
options=options,
)
else:
from gds.blocks.base import AtomicBlock
block = AtomicBlock(name=b_name, interface=iface)
spec.register_block(block)
# Import wirings
wiring_uris = list(g.objects(spec_uri, GDS_CORE["hasWiring"]))
for w_uri in wiring_uris:
if not isinstance(w_uri, URIRef):
continue
w_name = _str(g, w_uri, GDS_CORE["name"])
w_desc = _str(g, w_uri, GDS_CORE["description"])
block_names = []
for wb in g.objects(w_uri, GDS_CORE["wiringBlock"]):
if isinstance(wb, URIRef):
bn = _str(g, wb, GDS_CORE["name"])
if bn:
block_names.append(bn)
wires = []
for wire_node in g.objects(w_uri, GDS_CORE["hasWire"]):
ws = _str(g, wire_node, GDS_CORE["wireSource"])
wt = _str(g, wire_node, GDS_CORE["wireTarget"])
wsp = _str(g, wire_node, GDS_CORE["wireSpace"])
wo = _bool(g, wire_node, GDS_CORE["wireOptional"])
wires.append(Wire(source=ws, target=wt, space=wsp, optional=wo))
spec.register_wiring(
SpecWiring(
name=w_name,
block_names=block_names,
wires=wires,
description=w_desc,
)
)
# Import admissibility constraints
ac_uris = list(g.objects(spec_uri, GDS_CORE["hasAdmissibilityConstraint"]))
for ac_uri in ac_uris:
if not isinstance(ac_uri, URIRef):
continue
ac_name = _str(g, ac_uri, GDS_CORE["name"])
ac_boundary = _str(g, ac_uri, GDS_CORE["constraintBoundaryBlock"])
ac_desc = _str(g, ac_uri, GDS_CORE["description"])
depends_on: list[tuple[str, str]] = []
for dep in g.objects(ac_uri, GDS_CORE["hasDependency"]):
ent = _str(g, dep, GDS_CORE["depEntity"])
var = _str(g, dep, GDS_CORE["depVariable"])
depends_on.append((ent, var))
spec.register_admissibility(
AdmissibleInputConstraint(
name=ac_name,
boundary_block=ac_boundary,
depends_on=depends_on,
constraint=None,
description=ac_desc,
)
)
# Import transition signatures
ts_uris = list(g.objects(spec_uri, GDS_CORE["hasTransitionSignature"]))
for ts_uri in ts_uris:
if not isinstance(ts_uri, URIRef):
continue
ts_mech = _str(g, ts_uri, GDS_CORE["signatureMechanism"])
reads: list[tuple[str, str]] = []
for entry in g.objects(ts_uri, GDS_CORE["hasReadEntry"]):
ent = _str(g, entry, GDS_CORE["readEntity"])
var = _str(g, entry, GDS_CORE["readVariable"])
reads.append((ent, var))
depends_on_blocks = _strs(g, ts_uri, GDS_CORE["dependsOnBlock"])
invariant = _str(g, ts_uri, GDS_CORE["preservesInvariant"])
spec.register_transition_signature(
TransitionSignature(
mechanism=ts_mech,
reads=reads,
depends_on_blocks=depends_on_blocks,
preserves_invariant=invariant,
)
)
# Import state metrics
sm_uris = list(g.objects(spec_uri, GDS_CORE["hasStateMetric"]))
for sm_uri in sm_uris:
if not isinstance(sm_uri, URIRef):
continue
sm_name = _str(g, sm_uri, GDS_CORE["name"])
sm_type = _str(g, sm_uri, GDS_CORE["metricType"])
sm_desc = _str(g, sm_uri, GDS_CORE["description"])
variables: list[tuple[str, str]] = []
for entry in g.objects(sm_uri, GDS_CORE["hasMetricVariable"]):
ent = _str(g, entry, GDS_CORE["metricEntity"])
var = _str(g, entry, GDS_CORE["metricVariable"])
variables.append((ent, var))
spec.register_state_metric(
StateMetric(
name=sm_name,
variables=variables,
metric_type=sm_type,
distance=None, # R3 lossy
description=sm_desc,
)
)
return spec