Source code for hed.schema.schema_io.ontology_util

"""Utility functions for saving as an ontology or dataframe."""

import pandas as pd

from hed.schema.schema_io import schema_util
from hed.errors.exceptions import HedFileError
from hed.schema import hed_schema_df_constants as constants
from hed.schema.hed_schema_constants import HedKey
from hed.schema.schema_io.df_util import remove_prefix, calculate_attribute_type, get_attributes_from_row
from hed.schema.hed_cache import get_library_data

object_type_id_offset = {
    constants.OBJECT_KEY: (100, 300),
    constants.DATA_KEY: (300, 500),
    constants.ANNOTATION_KEY: (500, 700),
    constants.ATTRIBUTE_PROPERTY_KEY: (700, 900),
    constants.VALUE_CLASS_KEY: (1300, 1400),
    constants.UNIT_MODIFIER_KEY: (1400, 1500),
    constants.UNIT_CLASS_KEY: (1500, 1600),
    constants.UNIT_KEY: (1600, 1700),
    constants.TAG_KEY: (2000, -1),  # -1 = go to end of range
}


def _get_hedid_range(schema_name, df_key):
    """ Get the set of HedId's for this object type/schema name.

    Parameters:
        schema_name(str): The known schema name with an assigned id range
        df_key(str): The dataframe range type we're interested in.  a key from constants.DF_SUFFIXES

    Returns:
        number_set(set): A set of all id's in the requested range
    """
    if df_key == constants.STRUCT_KEY:
        raise NotImplementedError("Cannot assign hed_ids struct section")

    library_data = get_library_data(schema_name)
    if not library_data:
        return set()
    starting_id, ending_id = library_data["id_range"]

    start_object_range, end_object_range = object_type_id_offset[df_key]
    if df_key == constants.TAG_KEY:
        initial_tag_adj = 1  # We always skip 1 for tags
    else:
        initial_tag_adj = 0
    final_start = starting_id + start_object_range + initial_tag_adj
    final_end = starting_id + end_object_range
    if end_object_range == -1:
        # Add one since the versions on hed-schemas are set to max_value - 1
        final_end = ending_id + 1
    return set(range(final_start, final_end))


[docs]def get_all_ids(df): """Returns a set of all unique hedIds in the dataframe Parameters: df(pd.DataFrame): The dataframe Returns: numbers(Set or None): None if this has no HED column, otherwise all unique numbers as a set. """ if constants.hed_id in df.columns: modified_df = df[constants.hed_id].apply(lambda x: remove_prefix(x, "HED_")) modified_df = pd.to_numeric(modified_df, errors="coerce").dropna().astype(int) return set(modified_df.unique()) return None
[docs]def update_dataframes_from_schema(dataframes, schema, schema_name="", get_as_ids=False, assign_missing_ids=False): """ Write out schema as a dataframe, then merge in extra columns from dataframes. Parameters: dataframes(dict): A full set of schema spreadsheet formatted dataframes schema(HedSchema): The schema to write into the dataframes: schema_name(str): The name to use to find the schema id range. get_as_ids(bool): If True, replace all known references with HedIds assign_missing_ids(bool): If True, replacing any blank(new) HedIds with valid ones Returns: dataframes(dict of str:pd.DataFrames): The updated dataframes These dataframes can potentially have extra columns """ hedid_errors = [] if not schema_name: schema_name = schema.library # 1. Verify existing HED ids don't conflict between schema/dataframes for df_key, df in dataframes.items(): section_key = constants.section_mapping_hed_id.get(df_key) if not section_key: continue section = schema[section_key] unused_tag_ids = _get_hedid_range(schema_name, df_key) hedid_errors += _verify_hedid_matches(section, df, unused_tag_ids) if hedid_errors: raise HedFileError(hedid_errors[0]['code'], f"{len(hedid_errors)} issues found with hedId mismatches. See the .issues " f"parameter on this exception for more details.", schema.name, issues=hedid_errors) # 2. Get the new schema as DFs from hed.schema.schema_io.schema2df import Schema2DF # Late import as this is recursive output_dfs = Schema2DF(get_as_ids=get_as_ids).process_schema(schema, save_merged=False) if assign_missing_ids: # 3: Add any HED ID's as needed to these generated dfs for df_key, df in output_dfs.items(): if df_key == constants.STRUCT_KEY: continue unused_tag_ids = _get_hedid_range(schema_name, df_key) # If no errors, assign new HED ID's assign_hed_ids_section(df, unused_tag_ids) # 4: Merge the dataframes for df_key in output_dfs.keys(): out_df = output_dfs[df_key] df = dataframes[df_key] merge_dfs(out_df, df) return output_dfs
def _verify_hedid_matches(section, df, unused_tag_ids): """ Verify ID's in both have the same label, and verify all entries in the dataframe are already in the schema Parameters: section(HedSchemaSection): The loaded schema section to compare ID's with df(pd.DataFrame): The loaded spreadsheet dataframe to compare with unused_tag_ids(set): The valid range of ID's for this df Returns: error_list(list of str): A list of errors found matching id's """ hedid_errors = [] for row_number, row in df.iterrows(): if not any(row): continue label = row[constants.name] if label.endswith("-#"): label = label.replace("-#", "/#") df_id = row[constants.hed_id] entry = section.get(label) if not entry: # Neither side has a hedID, so nothing to do. if not df_id: continue hedid_errors += schema_util.format_error(row_number, row, f"'{label}' does not exist in schema file only the spreadsheet.") continue entry_id = entry.attributes.get(HedKey.HedID) if df_id: if not (df_id.startswith("HED_") and len(df_id) == len("HED_0000000")): hedid_errors += schema_util.format_error(row_number, row, f"'{label}' has an improperly formatted hedID in dataframe.") continue id_value = remove_prefix(df_id, "HED_") try: id_int = int(id_value) if id_int not in unused_tag_ids: hedid_errors += schema_util.format_error( row_number, row, f"'{label}' has id {id_int} which is outside " + "of the valid range for this type. Valid range is: " + f"{min(unused_tag_ids)} to {max(unused_tag_ids)}") continue except ValueError: hedid_errors += schema_util.format_error( row_number, row, f"'{label}' has a non-numeric hedID in the dataframe.") continue if entry_id and entry_id != df_id: hedid_errors += schema_util.format_error( row_number, row, f"'{label}' has hedID '{df_id}' in dataframe, but '{entry_id}' in schema.") continue return hedid_errors
[docs]def assign_hed_ids_section(df, unused_tag_ids): """ Adds missing HedIds to dataframe. Parameters: df(pd.DataFrame): The dataframe to add id's to. unused_tag_ids(set of int): The possible HED id's to assign from """ # Remove already used ids unused_tag_ids -= get_all_ids(df) sorted_unused_ids = sorted(unused_tag_ids, reverse=True) for row_number, row in df.iterrows(): hed_id = row[constants.hed_id] # we already verified existing ones if hed_id: continue hed_id = f"HED_{sorted_unused_ids.pop():07d}" row[constants.hed_id] = hed_id
[docs]def merge_dfs(dest_df, source_df): """ Merges extra columns from source_df into dest_df, adding the extra columns from the ontology to the schema df. Args: dest_df: The dataframe to add extra columns to source_df: The dataframe to get extra columns from """ # todo: vectorize this at some point save_df1_columns = dest_df.columns.copy() for index, row in source_df.iterrows(): # Find matching index in df1 based on 'rdfs:label' match_index = dest_df[dest_df['rdfs:label'] == row['rdfs:label']].index if not match_index.empty: for col in source_df.columns: if col not in save_df1_columns: dest_df.at[match_index[0], col] = row[col]
def _get_annotation_prop_ids(schema): annotation_props = dict() for entry in schema.attributes.values(): attribute_type = calculate_attribute_type(entry) if attribute_type == "annotation": annotation_props[entry.name] = entry.attributes[HedKey.HedID] for entry in schema.properties.values(): annotation_props[entry.name] = entry.attributes[HedKey.HedID] return annotation_props
[docs]def get_prefixes(dataframes): prefixes = dataframes.get(constants.PREFIXES_KEY) extensions = dataframes.get(constants.EXTERNAL_ANNOTATION_KEY) if prefixes is None or extensions is None: return {} all_prefixes = {prefix.Prefix: prefix[2] for prefix in prefixes.itertuples()} annotation_terms = {} for row in extensions.itertuples(): annotation_terms[row.Prefix + row.ID] = all_prefixes[row.Prefix] return annotation_terms
[docs]def convert_df_to_omn(dataframes): """ Convert the dataframe format schema to omn format. Parameters: dataframes(dict): A set of dataframes representing a schema, potentially including extra columns Returns: tuple: omn_file(str): A combined string representing (most of) a schema omn file. omn_data(dict): a dict of DF_SUFFIXES:str, representing each .tsv file in omn format. """ from hed.schema.hed_schema_io import from_dataframes annotation_terms = get_prefixes(dataframes) # Load the schema, so we can save it out with ID's schema = from_dataframes(dataframes) # Convert dataframes to hedId format, and add any missing hedId's(generally, they should be replaced before here) dataframes_u = update_dataframes_from_schema(dataframes, schema, get_as_ids=True) # Copy over remaining non schema dataframes. if constants.PREFIXES_KEY in dataframes: dataframes_u[constants.PREFIXES_KEY] = dataframes[constants.PREFIXES_KEY] dataframes_u[constants.EXTERNAL_ANNOTATION_KEY] = dataframes[constants.EXTERNAL_ANNOTATION_KEY] # Write out the new dataframes in omn format annotation_props = _get_annotation_prop_ids(schema) full_text = "" omn_data = {} for suffix, dataframe in dataframes_u.items(): if suffix in constants.DF_EXTRA_SUFFIXES: output_text = _convert_extra_df_to_omn(dataframes_u[suffix], suffix) else: output_text = _convert_df_to_omn(dataframes_u[suffix], annotation_properties=annotation_props, annotation_terms=annotation_terms) omn_data[suffix] = output_text full_text += output_text + "\n" return full_text, omn_data
def _convert_df_to_omn(df, annotation_properties=("",), annotation_terms=None): """Takes a single df format schema and converts it to omn. This is one section, e.g. tags, units, etc. Note: This mostly assumes a fully valid df. A df missing a required column will raise an error. Parameters: df(pd.DataFrame): the dataframe to turn into omn annotation_properties(dict): Known annotation properties, with the values being their hedId. annotation_terms(dict): The list of valid external omn tags, such as "dc:source" Returns: omn_text(str): the omn formatted text for this section """ output_text = "" for index, row in df.iterrows(): prop_type = _get_property_type(row) hed_id = row[constants.hed_id] output_text += f"{prop_type}: hed:{hed_id}\n" output_text += _add_annotation_lines(row, annotation_properties, annotation_terms) if prop_type != "AnnotationProperty": if constants.property_domain in row.index: prop_domain = row[constants.property_domain] output_text += "\tDomain:\n" output_text += f"\t\t{prop_domain}\n" if constants.property_range in row.index: prop_range = row[constants.property_range] output_text += "\tRange:\n" output_text += f"\t\t{prop_range}\n" output_text += "\n" if constants.equivalent_to in row.index: equivalent_to = row[constants.equivalent_to] equivalent_to = equivalent_to.replace(" and ", "\n\t\tand ") subclass_of = row[constants.subclass_of] if equivalent_to: output_text += "\tEquivalentTo:\n" output_text += f"\t\t{equivalent_to}" else: output_text += "\tSubClassOf:\n" output_text += f"\t\t{subclass_of}" output_text += "\n" output_text += "\n" return output_text def _convert_extra_df_to_omn(df, suffix): """Takes a single df format schema and converts it to omn. This is one section, e.g. tags, units, etc. Note: This mostly assumes a fully valid df. A df missing a required column will raise an error. Parameters: df(pd.DataFrame): the dataframe to turn into omn suffix(dict): Known annotation properties, with the values being their hedId. Returns: omn_text(str): the omn formatted text for this section """ output_text = "" for index, row in df.iterrows(): if suffix == constants.PREFIXES_KEY: output_text += f"Prefix: {row[constants.Prefix]} <{row[constants.NamespaceIRI]}>" elif suffix == constants.EXTERNAL_ANNOTATION_KEY: output_text += f"AnnotationProperty: {row[constants.Prefix]}{row[constants.ID]}" else: raise ValueError(f"Unknown tsv suffix attempting to be converted {suffix}") output_text += "\n" return output_text def _split_on_unquoted_commas(input_string): """ Splits the given string into comma separated portions, ignoring commas inside double quotes. Parameters: input_string: The string to split Returns: parts(list): The split apart string. """ # Note: does not handle escaped double quotes. parts = [] current = [] in_quotes = False for char in input_string: if char == '"': in_quotes = not in_quotes if char == ',' and not in_quotes: parts.append(''.join(current).strip()) current = [] else: current.append(char) if current: # Add the last part if there is any. parts.append(''.join(current).strip()) return parts def _split_annotation_values(parts): annotations = dict() for part in parts: key, value = part.split(" ", 1) annotations[key] = value return annotations def _add_annotation_lines(row, annotation_properties, annotation_terms): annotation_lines = [] description = row[constants.description] if description: annotation_lines.append(f"\t\t{constants.description} \"{description}\"") name = row[constants.name] if name: annotation_lines.append(f"\t\t{constants.name} \"{name}\"") # Add annotation properties(other than HedId) attributes = get_attributes_from_row(row) for attribute in attributes: if attribute in annotation_properties and attribute != HedKey.HedID: annotation_id = f"hed:{annotation_properties[attribute]}" value = attributes[attribute] if value is True: value = "true" else: value = f'"{value}"' annotation_lines.append(f"\t\t{annotation_id} {value}") if constants.annotations in row.index: portions = _split_on_unquoted_commas(row[constants.annotations]) annotations = _split_annotation_values(portions) for key, value in annotations.items(): if key not in annotation_terms: raise ValueError(f"Problem. Found {key} which is not in the prefix/annotation list.") annotation_lines.append(f"\t\t{key} {value}") output_text = "" if annotation_lines: output_text += "\tAnnotations:\n" output_text += ",\n".join(annotation_lines) output_text += "\n" return output_text def _get_property_type(row): """Gets the property type from the row.""" return row[constants.property_type] if constants.property_type in row.index else "Class"