Skip to content

gds_interchange.owl.export

Pydantic to RDF graph export functions.

Export GDS Pydantic models to RDF graphs (ABox instance data).

Mirrors the pattern in gds.serialize.spec_to_dict() but targets rdflib.Graph instead of plain dicts.

spec_to_graph(spec, *, base_uri=DEFAULT_BASE_URI)

Export a GDSSpec to an RDF graph (ABox instance data).

Source code in packages/gds-interchange/gds_interchange/owl/export.py
def spec_to_graph(
    spec: GDSSpec,
    *,
    base_uri: str = DEFAULT_BASE_URI,
) -> Graph:
    """Export a GDSSpec to an RDF graph (ABox instance data)."""
    g = Graph()
    _bind(g)
    ns = _ns(base_uri, spec.name)
    g.bind("inst", ns)

    spec_uri = ns["spec"]
    g.add((spec_uri, RDF.type, GDS_CORE["GDSSpec"]))
    g.add((spec_uri, GDS_CORE["name"], Literal(spec.name)))
    g.add((spec_uri, GDS_CORE["description"], Literal(spec.description)))

    # Types
    type_uris: dict[str, URIRef] = {}
    for name, t in spec.types.items():
        type_uris[name] = _typedef_to_rdf(g, ns, t)
        g.add((spec_uri, GDS_CORE["hasType"], type_uris[name]))

    # Also export parameter typedefs that may not be in spec.types
    for p in spec.parameter_schema.parameters.values():
        if p.typedef.name not in type_uris:
            type_uris[p.typedef.name] = _typedef_to_rdf(g, ns, p.typedef)

    # Spaces
    space_uris: dict[str, URIRef] = {}
    for name, s in spec.spaces.items():
        space_uris[name] = _space_to_rdf(g, ns, s, type_uris)
        g.add((spec_uri, GDS_CORE["hasSpace"], space_uris[name]))

    # Entities
    entity_uris: dict[str, URIRef] = {}
    for name, e in spec.entities.items():
        entity_uris[name] = _entity_to_rdf(g, ns, e, type_uris)
        g.add((spec_uri, GDS_CORE["hasEntity"], entity_uris[name]))

    # Parameters
    param_uris: dict[str, URIRef] = {}
    for name, p in spec.parameter_schema.parameters.items():
        param_uris[name] = _parameter_to_rdf(g, ns, p, type_uris)
        g.add((spec_uri, GDS_CORE["hasParameter"], param_uris[name]))

    # Blocks
    block_uris: dict[str, URIRef] = {}
    for name, b in spec.blocks.items():
        block_uris[name] = _block_to_rdf(g, ns, b, param_uris, entity_uris)
        g.add((spec_uri, GDS_CORE["hasBlock"], block_uris[name]))

    # Wirings
    for _name, w in spec.wirings.items():
        w_uri = _wiring_to_rdf(g, ns, w, block_uris, space_uris)
        g.add((spec_uri, GDS_CORE["hasWiring"], w_uri))

    # Admissibility constraints
    for ac_name, ac in spec.admissibility_constraints.items():
        ac_uri = _uri(ns, "admissibility", ac_name)
        g.add((ac_uri, RDF.type, GDS_CORE["AdmissibleInputConstraint"]))
        g.add((ac_uri, GDS_CORE["name"], Literal(ac_name)))
        g.add(
            (
                ac_uri,
                GDS_CORE["constraintBoundaryBlock"],
                Literal(ac.boundary_block),
            )
        )
        if ac.boundary_block in block_uris:
            g.add(
                (
                    ac_uri,
                    GDS_CORE["constrainsBoundary"],
                    block_uris[ac.boundary_block],
                )
            )
        g.add(
            (
                ac_uri,
                GDS_CORE["admissibilityHasConstraint"],
                Literal(ac.constraint is not None, datatype=XSD.boolean),
            )
        )
        g.add((ac_uri, GDS_CORE["description"], Literal(ac.description)))
        for entity_name, var_name in ac.depends_on:
            dep = BNode()
            g.add((dep, RDF.type, GDS_CORE["AdmissibilityDep"]))
            g.add((dep, GDS_CORE["depEntity"], Literal(entity_name)))
            g.add((dep, GDS_CORE["depVariable"], Literal(var_name)))
            g.add((ac_uri, GDS_CORE["hasDependency"], dep))
        g.add((spec_uri, GDS_CORE["hasAdmissibilityConstraint"], ac_uri))

    # Transition signatures
    for mname, ts in spec.transition_signatures.items():
        ts_uri = _uri(ns, "transition_sig", mname)
        g.add((ts_uri, RDF.type, GDS_CORE["TransitionSignature"]))
        g.add((ts_uri, GDS_CORE["name"], Literal(mname)))
        g.add((ts_uri, GDS_CORE["signatureMechanism"], Literal(ts.mechanism)))
        if ts.mechanism in block_uris:
            g.add(
                (
                    ts_uri,
                    GDS_CORE["signatureForMechanism"],
                    block_uris[ts.mechanism],
                )
            )
        for bname in ts.depends_on_blocks:
            g.add((ts_uri, GDS_CORE["dependsOnBlock"], Literal(bname)))
        if ts.preserves_invariant:
            g.add(
                (
                    ts_uri,
                    GDS_CORE["preservesInvariant"],
                    Literal(ts.preserves_invariant),
                )
            )
        for entity_name, var_name in ts.reads:
            entry = BNode()
            g.add((entry, RDF.type, GDS_CORE["TransitionReadEntry"]))
            g.add((entry, GDS_CORE["readEntity"], Literal(entity_name)))
            g.add((entry, GDS_CORE["readVariable"], Literal(var_name)))
            g.add((ts_uri, GDS_CORE["hasReadEntry"], entry))
        g.add((spec_uri, GDS_CORE["hasTransitionSignature"], ts_uri))

    # State metrics
    for sm_name, sm in spec.state_metrics.items():
        sm_uri = _uri(ns, "state_metric", sm_name)
        g.add((sm_uri, RDF.type, GDS_CORE["StateMetric"]))
        g.add((sm_uri, GDS_CORE["name"], Literal(sm_name)))
        if sm.metric_type:
            g.add((sm_uri, GDS_CORE["metricType"], Literal(sm.metric_type)))
        g.add(
            (
                sm_uri,
                GDS_CORE["metricHasDistance"],
                Literal(sm.distance is not None, datatype=XSD.boolean),
            )
        )
        g.add((sm_uri, GDS_CORE["description"], Literal(sm.description)))
        for entity_name, var_name in sm.variables:
            entry = BNode()
            g.add((entry, RDF.type, GDS_CORE["MetricVariableEntry"]))
            g.add((entry, GDS_CORE["metricEntity"], Literal(entity_name)))
            g.add((entry, GDS_CORE["metricVariable"], Literal(var_name)))
            g.add((sm_uri, GDS_CORE["hasMetricVariable"], entry))
        g.add((spec_uri, GDS_CORE["hasStateMetric"], sm_uri))

    return g

system_ir_to_graph(system, *, base_uri=DEFAULT_BASE_URI)

Export a SystemIR to an RDF graph.

Source code in packages/gds-interchange/gds_interchange/owl/export.py
def system_ir_to_graph(
    system: SystemIR,
    *,
    base_uri: str = DEFAULT_BASE_URI,
) -> Graph:
    """Export a SystemIR to an RDF graph."""
    g = Graph()
    _bind(g)
    ns = _ns(base_uri, system.name)
    g.bind("inst", ns)

    sys_uri = ns["system"]
    g.add((sys_uri, RDF.type, GDS_IR["SystemIR"]))
    g.add((sys_uri, GDS_CORE["name"], Literal(system.name)))
    g.add(
        (
            sys_uri,
            GDS_IR["compositionTypeSystem"],
            Literal(system.composition_type.value),
        )
    )
    if system.source:
        g.add((sys_uri, GDS_IR["sourceLabel"], Literal(system.source)))

    for b in system.blocks:
        b_uri = _block_ir_to_rdf(g, ns, b)
        g.add((sys_uri, GDS_IR["hasBlockIR"], b_uri))

    for idx, w in enumerate(system.wirings):
        w_uri = _wiring_ir_to_rdf(g, ns, w, idx)
        g.add((sys_uri, GDS_IR["hasWiringIR"], w_uri))

    for inp in system.inputs:
        inp_uri = _uri(ns, "input", inp.name)
        g.add((inp_uri, RDF.type, GDS_IR["InputIR"]))
        g.add((inp_uri, GDS_CORE["name"], Literal(inp.name)))
        g.add((sys_uri, GDS_IR["hasInputIR"], inp_uri))

    if system.hierarchy:
        h_uri = _hierarchy_to_rdf(g, ns, system.hierarchy)
        g.add((sys_uri, GDS_IR["hasHierarchy"], h_uri))

    return g

canonical_to_graph(canonical, *, base_uri=DEFAULT_BASE_URI, name='canonical')

Export a CanonicalGDS to an RDF graph.

Source code in packages/gds-interchange/gds_interchange/owl/export.py
def canonical_to_graph(
    canonical: CanonicalGDS,
    *,
    base_uri: str = DEFAULT_BASE_URI,
    name: str = "canonical",
) -> Graph:
    """Export a CanonicalGDS to an RDF graph."""
    g = Graph()
    _bind(g)
    ns = _ns(base_uri, name)
    g.bind("inst", ns)

    can_uri = ns["canonical"]
    g.add((can_uri, RDF.type, GDS_CORE["CanonicalGDS"]))
    g.add((can_uri, GDS_CORE["formula"], Literal(canonical.formula())))

    # State variables
    for entity_name, var_name in canonical.state_variables:
        sv_uri = _uri(ns, "state_var", f"{entity_name}.{var_name}")
        g.add((sv_uri, RDF.type, GDS_CORE["StateVariable"]))
        g.add((sv_uri, GDS_CORE["name"], Literal(var_name)))
        g.add((sv_uri, GDS_CORE["description"], Literal(f"{entity_name}.{var_name}")))
        g.add((can_uri, GDS_CORE["hasVariable"], sv_uri))

    # Block role partitions
    for bname in canonical.boundary_blocks:
        g.add((can_uri, GDS_CORE["boundaryBlock"], Literal(bname)))
    for bname in canonical.control_blocks:
        g.add((can_uri, GDS_CORE["controlBlock"], Literal(bname)))
    for bname in canonical.policy_blocks:
        g.add((can_uri, GDS_CORE["policyBlock"], Literal(bname)))
    for bname in canonical.mechanism_blocks:
        g.add((can_uri, GDS_CORE["mechanismBlock"], Literal(bname)))

    # Update map
    for mech_name, updates in canonical.update_map:
        for entity_name, var_name in updates:
            entry = BNode()
            g.add((entry, RDF.type, GDS_CORE["UpdateMapEntry"]))
            g.add((entry, GDS_CORE["name"], Literal(mech_name)))
            g.add((entry, GDS_CORE["updatesEntity"], Literal(entity_name)))
            g.add((entry, GDS_CORE["updatesVariable"], Literal(var_name)))
            g.add((can_uri, GDS_CORE["updatesEntry"], entry))

    return g

report_to_graph(report, *, base_uri=DEFAULT_BASE_URI)

Export a VerificationReport to an RDF graph.

Source code in packages/gds-interchange/gds_interchange/owl/export.py
def report_to_graph(
    report: VerificationReport,
    *,
    base_uri: str = DEFAULT_BASE_URI,
) -> Graph:
    """Export a VerificationReport to an RDF graph."""
    g = Graph()
    _bind(g)
    ns = _ns(base_uri, report.system_name)
    g.bind("inst", ns)

    report_uri = ns["report"]
    g.add((report_uri, RDF.type, GDS_VERIF["VerificationReport"]))
    g.add((report_uri, GDS_VERIF["systemName"], Literal(report.system_name)))

    for idx, f in enumerate(report.findings):
        f_uri = _uri(ns, "finding", f"{f.check_id}-{idx}")
        g.add((f_uri, RDF.type, GDS_VERIF["Finding"]))
        g.add((f_uri, GDS_VERIF["checkId"], Literal(f.check_id)))
        g.add((f_uri, GDS_VERIF["severity"], Literal(f.severity.value)))
        g.add((f_uri, GDS_VERIF["message"], Literal(f.message)))
        g.add((f_uri, GDS_VERIF["passed"], Literal(f.passed, datatype=XSD.boolean)))
        for elem in f.source_elements:
            g.add((f_uri, GDS_VERIF["sourceElement"], Literal(elem)))
        if f.exportable_predicate:
            g.add(
                (
                    f_uri,
                    GDS_VERIF["exportablePredicate"],
                    Literal(f.exportable_predicate),
                )
            )
        g.add((report_uri, GDS_VERIF["hasFinding"], f_uri))

    return g