Source code for hed.tools.analysis.key_map

""" A map of column value keys into new column values. """


import pandas as pd
from hed.errors.exceptions import HedFileError
from hed.tools.util.data_util import get_new_dataframe, get_row_hash, separate_values


[docs]class KeyMap: """ A map of unique column values for remapping columns. Attributes: key_cols (list): A list of column names that will be hashed into the keys for the map. target_cols (list or None): Optional list of column names that will be inserted into data and later remapped. name (str): An optional name of this remap for identification purposes. Notes: This mapping converts all columns in the mapping to strings. The remapping does not support other types of columns. """
[docs] def __init__(self, key_cols, target_cols=None, name=''): """ Information for remapping columns of tabular files. Parameters: key_cols (list): List of columns to be replaced (assumed in the DataFrame) target_cols(list): List of replacement columns (assumed to not be in the DataFrame) name (str): Name associated with this remap (usually a pathname of the events file). """ if not key_cols: raise ValueError("KEY_COLUMNS_EMPTY", "KeyMap key columns must exist", "") self.key_cols = key_cols.copy() if target_cols: self.target_cols = target_cols.copy() else: self.target_cols = [] if set(self.key_cols).intersection(set(self.target_cols)): raise ValueError("KEY_AND_TARGET_COLUMNS_NOT_DISJOINT", f"Key cols {str(key_cols)} and target cols {str(target_cols)} must be disjoint", "") self.name = name self.col_map = pd.DataFrame(columns=self.key_cols + self.target_cols) self.map_dict = {} # Index of key to position in the col_map DataFrame self.count_dict = {} # Keeps a running count of the number of times a key appears in the data
@property def columns(self): return self.key_cols + self.target_cols def __str__(self): temp_list = [f"{self.name} counts for key [{str(self.key_cols)}]:"] for index, row in self.col_map.iterrows(): key_hash = get_row_hash(row, self.columns) temp_list.append(f"{str(list(row.values))}:\t{self.count_dict[key_hash]}") return "\n".join(temp_list)
[docs] def make_template(self, additional_cols=None, show_counts=True): """ Return a dataframe template. Parameters: additional_cols (list or None): Optional list of additional columns to append to the returned dataframe. show_counts (bool): If true, number of times each key combination appears is in first column Returns: DataFrame: A dataframe containing the template. :raises HedFileError: - If additional columns are not disjoint from the key columns. Notes: - The template consists of the unique key columns in this map plus additional columns. """ if additional_cols and set(self.key_cols).intersection(additional_cols): raise HedFileError("AdditionalColumnsNotDisjoint", f"Additional columns {str(additional_cols)} must be disjoint from \ {str(self.columns)} must be disjoint", "") df = self.col_map[self.key_cols].copy() if additional_cols: df[additional_cols] = 'n/a' if show_counts: df.insert(0, 'key_counts', self._get_counts()) return df
def _get_counts(self): counts = [0 for _ in range(len(self.col_map))] for index, row in self.col_map.iterrows(): key_hash = get_row_hash(row, self.key_cols) counts[index] = self.count_dict[key_hash] return counts
[docs] def remap(self, data): """ Remap the columns of a dataframe or columnar file. Parameters: data (DataFrame, str): Columnar data (either DataFrame or filename) whose columns are to be remapped. Returns: tuple: - DataFrame: New dataframe with columns remapped. - list: List of row numbers that had no correspondence in the mapping. :raises HedFileError: - If data is missing some of the key columns. """ df_new = get_new_dataframe(data) present_keys, missing_keys = separate_values(df_new.columns.values.tolist(), self.key_cols) if missing_keys: raise HedFileError("MissingKeys", f"File must have key columns {str(self.key_cols)}", "") self.remove_quotes(df_new, columns=present_keys) df_new[self.target_cols] = 'n/a' missing_indices = self._remap(df_new) return df_new, missing_indices
def _remap(self, df): """ Utility method that iterates through dataframes to do the remapping. Parameters: df (DataFrame): DataFrame in which to perform the mapping. Returns: list: The row numbers that had no correspondence in the mapping. """ missing_indices = [] for index, row in df.iterrows(): key = get_row_hash(row, self.key_cols) key_value = self.map_dict.get(key, None) if key_value is not None: result = self.col_map.iloc[key_value] row[self.target_cols] = result[self.target_cols].values df.iloc[index] = row else: missing_indices.append(index) return missing_indices
[docs] def resort(self): """ Sort the col_map in place by the key columns. """ self.col_map.sort_values(by=self.key_cols, inplace=True, ignore_index=True) for index, row in self.col_map.iterrows(): key_hash = get_row_hash(row, self.key_cols) self.map_dict[key_hash] = index
[docs] def update(self, data, allow_missing=True): """ Update the existing map with information from data. Parameters: data (DataFrame or str): DataFrame or filename of an events file or event map. allow_missing (bool): If true allow missing keys and add as n/a columns. :raises HedFileError: - If there are missing keys and allow_missing is False. """ df = get_new_dataframe(data) col_list = df.columns.values.tolist() keys_present, keys_missing = separate_values(col_list, self.key_cols) if keys_missing and not allow_missing: raise HedFileError("MissingKeyColumn", f"make_template data does not have key columns {str(keys_missing)}", "") base_df = df[keys_present].copy() self.remove_quotes(base_df) if keys_missing: base_df[keys_missing] = 'n/a' if self.target_cols: base_df[self.target_cols] = 'n/a' targets_present, targets_missing = separate_values(col_list, self.target_cols) if targets_present: base_df[targets_present] = df[targets_present].values self._update(base_df)
def _update(self, base_df): """ Update the dictionary of key values based on information in the dataframe. Parameters: base_df (DataFrame): DataFrame of consisting of the columns in the KeyMap """ row_list = [] next_pos = len(self.col_map) for index, row in base_df.iterrows(): key, pos_update = self._handle_update(row, row_list, next_pos) next_pos += pos_update if row_list: df = pd.DataFrame(row_list) self.col_map = pd.concat([self.col_map, df], axis=0, ignore_index=True) def _handle_update(self, row, row_list, next_pos): """ Update the dictionary and counts of the number of times this combination of key columns appears. Parameters: row (DataSeries): Data the values in a row. row_list (list): A list of rows to be appended to hold the unique rows next_pos (int): Index into the Returns: tuple: (key, pos_update) key is the row hash and pos_update is 1 if new row or 0 otherwise. """ key = get_row_hash(row, self.key_cols) pos_update = 0 if key not in self.map_dict: self.map_dict[key] = next_pos row_list.append(row) pos_update = 1 self.count_dict[key] = 0 self.count_dict[key] = self.count_dict[key] + 1 return key, pos_update
[docs] @staticmethod def remove_quotes(df, columns=None): """ Remove quotes from the specified columns and convert to string. Parameters: df (Dataframe): Dataframe to process by removing quotes. columns (list): List of column names. If None, all columns are used. Notes: - Replacement is done in place. """ col_types = df.dtypes if not columns: columns = df.columns.values.tolist() for index, col in enumerate(df.columns): if col in columns and col_types.iloc[index] in ['string', 'object']: df[col] = df[col].astype(str) df.iloc[:, index] = df.iloc[:, index].str.replace('"', '') df.iloc[:, index] = df.iloc[:, index].str.replace("'", "")