Source code for pyiron_ontology.constructor

# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
"""
A parent class for the constructors of all pyiron ontologies.
"""

from __future__ import annotations

from typing import Optional
from warnings import warn

import owlready2 as owl
import pint

from pyiron_ontology.workflow import NodeTree

UREG = pint.UnitRegistry()


[docs] class Constructor: def __init__( self, name: str, closed: bool = True, strict: bool = False, debug: int = 0, ): onto = owl.get_ontology(f"file://{name}.owl") self.onto = onto self._make_universal_declarations() self._make_specific_declarations() # TODO: Introduce a "from_csv" option for constructing, and leverage # `all_classes=False` in `declare_classes`? self.sync(closed=closed, strict=strict, debug=debug)
[docs] def sync( self, closed=True, infer_property_values=True, infer_data_property_values=True, debug=0, strict=True, ): if closed: owl.close_world(self.onto.PyObject) with self.onto: owl.sync_reasoner_pellet( infer_property_values=infer_property_values, infer_data_property_values=infer_data_property_values, debug=debug, ) inconsistent = list(self.onto.inconsistent_classes()) if len(inconsistent) > 0: msg = f"Inconsistent classes were found in the ontology: {inconsistent}" if strict: raise RuntimeError(msg) else: warn(msg)
[docs] def save(self): self.onto.save()
def _make_specific_declarations(self): pass def _make_universal_declarations(self): with self.onto: class PyironOntoThing(owl.Thing): def get_sources( self, additional_requirements: list[Generic] = None ) -> list[WorkflowThing]: raise NotImplementedError def get_source_tree(self, additional_requirements=None): return build_tree( self, additional_requirements=additional_requirements ) def get_source_path(self, *path_indices: int): return build_path(self, *path_indices) class Parameter(PyironOntoThing): def unit_conversion(self, other_unit: str) -> float: if self.unit is not None: return UREG(self.unit).to(other_unit).magnitude else: raise ValueError("Parameters must have a unit specified") class has_unit(Parameter >> str, owl.FunctionalProperty): class_property_type = ["some"] python_name = "unit" class Generic(Parameter): def get_sources( self, additional_requirements: list[Generic] = None ) -> list[Output]: return [ out for out in self.indirect_outputs if ( out.satisfies(additional_requirements) if additional_requirements is not None else True ) ] @staticmethod def only_get_thing_classes(things): return [ is_a_class for is_a_class in things if isinstance(is_a_class, owl.ThingClass) ] @property def indirect_things(self): return self.only_get_thing_classes(self.INDIRECT_is_a) @property def indirect_io(self) -> list[Parameter]: generic_classes = self.only_get_thing_classes(self.is_a) unique_instances = list( set(generic_classes[0].instances()).union( *[gc.instances() for gc in generic_classes[1:]] ) ) return [p for ui in unique_instances for p in ui.parameters] @property def indirect_outputs(self) -> list[Output]: return [p for p in self.indirect_io if Output in p.is_a] @property def indirect_disjoints_set(self) -> set[Generic]: return get_disjoints_set(self.indirect_things) @property def representation_info(self): """ A more computationally efficient call when you know you need both the `indirect_disjoints` _and_ `indirect_things` properties at once. Returns: list, set: indirect things, indirect disjoints """ indirect_things = self.indirect_things indirect_disjoints = get_disjoints_set(indirect_things) return indirect_things, indirect_disjoints @classmethod def class_is_indirectly_disjoint_with(cls, other: owl.ThingClass): ancestors1 = list(cls.ancestors()) ancestors2 = list(other.ancestors()) combined_disjoints = get_disjoints_set(ancestors1).union( get_disjoints_set(ancestors2) ) combined_ancestors = set(ancestors1).union(ancestors2) return len(combined_disjoints.intersection(combined_ancestors)) > 0 def has_a_representation_among_others(self, others_info): my_things, my_disjoints = self.representation_info return any( compatible_classes( my_things, my_disjoints, other_things, other_disjoints, ) for (other_things, other_disjoints) in others_info ) class WorkflowThing(PyironOntoThing): pass class Function(WorkflowThing): def get_sources( self, additional_requirements: list[Generic] = None ) -> list[Input]: return self.mandatory_inputs @property def inputs(self): return self.mandatory_inputs + self.optional_inputs @property def options(self): return [ opt for inp in self.inputs for opt in [inp.generic] + inp.requirements + inp.transitive_requirements ] class IO(Parameter, WorkflowThing): pass class has_generic(IO >> Generic, owl.FunctionalProperty): python_name = "generic" class has_for_parameter(Generic >> IO, owl.InverseFunctionalProperty): python_name = "parameters" inverse_property = has_generic class has_hdf_path(IO >> str, owl.FunctionalProperty): python_name = "hdf_path" class Output(IO): def get_sources( self, additional_requirements: list[Generic] = None ) -> list[Function]: return [self.output_of] @property def options(self): return self.output_of.options def satisfies(self, requirements: list[Generic]) -> bool: others_info = [ other.representation_info for other in self.options + [self.generic] ] return all( requirement.has_a_representation_among_others(others_info) for requirement in requirements ) class is_output_of(Output >> Function, owl.FunctionalProperty): python_name = "output_of" class has_for_output(Function >> Output, owl.InverseFunctionalProperty): python_name = "outputs" inverse_property = is_output_of class Input(IO): def get_sources( self, additional_requirements: Optional[list[Generic]] = None ) -> list[Output]: return self.get_sources_and_passed_requirements( additional_requirements=additional_requirements )[0] def get_sources_and_passed_requirements( self, additional_requirements: Optional[list[Generic]] = None ) -> tuple[list[Output], list[Generic]]: requirements = self.get_requirements( additional_requirements=additional_requirements ) sources = self.generic.get_sources( additional_requirements=requirements ) return sources, requirements def get_requirements(self, additional_requirements=None): """ For each additional requirement, see if it is as or more specific than an existing requirement (from among the generic class, requirements, and transitive requirements), and if so keep it (discarding the original if in the generic class or requirements, appending if it's a transitive requirement that we're actually receiving). """ if additional_requirements is None: return [self.generic] + self.requirements requirements = [self.generic] + self.requirements base_infos = [other.representation_info for other in requirements] transitive_infos = [ other.representation_info for other in self.transitive_requirements ] for add_req in additional_requirements: add_things, add_disjoints = add_req.representation_info used = False # For early breaking if we use the additional req for i, (base_things, base_disjoints) in enumerate(base_infos): if self.candidate_is_as_or_more_specific_than( add_things, base_disjoints, base_things ): requirements[i] = add_req # Overwrite the thing you're # more specific than used = True break if used: continue for trans_things, trans_disjoints in transitive_infos: # If you haven't found the additional requirement yet, # check if it's in the allowed transitive requirements if compatible_classes( add_things, add_disjoints, trans_things, trans_disjoints, ): requirements.append(add_req) break return requirements @staticmethod def candidate_is_as_or_more_specific_than( candidate_things, ref_disjoints, ref_things ) -> bool: not_disjoint = ( len(ref_disjoints.intersection(candidate_things)) == 0 ) return not_disjoint and set(ref_things).issubset(candidate_things) class is_optional_input_of(Input >> Function, owl.FunctionalProperty): python_name = "optional_input_of" class has_for_optional_input( Function >> Input, owl.InverseFunctionalProperty ): python_name = "optional_inputs" inverse_property = is_optional_input_of class is_mandatory_input_of(Input >> Function, owl.FunctionalProperty): python_name = "mandatory_input_of" class has_for_mandatory_input( Function >> Input, owl.InverseFunctionalProperty ): python_name = "mandatory_inputs" inverse_property = is_mandatory_input_of class has_for_requirement(Input >> Generic): python_name = "requirements" class is_requirement_of(Generic >> Input): python_name = "requirement_of" inverse_property = has_for_requirement class has_for_transitive_requirement(Input >> Generic): python_name = "transitive_requirements" class is_transitive_requirement_of(Generic >> Input): python_name = "transitive_requirement_of" inverse_property = has_for_transitive_requirement owl.AllDisjoint([is_optional_input_of, is_mandatory_input_of]) owl.AllDisjoint([Input, Function, Output, Generic]) def compatible_classes( things1: list[owl.ThingClass], disjoints1: set[owl.ThingClass], things2: list[owl.ThingClass], disjoints2: set[owl.ThingClass], ): """ Given the `is_a` and disjoint classes of two individuals, checks whether they are compatible -- i.e. whether the classes of one are in the disjoints of the other (which would lead to incompatibility). Args: things1 (list[owl.ThingClass]): Classes of the first indivual. disjoints1 (set[owl.ThingClass]): Disjoints of the first individual. things2 (list[owl.ThingClass]): Classes of the second individual. disjoints2 (set[owl.ThingClass]): Returns: (bool): Whether any classes of one individual are in the disjoints of the other. """ # Put 0 first so we can skip the second evaluation when the first fails return ( 0 == len(disjoints1.intersection(things2)) == len(disjoints2.intersection(things1)) ) def get_disjoints_set(classes: list[owl.ThingClass]): """ For a list of things, get the set of all the things they're disjoint to """ disjoints = [] for thing in classes: if thing == owl.Thing: continue try: entities = list(next(thing.disjoints()).entities) # The entities are the actual classes that are disjoint # The entities for each of the disjoints are ideantical, # so we can just use `next` to grab the first one entities.remove(thing) # The entities of our disjoint include us, so remove us disjoints += entities except StopIteration: # If the disjoints are empty, just continue continue return set(disjoints) def build_tree( parameter, parent=None, additional_requirements=None ) -> NodeTree: node = NodeTree(parameter, parent=parent) if isinstance(parameter, Input): ( sources, additional_requirements, ) = parameter.get_sources_and_passed_requirements( additional_requirements=additional_requirements ) # Snag the accepted transitive requirements as well else: sources = parameter.get_sources( additional_requirements=additional_requirements ) for source in sources: build_tree( source, parent=node, additional_requirements=additional_requirements ) return node def build_path( parameter, *path_indices: int, parent=None, additional_requirements=None ): node = NodeTree(parameter, parent=parent) sources = parameter.get_sources( additional_requirements=additional_requirements ) if len(path_indices) > 0: i, path_indices = path_indices[0], path_indices[1:] source = sources[i] _, sources = build_path( source, *path_indices, parent=node, additional_requirements=additional_requirements, ) return node, sources