Source code for hed.tools.remodeling.dispatcher

""" Controller for applying operations to tabular files and saving the results. """

import os
import numpy as np
import pandas as pd
import json
from hed.errors.exceptions import HedFileError
from hed.schema.hed_schema_io import load_schema_version
from hed.schema import HedSchema, HedSchemaGroup
from hed.tools.remodeling.backup_manager import BackupManager
from hed.tools.remodeling.operations.valid_operations import valid_operations
from hed.tools.util.io_util import clean_filename, extract_suffix_path, get_timestamp


[docs]class Dispatcher: """ Controller for applying operations to tabular files and saving the results. """ REMODELING_SUMMARY_PATH = 'remodel/summaries'
[docs] def __init__(self, operation_list, data_root=None, backup_name=BackupManager.DEFAULT_BACKUP_NAME, hed_versions=None): """ Constructor for the dispatcher. Parameters: operation_list (list): List of unparsed operations. data_root (str or None): Root directory for the dataset. If none, then backups are not made. hed_versions (str, list, HedSchema, or HedSchemaGroup): The HED schema. :raises HedFileError: - If the specified backup does not exist. :raises ValueError: - If any of the operations cannot be parsed correctly. """ self.data_root = data_root self.backup_name = backup_name self.backup_man = None if self.data_root and backup_name: self.backup_man = BackupManager(data_root) if not self.backup_man.get_backup(self.backup_name): raise HedFileError("BackupDoesNotExist", f"Remodeler cannot be run with a dataset without first creating the " f"{self.backup_name} backup for {self.data_root}", "") op_list, errors = self.parse_operations(operation_list) if errors: these_errors = self.errors_to_str(errors, 'Dispatcher failed due to invalid operations') raise ValueError("InvalidOperationList", f"{these_errors}") self.parsed_ops = op_list self.hed_schema = self.get_schema(hed_versions) self.summary_dicts = {}
[docs] def get_summaries(self, file_formats=['.txt', '.json']): """ Return the summaries in a dictionary of strings suitable for saving or archiving. Parameters: file_formats (list): List of formats for the context files ('.json' and '.txt' are allowed). Returns: list: A list of dictionaries of summaries keyed to filenames. """ summary_list = [] time_stamp = '_' + get_timestamp() for context_name, context_item in self.summary_dicts.items(): file_base = context_item.op.summary_filename if self.data_root: file_base = extract_suffix_path(self.data_root, file_base) file_base = clean_filename(file_base) for file_format in file_formats: if file_format == '.txt': summary = context_item.get_text_summary(individual_summaries="consolidated") summary = summary['Dataset'] elif file_format == '.json': summary = json.dumps(context_item.get_summary(individual_summaries="consolidated"), indent=4) else: continue summary_list.append({'file_name': file_base + time_stamp + file_format, 'file_format': file_format, 'file_type': 'summary', 'content': summary}) return summary_list
[docs] def get_data_file(self, file_designator): """ Get the correct data file give the file designator. Parameters: file_designator (str, DataFrame ): A dataFrame or the full path of the dataframe in the original dataset. Returns: DataFrame: DataFrame after reading the path. :raises HedFileError: - If a valid file cannot be found. Notes: - If a string is passed and there is a backup manager, the string must correspond to the full path of the file in the original dataset. In this case, the corresponding backup file is read and returned. - If a string is passed and there is no backup manager, the data file corresponding to the file_designator is read and returned. - If a Pandas DataFrame, return a copy. """ if isinstance(file_designator, pd.DataFrame): return file_designator.copy() if self.backup_man: actual_path = self.backup_man.get_backup_path(self.backup_name, file_designator) else: actual_path = file_designator try: df = pd.read_csv(actual_path, sep='\t', header=0, keep_default_na=False, na_values=",null") except Exception: raise HedFileError("BadDataFile", f"{str(actual_path)} (orig: {file_designator}) does not correspond to a valid tsv file", "") return df
[docs] def get_summary_save_dir(self): """ Return the directory in which to save the summaries. Returns: str: the data_root + remodeling summary path :raises HedFileError: - If this dispatcher does not have a data_root. """ if self.data_root: return os.path.realpath(os.path.join(self.data_root, 'derivatives', Dispatcher.REMODELING_SUMMARY_PATH)) raise HedFileError("NoDataRoot", f"Dispatcher must have a data root to produce directories", "")
[docs] def run_operations(self, file_path, sidecar=None, verbose=False): """ Run the dispatcher operations on a file. Parameters: file_path (str or DataFrame): Full path of the file to be remodeled or a DataFrame sidecar (Sidecar or file-like): Only needed for HED operations. verbose (bool): If true, print out progress reports Returns: DataFrame: The processed dataframe. """ # string to functions if verbose: print(f"Reading {file_path}...") df = self.get_data_file(file_path) for operation in self.parsed_ops: df = self.prep_data(df) df = operation.do_op(self, df, file_path, sidecar=sidecar) df = self.post_proc_data(df) return df
[docs] def save_summaries(self, save_formats=['.json', '.txt'], individual_summaries="separate", summary_dir=None, task_name=""): """ Save the summary files in the specified formats. Parameters: save_formats (list): A list of formats [".txt", ."json"] individual_summaries (str): "consolidated", "individual", or "none". summary_dir (str or None): Directory for saving summaries. task_name (str): Name of task if summaries separated by task or "" if not separated. Notes: The summaries are saved in the dataset derivatives/remodeling folder if no save_dir is provided. Notes: - "consolidated" means that the overall summary and summaries of individual files are in one summary file. - "individual" means that the summaries of individual files are in separate files. - "none" means that only the overall summary is produced. """ if not save_formats: return if not summary_dir: summary_dir = self.get_summary_save_dir() os.makedirs(summary_dir, exist_ok=True) for summary_name, summary_item in self.summary_dicts.items(): summary_item.save(summary_dir, save_formats, individual_summaries=individual_summaries, task_name=task_name)
[docs] @staticmethod def parse_operations(operation_list): errors = [] operations = [] for index, item in enumerate(operation_list): try: if not isinstance(item, dict): raise TypeError("InvalidOperationFormat", f"Each operations must be a dictionary but operation {str(item)} is {type(item)}") if "operation" not in item: raise KeyError("MissingOperation", f"operation {str(item)} does not have a operation key") if "parameters" not in item: raise KeyError("MissingParameters", f"Operation {str(item)} does not have a parameters key") if item["operation"] not in valid_operations: raise KeyError("OperationNotListedAsValid", f"Operation {item['operation']} must be added to operations_list " f"before it can be executed.") new_operation = valid_operations[item["operation"]](item["parameters"]) operations.append(new_operation) except Exception as ex: errors.append({"index": index, "item": f"{item}", "error_type": type(ex), "error_code": ex.args[0], "error_msg": ex.args[1]}) if errors: return [], errors return operations, []
[docs] @staticmethod def prep_data(df): """ Make a copy and replace all n/a entries in the data frame by np.NaN for processing. Parameters: df (DataFrame) - The DataFrame to be processed. """ return df.replace('n/a', np.NaN)
[docs] @staticmethod def post_proc_data(df): """ Replace all nan entries with 'n/a' for BIDS compliance Parameters: df (DataFrame): The DataFrame to be processed. Returns: DataFrame: DataFrame with the 'np.NAN replaced by 'n/a' """ dtypes = df.dtypes.to_dict() for col_name, typ in dtypes.items(): if typ == 'category': df[col_name] = df[col_name].astype(str) return df.fillna('n/a')
[docs] @staticmethod def errors_to_str(messages, title="", sep='\n'): error_list = [0]*len(messages) for index, message in enumerate(messages): error_list[index] = f"Operation[{message.get('index', None)}] " + \ f"has error:{message.get('error_type', None)}" + \ f" with error code:{message.get('error_code', None)} " + \ f"\n\terror msg:{message.get('error_msg', None)}" errors = sep.join(error_list) if title: return title + sep + errors return errors
[docs] @staticmethod def get_schema(hed_versions): if not hed_versions: return None elif isinstance(hed_versions, str) or isinstance(hed_versions, list): return load_schema_version(hed_versions) elif isinstance(hed_versions, HedSchema) or isinstance(hed_versions, HedSchemaGroup): return hed_versions else: raise ValueError("InvalidHedSchemaOrSchemaVersion", "Expected schema or schema version")