from pyiron_workflow.api import NOT_DATA, Workflow
from pyiron_workflow.channels import Channel
from pyiron_workflow.node import Node
from pyiron_workflow.nodes.composite import Composite
from rdflib import Graph
from semantikon.ontology import SNS, get_knowledge_graph
def _extract_data(item: Channel, with_values=True, with_default=True) -> dict:
data = {}
data_dict = {"default": NOT_DATA, "value": NOT_DATA, "type_hint": None}
if not with_values:
data_dict.pop("value")
if not with_default:
data_dict.pop("default")
for key, value in data_dict.items():
if getattr(item, key) is not value:
data[key] = getattr(item, key)
return data
def _is_internal_connection(channel: Channel, workflow: Composite, io_: str) -> bool:
"""
Check if a channel is connected to another channel in the same workflow.
Args:
channel (Channel): The channel to check.
workflow (Composite): The workflow to check whether the channel is connected to.
io_ (str): The IO direction to check.
Returns:
bool: Whether the channel is connected to another channel in the same workflow.
"""
if not channel.connected:
return False
return any(channel.connections[0] in getattr(n, io_) for n in workflow)
def _get_scoped_label(channel: Channel, io_: str) -> str:
return channel.scoped_label.replace("__", f".{io_}.")
def _io_to_dict(
node: Node, with_values: bool = True, with_default: bool = True
) -> dict:
data = {"inputs": {}, "outputs": {}}
is_composite = isinstance(node, Composite)
for io_ in ["inputs", "outputs"]:
for inp in getattr(node, io_):
if is_composite:
data[io_][inp.scoped_label] = _extract_data(
inp, with_values=with_values, with_default=with_default
)
else:
data[io_][inp.label] = _extract_data(
inp, with_values=with_values, with_default=with_default
)
return data
def _export_node_to_dict(
node: Node, with_values: bool = True, with_default: bool = True
) -> dict:
"""
Export a node to a dictionary.
Args:
node (Node): The node to export.
with_values (bool): Whether to include the values of the channels in the
dictionary. (Default is True.)
Returns:
dict: The exported node as a dictionary.
"""
data = {"inputs": {}, "outputs": {}, "function": node.node_function}
data.update(_io_to_dict(node, with_values=with_values, with_default=with_default))
return data
def _export_composite_to_dict(
workflow: Composite, with_values: bool = True, with_default: bool = True
) -> dict:
"""
Export a composite to a dictionary.
Args:
workflow (Composite): The composite to export.
with_values (bool): Whether to include the values of the channels in the
dictionary. (Default is True.)
Returns:
dict: The exported composite as a dictionary.
"""
data = {
"inputs": {},
"outputs": {},
"nodes": {},
"edges": [],
"label": workflow.label,
}
for inp in workflow.inputs:
if inp.value_receiver is not None and inp.value_receiver.owner in workflow:
data["edges"].append(
(
f"inputs.{inp.scoped_label}",
_get_scoped_label(inp.value_receiver, "inputs"),
)
)
for node in workflow:
label = node.label
if isinstance(node, Composite):
data["nodes"][label] = _export_composite_to_dict(
node, with_values=with_values
)
else:
data["nodes"][label] = _export_node_to_dict(node, with_values=with_values)
for inp in node.inputs:
if _is_internal_connection(inp, workflow, "outputs"):
data["edges"].append(
(
_get_scoped_label(inp.connections[0], "outputs"),
_get_scoped_label(inp, "inputs"),
)
)
for out in node.outputs:
if out.value_receiver is not None:
data["edges"].append(
(
_get_scoped_label(out, "outputs"),
f"outputs.{out.value_receiver.scoped_label}",
)
)
data.update(
_io_to_dict(workflow, with_values=with_values, with_default=with_default)
)
return data
[docs]
def export_to_dict(
workflow: Node, with_values: bool = True, with_default: bool = True
) -> dict:
if isinstance(workflow, Composite):
return _export_composite_to_dict(workflow, with_values=with_values)
return _export_node_to_dict(
workflow, with_values=with_values, with_default=with_default
)
[docs]
def parse_workflow(
workflow: Workflow,
with_values: bool = True,
with_default: bool = True,
graph: Graph | None = None,
inherit_properties: bool = True,
ontology=SNS,
append_missing_items: bool = True,
) -> Graph:
"""
Generate RDF graph from a pyiron workflow object
Args:
workflow (pyiron_workflow.workflow.Workflow): workflow object
with_values (bool): include channel values in the graph
with_default (bool): include default values in the graph
graph (rdflib.Graph): graph to add workflow information to
inherit_properties (bool): inherit properties from the ontology
ontology (str): ontology to use
append_missing_items (bool): append missing items for restrictions to
the ontology
Returns:
(rdflib.Graph): graph containing workflow information
"""
wf_dict = export_to_dict(
workflow, with_values=with_values, with_default=with_default
)
return get_knowledge_graph(
wf_dict=wf_dict,
graph=graph,
inherit_properties=inherit_properties,
ontology=ontology,
append_missing_items=append_missing_items,
)