def spec_to_graph(
spec: GDSSpec,
*,
base_uri: str = DEFAULT_BASE_URI,
) -> Graph:
"""Export a GDSSpec to an RDF graph (ABox instance data)."""
g = Graph()
_bind(g)
ns = _ns(base_uri, spec.name)
g.bind("inst", ns)
spec_uri = ns["spec"]
g.add((spec_uri, RDF.type, GDS_CORE["GDSSpec"]))
g.add((spec_uri, GDS_CORE["name"], Literal(spec.name)))
g.add((spec_uri, GDS_CORE["description"], Literal(spec.description)))
# Types
type_uris: dict[str, URIRef] = {}
for name, t in spec.types.items():
type_uris[name] = _typedef_to_rdf(g, ns, t)
g.add((spec_uri, GDS_CORE["hasType"], type_uris[name]))
# Also export parameter typedefs that may not be in spec.types
for p in spec.parameter_schema.parameters.values():
if p.typedef.name not in type_uris:
type_uris[p.typedef.name] = _typedef_to_rdf(g, ns, p.typedef)
# Spaces
space_uris: dict[str, URIRef] = {}
for name, s in spec.spaces.items():
space_uris[name] = _space_to_rdf(g, ns, s, type_uris)
g.add((spec_uri, GDS_CORE["hasSpace"], space_uris[name]))
# Entities
entity_uris: dict[str, URIRef] = {}
for name, e in spec.entities.items():
entity_uris[name] = _entity_to_rdf(g, ns, e, type_uris)
g.add((spec_uri, GDS_CORE["hasEntity"], entity_uris[name]))
# Parameters
param_uris: dict[str, URIRef] = {}
for name, p in spec.parameter_schema.parameters.items():
param_uris[name] = _parameter_to_rdf(g, ns, p, type_uris)
g.add((spec_uri, GDS_CORE["hasParameter"], param_uris[name]))
# Blocks
block_uris: dict[str, URIRef] = {}
for name, b in spec.blocks.items():
block_uris[name] = _block_to_rdf(g, ns, b, param_uris, entity_uris)
g.add((spec_uri, GDS_CORE["hasBlock"], block_uris[name]))
# Wirings
for _name, w in spec.wirings.items():
w_uri = _wiring_to_rdf(g, ns, w, block_uris, space_uris)
g.add((spec_uri, GDS_CORE["hasWiring"], w_uri))
# Admissibility constraints
for ac_name, ac in spec.admissibility_constraints.items():
ac_uri = _uri(ns, "admissibility", ac_name)
g.add((ac_uri, RDF.type, GDS_CORE["AdmissibleInputConstraint"]))
g.add((ac_uri, GDS_CORE["name"], Literal(ac_name)))
g.add(
(
ac_uri,
GDS_CORE["constraintBoundaryBlock"],
Literal(ac.boundary_block),
)
)
if ac.boundary_block in block_uris:
g.add(
(
ac_uri,
GDS_CORE["constrainsBoundary"],
block_uris[ac.boundary_block],
)
)
g.add(
(
ac_uri,
GDS_CORE["admissibilityHasConstraint"],
Literal(ac.constraint is not None, datatype=XSD.boolean),
)
)
g.add((ac_uri, GDS_CORE["description"], Literal(ac.description)))
for entity_name, var_name in ac.depends_on:
dep = BNode()
g.add((dep, RDF.type, GDS_CORE["AdmissibilityDep"]))
g.add((dep, GDS_CORE["depEntity"], Literal(entity_name)))
g.add((dep, GDS_CORE["depVariable"], Literal(var_name)))
g.add((ac_uri, GDS_CORE["hasDependency"], dep))
g.add((spec_uri, GDS_CORE["hasAdmissibilityConstraint"], ac_uri))
# Transition signatures
for mname, ts in spec.transition_signatures.items():
ts_uri = _uri(ns, "transition_sig", mname)
g.add((ts_uri, RDF.type, GDS_CORE["TransitionSignature"]))
g.add((ts_uri, GDS_CORE["name"], Literal(mname)))
g.add((ts_uri, GDS_CORE["signatureMechanism"], Literal(ts.mechanism)))
if ts.mechanism in block_uris:
g.add(
(
ts_uri,
GDS_CORE["signatureForMechanism"],
block_uris[ts.mechanism],
)
)
for bname in ts.depends_on_blocks:
g.add((ts_uri, GDS_CORE["dependsOnBlock"], Literal(bname)))
if ts.preserves_invariant:
g.add(
(
ts_uri,
GDS_CORE["preservesInvariant"],
Literal(ts.preserves_invariant),
)
)
for entity_name, var_name in ts.reads:
entry = BNode()
g.add((entry, RDF.type, GDS_CORE["TransitionReadEntry"]))
g.add((entry, GDS_CORE["readEntity"], Literal(entity_name)))
g.add((entry, GDS_CORE["readVariable"], Literal(var_name)))
g.add((ts_uri, GDS_CORE["hasReadEntry"], entry))
g.add((spec_uri, GDS_CORE["hasTransitionSignature"], ts_uri))
# State metrics
for sm_name, sm in spec.state_metrics.items():
sm_uri = _uri(ns, "state_metric", sm_name)
g.add((sm_uri, RDF.type, GDS_CORE["StateMetric"]))
g.add((sm_uri, GDS_CORE["name"], Literal(sm_name)))
if sm.metric_type:
g.add((sm_uri, GDS_CORE["metricType"], Literal(sm.metric_type)))
g.add(
(
sm_uri,
GDS_CORE["metricHasDistance"],
Literal(sm.distance is not None, datatype=XSD.boolean),
)
)
g.add((sm_uri, GDS_CORE["description"], Literal(sm.description)))
for entity_name, var_name in sm.variables:
entry = BNode()
g.add((entry, RDF.type, GDS_CORE["MetricVariableEntry"]))
g.add((entry, GDS_CORE["metricEntity"], Literal(entity_name)))
g.add((entry, GDS_CORE["metricVariable"], Literal(var_name)))
g.add((sm_uri, GDS_CORE["hasMetricVariable"], entry))
g.add((spec_uri, GDS_CORE["hasStateMetric"], sm_uri))
return g