""" Controller for applying operations to tabular files and saving the results. """
import os
import numpy as np
import pandas as pd
import json
from hed.errors.exceptions import HedFileError
from hed.schema.hed_schema_io import load_schema_version
from hed.schema import HedSchema, HedSchemaGroup
from hed.tools.remodeling.backup_manager import BackupManager
from hed.tools.remodeling.operations.valid_operations import valid_operations
from hed.tools.util.io_util import clean_filename, extract_suffix_path, get_timestamp
[docs]class Dispatcher:
""" Controller for applying operations to tabular files and saving the results. """
REMODELING_SUMMARY_PATH = 'remodel/summaries'
[docs] def __init__(self, operation_list, data_root=None,
backup_name=BackupManager.DEFAULT_BACKUP_NAME, hed_versions=None):
""" Constructor for the dispatcher.
Parameters:
operation_list (list): List of unparsed operations.
data_root (str or None): Root directory for the dataset. If none, then backups are not made.
hed_versions (str, list, HedSchema, or HedSchemaGroup): The HED schema.
:raises HedFileError:
- If the specified backup does not exist.
:raises ValueError:
- If any of the operations cannot be parsed correctly.
"""
self.data_root = data_root
self.backup_name = backup_name
self.backup_man = None
if self.data_root and backup_name:
self.backup_man = BackupManager(data_root)
if not self.backup_man.get_backup(self.backup_name):
raise HedFileError("BackupDoesNotExist",
f"Remodeler cannot be run with a dataset without first creating the "
f"{self.backup_name} backup for {self.data_root}", "")
op_list, errors = self.parse_operations(operation_list)
if errors:
these_errors = self.errors_to_str(errors, 'Dispatcher failed due to invalid operations')
raise ValueError("InvalidOperationList", f"{these_errors}")
self.parsed_ops = op_list
self.hed_schema = self.get_schema(hed_versions)
self.summary_dicts = {}
[docs] def get_summaries(self, file_formats=['.txt', '.json']):
""" Return the summaries in a dictionary of strings suitable for saving or archiving.
Parameters:
file_formats (list): List of formats for the context files ('.json' and '.txt' are allowed).
Returns:
list: A list of dictionaries of summaries keyed to filenames.
"""
summary_list = []
time_stamp = '_' + get_timestamp()
for context_name, context_item in self.summary_dicts.items():
file_base = context_item.op.summary_filename
if self.data_root:
file_base = extract_suffix_path(self.data_root, file_base)
file_base = clean_filename(file_base)
for file_format in file_formats:
if file_format == '.txt':
summary = context_item.get_text_summary(individual_summaries="consolidated")
summary = summary['Dataset']
elif file_format == '.json':
summary = json.dumps(context_item.get_summary(individual_summaries="consolidated"), indent=4)
else:
continue
summary_list.append({'file_name': file_base + time_stamp + file_format, 'file_format': file_format,
'file_type': 'summary', 'content': summary})
return summary_list
[docs] def get_data_file(self, file_designator):
""" Get the correct data file give the file designator.
Parameters:
file_designator (str, DataFrame ): A dataFrame or the full path of the dataframe in the original dataset.
Returns:
DataFrame: DataFrame after reading the path.
:raises HedFileError:
- If a valid file cannot be found.
Notes:
- If a string is passed and there is a backup manager,
the string must correspond to the full path of the file in the original dataset.
In this case, the corresponding backup file is read and returned.
- If a string is passed and there is no backup manager,
the data file corresponding to the file_designator is read and returned.
- If a Pandas DataFrame, return a copy.
"""
if isinstance(file_designator, pd.DataFrame):
return file_designator.copy()
if self.backup_man:
actual_path = self.backup_man.get_backup_path(self.backup_name, file_designator)
else:
actual_path = file_designator
try:
df = pd.read_csv(actual_path, sep='\t', header=0, keep_default_na=False, na_values=",null")
except Exception:
raise HedFileError("BadDataFile",
f"{str(actual_path)} (orig: {file_designator}) does not correspond to a valid tsv file",
"")
return df
[docs] def get_summary_save_dir(self):
""" Return the directory in which to save the summaries.
Returns:
str: the data_root + remodeling summary path
:raises HedFileError:
- If this dispatcher does not have a data_root.
"""
if self.data_root:
return os.path.realpath(os.path.join(self.data_root, 'derivatives', Dispatcher.REMODELING_SUMMARY_PATH))
raise HedFileError("NoDataRoot", f"Dispatcher must have a data root to produce directories", "")
[docs] def run_operations(self, file_path, sidecar=None, verbose=False):
""" Run the dispatcher operations on a file.
Parameters:
file_path (str or DataFrame): Full path of the file to be remodeled or a DataFrame
sidecar (Sidecar or file-like): Only needed for HED operations.
verbose (bool): If true, print out progress reports
Returns:
DataFrame: The processed dataframe.
"""
# string to functions
if verbose:
print(f"Reading {file_path}...")
df = self.get_data_file(file_path)
for operation in self.parsed_ops:
df = self.prep_data(df)
df = operation.do_op(self, df, file_path, sidecar=sidecar)
df = self.post_proc_data(df)
return df
[docs] def save_summaries(self, save_formats=['.json', '.txt'], individual_summaries="separate",
summary_dir=None, task_name=""):
""" Save the summary files in the specified formats.
Parameters:
save_formats (list): A list of formats [".txt", ."json"]
individual_summaries (str): "consolidated", "individual", or "none".
summary_dir (str or None): Directory for saving summaries.
task_name (str): Name of task if summaries separated by task or "" if not separated.
Notes:
The summaries are saved in the dataset derivatives/remodeling folder if no save_dir is provided.
Notes:
- "consolidated" means that the overall summary and summaries of individual files are in one summary file.
- "individual" means that the summaries of individual files are in separate files.
- "none" means that only the overall summary is produced.
"""
if not save_formats:
return
if not summary_dir:
summary_dir = self.get_summary_save_dir()
os.makedirs(summary_dir, exist_ok=True)
for summary_name, summary_item in self.summary_dicts.items():
summary_item.save(summary_dir, save_formats, individual_summaries=individual_summaries, task_name=task_name)
[docs] @staticmethod
def parse_operations(operation_list):
errors = []
operations = []
for index, item in enumerate(operation_list):
try:
if not isinstance(item, dict):
raise TypeError("InvalidOperationFormat",
f"Each operations must be a dictionary but operation {str(item)} is {type(item)}")
if "operation" not in item:
raise KeyError("MissingOperation",
f"operation {str(item)} does not have a operation key")
if "parameters" not in item:
raise KeyError("MissingParameters",
f"Operation {str(item)} does not have a parameters key")
if item["operation"] not in valid_operations:
raise KeyError("OperationNotListedAsValid",
f"Operation {item['operation']} must be added to operations_list "
f"before it can be executed.")
new_operation = valid_operations[item["operation"]](item["parameters"])
operations.append(new_operation)
except Exception as ex:
errors.append({"index": index, "item": f"{item}", "error_type": type(ex),
"error_code": ex.args[0], "error_msg": ex.args[1]})
if errors:
return [], errors
return operations, []
[docs] @staticmethod
def prep_data(df):
""" Make a copy and replace all n/a entries in the data frame by np.NaN for processing.
Parameters:
df (DataFrame) - The DataFrame to be processed.
"""
return df.replace('n/a', np.NaN)
[docs] @staticmethod
def post_proc_data(df):
""" Replace all nan entries with 'n/a' for BIDS compliance
Parameters:
df (DataFrame): The DataFrame to be processed.
Returns:
DataFrame: DataFrame with the 'np.NAN replaced by 'n/a'
"""
dtypes = df.dtypes.to_dict()
for col_name, typ in dtypes.items():
if typ == 'category':
df[col_name] = df[col_name].astype(str)
return df.fillna('n/a')
[docs] @staticmethod
def errors_to_str(messages, title="", sep='\n'):
error_list = [0]*len(messages)
for index, message in enumerate(messages):
error_list[index] = f"Operation[{message.get('index', None)}] " + \
f"has error:{message.get('error_type', None)}" + \
f" with error code:{message.get('error_code', None)} " + \
f"\n\terror msg:{message.get('error_msg', None)}"
errors = sep.join(error_list)
if title:
return title + sep + errors
return errors
[docs] @staticmethod
def get_schema(hed_versions):
if not hed_versions:
return None
elif isinstance(hed_versions, str) or isinstance(hed_versions, list):
return load_schema_version(hed_versions)
elif isinstance(hed_versions, HedSchema) or isinstance(hed_versions, HedSchemaGroup):
return hed_versions
else:
raise ValueError("InvalidHedSchemaOrSchemaVersion", "Expected schema or schema version")