Source code for hed.models.base_input
import re
import os
import openpyxl
import pandas
from hed.models.column_mapper import ColumnMapper
from hed.errors.exceptions import HedFileError, HedExceptions
from hed.errors.error_reporter import ErrorHandler
import pandas as pd
[docs]class BaseInput:
""" Superclass representing a basic columnar file. """
TEXT_EXTENSION = ['.tsv', '.txt']
EXCEL_EXTENSION = ['.xlsx']
FILE_EXTENSION = [*TEXT_EXTENSION, *EXCEL_EXTENSION]
STRING_INPUT = 'string'
FILE_INPUT = 'file'
TAB_DELIMITER = '\t'
COMMA_DELIMITER = ','
[docs] def __init__(self, file, file_type=None, worksheet_name=None, has_column_names=True, mapper=None, name=None,
allow_blank_names=True):
""" Constructor for the BaseInput class.
Parameters:
file (str or file-like or pandas dataframe): An xlsx/tsv file to open.
file_type (str or None): ".xlsx" (Excel), ".tsv" or ".txt" (tab-separated text).
Derived from file if file is a filename. Ignored if pandas dataframe.
worksheet_name (str or None): Name of Excel workbook worksheet name to use.
(Not applicable to tsv files.)
has_column_names (bool): True if file has column names.
This value is ignored if you pass in a pandas dataframe.
mapper (ColumnMapper or None): Indicates which columns have HED tags.
See SpreadsheetInput or TabularInput for examples of how to use built-in a ColumnMapper.
name (str or None): Optional field for how this file will report errors.
allow_blank_names(bool): If True, column names can be blank
:raises HedFileError:
- file is blank
- An invalid dataframe was passed with size 0
- An invalid extension was provided
- A duplicate or empty column name appears
- Cannot open the indicated file
- The specified worksheet name does not exist
- If the sidecar file or tabular file had invalid format and could not be read.
"""
if mapper is None:
mapper = ColumnMapper()
self._mapper = mapper
self._has_column_names = has_column_names
self._name = name
# This is the loaded workbook if we loaded originally from an Excel file.
self._loaded_workbook = None
self._worksheet_name = worksheet_name
self._dataframe = None
input_type = file_type
if isinstance(file, str):
if file_type is None:
_, input_type = os.path.splitext(file)
if self.name is None:
self._name = file
self._open_dataframe_file(file, has_column_names, input_type)
column_issues = ColumnMapper.check_for_blank_names(self.columns, allow_blank_names=allow_blank_names)
if column_issues:
raise HedFileError(HedExceptions.BAD_COLUMN_NAMES, "Duplicate or blank columns found. See issues.",
self.name, issues=column_issues)
self.reset_mapper(mapper)
[docs] def reset_mapper(self, new_mapper):
""" Set mapper to a different view of the file.
Parameters:
new_mapper (ColumnMapper): A column mapper to be associated with this base input.
"""
self._mapper = new_mapper
if not self._mapper:
self._mapper = ColumnMapper()
if self._dataframe is not None and self._has_column_names:
columns = self._dataframe.columns
self._mapper.set_column_map(columns)
@property
def dataframe(self):
""" The underlying dataframe. """
return self._dataframe
@property
def dataframe_a(self):
"""Return the assembled dataframe
Probably a placeholder name.
Returns:
Dataframe: the assembled dataframe"""
return self.assemble()
@property
def series_a(self):
"""Return the assembled dataframe as a series
Returns:
Series: the assembled dataframe with columns merged
"""
return self.combine_dataframe(self.assemble())
@property
def series_filtered(self):
"""Return the assembled dataframe as a series, with rows that have the same onset combined
Returns:
Series: the assembled dataframe with columns merged, and the rows filtered together
"""
if self.onsets is not None:
indexed_dict = self._indexed_dict_from_onsets(self.onsets.astype(float))
return self._filter_by_index_list(self.series_a, indexed_dict=indexed_dict)
@staticmethod
def _indexed_dict_from_onsets(onsets):
current_onset = -1000000.0
tol = 1e-9
from collections import defaultdict
indexed_dict = defaultdict(list)
for i, onset in enumerate(onsets):
if abs(onset - current_onset) > tol:
current_onset = onset
indexed_dict[current_onset].append(i)
return indexed_dict
@staticmethod
def _filter_by_index_list(original_series, indexed_dict):
new_series = pd.Series(["n/a"] * len(original_series), dtype=str)
for onset, indices in indexed_dict.items():
if indices:
first_index = indices[0] # Take the first index of each onset group
# Join the corresponding original series entries and place them at the first index
new_series[first_index] = ",".join([str(original_series[i]) for i in indices])
return new_series
@property
def onsets(self):
"""Returns the onset column if it exists"""
if "onset" in self.columns:
return self._dataframe["onset"]
@property
def name(self):
""" Name of the data. """
return self._name
@property
def has_column_names(self):
""" True if dataframe has column names. """
return self._has_column_names
@property
def loaded_workbook(self):
""" The underlying loaded workbooks. """
return self._loaded_workbook
@property
def worksheet_name(self):
""" The worksheet name. """
return self._worksheet_name
[docs] def convert_to_form(self, hed_schema, tag_form):
""" Convert all tags in underlying dataframe to the specified form.
Parameters:
hed_schema (HedSchema): The schema to use to convert tags.
tag_form(str): HedTag property to convert tags to.
Most cases should use convert_to_short or convert_to_long below.
"""
from hed.models.df_util import convert_to_form
convert_to_form(self._dataframe, hed_schema, tag_form, self._mapper.get_tag_columns())
[docs] def convert_to_short(self, hed_schema):
""" Convert all tags in underlying dataframe to short form.
Parameters:
hed_schema (HedSchema): The schema to use to convert tags.
"""
return self.convert_to_form(hed_schema, "short_tag")
[docs] def convert_to_long(self, hed_schema):
""" Convert all tags in underlying dataframe to long form.
Parameters:
hed_schema (HedSchema or None): The schema to use to convert tags.
"""
return self.convert_to_form(hed_schema, "long_tag")
[docs] def shrink_defs(self, hed_schema):
""" Shrinks any def-expand found in the underlying dataframe.
Parameters:
hed_schema (HedSchema or None): The schema to use to identify defs
"""
from df_util import shrink_defs
shrink_defs(self._dataframe, hed_schema=hed_schema, columns=self._mapper.get_tag_columns())
[docs] def expand_defs(self, hed_schema, def_dict):
""" Shrinks any def-expand found in the underlying dataframe.
Parameters:
hed_schema (HedSchema or None): The schema to use to identify defs
def_dict (DefinitionDict): The definitions to expand
"""
from df_util import expand_defs
expand_defs(self._dataframe, hed_schema=hed_schema, def_dict=def_dict, columns=self._mapper.get_tag_columns())
[docs] def to_excel(self, file):
""" Output to an Excel file.
Parameters:
file (str or file-like): Location to save this base input.
:raises ValueError:
- if empty file object was passed
:raises OSError:
- Cannot open the indicated file
"""
if not file:
raise ValueError("Empty file name or object passed in to BaseInput.save.")
dataframe = self._dataframe
if self._loaded_workbook:
old_worksheet = self.get_worksheet(self._worksheet_name)
# Excel spreadsheets are 1 based, then add another 1 for column names if present
adj_row_for_col_names = 1
if self._has_column_names:
adj_row_for_col_names += 1
adj_for_one_based_cols = 1
for row_number, text_file_row in dataframe.iterrows():
for column_number, column_text in enumerate(text_file_row):
cell_value = dataframe.iloc[row_number, column_number]
old_worksheet.cell(row_number + adj_row_for_col_names,
column_number + adj_for_one_based_cols).value = cell_value
self._loaded_workbook.save(file)
else:
dataframe.to_excel(file, header=self._has_column_names)
[docs] def to_csv(self, file=None):
""" Write to file or return as a string.
Parameters:
file (str, file-like, or None): Location to save this file. If None, return as string.
Returns:
None or str: None if file is given or the contents as a str if file is None.
:raises OSError:
- Cannot open the indicated file
"""
dataframe = self._dataframe
csv_string_if_filename_none = dataframe.to_csv(file, '\t', index=False, header=self._has_column_names)
return csv_string_if_filename_none
@property
def columns(self):
""" Returns a list of the column names.
Empty if no column names.
Returns:
columns(list): the column names
"""
columns = []
if self._dataframe is not None and self._has_column_names:
columns = list(self._dataframe.columns)
return columns
[docs] def column_metadata(self):
"""Get the metadata for each column
Returns:
dict: number/ColumnMeta pairs
"""
if self._mapper:
return self._mapper._final_column_map
return {}
[docs] def set_cell(self, row_number, column_number, new_string_obj, tag_form="short_tag"):
""" Replace the specified cell with transformed text.
Parameters:
row_number (int): The row number of the spreadsheet to set.
column_number (int): The column number of the spreadsheet to set.
new_string_obj (HedString): Object with text to put in the given cell.
tag_form (str): Version of the tags (short_tag, long_tag, base_tag, etc)
Notes:
Any attribute of a HedTag that returns a string is a valid value of tag_form.
:raises ValueError:
- There is not a loaded dataframe
:raises KeyError:
- the indicated row/column does not exist
:raises AttributeError:
- The indicated tag_form is not an attribute of HedTag
"""
if self._dataframe is None:
raise ValueError("No data frame loaded")
new_text = new_string_obj.get_as_form(tag_form)
self._dataframe.iloc[row_number, column_number] = new_text
[docs] def get_worksheet(self, worksheet_name=None):
""" Get the requested worksheet.
Parameters:
worksheet_name (str or None): The name of the requested worksheet by name or the first one if None.
Returns:
openpyxl.workbook.Workbook: The workbook request.
Notes:
If None, returns the first worksheet.
:raises KeyError:
- The specified worksheet name does not exist
"""
if worksheet_name and self._loaded_workbook:
# return self._loaded_workbook.get_sheet_by_name(worksheet_name)
return self._loaded_workbook[worksheet_name]
elif self._loaded_workbook:
return self._loaded_workbook.worksheets[0]
else:
return None
@staticmethod
def _get_dataframe_from_worksheet(worksheet, has_headers):
""" Create a dataframe from the worksheet.
Parameters:
worksheet (Worksheet): The loaded worksheet to convert.
has_headers (bool): True if this worksheet has column headers.
Returns:
DataFrame: The converted data frame.
"""
if has_headers:
data = worksheet.values
# first row is columns
cols = next(data)
data = list(data)
return pandas.DataFrame(data, columns=cols, dtype=str)
else:
return pandas.DataFrame(worksheet.values, dtype=str)
[docs] def validate(self, hed_schema, extra_def_dicts=None, name=None, error_handler=None):
"""Creates a SpreadsheetValidator and returns all issues with this fil
Parameters:
hed_schema(HedSchema): The schema to use for validation
extra_def_dicts(list of DefDict or DefDict): all definitions to use for validation
name(str): The name to report errors from this file as
error_handler (ErrorHandler): Error context to use. Creates a new one if None
Returns:
issues (list of dict): A list of issues for hed string
"""
from hed.validator.spreadsheet_validator import SpreadsheetValidator
if not name:
name = self.name
tab_validator = SpreadsheetValidator(hed_schema)
validation_issues = tab_validator.validate(self, self._mapper.get_def_dict(hed_schema, extra_def_dicts), name,
error_handler=error_handler)
return validation_issues
@staticmethod
def _dataframe_has_names(dataframe):
for column in dataframe.columns:
if isinstance(column, str):
return True
return False
[docs] def assemble(self, mapper=None, skip_curly_braces=False):
""" Assembles the hed strings
Parameters:
mapper(ColumnMapper or None): Generally pass none here unless you want special behavior.
skip_curly_braces (bool): If True, don't plug in curly brace values into columns.
Returns:
Dataframe: the assembled dataframe
"""
if mapper is None:
mapper = self._mapper
all_columns = self._handle_transforms(mapper)
if skip_curly_braces:
return all_columns
transformers, _ = mapper.get_transformers()
refs = self.get_column_refs()
column_names = list(transformers)
return self._handle_curly_braces_refs(all_columns, refs, column_names)
def _handle_transforms(self, mapper):
transformers, need_categorical = mapper.get_transformers()
if transformers:
all_columns = self._dataframe
if need_categorical:
all_columns[need_categorical] = all_columns[need_categorical].astype('category')
all_columns = all_columns.transform(transformers)
if need_categorical:
all_columns[need_categorical] = all_columns[need_categorical].astype('str')
else:
all_columns = self._dataframe
return all_columns
@staticmethod
def _replace_ref(text, newvalue, column_ref):
""" Replace column ref in x with y. If it's n/a, delete extra commas/parentheses.
Note: This function could easily be updated to handle non-curly brace values, but it's faster this way.
Parameters:
text (str): The input string containing the ref enclosed in curly braces.
newvalue (str): The replacement value for the ref.
column_ref (str): The ref to be replaced, without curly braces
Returns:
str: The modified string with the ref replaced or removed.
"""
# If it's not n/a, we can just replace directly.
if newvalue != "n/a":
return text.replace(f"{{{column_ref}}}", newvalue)
def _remover(match):
p1 = match.group("p1").count("(")
p2 = match.group("p2").count(")")
if p1 > p2: # We have more starting parens than ending. Make sure we don't remove comma before
output = match.group("c1") + "(" * (p1 - p2)
elif p2 > p1: # We have more ending parens. Make sure we don't remove comma after
output = ")" * (p2 - p1) + match.group("c2")
else:
c1 = match.group("c1")
c2 = match.group("c2")
if c1:
c1 = ""
elif c2:
c2 = ""
output = c1 + c2
return output
# this finds all surrounding commas and parentheses to a reference.
# c1/c2 contain the comma(and possibly spaces) separating this ref from other tags
# p1/p2 contain the parentheses directly surrounding the tag
# All four groups can have spaces.
pattern = r'(?P<c1>[\s,]*)(?P<p1>[(\s]*)\{' + column_ref + r'\}(?P<p2>[\s)]*)(?P<c2>[\s,]*)'
return re.sub(pattern, _remover, text)
@staticmethod
def _handle_curly_braces_refs(df, refs, column_names):
"""
Plug in curly braces with other columns
"""
# Filter out columns and refs that don't exist.
refs = [ref for ref in refs if ref in column_names]
remaining_columns = [column for column in column_names if column not in refs]
# Replace references in the columns we are saving out.
saved_columns = df[refs]
for column_name in remaining_columns:
for replacing_name in refs:
# If the data has no n/a values, this version is MUCH faster.
# column_name_brackets = f"{{{replacing_name}}}"
# df[column_name] = pd.Series(x.replace(column_name_brackets, y) for x, y
# in zip(df[column_name], saved_columns[replacing_name]))
df[column_name] = pd.Series(BaseInput._replace_ref(x, y, replacing_name) for x, y
in zip(df[column_name], saved_columns[replacing_name]))
df = df[remaining_columns]
return df
[docs] @staticmethod
def combine_dataframe(dataframe):
""" Combines all columns in the given dataframe into a single HED string series,
skipping empty columns and columns with empty strings.
Parameters:
dataframe(Dataframe): The dataframe to combine
Returns:
Series: the assembled series
"""
dataframe = dataframe.apply(
lambda x: ', '.join(filter(lambda e: bool(e) and e != "n/a", map(str, x))),
axis=1
)
return dataframe
[docs] def get_def_dict(self, hed_schema, extra_def_dicts=None):
""" Returns the definition dict for this file
Note: Baseclass implementation returns just extra_def_dicts.
Parameters:
hed_schema(HedSchema): used to identify tags to find definitions(if needed)
extra_def_dicts (list, DefinitionDict, or None): Extra dicts to add to the list.
Returns:
DefinitionDict: A single definition dict representing all the data(and extra def dicts)
"""
from hed.models.definition_dict import DefinitionDict
return DefinitionDict(extra_def_dicts, hed_schema)
[docs] def get_column_refs(self):
""" Returns a list of column refs for this file.
Default implementation returns none.
Returns:
column_refs(list): A list of unique column refs found
"""
return []
def _open_dataframe_file(self, file, has_column_names, input_type):
pandas_header = 0
if not has_column_names:
pandas_header = None
if isinstance(file, pandas.DataFrame):
self._dataframe = file.astype(str)
self._has_column_names = self._dataframe_has_names(self._dataframe)
elif not file:
raise HedFileError(HedExceptions.FILE_NOT_FOUND, "Empty file passed to BaseInput.", file)
elif input_type in self.TEXT_EXTENSION:
try:
self._dataframe = pandas.read_csv(file, delimiter='\t', header=pandas_header,
dtype=str, keep_default_na=True, na_values=["", "null"])
except Exception as e:
raise HedFileError(HedExceptions.INVALID_FILE_FORMAT, str(e), self.name) from e
# Convert nan values to a known value
self._dataframe = self._dataframe.fillna("n/a")
elif input_type in self.EXCEL_EXTENSION:
try:
self._loaded_workbook = openpyxl.load_workbook(file)
loaded_worksheet = self.get_worksheet(self._worksheet_name)
self._dataframe = self._get_dataframe_from_worksheet(loaded_worksheet, has_column_names)
except Exception as e:
raise HedFileError(HedExceptions.GENERIC_ERROR, str(e), self.name) from e
else:
raise HedFileError(HedExceptions.INVALID_EXTENSION, "", file)
if self._dataframe.size == 0:
raise HedFileError(HedExceptions.INVALID_DATAFRAME, "Invalid dataframe(malformed datafile, etc)", file)