#!/usr/bin/python3
"""
Extract data from Excel workbooks, csv files, and JSON files.
Usage:
messes extract <metadata_source>... [--delete <metadata_section>...] [options]
messes extract --help
<metadata_source> - tagged input metadata source as csv/json filename or
xlsx_filename[:worksheet_name|regular_expression] or
google_sheets_url[:worksheet_name|regular_expression]
"#export" worksheet name is the default.
Options:
-h, --help - show this help documentation.
-v, --version - show the version.
--silent - print no warning messages.
--output <filename_json> - output json filename.
--compare <filename_json> - compare extracted metadata to given JSONized metadata.
--modify <source> - modification directives worksheet name, regular expression, csv/json filename, or
xlsx_filename:[worksheet_name|regular_expression] or
google_sheets_url[:worksheet_name|regular_expression]
[default: #modify].
--end-modify <source> - apply modification directives after all metadata merging. Requires csv/json filename or
xlsx_filename:[worksheet_name|regular_expression] or
google_sheets_url[:worksheet_name|regular_expression].
--automate <source> - automation directives worksheet name, regular expression, csv/json filename, or
xlsx_filename:[worksheet_name|regular_expression] or
google_sheets_url[:worksheet_name|regular_expression]
[default: #automate].
--save-directives <filename_json> - output filename with modification and automation directives in JSON format.
--save-export <filetype> - output export worksheet with suffix "_export" and with the indicated xlsx/csv format extension.
--show <show_option> - show a part of the metadata. See options below.
--delete <metadata_section>... - delete a section of the JSONized metadata. Section format is tableKey or tableKey,IDKey or tableKey,IDKey,fieldName. These can be regular expressions.
--keep <metadata_tables> - only keep the selected tables. Delete the rest. Table format is tableKey,tableKey,... The tableKey can be a regular expression.
--file-cleaning <remove_regex> - a string or regular expression to remove characters in input files, removes unicode and \r characters by default, enter "None" to disable [default: _x([0-9a-fA-F]{4})_|\r].
Show Options:
tables - show tables in the extracted metadata.
lineage - show parent-child lineages per table.
all - show every option.
Regular Expression Format:
Regular expressions have the form "r'...'" on the command line.
The re.match function is used, which matches from the beginning of a string, meaning that a regular expression matches as if it starts with a "^".
Directives JSON Format:
{
"modification" : { table : { field : { "(exact|regex|levenshtein)\-(first|first\-nowarn|unique|all)" :
{ field_value : { "assign" : { field : value,... }, "append" : { field : value,... }, "prepend" : { field : value,... },
"regex" : { field : regex_pair,... }, "delete" : [ field,... ], "rename" : { old_field : new_field } } } } } }
"automation" : [ { "header_tag_descriptions" : [ { "header" : column_description, "tag" : tag_description, "required" : true|false } ], "exclusion_test" : exclusion_value, "insert" : [ [ cell_content, ... ] ] } ]
}
"""
#
# Written by Hunter Moseley, 06/18/2014
# Copyright Hunter Moseley, 06/18/2014. All rights reserved.
#
# Hugely Revised (Over 75%) by Hunter Moseley, 08/29/2020
# Copyright Hunter Moseley, 08/29/2020. All rights reserved.
#
# Revised significantly by Travis Thompson, 03/03/2023
from __future__ import annotations
import os.path
import copy
import sys
import re
import collections
import pathlib
from typing import TextIO
import json
import urllib.error
import pandas
import docopt
import jellyfish
try:
from messes.extract import cythonized_tagSheet
except ImportError:
from messes.extract import tagSheet as cythonized_tagSheet
from messes import __version__
silent = False
def main() :
args = docopt.docopt(__doc__, version = __version__)
if args["--silent"]:
global silent
silent = True
if args["--file-cleaning"] == "None":
args["--file-cleaning"] = None
tagParser = TagParser()
if any([True for arg in sys.argv if arg == "--modify"]):
modifyDefaulted = False
else:
modifyDefaulted = True
if any([True for arg in sys.argv if arg == "--automate"]):
automateDefaulted = False
else:
automateDefaulted = True
for metadataSource in args["<metadata_source>"]:
tagParser.readMetadata(metadataSource, args["--automate"], automateDefaulted, args["--modify"], modifyDefaulted, args["--file-cleaning"], args["--save-export"])
## --end-modify is needed so that the merged metadata files can all be modified after being merged together.
## Without this each metadatasource only gets its own modification.
if args["--end-modify"] != None:
modificationSource = args["--end-modify"]
if not TagParser.hasFileExtension(modificationSource) and \
not TagParser.isGoogleSheetsFile(modificationSource):
if (reMatch := re.search(r"(.*\.xls[xm]?)", metadataSource)):
modificationFilePath = reMatch.group(1)
modificationSheetName = modificationSource
elif (reMatch := re.search(r"docs.google.com/spreadsheets/d/([^/]*)/.*$", metadataSource)):
modificationFilePath = "https://docs.google.com/spreadsheets/d/" + reMatch.group(1) + "/export?format=xlsx"
modificationSheetName = modificationSource
else:
modificationFilePath = modificationSource
modificationSheetName = None
elif re.search(r"\.xls[xm]?$", modificationSource):
modificationFilePath = modificationSource
modificationSheetName = "#modify"
elif (reMatch := re.search(r"^(.*\.xls[xm]?):(.*)$", modificationSource)):
modificationFilePath = reMatch.group(1)
modificationSheetName = reMatch.group(2)
elif (reMatch := re.search(r"docs.google.com/spreadsheets/d/([^/]*)/[^:]*$", modificationSource)):
modificationFilePath = "https://docs.google.com/spreadsheets/d/" + reMatch.group(1) + "/export?format=xlsx"
modificationSheetName = "#modify"
elif (reMatch := re.search(r"docs.google.com/spreadsheets/d/([^/]*)/.*:(.*)$", modificationSource)):
modificationFilePath = "https://docs.google.com/spreadsheets/d/" + reMatch.group(1) + "/export?format=xlsx"
modificationSheetName = reMatch.group(2)
else:
modificationFilePath = modificationSource
modificationSheetName = None
modificationDirectives = tagParser.readDirectives(modificationFilePath, modificationSheetName, "modification", args["--file-cleaning"])
tagParser.modify(modificationDirectives)
if getattr(tagParser, "unusedModifications", None) != None and not silent:
for (tableKey, fieldKey, comparisonType, modificationID) in tagParser.unusedModifications:
print("Warning: modification directive #" + tableKey + "." + fieldKey + "." + comparisonType + "." + modificationID + " never matched.", file=sys.stderr)
if args["--delete"]:
sections = [ section.split(",") for section in args["--delete"] ]
tagParser.deleteMetadata(sections)
if args["--keep"]:
keep_strings = args["--keep"].split(",")
keep_regexes = []
for string in keep_strings:
if re.match(TagParser.reDetector, string):
keep_regexes.append(re.compile(re.match(TagParser.reDetector, string)[1]))
else:
keep_regexes.append(re.compile("^" + re.escape(string) + "$"))
tables_to_keep = []
for regex in keep_regexes:
for table in tagParser.extraction:
if re.search(regex, table):
tables_to_keep.append(table)
sections = [ [ tableKey ] for tableKey in tagParser.extraction.keys() if tableKey not in tables_to_keep ]
tagParser.deleteMetadata(sections)
if args["--show"]:
validShowSubOption = False
if args["--show"] == "tables" or args["--show"] == "all":
print("Tables: "," ".join(tagParser.extraction.keys()))
validShowSubOption = True
if args["--show"] == "lineage" or args["--show"] == "all":
lineages = tagParser.generateLineages()
tagParser.printLineages(lineages,indentation=0, file=sys.stdout)
validShowSubOption = True
if not validShowSubOption:
print("Unknown sub option for \"--show\" option: \"" + args["--show"] + "\"", file=sys.stderr)
if args["--output"]: # save to JSON
if pathlib.Path(args["--output"]).suffix != ".json":
args["--output"] = args["--output"] + ".json"
with open(args["--output"],'w') as jsonFile :
jsonFile.write(json.dumps(tagParser.extraction, sort_keys=True, indent=2, separators=(',', ': ')))
if args["--compare"]:
comparePath = pathlib.Path(args["--compare"])
if comparePath.exists():
if comparePath.suffix != ".json":
print("Error: The provided file for comparison is not a JSON file.", file=sys.stderr)
else:
with open(comparePath, 'r') as jsonFile:
otherMetadata = json.load(jsonFile)
print("Comparison", file=sys.stdout)
if not tagParser.compare(otherMetadata, file=sys.stdout):
print("No differences detected.", file=sys.stdout)
else:
print("Error: The provided file for comparison does not exist.", file=sys.stderr)
if args["--save-directives"]:
if pathlib.Path(args["--save-directives"]).suffix != ".json":
args["--save-directives"] = args["--save-directives"] + ".json"
if getattr(tagParser, "modificationDirectives", None) != None or getattr(tagParser, "automationDirectives", None) != None:
directives = {}
if getattr(tagParser, "modificationDirectives", None) != None:
directives["modification"] = tagParser.modificationDirectives
if getattr(tagParser, "automationDirectives", None) != None:
directives["automation"] = tagParser.automationDirectives
with open(args["--save-directives"],'w') as jsonFile :
jsonFile.write(json.dumps(directives, sort_keys=True, indent=2, separators=(',', ': ')))
else:
print("There are no directives to save.",file=sys.stderr)
[docs]def xstr(s: str|None) -> str :
"""Returns str(s) or "" if s is None.
Args:
s: input string or None.
Returns:
str(s) or "" if s is None.
"""
return "" if s is None else str(s)
[docs]class Evaluator(object) :
"""Creates object that calls eval with a given record."""
evalDetector = re.compile(r'\s*eval\((.*)\)\s*$')
fieldDetector = re.compile(r'\#(.*)\#$')
reDetector = re.compile(r"r[\"'](.*)[\"']$")
evalSplitter = re.compile(r'(\#[^#]+\#)')
def __init__(self, evalString: str, useFieldTests: bool = True, listAsString: bool = False):
"""Initializer
Args:
evalString: string of the form eval(...) to deliver to eval(), "eval(" and ")" will be removed.
useFieldTests: whether to use field tests in field name modification.
listAsString: whether to convert a list into a single string.
"""
self.evalString = evalString
self.useFieldTests = useFieldTests
self.listAsString = listAsString
tokenList = [token for token in re.split(Evaluator.evalSplitter, evalString) if token != "" and token != None]
self.fieldTests = {}
self.requiredFields = []
finalTokenList = []
regexCount = 1
for token in tokenList:
if reMatch := re.match(Evaluator.fieldDetector, token):
fieldString = reMatch.group(1)
if reMatch := re.match(Evaluator.reDetector, fieldString):
fieldString = "REGEX" + str(regexCount)
regexCount += 1
finalTokenList.append(fieldString)
self.fieldTests[fieldString] = re.compile(reMatch.group(1))
else:
finalTokenList.append(fieldString.replace("%", "_PERCENT_"))
self.requiredFields.append(fieldString)
else:
finalTokenList.append(token)
if not useFieldTests:
self.requiredFields.extend(self.fieldTests.keys())
self.code = compile("".join(finalTokenList), self.evalString, "eval")
[docs] def evaluate(self, record: dict) -> str|list:
"""Return eval results for the given record.
Args:
record: record from TagParser.extraction.
Returns:
The results from eval() with the record's contents.
"""
restricted = { field.replace("%","_PERCENT_") : record[field] for field in self.requiredFields }
if self.useFieldTests and self.fieldTests:
restricted.update({ [(fieldName, value) for field, value in record.items() if re.search(fieldTest,field)][0] for fieldName, fieldTest in self.fieldTests.items() })
value = eval(self.code,restricted)
if type(value) == list:
if self.listAsString:
return ";".join(value)
else:
return value
else:
return xstr(eval(self.code,restricted))
[docs] def hasRequiredFields(self, record: dict) -> bool:
"""Returns whether the record has all required fields.
Args:
record: record from TagParser.extraction.
Returns:
True if the record has all required fiels, False otherwise.
"""
return all(field in record for field in self.requiredFields) and ( not self.useFieldTests or not self.fieldTests or \
all([ len([(fieldName, value) for field, value in record.items() if re.search(fieldTest,field)]) == 1 for fieldName, fieldTest in self.fieldTests.items() ]) )
[docs] @staticmethod
def isEvalString(evalString: str) -> re.Match|None:
"""Tests whether the evalString is of the form r"^eval(...)$"
Args:
evalString: a string to determine whether or not it is of the eval variety.
Returns:
An re.Match object if the evalString is indeed an eval string, or None if it is not.
"""
return re.match(Evaluator.evalDetector, evalString)
[docs]class Operand(object) :
"""Class of objects that create string operands for concatenation operations."""
def __init__(self, value: str|int) :
"""Initializer
Args:
value: a string or int that represents the value of the operand.
"""
self.value = value
def __call__(self, record: dict, row: pandas.core.series.Series) :
"""Passes, exists to be overridden.
Args:
record: record from TagParser.extraction.
row: pandas Series that is a row from metadata being parsed.
"""
pass
[docs]class LiteralOperand(Operand) :
"""Represents string literal operands."""
def __call__(self, record: dict, row: pandas.core.series.Series) -> str:
"""Returns string value.
Args:
record: record from TagParser.extraction.
row: pandas Series that is a row from metadata being parsed.
Returns:
String value of the operand.
"""
return self.value
[docs]class VariableOperand(Operand) :
"""Represents #table.record%attribute variable operands."""
def __call__(self, record: dict, row: pandas.core.series.Series) -> str:
"""Returns record field value.
Args:
record: record from TagParser.extraction.
row: pandas Series that is a row from metadata being parsed.
Returns:
The value of the record's field where field is the operand's value.
"""
return record[self.value]
[docs]class ColumnOperand(Operand) :
"""Represents specific worksheet cells in a given column as operands."""
def __call__(self, record: dict, row: pandas.core.series.Series) -> str:
"""Rerurns column value in the given row.
Args:
record: record from TagParser.extraction.
row: pandas Series that is a row from metadata being parsed.
Returns:
xstr(row.iloc[self.value]).strip() of a column in the row. The column returned is the index based on the operand's value.
"""
return xstr(row.iloc[self.value]).strip()
[docs]class FieldMaker(object) :
"""Creates objects that convert specific information from a worksheet row into a field via concatenation of a list of operands."""
def __init__(self, field: str) :
"""Initializer
Args:
field: name of a field in a record from TagParser.extraction.
"""
self.field = field
self.operands = []
[docs] def create(self, record: dict, row: pandas.core.series.Series) -> str:
"""Creates field-value and adds to record using row and record.
Args:
record: record from TagParser.extraction.
row: pandas Series that is a row from metadata being parsed.
Returns:
Value created by applying all operands in self.operands and written into record[self.field].
"""
value = ""
for operand in self.operands :
value += operand(record, row)
record[self.field] = value
return value
[docs] def shallowClone(self) -> FieldMaker:
"""Returns clone with shallow copy of operands.
Returns:
A copy of self, but with a shallow copy of operands.
"""
clone = FieldMaker(self.field)
clone.operands = self.operands
return clone
[docs]class ListFieldMaker(FieldMaker) :
"""Creates objects that convert specific information from a worksheet row into a list field via appending of a list of operands."""
[docs] def create(self, record: dict, row: pandas.core.series.Series) -> list:
"""Creates field-value and adds to record using PARAMETERS row and record.
Args:
record: record from TagParser.extraction.
row: pandas Series that is a row from metadata being parsed.
Returns:
Value created by applying all operands in self.operands and written into record[self.field].
"""
value = []
for operand in self.operands :
if isinstance(operand, ColumnOperand) : # split column operands into separate values.
## If the list field contains semicolons use it to split instead of commas.
if re.match(r".*;.*", operand(record, row)):
value.extend(operand(record, row).strip(";").split(";"))
else:
value.extend(operand(record, row).strip(",").split(","))
else :
value.append(operand(record, row))
if self.field in record :
record[self.field].extend(value)
else :
record[self.field] = value
return value
## Currently I don't think this can be called from the CLI.
## The only time shallowClone is called is when a child is created and that is
## only ever called on a FieldMaker type, not ListFieldMaker.
[docs] def shallowClone(self) -> ListFieldMaker:
"""Returns clone with shallow copy of operands.
Returns:
A copy of self, but with a shallow copy of operands.
"""
clone = ListFieldMaker(self.field)
clone.operands = self.operands
return clone
[docs]class RecordMaker(object) :
"""Creates objects that convert worksheet rows into records for specific tables."""
def __init__(self) :
"""Initializer"""
self.table = ""
self.fieldMakers = []
[docs] @staticmethod
def child(example: RecordMaker, table: str, parentIDIndex: int) -> RecordMaker:
"""Returns child object derived from a example object.
Args:
example: RecordMaker with global literal fields.
table: table where the child record will go.
parentIDIndex: column index for parentID of the child record.
Returns:
RecordMaker to make a new child record.
"""
child = RecordMaker()
child.table = table
child.fieldMakers = [ maker.shallowClone() for maker in example.fieldMakers ]
for maker in child.fieldMakers :
if (reMatch := re.match('(\w*)\.(.*)$', maker.field)) and reMatch.group(1) == table :
maker.field = reMatch.group(2)
child.addField(table,"parent_id")
child.addColumnOperand(parentIDIndex)
return child
[docs] def create(self, row: pandas.core.series.Series) -> tuple[str,dict]:
"""Returns record created from given row.
Args:
row: pandas Series that is a row from metadata being parsed.
Returns:
The table string and created record in a tuple.
"""
record = {}
for fieldMaker in self.fieldMakers :
fieldMaker.create(record, row)
return self.table, record
[docs] def addField(self, table: str, field: str, fieldMakerClass: FieldMaker|ListFieldMaker = FieldMaker):
"""Creates and adds new FieldMaker object.
Args:
table: table name to add.
field: field name to add.
fieldMakerClass: which type of FieldMaker to add to self.fieldMakers.
"""
if self.table == "" :
self.table = table
field = self.properField(table,field)
self.fieldMakers.append(fieldMakerClass(field))
[docs] def addGlobalField(self, table: str, field: str, literal: str, fieldMakerClass: FieldMaker|ListFieldMaker = FieldMaker) :
"""Creates and adds new FieldMaker with literal operand that will be used as global fields for all records created from a row.
Args:
table: table name to add.
field: field name to add.
literal: value of the field to be added.
fieldMakerClass: which type of FieldMaker to add to self.fieldMakers.
"""
field = table + "." + field
self.fieldMakers.append(fieldMakerClass(field))
self.fieldMakers[-1].operands.append(LiteralOperand(literal))
[docs] def addVariableOperand(self, table: str, field: str) :
"""Add field as a variable operand to the last FieldMaker.
Args:
table: table name to add.
field: field name to add.
"""
field = self.properField(table,field)
self.fieldMakers[-1].operands.append(VariableOperand(field))
[docs] def addLiteralOperand(self, literal: str) :
"""Add literal as an operand to the last FieldMaker.
Args:
literal: value to append.
"""
self.fieldMakers[-1].operands.append(LiteralOperand(literal))
[docs] def addColumnOperand(self, columnIndex: int) :
"""Add columnIndex as a column variable operand to the last FieldMaker.
Args:
columnIndex: column number to add.
"""
self.fieldMakers[-1].operands.append(ColumnOperand(columnIndex))
[docs] def isInvalidDuplicateField(self, table: str, field: str, fieldMakerClass: FieldMaker|ListFieldMaker) -> bool:
"""Returns whether a given table.field is an invalid duplicate in the current RecordMaker.
Args:
table: table name to look for.
field: field name to look for.
fieldMakerClass: uses this type to do the correct checks.
Returns:
True if table.field is an invalid duplicate, False otherwise.
"""
field = self.properField(table,field)
return (fieldMakerClass == FieldMaker and self.hasShortField(field)) or len([ index for index in range(len(self.fieldMakers)) if self.fieldMakers[index].field == field and not isinstance(self.fieldMakers[index], ListFieldMaker) ]) > 0
[docs] def hasField(self, table: str, field: str, offset: int =0) -> bool:
"""Returns whether a given table.field exists in the current RecordMaker.
Args:
table: table name to look for.
field: field name to look for.
offset: offset from end to stop looking for #table.field.
Returns:
True if table.field exists, False otherwise.
"""
field = self.properField(table,field)
return self.hasShortField(field, offset)
[docs] def hasShortField(self, field: str, offset: int =0) -> bool:
"""Returns whether a given field exists in the current RecordMaker.
Args:
field: field name to look for.
offset: offset from end to stop looking for #table.field.
Returns:
True if field exists, False otherwise.
"""
return len([ index for index in range(len(self.fieldMakers)-offset) if self.fieldMakers[index].field == field ]) > 0
[docs] def isLastField(self, table: str, field: str) -> bool:
"""Returns whether the last FieldMaker is for table.field.
Args:
table: table name to look for.
field: field name to look for.
Returns:
True if the last FieldMaker is for table.field, False otherwise.
"""
field = self.properField(table,field)
return self.fieldMakers[-1].field == field
[docs] def hasValidID(self) -> bool :
"""Returns whether there is a valid id field.
Returns:
True if there is a valid id field, False otherwise.
"""
return self.hasShortField("id") and type(self.shortField("id").operands[0]) is ColumnOperand and not type(self.shortField("id")) == ListFieldMaker
[docs] def properField(self, table: str, field: str) -> str:
"""Returns proper field name based on given table and field and internal self.table.
Args:
table: table name to check against internal table name and build proper field name with.
field: field name to build proper field name with.
Returns:
"table.field" with the appropriate table.
"""
if table != self.table :
field = table + "." + field
return field
## This is currently never called anywhere, so cannot be tested through the CLI.
[docs] def field(self, table: str, field: str) -> FieldMaker|ListFieldMaker|None:
"""Returns FieldMaker for table.field.
Args:
table: table name to look for FieldMaker.
field: field name to look for FieldMaker.
Returns:
The FieldMaker for the table.field.
"""
field = self.properField(table,field)
return self.shortField(field)
[docs] def shortField(self, field: str) -> FieldMaker|ListFieldMaker|None:
"""Returns FieldMaker for field.
Args:
field: field name to look for FieldMaker.
Returns:
The FieldMaker for the field.
"""
for fieldMaker in self.fieldMakers :
if fieldMaker.field == field :
return fieldMaker
[docs]class TagParserError(Exception):
"""Exception class for errors thrown by TagParser."""
def __init__(self, message: str, fileName: str, sheetName: str, rowIndex: int, columnIndex: int, endMessage: str =""):
"""
Args:
message: start of the message for the exception.
fileName: the file name where the exception happened.
sheetName: the sheet name in the Excel file where the exception happened.
rowIndex: the row index in the tabular file where the exception happened.
columnIndex: the column index in the tabular file where the exception happened.
endMessage: the optional end of the message for the exception.
"""
if re.search(r"\.xls[xm]?$", fileName):
cellName = TagParserError.columnName(columnIndex) + str(rowIndex+1)
else:
cellName = "col " + str(columnIndex+1) + ", row " + str(rowIndex+1)
self.value = message + " at cell \"" + fileName + ":" + sheetName + "[" + cellName + "]\"" + endMessage
[docs] @staticmethod
def columnName(columnIndex: int) -> str:
"""Returns Excel-style column name for columnIndex (integer).
Args:
columnIndex: index of the column in the spreadsheet.
Returns:
If columnIndex is less than 0 return ":", else return the capital letter(s) of the Excel colummn, Ex: columnIndex = 3 returns "D"
"""
if columnIndex < 0 :
return ":"
dividend = columnIndex+1
name = ""
while dividend > 0 :
modulo = (dividend - 1 ) % 26
name = chr(65+modulo) + name
dividend = int((dividend - modulo) / 26)
return name
def __str__(self) :
return repr(self.value)
[docs]class TagParser(object):
"""Creates parser objects that convert tagged .xlsx worksheets into nested dictionary structures for metadata capture."""
def __init__(self):
self.extraction = {}
self.tablesAndFieldsToTrack = {}
self.tableRecordsToAddTo = {}
self.trackedFieldsDict = {}
reDetector = re.compile(r"r[\"'](.*)[\"']$")
@staticmethod
def _isEmptyRow(row: pandas.core.series.Series) -> bool:
"""Returns True if row is empty.
Args:
row: row from a tabular file.
Returns:
True if each value in row is the empty string after stripping, False otherwise.
"""
for cell in row :
if xstr(cell).strip() != "" :
return False
return True
def _determineTableField(self, params: tuple[str]|tuple[str,str]|tuple[str,str,str]) -> tuple[str,str]:
"""Returns table and field based on params tuple and last table and field set.
If table is in params use that for table, else use self.lastTable.
If field is in params use that for field, else use self.lastField.
If attribute is in params add that to field.
Args:
params: (attribute) or (table, field) or (table, field, attribute), generally the groups from a regular expression.
Returns:
(table, field) or (table, field%attribute)
Raises:
TagParserError: if the table or field name are undefined.
"""
if len(params) > 1 :
table = params[0]
field = params[1]
if len(params) > 2 :
attribute = params[2]
else :
attribute = ""
else :
table = ""
field = ""
attribute = params[0]
if table == "" :
if self.lastTable == "" :
raise TagParserError("Undefined table name", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
table = self.lastTable
else :
self.lastTable = table
if field == "" :
if self.lastField == "" :
## There does not appear to be a way to get to this error from the CLI.
## Any tag missing a field triggers a different error.
raise TagParserError("Undefined field name", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
field = self.lastField
else :
self.lastField = field
if attribute != "" :
return table, field + "%" + attribute
return table, field
cellSplitter = re.compile(r'([*=+;,]|\"[^\"]*\"|#\w+\s*\w+\.(?:\w+\s*\w+%\w+\s*\w+|\w+\s*\w+))|\s+')
stringExtractor = re.compile(r'\"(.*)\"$')
operatorDetector = re.compile(r'[=+]')
wordDetector = re.compile(r'\w+')
wordOnlyDetector = re.compile('\w+$')
tagDetector = re.compile(r'#')
childDetector = re.compile(r'#.*\%child')
childFieldDetector = re.compile(r'#(\w*)\%child\.(\w+)$')
childFieldAttributeDetector = re.compile(r'#(\w*)\%child\.(\w+)\%(\w+)$')
emptyChildDetector = re.compile(r'#(\w*)\%child$')
tableFieldAttributeDetector = re.compile(r'#(\w*)\.(\w+)\%(\w+)$')
tableFieldDetector = re.compile(r'#([\w\s-]*)\.(\w+|\w+\.id)$')
attributeDetector = re.compile('#\%(\w+)$')
trackFieldDetector = re.compile(r'#(\w*)\%track$')
untrackFieldDetector = re.compile(r'#(\w*)\%untrack$')
def _parseHeaderCell(self, recordMakers: list[RecordMaker], cellString: str, childWithoutID: bool) -> bool:
"""Parses header cell and return the current state of ID inclusion of current child record.
Parse cellString and modify recordMakers so they can be used later to create records.
Args:
recordMakers: list of recordMaker objects.
cellString: contents of the header cell.
childWithoutID: whether the current child record has an id or not.
Returns:
True if the current child record does not have an id, False if it does.
Raises:
TagParserError: If any of the tags are misformed an error will be raised.
"""
if self.columnIndex == 0 and (re.search(TagParser.childDetector, cellString)) :
raise TagParserError("#.%child tag not allowed in first column", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if self.columnIndex != 0 and re.search('#tags', cellString) :
raise TagParserError("#tags only allowed in first column", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
tokens = [ x for x in re.split(TagParser.cellSplitter, cellString) if x != "" and x != None ]
tokens = [ x if (reMatch := re.match(TagParser.stringExtractor, x)) == None else reMatch.group(1) for x in tokens ]
assignment = False
fieldMakerClass = FieldMaker
while len(tokens) > 0 :
token = tokens.pop(0)
# check for common errors
## This cannot be triggered from the CLI with #tags in assignment. It will hit another error about #tags only being on the first column first.
if assignment and (token == '#table' or token == "#tags" or re.match(TagParser.childDetector, token)) :
raise TagParserError("#table, #tags, or #%child tags in assignment", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if len(tokens) > 0 and re.match(TagParser.operatorDetector,token) and re.match(TagParser.operatorDetector,tokens[0]) :
raise TagParserError("tandem +/= operators without intervening operand", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if len(tokens) > 0 and re.search(TagParser.wordDetector,token) and re.search(TagParser.wordDetector,tokens[0]) :
raise TagParserError("tandem literal/tag without intervening operator", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if re.match(TagParser.operatorDetector,token) and ( len(tokens) == 0 or tokens[0] == ';' ) :
raise TagParserError("+/= operator without second operand", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if token == '+' and not assignment :
raise TagParserError("+ operator not in an assignment", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if token == ',' and (not assignment or fieldMakerClass != ListFieldMaker) :
raise TagParserError(", operator not in a list field assignment", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if token == '=' and assignment :
raise TagParserError("second = operator in an assignment", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if token == '*' and (assignment or len(tokens) == 0 or not re.match(TagParser.tagDetector,tokens[0]) ) :
raise TagParserError("* operator is not at the beginning of a field tag", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if token == '#tags' :
pass
elif token == '#table' :
if len(tokens) < 2 or tokens[0] != '=' or not re.match(TagParser.wordOnlyDetector,tokens[1]) :
raise TagParserError("#table tag without assignment", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
tokens.pop(0)
self.lastTable = tokens.pop(0)
elif re.match(TagParser.emptyChildDetector, token) :
raise TagParserError("child tag with no field", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
elif (reMatch := re.match(TagParser.childFieldAttributeDetector, token)) or (reMatch := re.match(TagParser.childFieldDetector, token)) : # #table%child.field.attribute combinations
if not recordMakers[1].hasValidID() :
raise TagParserError("no id field in parent record", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
table, field = self._determineTableField(reMatch.groups())
if field != "id" and len(tokens) > 0 and tokens[0] == "=" :
raise TagParserError("no assignment allowed with explicit child field", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if field != "id" and childWithoutID :
raise TagParserError("second explicit non-id child field specified", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if field == "id" and childWithoutID and table != recordMakers[-1].table :
raise TagParserError("second explicit non-id child field specified", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if not childWithoutID :
recordMakers.append(RecordMaker.child(recordMakers[0], table, recordMakers[1].shortField("id").operands[0].value))
## As far as I can tell this error is impossible to reach from the CLI. Trying to create duplicate fields will lead to triggering one of
## second explicit errors above.
if recordMakers[-1].isInvalidDuplicateField(table, field, fieldMakerClass) :
raise TagParserError(str("field \"") + field + "\" specified twice in " + table + " record", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
recordMakers[-1].addField(table, field, fieldMakerClass)
if field == "id" :
childWithoutID = False
if len(tokens) > 0 and tokens[0] == "=" :
recordMakers[-1].addColumnOperand(recordMakers[1].shortField("id").operands[0].value)
else :
recordMakers[-1].addColumnOperand(self.columnIndex)
else :
childWithoutID = True
recordMakers[-1].addColumnOperand(self.columnIndex)
elif (reMatch := re.match(TagParser.trackFieldDetector, token)) :
if len(tokens) < 2 or tokens[0] != "=":
raise TagParserError("Incorrectly formatted track tag, \"=\" must follow \"track\" and \"table.field\" or \"table.field%attribute\" must follow \"=\"", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
## Munch the =.
tokens.pop(0)
nextToken = tokens.pop(0)
while True:
if not re.match(r"(\w+\.\w+)|(\w+\.\w+%\w+)", nextToken):
raise TagParserError("Incorrectly formatted track tag, the field or attribute to be tracked is malformed, must be \"table.field\" or \"table.field%attribute\"", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if reMatch.groups()[0] == "":
tableToAddTo = self.lastTable
else:
tableToAddTo = reMatch.groups()[0]
self.lastTable = tableToAddTo
split = nextToken.split(".")
fieldTable = split[0]
field = split[1]
if fieldTable in self.tablesAndFieldsToTrack:
self.tablesAndFieldsToTrack[fieldTable].add(field)
else:
self.tablesAndFieldsToTrack[fieldTable] = set([field])
if tableToAddTo in self.tableRecordsToAddTo:
self.tableRecordsToAddTo[tableToAddTo].add(nextToken)
else:
self.tableRecordsToAddTo[tableToAddTo] = set([nextToken])
if nextToken not in self.trackedFieldsDict:
self.trackedFieldsDict[nextToken] = ""
if tokens:
nextToken = tokens.pop(0)
if nextToken == ",":
nextToken = tokens.pop(0)
elif nextToken == ";":
break
else:
break
elif (reMatch := re.match(TagParser.untrackFieldDetector, token)) :
if len(tokens) < 2 or tokens[0] != "=":
raise TagParserError("Incorrectly formatted untrack tag, \"=\" must follow \"track\" and \"table.field\" or \"table.field%attribute\" must follow \"=\"", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
## Munch the =.
tokens.pop(0)
nextToken = tokens.pop(0)
while True:
if not re.match(r"(\w+\.\w+)|(\w+\.\w+%\w+)", nextToken):
raise TagParserError("Incorrectly formatted untrack tag, the field or attribute to be tracked is malformed, must be \"table.field\" or \"table.field%attribute\"", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if reMatch.groups()[0] == "":
tableToAddTo = self.lastTable
else:
tableToAddTo = reMatch.groups()[0]
self.lastTable = tableToAddTo
split = nextToken.split(".")
fieldTable = split[0]
field = split[1]
if tableToAddTo in self.tableRecordsToAddTo:
self.tableRecordsToAddTo[tableToAddTo].discard(nextToken)
if len(self.tableRecordsToAddTo[tableToAddTo]) == 0:
del self.tableRecordsToAddTo[tableToAddTo]
tableAndFieldInOtherTables = False
for table, fields in self.tableRecordsToAddTo.items():
if nextToken in fields:
tableAndFieldInOtherTables = True
break
if not tableAndFieldInOtherTables:
del self.trackedFieldsDict[nextToken]
self.tablesAndFieldsToTrack[fieldTable].discard(field)
if len(self.tablesAndFieldsToTrack[fieldTable]) == 0:
del self.tablesAndFieldsToTrack[fieldTable]
if tokens:
nextToken = tokens.pop(0)
if nextToken == ",":
nextToken = tokens.pop(0)
elif nextToken == ";":
break
else:
break
elif (reMatch := re.match(TagParser.tableFieldAttributeDetector, token)) or (reMatch := re.match(TagParser.tableFieldDetector, token)) or (reMatch := re.match(TagParser.attributeDetector, token)) : #table.field.attribute combinations
table, field = self._determineTableField(reMatch.groups())
currentTable = recordMakers[-1].table
if currentTable != "" and currentTable != table:
raise TagParserError("second table specified after first table, if trying to specify an id to another table use #.table.id", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if self.columnIndex == 0 :
if len(tokens) < 2 or tokens[0] != '=' or re.match(TagParser.tagDetector, tokens[1]) :
raise TagParserError("tags without assignment in first column", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
tokens.pop(0)
recordMakers[0].addGlobalField(table, field, tokens.pop(0))
elif assignment :
if not recordMakers[-1].hasField(table, field, 1) or recordMakers[-1].isLastField(table,field) :
raise TagParserError("the field or attribute value used for assignment is not previously defined in record", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
recordMakers[-1].addVariableOperand(table, field)
else :
if recordMakers[-1].isInvalidDuplicateField(table, field, fieldMakerClass) :
raise TagParserError(str("field \"") + field + "\" specified twice in " + table + " record", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
recordMakers[-1].addField(table, field, fieldMakerClass)
if len(tokens) == 0 or tokens[0] == ';' :
recordMakers[-1].addColumnOperand(self.columnIndex)
elif token == "=" :
assignment = True
elif token == "*" :
fieldMakerClass = ListFieldMaker
elif token == "+" :
## This check is done above, but this elif needs to be here to munch the + operator so it isn't added as a LiteralOperand.
if not assignment :
raise TagParserError("+ operator not in an assignment", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
elif token == "," :
## This check is done above, but this elif needs to be here to munch the , operator so it isn't added as a LiteralOperand.
if not assignment or fieldMakerClass != ListFieldMaker:
raise TagParserError(", operator not in a list field assignment", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
elif token == ";" :
assignment = False
fieldMakerClass = FieldMaker
elif re.match('#',token) : # malformed tags
raise TagParserError("malformed or unrecognized tag \"" + token + "\"", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
elif assignment : # literals
recordMakers[-1].addLiteralOperand(token)
else :
raise TagParserError("bad token \"" + token + "\"", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
return childWithoutID
def _parseHeaderRow(self, row: pandas.core.series.Series) -> list[RecordMaker]:
"""Parses header row and returns a list of RecordMakers.
Args:
row: header row from metadata file.
Returns:
A list of RecordMakers to be used to create records.
Raises:
TagParserError: Will raise an error if a child record does not have an id.
"""
self.lastTable = ""
self.lastField = ""
recordMakers = [ RecordMaker(), RecordMaker() ]
childWithoutID = False
for self.columnIndex in range(0, len(row)) :
cellString = xstr(row.iloc[self.columnIndex]).strip()
if re.match('[*]?#', cellString) :
childWithoutID = self._parseHeaderCell(recordMakers, cellString, childWithoutID)
self.columnIndex = -1
if childWithoutID :
raise TagParserError("#.child record without id", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
recordMakers.pop(0) # pop example RecordMaker used to hold global literals.
return recordMakers
def _parseRow(self, recordMakers: list[RecordMaker], row: pandas.core.series.Series):
"""Create new records and add them to the nested extraction dictionary.
Loop through the RecordMakers in recordMaker and add records to self.extraction
based on the values in row.
Args:
recordMakers: RecordMakers created from parsing a header row.
row: row of data from a metadata file.
"""
for recordMaker in recordMakers :
if not recordMaker.hasValidID():
return
table,record = recordMaker.create(row)
if not table in self.extraction :
self.extraction[table] = {}
## Keep track of ids in specified tables.
if table in self.tablesAndFieldsToTrack:
for field in self.tablesAndFieldsToTrack[table]:
if field in record:
self.trackedFieldsDict[table + "." + field] = record[field]
## Copy tracked fields into records if applicable.
if table in self.tableRecordsToAddTo:
for fieldToAdd in self.tableRecordsToAddTo[table]:
if not fieldToAdd in record and self.trackedFieldsDict[fieldToAdd] != "":
record[fieldToAdd] = self.trackedFieldsDict[fieldToAdd]
elif fieldToAdd in record:
self.trackedFieldsDict[fieldToAdd] = record[fieldToAdd]
if not record["id"] in self.extraction[table] :
self.extraction[table][record["id"]] = record
else :
for key in record :
if key == "id" :
pass
## For when the same record is on multiple tables in the tabular file.
elif not key in self.extraction[table][record["id"]] :
self.extraction[table][record["id"]][key] = record[key]
elif isinstance(self.extraction[table][record["id"]][key], list) :
if isinstance(record[key], list):
self.extraction[table][record["id"]][key] = self.extraction[table][record["id"]][key] + record[key]
else:
self.extraction[table][record["id"]][key].append(record[key])
elif self.extraction[table][record["id"]][key] != record[key] :
self.extraction[table][record["id"]][key] = [ self.extraction[table][record["id"]][key], record[key] ]
[docs] def parseSheet(self, fileName: str, sheetName: str, worksheet: pandas.core.frame.DataFrame):
"""Extracts useful metadata from the worksheet and puts it in the extraction dictionary.
Args:
fileName: name of the file, used for error messages.
sheetName: name of the Excel sheet, used for error messages.
worksheet: the data from the file name and sheet name.
"""
self.lastTable = ""
self.lastField = ""
self.columnIndex = -1
self.rowIndex = -1
self.fileName = fileName
self.sheetName = sheetName
tagRows = worksheet.iloc[:,0].str.match("#tags")
ignoreRows = worksheet.iloc[:,0].str.match("#ignore")
emptyRows = (worksheet=="").all(axis=1)
possibleEndOfTagGroupRows = emptyRows | tagRows
worksheetHeaderRows = worksheet[tagRows]
endOfTagGroupIndexes = []
for header_index in worksheetHeaderRows.index:
endingIndexFound = False
for index in possibleEndOfTagGroupRows[possibleEndOfTagGroupRows].index:
if index > header_index:
endOfTagGroupIndexes.append(index)
endingIndexFound = True
break
if not endingIndexFound:
endOfTagGroupIndexes.append(possibleEndOfTagGroupRows.index[-1]+1)
for headerRow in range(worksheetHeaderRows.shape[0]):
headerRowIndex = worksheetHeaderRows.iloc[headerRow,:].name
self.rowIndex = headerRowIndex
recordMakers = self._parseHeaderRow(worksheet.loc[headerRowIndex, :])
## recordMakers should only ever be either 1 or 2 in size. If 2 then
## it is a child record and the first recordMaker is just making the
## parent record using the child's indicated id.
## If there is not validID print a message unless there are no fieldMakers, then assume it is a control flow header row.
## For example a row that just turns tracking on or off.
if not recordMakers[-1].hasValidID() and recordMakers[-1].fieldMakers and not silent:
print("Warning: The header row at index " + str(headerRowIndex) + " in the compiled export sheet does not have an \"id\" tag, so it will not be in the JSON output.", file=sys.stderr)
rowsToParse = [index for index in range(headerRowIndex+1, endOfTagGroupIndexes[headerRow])]
rowsToParse = ignoreRows.iloc[rowsToParse][~ignoreRows]
## If there was a header, but no rows underneath we want to add an empty table.
if len(rowsToParse) == 0:
if not recordMakers[0].table in self.extraction :
self.extraction[recordMakers[0].table] = {}
for index in rowsToParse.index:
self._parseRow(recordMakers, worksheet.loc[index, :])
self.rowIndex = -1
[docs] @staticmethod
def loadSheet(fileName: str|TextIO, sheetName: str, removeRegex: str|None = None, isDefaultSearch: bool = False) -> tuple[str,str,pandas.core.frame.DataFrame]|None:
"""Load and return worksheet as a pandas data frame.
Args:
fileName: filename or sys.stdin to read a csv from stdin.
sheeName: sheet name for an Excel file, ignored if not an Excel file. Can be a regular expression to search for a sheet.
removeRegex: a string to pass to DataFrame.replace() to replace characters with an empty string in the dataframe that is read in.
Can be a regex. Set to None to not replace anything.
isDefaultSearch: whether or not the sheetName is using default values, determines whether to print some messages.
Returns:
None if the worksheet is empty, else (fileName, sheetName, dataFrame)
Raises:
Exception: If fileName is invalid.
"""
isGoogleSheetsFile = TagParser.isGoogleSheetsFile(fileName)
if (isinstance(fileName, str) and (reMatch := re.search(r"^(.*\.xls[xm]?)$", fileName))) or isGoogleSheetsFile:
if os.path.isfile(fileName) or isGoogleSheetsFile:
try:
workbook = pandas.ExcelFile(fileName)
except urllib.error.HTTPError:
print("The Google Sheets file \"" + fileName + "\" does not exist or the URL is malformed.", file=sys.stderr)
return None
## Convert the sheetname to a regular expression pattern so users can specify a sheetname using a regular expression.
if re.match(TagParser.reDetector, sheetName):
sheetDetector = re.compile(re.match(TagParser.reDetector, sheetName)[1])
else:
sheetDetector = re.compile("^" + re.escape(sheetName) + "$")
for sheetName in workbook.sheet_names:
if re.search(sheetDetector, sheetName) != None:
dataFrame = pandas.read_excel(workbook, sheetName, header=None, index_col=None, nrows=0)
converters = {column:str for column in dataFrame.columns}
dataFrame = pandas.read_excel(workbook, sheetName, header=None, index_col=None, converters=converters)
if len(dataFrame) == 0:
if isGoogleSheetsFile:
print("There is no data in the sheet, " + sheetName + \
", of the Google Sheets file \"" + fileName + "\".", file=sys.stderr)
else:
print("There is no data in worksheet \"" + fileName + ":" + sheetName + "\".", file=sys.stderr)
return None
else:
## Empty cells are read in as nan by default, replace with empty string.
dataFrame = dataFrame.fillna("")
if removeRegex:
dataFrame.replace(removeRegex, "", regex=True, inplace=True)
return (fileName, sheetName, dataFrame)
if not isDefaultSearch:
print("r'" + sheetDetector.pattern + "' did not match any sheets in \"" + fileName + "\".", file=sys.stderr)
else:
print("Excel workbook \"" + reMatch.group(1) + "\" does not exist.", file=sys.stderr)
elif not isinstance(fileName, str) or re.search(r"\.csv$", fileName):
if not isinstance(fileName, str) or pathlib.Path(fileName).exists():
try:
dataFrame = pandas.read_csv(fileName, header=None, index_col=None, dtype=str)
except pandas.errors.EmptyDataError:
print("There is no data in csv file \"" + fileName + "\".", file=sys.stderr)
else:
## I don't think there is a way to read in a csv file with no length. All my attempts resulted in an error.
## Thus this is not testable from the CLI.
if len(dataFrame) == 0:
print("There is no data in csv file \"" + fileName + "\".", file=sys.stderr)
else:
dataFrame = dataFrame.fillna("") # Empty cells are read in as nan by default. Therefore replace with empty string.
if removeRegex:
dataFrame.replace(removeRegex, "", regex=True, inplace=True)
sheetName = "" if not sheetName else sheetName
return (fileName, sheetName, dataFrame)
else:
print("The csv file \"" + fileName + "\" does not exist.", file=sys.stderr)
else:
raise Exception("Invalid worksheet identifier \"" + fileName + "\" passed into function.")
return None
[docs] @staticmethod
def hasFileExtension(string: str) -> bool:
"""Tests whether the string has a file extension.
Args:
string: string to test.
Returns:
True if .xls, .xlsx, .xlsm, .csv, or .json is in string, False otherwise.
"""
return ".xls" in string or ".xlsx" in string or ".xlsm" in string or ".csv" in string or ".json" in string
[docs] @staticmethod
def isGoogleSheetsFile(string: str) -> bool:
"""Tests whether the string is a Google Sheets URL.
Args:
string: string to test.
Returns:
True if docs.google.com/spreadsheets/d/ is in string, False otherwise.
"""
return True if isinstance(string, str) and "docs.google.com/spreadsheets/d/" in string else False
[docs] def saveSheet(self, fileName: str, sheetName: str, worksheet: pandas.core.frame.DataFrame, saveExtension: str):
"""Save given worksheet in the given format.
Args:
fileName: file name or path to save to.
sheetName: name to give the sheet if saving as Excel.
worksheet: data to save.
saveExtension: if "csv" save as csv file, else save as Excel.
"""
if pathlib.Path(fileName).exists():
baseName = os.path.basename(fileName)
fileName = baseName.rsplit(".",1)[0] + "_export."
elif (reMatch := re.search(r"docs.google.com/spreadsheets/d/([^/]*)/[^:]*$", fileName)):
fileName = "Google_Sheets_" + reMatch.group(1) + "_export."
else:
fileName = "stdin_export."
if saveExtension == "csv":
fileName += "csv"
worksheet.to_csv(fileName, header=False, index=False)
else:
fileName += "xlsx"
with pandas.ExcelWriter(fileName, engine = "xlsxwriter") as writer:
worksheet.to_excel(writer, sheet_name = sheetName, index=False, header=False)
headerSplitter = re.compile(r'[+]|(r?\"[^\"]*\"|r?\'[^\']*\')|\s+')
modificationComparisonTypes = [ "exact", "regex", "levenshtein" ]
matchTypes = ["first", "first-nowarn", "unique", "all"]
def _parseModificationSheet(self, fileName: str, sheetName: str, worksheet: pandas.core.frame.DataFrame):
"""Extracts modification directives from a given worksheet.
"modification" : { table : { field : { "(exact|regex|levenshtein)\-(first|first\-nowarn|unique|all)" :
{ field_value : { "assign" : { field : value,... }, "append" : { field : value,... }, "prepend" : { field : value,... },
"regex" : { field : regex_pair,... }, "delete" : [ field,... ], "rename" : { old_field : new_field } } } } } }
Loops over worksheet and builds up self.modificationDirectives.
Args:
fileName: used for printing more descriptive error messages.
sheetName: used for printing more descriptive error messages.
worksheet: data used to build the modification directives.
Raises:
TagParserError: usually raised for malformed tags, but also for other unpredicted errors
Exception: a catch all in case something unforeseen happens.
"""
self.columnIndex = -1
self.rowIndex = -1
self.fileName = fileName
self.sheetName = sheetName
aColumn = worksheet.iloc[:, 0]
parsing = False
for self.rowIndex in range(len(aColumn)):
try:
if re.match('#tags$', xstr(aColumn.iloc[self.rowIndex]).strip()):
parsing = True
valueIndex = -1
comparisonIndex = -1
comparisonType = "regex|exact"
userSpecifiedType = False
matchType = "first"
matchIndex = -1
assignIndeces = []
assignFields = []
assignFieldTypes = []
appendIndeces = []
appendFields = []
appendFieldTypes = []
prependIndeces = []
prependFields = []
prependFieldTypes = []
regexIndeces = []
regexFields = []
deletionFields = []
renameFieldMap = {}
for self.columnIndex in range(1, len(worksheet.iloc[self.rowIndex, :])):
cellString = xstr(worksheet.iloc[self.rowIndex, self.columnIndex]).strip()
if (reMatch := re.match('\s*#(\w+)\.(\w+|\w+%\w+|\w+\.id)\.value\s*$', cellString)):
valueIndex = self.columnIndex
table = reMatch.group(1)
fieldID = reMatch.group(2)
elif (reMatch := re.match('\s*#(\w+)?\.(\w+|\w+%\w+|\w+\.id)\.delete\s*$', cellString)):
if valueIndex == -1:
raise TagParserError("#table_name.field_name.delete in column before #table_name.field_name.value", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if reMatch.group(1) is not None and reMatch.group(1) != table:
raise TagParserError("Table name does not match between #table_name.field_name.value and #table_name.field_name.delete modification tags", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
deletionFields.append(reMatch.group(2))
elif (reMatch := re.match('\s*#(\w+)?\.(\w+|\w+%\w+|\w+\.id)\.rename\.(\w+|\w+%\w+|\w+\.id)\s*$', cellString)):
if valueIndex == -1:
raise TagParserError("#table_name.field_name.rename in column before #table_name.field_name.value", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if reMatch.group(1) is not None and reMatch.group(1) != table:
raise TagParserError("Table name does not match between #table_name.field_name.value and #table_name.field_name.rename modification tags", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if reMatch.group(2) == reMatch.group(3):
raise TagParserError("rename modification directive renames the field to the same name", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if reMatch.group(2) == "id":
raise TagParserError("Not allowed to rename \"id\" fields", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
renameFieldMap[reMatch.group(2)] = reMatch.group(3)
elif (reMatch := re.match('\s*#(\w+)?\.(\w+|\w+%\w+|\w+\.id)\.rename\s*$', cellString)):
raise TagParserError("Incorrect rename directive format. Should be #[table_name].field_name.rename.new_field_name", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
elif (reMatch := re.match('\s*(\*#|#)(\w+)?\.(\w+)\.assign\s*$', cellString)) or (reMatch := re.match('\s*(#)(\w+)?\.(\w+|\w+%\w+|\w+\.id)\.assign\s*$', cellString)):
if valueIndex == -1:
raise TagParserError("#table_name.field_name.assign in column before #table_name.field_name.value", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if reMatch.group(2) is not None and reMatch.group(2) != table:
raise TagParserError("Table name does not match between #table_name.field_name.value and #table_name.field_name.assign modification tags", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
assignIndeces.append(self.columnIndex)
assignFieldTypes.append(reMatch.group(1))
assignFields.append(reMatch.group(3))
elif (reMatch := re.match('\s*(\*#|#)(\w+)?\.(\w+)\.append\s*$', cellString)) or (reMatch := re.match('\s*(#)(\w+)?\.(\w+|\w+%\w+|\w+\.id)\.append\s*$', cellString)):
if valueIndex == -1:
raise TagParserError("#table_name.field_name.append in column before #table_name.field_name.value", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if reMatch.group(2) is not None and reMatch.group(2) != table:
raise TagParserError("Table name does not match between #table_name.field_name.value and #table_name.field_name.append modification tags", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
appendIndeces.append(self.columnIndex)
appendFieldTypes.append(reMatch.group(1))
appendFields.append(reMatch.group(3))
elif (reMatch := re.match('\s*(\*#|#)(\w+)?\.(\w+)\.prepend\s*$', cellString)) or (reMatch := re.match('\s*(#)(\w+)?\.(\w+|\w+%\w+|\w+\.id)\.prepend\s*$', cellString)):
if valueIndex == -1:
raise TagParserError("#table_name.field_name.prepend in column before #table_name.field_name.value", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if reMatch.group(2) is not None and reMatch.group(2) != table:
raise TagParserError("Table name does not match between #table_name.field_name.value and #table_name.field_name.prepend modification tags", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
prependIndeces.append(self.columnIndex)
prependFieldTypes.append(reMatch.group(1))
prependFields.append(reMatch.group(3))
elif (reMatch := re.match('\s*#(\w+)?\.(\w+)\.regex\s*$', cellString)) or (reMatch := re.match('\s*#(\w+)?\.(\w+|\w+%\w+|\w+\.id)\.regex\s*$', cellString)):
if valueIndex == -1:
raise TagParserError("#table_name.field_name.regex in column before #table_name.field_name.value", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if reMatch.group(1) is not None and reMatch.group(1) != table:
raise TagParserError("Table name does not match between #table_name.field_name.value and #table_name.field_name.regex modification tags", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
regexIndeces.append(self.columnIndex)
regexFields.append(reMatch.group(2))
elif (reMatch := re.match('\s*#comparison\s*=\s*(exact|regex|regex\|exact|levenshtein)\s*$', cellString)):
comparisonType=reMatch.group(1)
userSpecifiedType = True
elif re.match('\s*#comparison\s*$', cellString):
comparisonIndex = self.columnIndex
elif re.match('\s*#match\s*=.*$', cellString):
if (reMatch := re.match('\s*#match\s*=\s*(first|first-nowarn|unique|all)\s*$', cellString)):
matchType = reMatch.group(1)
else:
badType = re.match('\s*#match\s*=(.*)$', cellString).group(1)
raise TagParserError("Unknown match type \"" + badType + "\"", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
elif re.match('\s*#match\s*$', cellString):
matchIndex = self.columnIndex
self.columnIndex = -1
if valueIndex == -1 or (len(assignIndeces) == 0 and len(appendIndeces) == 0 and len(prependIndeces) == 0 and len(regexIndeces) == 0 and len(deletionFields) == 0 and not renameFieldMap):
raise TagParserError("Missing #table_name.field_name.value or #.field_name.assign|append|prepend|regex|delete|rename modification tags", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if "id" in deletionFields:
raise TagParserError("Not allowed to delete \"id\" fields", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if not table in self.modificationDirectives:
self.modificationDirectives[table] = {}
if not fieldID in self.modificationDirectives[table]:
self.modificationDirectives[table][fieldID] = {}
elif re.match('#ignore$', xstr(aColumn.iloc[self.rowIndex]).strip()):
pass
elif TagParser._isEmptyRow(worksheet.iloc[self.rowIndex, :]):
parsing = False
elif parsing:
fieldValue = xstr(worksheet.iloc[self.rowIndex, valueIndex]).strip()
if comparisonIndex != -1 and (comparisonString := xstr(worksheet.iloc[self.rowIndex, comparisonIndex]).strip()) in TagParser.modificationComparisonTypes:
localComparisonType = comparisonString
else:
if not userSpecifiedType or comparisonType == "regex|exact":
localComparisonType = "regex" if re.match(TagParser.reDetector, fieldValue) else "exact"
else:
localComparisonType = comparisonType
if localComparisonType == "regex" and not re.match(TagParser.reDetector, fieldValue):
print(TagParserError("Comparison type is indicated as regex, but comparison value is not a regex", self.fileName, self.sheetName, self.rowIndex, valueIndex, " This modification will be skipped."), file=sys.stderr)
# raise TagParserError("Comparison type is indicated as regex, but comparison value is not a regex", self.fileName, self.sheetName, self.rowIndex, valueIndex)
continue
if matchIndex != -1:
matchType = xstr(worksheet.iloc[self.rowIndex, matchIndex]).strip()
if matchType not in TagParser.matchTypes:
raise TagParserError("Unknown match type \"" + matchType + "\"", self.fileName, self.sheetName, self.rowIndex, matchIndex)
localComparisonType += "-" + matchType
assignFieldMap = {}
for i in range(len(assignIndeces)):
assignFieldValue = xstr(worksheet.iloc[self.rowIndex, assignIndeces[i]]).strip()
if re.match(r"\*", assignFieldTypes[i]) and not Evaluator.isEvalString(assignFieldValue):
## If the list field contains semicolons use it to split instead of commas.
if re.match(r".*;.*", assignFieldValue):
assignFieldValue = assignFieldValue.strip(";").split(";")
else:
assignFieldValue = assignFieldValue.strip(",").split(",")
assignFieldMap[assignFields[i]] = assignFieldValue
appendFieldMap = {}
for i in range(len(appendIndeces)):
appendFieldValue = xstr(worksheet.iloc[self.rowIndex, appendIndeces[i]]).strip()
if re.match(r"\*", appendFieldTypes[i]):
## If the list field contains semicolons use it to split instead of commas.
if re.match(r".*;.*", appendFieldValue):
appendFieldValue = appendFieldValue.strip(";").split(";")
else:
appendFieldValue = appendFieldValue.strip(",").split(",")
appendFieldMap[appendFields[i]] = appendFieldValue
prependFieldMap = {}
for i in range(len(prependIndeces)):
prependFieldValue = xstr(worksheet.iloc[self.rowIndex, prependIndeces[i]]).strip()
if re.match(r"\*", prependFieldTypes[i]):
## If the list field contains semicolons use it to split instead of commas.
if re.match(r".*;.*", prependFieldValue):
prependFieldValue = prependFieldValue.strip(";").split(";")
else:
prependFieldValue = prependFieldValue.strip(",").split(",")
prependFieldMap[prependFields[i]] = prependFieldValue
regexFieldMap = {}
for i in range(len(regexIndeces)):
regexFieldValue = xstr(worksheet.iloc[self.rowIndex, regexIndeces[i]]).strip()
if (reMatch := re.match(r"(r[\"'].*[\"'])\s*,\s*(r[\"'].*[\"'])$",regexFieldValue)):
regexFieldMap[regexFields[i]] = [ reMatch.group(1), reMatch.group(2) ]
else:
raise TagParserError("#table_name.field_name.regex value is not of the correct format r\"...\",r\"...\".", self.fileName, self.sheetName, self.rowIndex, valueIndex)
if not localComparisonType in self.modificationDirectives[table][fieldID]:
self.modificationDirectives[table][fieldID][localComparisonType] = {}
if not fieldValue in self.modificationDirectives[table][fieldID][localComparisonType]:
self.modificationDirectives[table][fieldID][localComparisonType][fieldValue] = {}
# elif not silent:
# print(TagParserError("Warning: duplicate modification directive given", self.fileName, self.sheetName, self.rowIndex, self.columnIndex), file=sys.stderr)
if assignFieldMap:
if "assign" in self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]:
## If any of the keys in assignFieldMap are already in modificationDirectives then it is a duplicate assign modification.
if any([key in self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["assign"] for key in assignFieldMap]) and not silent:
print(TagParserError("Warning: duplicate assign modification directive given", self.fileName, self.sheetName, self.rowIndex, self.columnIndex), file=sys.stderr)
self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["assign"].update(assignFieldMap)
else:
self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["assign"] = assignFieldMap
if appendFieldMap:
if "append" in self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]:
## If any of the keys in appendFieldMap are already in modificationDirectives then it is a duplicate append modification.
if any([key in self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["append"] for key in appendFieldMap]) and not silent:
print(TagParserError("Warning: duplicate append modification directive given", self.fileName, self.sheetName, self.rowIndex, self.columnIndex), file=sys.stderr)
self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["append"].update(appendFieldMap)
else:
self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["append"] = appendFieldMap
if prependFieldMap:
if "prepend" in self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]:
## If any of the keys in prependFieldMap are already in modificationDirectives then it is a duplicate prepend modification.
if any([key in self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["prepend"] for key in prependFieldMap]) and not silent:
print(TagParserError("Warning: duplicate prepend modification directive given", self.fileName, self.sheetName, self.rowIndex, self.columnIndex), file=sys.stderr)
self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["prepend"].update(prependFieldMap)
else:
self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["prepend"] = prependFieldMap
if regexFieldMap:
if "regex" in self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]:
## If any of the keys in regexFieldMap are already in modificationDirectives then it is a duplicate regex modification.
if any([key in self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["regex"] for key in regexFieldMap]) and not silent:
print(TagParserError("Warning: duplicate regex modification directive given", self.fileName, self.sheetName, self.rowIndex, self.columnIndex), file=sys.stderr)
self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["regex"].update(regexFieldMap)
else:
self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["regex"] = regexFieldMap
if deletionFields:
if "delete" in self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]:
## If any of fields in deletionFields are already in modificationDirectives then it is a duplicate delete modification.
if any([field in self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["delete"] for field in deletionFields]) and not silent:
print(TagParserError("Warning: duplicate delete modification directive given", self.fileName, self.sheetName, self.rowIndex, self.columnIndex), file=sys.stderr)
self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["delete"].extend(deletionFields)
else:
self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["delete"] = deletionFields
if renameFieldMap:
if "rename" in self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]:
## If any of the keys in renameFieldMap are already in modificationDirectives then it is a duplicate rename modification.
if any([key in self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["rename"] for key in renameFieldMap]) and not silent:
print(TagParserError("Warning: duplicate rename modification directive given", self.fileName, self.sheetName, self.rowIndex, self.columnIndex), file=sys.stderr)
self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["rename"].update(renameFieldMap)
else:
self.modificationDirectives[table][fieldID][localComparisonType][fieldValue]["rename"] = renameFieldMap
except TagParserError as err:
print(err.value, file=sys.stderr)
exit(1)
## I don't think this can be triggered from the CLI.
except:
print(TagParserError("Internal Parser Error", self.fileName, self.sheetName, self.rowIndex, self.columnIndex), file=sys.stderr)
raise
self.rowIndex = -1
def _parseAutomationSheet(self, fileName: str, sheetName: str, worksheet: pandas.core.series.Series):
"""Extracts automation directives from a given worksheet.
"automation" : [ { "header_tag_descriptions" : [ { "header" : column_description, "tag" : tag_description, "required" : true|false } ], "exclusion_test" : exclusion_value, "insert" : [ [ cell_content, ... ] ] } ]
Loops over worksheet and builds up self.automationDirectives.
Args:
fileName: used for printing more descriptive error messages.
sheetName: used for printing more descriptive error messages.
worksheet: data used to build the automation directives.
Raises:
TagParserError: usually raised for malformed tags, but also for other unpredicted errors
Exception: a catch all in case something unforeseen happens.
"""
self.columnIndex = -1
self.rowIndex = -1
self.fileName = fileName
self.sheetName = sheetName
aColumn = worksheet.iloc[:, 0]
parsing = False
self.rowIndex = 0
currAutomationGroup = None
while self.rowIndex < len(aColumn):
try:
if re.match('#tags$', xstr(aColumn.iloc[self.rowIndex]).strip()):
parsing = True
headerIndex = -1
tagIndex = -1
usedHeaders = set()
## If #tags group is twice in a row remove it from the directives.
if self.automationDirectives and "header_tag_descriptions" in self.automationDirectives[-1] and not self.automationDirectives[-1]["header_tag_descriptions"]:
self.automationDirectives.pop()
currAutomationGroup = { "header_tag_descriptions" : [] }
requiredIndex = -1
self.automationDirectives.append(currAutomationGroup)
for self.columnIndex in range(1, len(worksheet.iloc[self.rowIndex, :])):
cellString = xstr(worksheet.iloc[self.rowIndex, self.columnIndex]).strip()
if re.match('\s*#header\s*$', cellString):
headerIndex = self.columnIndex
elif re.match('\s*#add\s*$', cellString):
tagIndex = self.columnIndex
elif re.match('\s*#required\s*$', cellString):
requiredIndex = self.columnIndex
elif (reMatch := re.match('\s*#exclude\s*=\s*(.+)\s*$', cellString)):
currAutomationGroup["exclusion_test"]=reMatch.group(1)
self.columnIndex = -1
if headerIndex == -1:
raise TagParserError("Missing #header tag", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
if tagIndex == -1:
raise TagParserError("Missing #add tag", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
elif re.match('#ignore$', xstr(aColumn.iloc[self.rowIndex]).strip()):
pass
elif re.match('#insert$', xstr(aColumn.iloc[self.rowIndex]).strip()):
## If #insert is found inside of #tags then it needs to be added to the current tag group, otherwise make a new one.
if not parsing:
currAutomationGroup = {}
self.automationDirectives.append(currAutomationGroup)
## If "insert" is already in the current automation group then add to it and don't overwrite it.
if not "insert" in currAutomationGroup:
currAutomationGroup["insert"] = []
currAutomationGroup["insert_multiple"] = False
for self.columnIndex in range(1, len(worksheet.iloc[self.rowIndex, :])):
cellString = xstr(worksheet.iloc[self.rowIndex, self.columnIndex]).strip()
if re.match('\s*#multiple\s*=\s*[Tt]rue\s*$', cellString):
currAutomationGroup["insert_multiple"] = True
elif re.match('\s*#multiple\s*=\s*[Ff]alse\s*$', cellString):
currAutomationGroup["insert_multiple"] = False
endTagFound = False
while self.rowIndex < len(aColumn)-1:
self.rowIndex += 1
if re.match('#end$', xstr(aColumn.iloc[self.rowIndex]).strip()):
endTagFound = True
break
currAutomationGroup["insert"].append([xstr(worksheet.iloc[self.rowIndex, self.columnIndex]).strip() for self.columnIndex in range(0, len(worksheet.iloc[self.rowIndex, :]))])
if not endTagFound:
raise TagParserError("Missing #end tag", self.fileName, self.sheetName, self.rowIndex, self.columnIndex)
elif TagParser._isEmptyRow(worksheet.iloc[self.rowIndex, :]):
parsing = False
elif parsing:
headerValue = xstr(worksheet.iloc[self.rowIndex, headerIndex]).strip()
newTagValue = xstr(worksheet.iloc[self.rowIndex, tagIndex]).strip()
localRequired = True if requiredIndex == -1 or re.match("[Tt]rue$", xstr(worksheet.iloc[self.rowIndex, requiredIndex]).strip()) else False
if headerValue not in usedHeaders:
usedHeaders.add(headerValue)
currAutomationGroup["header_tag_descriptions"].append({ "header" : headerValue, "tag" : newTagValue, "required" : localRequired })
elif not silent:
print(TagParserError("Warning: duplicate header description provided in automation directive", self.fileName, self.sheetName, self.rowIndex, self.columnIndex), file=sys.stderr)
except TagParserError as err:
print(err.value, file=sys.stderr)
exit(1)
## Not sure you can get to this from the CLI.
except:
print(TagParserError("Internal Parser Error", self.fileName, self.sheetName, self.rowIndex, self.columnIndex), file=sys.stderr)
raise
self.rowIndex += 1
## I'm not sure what this was testing for, presumably empty directives need to be removed.
# if self.automationDirectives and not self.automationDirectives[-1]["header_tag_descriptions"]:
# self.automationDirectives.pop()
## Only keep non empty directives.
self.automationDirectives = [directive for directive in self.automationDirectives if "header_tag_descriptions" in directive and directive["header_tag_descriptions"] or not "header_tag_descriptions" in directive]
self.rowIndex = -1
[docs] def readDirectives(self, source: str, sheetName: str, directiveType: str, removeRegex: str|None, isDefaultSearch: bool =False) -> dict:
"""Read directives source of a given directive type.
Args:
source: file path.
sheetName: sheet name for an Excel file, ignored if not an Excel file.
directiveType: either "modification" or "automation" to call the correct parsing function.
removeRegex: a string to pass to DataFrame.replace() to replace characters with an empty string in the dataframe that is read in.
Can be a regex. Set to None to not replace anything. Passed to loadSheet.
isDefaultSearch: whether or not the source is using default values, passed to loadSheet for message printing.
Returns:
The directives that were read in.
"""
directives = None
if re.search(r"\.json$", source):
with open(source, 'r') as jsonFile:
directives = json.load(jsonFile)
if type(directives) == dict and directiveType in directives:
directives = directives[directiveType]
else:
directives = None
if not silent:
print("Warning: The input directives JSON file is either not a dict or does not contain the directive keyword \"" + directiveType + "\". This means that " + directiveType + " will not be done.", file=sys.stderr)
elif TagParser.hasFileExtension(source) or TagParser.isGoogleSheetsFile(source):
dataFrameTuple = TagParser.loadSheet(source, sheetName, removeRegex=removeRegex, isDefaultSearch=isDefaultSearch)
if dataFrameTuple != None:
if directiveType == "modification":
self.modificationDirectives = {}
self._parseModificationSheet(*dataFrameTuple)
directives = self.modificationDirectives
else:
self.automationDirectives = []
self._parseAutomationSheet(*dataFrameTuple)
directives = copy.deepcopy(self.automationDirectives)
return directives
def _applyModificationDirectives(self, record: dict, recordPath: str, modifications: dict):
"""Apply modification directives to the given record.
Args:
record: a record from self.extraction extracted from metadata.
recordPath: the path to the record in self.extraction, used for printing warning messages.
modifications: the modifications to apply to the record.
"""
if "assign" in modifications:
for newField, newValue in modifications["assign"].items():
if type(newValue) == Evaluator:
if newValue.hasRequiredFields(record):
newValueForRecord = newValue.evaluate(record)
if newField in record and not silent:
if isinstance(record[newField], list) and not isinstance(newValueForRecord, list):
print("Warning: \"" + newField + "\" in record, " + recordPath + ", was assigned a non list type value but was originally a list type value.")
elif not isinstance(record[newField], list) and isinstance(newValueForRecord, list):
print("Warning: \"" + newField + "\" in record, " + recordPath + ", was assigned a list type value but was not originally a list type value.")
record[newField] = newValueForRecord
elif not silent:
print("Warning: Field assignment directive \"" + newField + "\" missing required field(s) \"" + ",".join([ field for field in newValue.requiredFields if field not in record]) + "\", or a regular expression matched no fields or more than one.", file=sys.stderr)
else:
## If this is not a copy when it is a list it has unexpected results.
newValueForRecord = copy.deepcopy(newValue)
if newField in record and not silent:
if isinstance(record[newField], list) and not isinstance(newValueForRecord, list):
print("Warning: \"" + newField + "\" in record, " + recordPath + ", was assigned a non list type value but was originally a list type value.")
elif not isinstance(record[newField], list) and isinstance(newValueForRecord, list):
print("Warning: \"" + newField + "\" in record, " + recordPath + ", was assigned a list type value but was not originally a list type value.")
record[newField] = newValueForRecord
if newField in record:
fieldPath = recordPath + newField
if fieldPath in self.changedRecords:
if (("assign" == self.changedRecords[fieldPath]["previous_modification_type"] and self.changedRecords[fieldPath]["previous_modification_value"] != record[newField]) or\
"append" == self.changedRecords[fieldPath]["previous_modification_type"] or \
"prepend" == self.changedRecords[fieldPath]["previous_modification_type"] or\
"regex" == self.changedRecords[fieldPath]["previous_modification_type"] or \
"delete" == self.changedRecords[fieldPath]["previous_modification_type"]) and not silent:
print("Warning: \"" + newField + "\" in record, " + recordPath + ", was assigned a new value after previously being modified by a different modification directive.", file=sys.stderr)
else:
self.changedRecords[fieldPath] = {}
self.changedRecords[fieldPath]["previous_modification_type"] = "assign"
self.changedRecords[fieldPath]["previous_modification_value"] = record[newField]
if "append" in modifications:
for newField, newValue in modifications["append"].items():
if newField not in record and type(newValue) == list:
record[newField] = newValue.copy()
elif newField not in record and type(newValue) != list:
record[newField] = newValue
elif type(record[newField]) == list and type(newValue) == list:
minLen = min(len(record[newField]),len(newValue))
for index in range(minLen):
record[newField][index] += newValue[index]
elif type(record[newField]) == list and type(newValue) != list:
for index in range(len(record[newField])):
record[newField][index] += newValue
elif type(record[newField]) != list and type(newValue) == list:
record[newField] += newValue[0]
else:
record[newField] += newValue
fieldPath = recordPath + newField
if fieldPath in self.changedRecords:
if "delete" == self.changedRecords[fieldPath]["previous_modification_type"] and not silent:
print("Warning: The field, \"" + newField + "\", in record, " + recordPath + ", was deleted before being appended to by a different modification directive.", file=sys.stderr)
else:
self.changedRecords[fieldPath] = {}
self.changedRecords[fieldPath]["previous_modification_type"] = "append"
self.changedRecords[fieldPath]["previous_modification_value"] = record[newField]
if "prepend" in modifications:
for newField, newValue in modifications["prepend"].items():
if newField not in record and type(newValue) == list:
record[newField] = newValue.copy()
elif newField not in record and type(newValue) != list:
record[newField] = newValue
elif type(record[newField]) == list and type(newValue) == list:
minLen = min(len(record[newField]),len(newValue))
for index in range(minLen):
record[newField][index] = newValue[index] + record[newField][index]
elif type(record[newField]) == list and type(newValue) != list:
for index in range(len(record[newField])):
record[newField][index] = newValue + record[newField][index]
elif type(record[newField]) != list and type(newValue) == list:
record[newField] = newValue[0] + record[newField]
else:
record[newField] = newValue + record[newField]
fieldPath = recordPath + newField
if fieldPath in self.changedRecords:
if "delete" == self.changedRecords[fieldPath]["previous_modification_type"] and not silent:
print("Warning: The field, \"" + newField + "\", in record, " + recordPath + ", was deleted before being prepended to by a different modification directive.", file=sys.stderr)
else:
self.changedRecords[fieldPath] = {}
self.changedRecords[fieldPath]["previous_modification_type"] = "prepend"
self.changedRecords[fieldPath]["previous_modification_value"] = record[newField]
if "regex" in modifications:
for newField, regexPair in modifications["regex"].items():
fieldInRecord = True
if newField not in record:
fieldInRecord = False
if not silent:
print("Warning: regex substitution (" + ",".join(regexPair) + ") cannot be applied to record with missing field \"" + newField + "\"", file=sys.stderr)
elif type(record[newField]) == list:
for index in range(len(record[newField])):
record[newField][index] = re.sub(regexPair[0],regexPair[1],record[newField][index])
else:
oldValue = record[newField]
record[newField] = re.sub(regexPair[0],regexPair[1],record[newField])
if oldValue == record[newField]:
if not silent:
print("Warning: regex substitution (" + ",".join(regexPair) + ") produces no change in field \"" + newField + "\" value \"" + oldValue + "\"", file=sys.stderr)
fieldPath = recordPath + newField
if fieldPath in self.changedRecords:
if "delete" == self.changedRecords[fieldPath]["previous_modification_type"] and not silent:
print("Warning: The field, \"" + newField + "\", in record, " + recordPath + ", was deleted by a modification directive before attempting to be modified by a regex modification directive.", file=sys.stderr)
if "assign" == self.changedRecords[fieldPath]["previous_modification_type"] and not silent:
print("Warning: The field, \"" + newField + "\", in record, " + recordPath + ", had a regex substitution applied after previously being assigned a new value by an assignment modification directive.", file=sys.stderr)
elif fieldInRecord:
self.changedRecords[fieldPath] = {}
if fieldInRecord:
self.changedRecords[fieldPath]["previous_modification_type"] = "regex"
self.changedRecords[fieldPath]["previous_modification_value"] = record[newField]
if "delete" in modifications:
for deletedField in modifications["delete"]:
record.pop(deletedField, None)
fieldPath = recordPath + deletedField
if fieldPath in self.changedRecords:
if ("assign" == self.changedRecords[fieldPath]["previous_modification_type"] or\
"append" == self.changedRecords[fieldPath]["previous_modification_type"] or \
"prepend" == self.changedRecords[fieldPath]["previous_modification_type"] or\
"regex" == self.changedRecords[fieldPath]["previous_modification_type"] or\
"rename" == self.changedRecords[fieldPath]["previous_modification_type"]) and not silent:
print("Warning: The field, \"" + deletedField + "\", in record, " + recordPath + ", was deleted after previously being modified by a different modification directive.", file=sys.stderr)
else:
self.changedRecords[fieldPath] = {}
self.changedRecords[fieldPath]["previous_modification_type"] = "delete"
self.changedRecords[fieldPath]["previous_modification_value"] = ""
if "rename" in modifications:
for oldField,newField in modifications["rename"].items():
fieldInRecord = False
if oldField in record:
if newField in record:
print("Warning: A modification directive has renamed the field \"" + oldField + "\" to \"" + newField + "\" for record " + recordPath + ", but \"" + newField + "\" already existed in the record, so its value was overwritten.", file=sys.stderr)
fieldInRecord = True
record[newField] = record[oldField]
record.pop(oldField, None)
if not fieldInRecord:
fieldPath = recordPath + oldField
if fieldPath in self.changedRecords:
if "delete" == self.changedRecords[fieldPath]["previous_modification_type"] and not silent:
print("Warning: The field, \"" + oldField + "\", in record, " + recordPath + ", was deleted by a modification directive, and then a different modification directive attempted to rename it, but it no longer exists.", file=sys.stderr)
else:
fieldPath = recordPath + newField
if fieldPath in self.changedRecords:
if "delete" == self.changedRecords[fieldPath]["previous_modification_type"] and not silent:
print("Warning: The field, \"" + newField + "\", in record, " + recordPath + ", was deleted by a modification directive, but then a rename directive created it again from a different field.", file=sys.stderr)
else:
self.changedRecords[fieldPath] = {}
self.changedRecords[fieldPath]["previous_modification_type"] = "rename"
self.changedRecords[fieldPath]["previous_modification_value"] = record[newField]
## not sure if the old field that was removed should be added or not.
# if recordPath + oldField not in self.changedRecords:
# self.changedRecords[recordPath + oldField] = {}
# self.changedRecords[recordPath + oldField]["previous_modification_type"] = "rename"
# self.changedRecords[recordPath + oldField]["previous_modification_value"] = record[newField]
def _applyExactModificationDirectives(self, tableKey: str, fieldKey: str, modificationDirectives: dict):
"""Tests and applies exact modification directives
Args:
tableKey: table key in modificationDirectives to get to the modification to apply.
fieldKey: field key in modificationDirectives to get to the modification to apply.
modificationDirectives: contains the modifications to apply.
"""
comparisonTypes = ["exact-first", "exact-first-nowarn", "exact-unique", "exact-all"]
firstTypes = ["exact-first", "exact-first-nowarn"]
for comparisonType in comparisonTypes:
if comparisonType == "exact-unique":
isUnique=True
else:
isUnique=False
if comparisonType in firstTypes:
isFirst=True
else:
isFirst=False
matchedFieldValues = {}
if comparisonType in modificationDirectives[tableKey][fieldKey]:
table = self.extraction[tableKey]
for idKey, record in table.items():
if fieldKey in record:
fieldValue = record[fieldKey]
if type(fieldValue) == list:
for specificValue in fieldValue:
if specificValue in modificationDirectives[tableKey][fieldKey][comparisonType]:
if isFirst:
if specificValue not in matchedFieldValues:
self._applyModificationDirectives(record, tableKey + "[" + idKey + "]", modificationDirectives[tableKey][fieldKey][comparisonType][specificValue])
matchedFieldValues[specificValue] = {"idKey":idKey, "numberOfMatches":1}
self.usedModifications.add((tableKey, fieldKey, comparisonType, specificValue))
elif comparisonType == "exact-first" and not silent:
print("Warning: modification directive #" + tableKey + "." + fieldKey + "." + comparisonType + "." + specificValue + " matches more than one record. Only the first record will be changed. Try #match=all if all matching records should be changed, or #match=first-nowarn to silence this message.", file=sys.stderr)
elif isUnique:
if specificValue not in matchedFieldValues:
matchedFieldValues[specificValue] = {"idKey":idKey, "numberOfMatches":1}
else:
matchedFieldValues[specificValue]["numberOfMatches"] += 1
else:
self._applyModificationDirectives(record, tableKey + "[" + idKey + "]", modificationDirectives[tableKey][fieldKey][comparisonType][specificValue])
self.usedModifications.add((tableKey, fieldKey, comparisonType, specificValue))
elif fieldValue in modificationDirectives[tableKey][fieldKey][comparisonType]:
if isFirst:
if fieldValue not in matchedFieldValues:
self._applyModificationDirectives(record, tableKey + "[" + idKey + "]", modificationDirectives[tableKey][fieldKey][comparisonType][fieldValue])
matchedFieldValues[fieldValue] = {"idKey":idKey, "numberOfMatches":1}
self.usedModifications.add((tableKey, fieldKey, comparisonType, fieldValue))
elif comparisonType == "exact-first" and not silent:
print("Warning: modification directive #" + tableKey + "." + fieldKey + "." + comparisonType + "." + fieldValue + " matches more than one record. Only the first record will be changed. Try #match=all if all matching records should be changed, or #match=first-nowarn to silence this message.", file=sys.stderr)
elif isUnique:
if fieldValue not in matchedFieldValues:
matchedFieldValues[fieldValue] = {"idKey":idKey, "numberOfMatches":1}
else:
matchedFieldValues[fieldValue]["numberOfMatches"] += 1
else:
self._applyModificationDirectives(record, tableKey + "[" + idKey + "]", modificationDirectives[tableKey][fieldKey][comparisonType][fieldValue])
self.usedModifications.add((tableKey, fieldKey, comparisonType, fieldValue))
if isUnique:
for fieldValue, attributes in matchedFieldValues.items():
if attributes["numberOfMatches"] == 1:
self._applyModificationDirectives(table[attributes["idKey"]], tableKey + "[" + attributes["idKey"] + "]", modificationDirectives[tableKey][fieldKey][comparisonType][fieldValue])
self.usedModifications.add((tableKey, fieldKey, comparisonType, fieldValue))
def _applyRegexModificationDirectives(self, tableKey: str, fieldKey: str, modificationDirectives: dict, regexObjects: dict):
"""Tests and applies regular expression modification directives.
Args:
tableKey: table key in modificationDirectives to get to the modification to apply.
fieldKey: field key in modificationDirectives to get to the modification to apply.
modificationDirectives: contains the modifications to apply.
regexObjects: mapping of the regex string in the directives to their compiled objects.
"""
comparisonTypes = ["regex-first", "regex-first-nowarn", "regex-unique", "regex-all"]
firstTypes = ["regex-first", "regex-first-nowarn"]
for comparisonType in comparisonTypes:
if comparisonType == "regex-unique":
isUnique=True
else:
isUnique=False
if comparisonType in firstTypes:
isFirst=True
else:
isFirst=False
matchedRegexIDs = {}
if comparisonType in modificationDirectives[tableKey][fieldKey]:
table = self.extraction[tableKey]
for idKey, record in table.items():
if fieldKey in record:
fieldValue = record[fieldKey]
for regexID, regexEntry in modificationDirectives[tableKey][fieldKey][comparisonType].items():
if type(fieldValue) == list:
for specificValue in fieldValue:
if re.search(regexObjects[regexID], specificValue):
if isFirst:
if regexID not in matchedRegexIDs:
self._applyModificationDirectives(record, tableKey + "[" + idKey + "]", regexEntry)
matchedRegexIDs[regexID] = {"idKey":idKey, "numberOfMatches":1}
self.usedModifications.add((tableKey, fieldKey, comparisonType, regexID))
elif comparisonType == "regex-first" and not silent:
print("Warning: modification directive #" + tableKey + "." + fieldKey + "." + comparisonType + "." + regexID + " matches more than one record. Only the first record will be changed. Try #match=all if all matching records should be changed, or #match=first-nowarn to silence this message.", file=sys.stderr)
elif isUnique:
if regexID not in matchedRegexIDs:
matchedRegexIDs[regexID] = {"idKey":idKey, "numberOfMatches":1}
else:
matchedRegexIDs[regexID]["numberOfMatches"] += 1
else:
self._applyModificationDirectives(record, tableKey + "[" + idKey + "]", regexEntry)
self.usedModifications.add((tableKey, fieldKey, comparisonType, regexID))
elif re.search(regexObjects[regexID], fieldValue):
if isFirst:
if regexID not in matchedRegexIDs:
self._applyModificationDirectives(record, tableKey + "[" + idKey + "]", regexEntry)
matchedRegexIDs[regexID] = {"idKey":idKey, "numberOfMatches":1}
self.usedModifications.add((tableKey, fieldKey, comparisonType, regexID))
elif comparisonType == "regex-first" and not silent:
print("Warning: modification directive #" + tableKey + "." + fieldKey + "." + comparisonType + "." + regexID + " matches more than one record. Only the first record will be changed. Try #match=all if all matching records should be changed, or #match=first-nowarn to silence this message.", file=sys.stderr)
elif isUnique:
if regexID not in matchedRegexIDs:
matchedRegexIDs[regexID] = {"idKey":idKey, "numberOfMatches":1}
else:
matchedRegexIDs[regexID]["numberOfMatches"] += 1
else:
self._applyModificationDirectives(record, tableKey + "[" + idKey + "]", regexEntry)
self.usedModifications.add((tableKey, fieldKey, comparisonType, regexID))
if isUnique:
for regexID, attributes in matchedRegexIDs.items():
if attributes["numberOfMatches"] == 1:
self._applyModificationDirectives(table[attributes["idKey"]], tableKey + "[" + attributes["idKey"] + "]", modificationDirectives[tableKey][fieldKey][comparisonType][regexID])
self.usedModifications.add((tableKey, fieldKey, comparisonType, regexID))
def _applyLevenshteinModificationDirectives(self, tableKey: str, fieldKey: str, modificationDirectives: dict):
"""Tests and applies levenshtein modification directives.
Args:
tableKey: table key in modificationDirectives to get to the modification to apply.
fieldKey: field key in modificationDirectives to get to the modification to apply.
modificationDirectives: contains the modifications to apply.
"""
comparisonTypes = ["levenshtein-first", "levenshtein-first-nowarn", "levenshtein-unique", "levenshtein-all"]
firstTypes = ["levenshtein-first", "levenshtein-first-nowarn"]
for comparisonType in comparisonTypes:
if comparisonType == "levenshtein-unique":
isUnique=True
else:
isUnique=False
if comparisonType in firstTypes:
isFirst=True
else:
isFirst=False
if comparisonType in modificationDirectives[tableKey][fieldKey]:
levenshteinComparisons = collections.defaultdict(dict)
levenshteinComparisonValues = collections.defaultdict(dict)
for levID, levEntry in modificationDirectives[tableKey][fieldKey][comparisonType].items():
for idKey, record in self.extraction[tableKey].items():
if fieldKey in record:
## If the comparison field is a list field then calculate the distance between all values in the list and use the smallest for comparison.
if type(record[fieldKey]) == list:
fieldValues = record[fieldKey]
levenshteinDistances = [jellyfish.levenshtein_distance(levID, specificValue) for specificValue in fieldValues]
levenshteinComparisons[levID][idKey] = min(levenshteinDistances)
levenshteinComparisonValues[levID][idKey] = fieldValues[min(range(len(levenshteinDistances)), key=levenshteinDistances.__getitem__)]
else:
levenshteinComparisons[levID][idKey] = jellyfish.levenshtein_distance(levID, record[fieldKey])
levenshteinComparisonValues[levID][idKey] = record[fieldKey]
idKeySet = set()
for levID in levenshteinComparisons.keys():
idKeySet |= set(levenshteinComparisons[levID].keys())
uniqueForIdKey = collections.defaultdict(None)
for idKey in idKeySet:
usableLevIDs = [levID for levID in levenshteinComparisons.keys() if idKey in levenshteinComparisons[levID]]
minLevID = min(levenshteinComparisons.keys(), key=(lambda k: levenshteinComparisons[k][idKey]))
minLevValue = levenshteinComparisons[minLevID][idKey]
## Assign only if there is 1 match to the minimum levenshtein distance.
if sum(levenshteinComparisons[levKey][idKey] == minLevValue for levKey in usableLevIDs) == 1:
uniqueForIdKey[idKey] = minLevID
uniqueForLevenshteinID = collections.defaultdict(None)
for levID in levenshteinComparisons.keys():
minIDKey = min(levenshteinComparisons[levID].keys(), key=(lambda k: levenshteinComparisons[levID][k]))
minIDValue = levenshteinComparisons[levID][minIDKey]
if isFirst:
idKeysThatHaveMinValue = [idKey for idKey in levenshteinComparisons[levID] if levenshteinComparisons[levID][idKey] == minIDValue]
uniqueForLevenshteinID[levID] = idKeysThatHaveMinValue[0]
if comparisonType == "levenshtein-first" and len(idKeysThatHaveMinValue) > 1 and not silent:
print("Warning: modification directive #" + tableKey + "." + fieldKey + "." + comparisonType + "." + levID + " matches more than one record. Only the first record will be changed. Try #match=all if all matching records should be changed, or #match=first-nowarn to silence this message.", file=sys.stderr)
elif isUnique:
if sum(levenshteinComparisons[levID][idKey] == minIDValue for idKey in levenshteinComparisons[levID]) == 1:
uniqueForLevenshteinID[levID] = minIDKey
else:
idKeysThatHaveMinValue = [idKey for idKey in levenshteinComparisons[levID] if levenshteinComparisons[levID][idKey] == minIDValue]
uniqueForLevenshteinID[levID] = idKeysThatHaveMinValue
if isFirst or isUnique:
for levID, levEntry in modificationDirectives[tableKey][fieldKey][comparisonType].items():
if levID in uniqueForLevenshteinID and uniqueForIdKey[uniqueForLevenshteinID[levID]] == levID:
self._applyModificationDirectives(self.extraction[tableKey][uniqueForLevenshteinID[levID]], tableKey + "[" + uniqueForLevenshteinID[levID] + "]", levEntry)
self.usedModifications.add((tableKey, fieldKey, comparisonType, levID))
else:
for levID, levEntry in modificationDirectives[tableKey][fieldKey][comparisonType].items():
if levID in uniqueForLevenshteinID:
for idKey in uniqueForLevenshteinID[levID]:
self._applyModificationDirectives(self.extraction[tableKey][idKey], tableKey + "[" + idKey + "]", levEntry)
self.usedModifications.add((tableKey, fieldKey, comparisonType, levID))
[docs] def modify(self, modificationDirectives: dict):
"""Applies modificationDirectives to the extracted metadata.
Args:
modificationDirectives: contains the modifications to apply.
"""
self.modificationDirectives = modificationDirectives
if modificationDirectives != None:
modificationDirectives = copy.deepcopy(modificationDirectives) # Must make deepcopy since regex objects being embedded.
if getattr(self,"unusedModifications", None) is None:
self.unusedModifications = set()
if getattr(self,"usedModifications", None) is None:
self.usedModifications = set()
# Compile regex objects for comparison.
regexObjects = {}
for tableDict in modificationDirectives.values():
for fieldDict in tableDict.values():
for regex_type in ["regex-first", "regex-first-nowarn", "regex-unique", "regex-all"]:
if regex_type in fieldDict:
regexObjects.update({ regexString : re.compile(re.match(TagParser.reDetector, regexString)[1]) for regexString in fieldDict[regex_type].keys()})
# Compile regex objects for regex substitution directives.
for tableDict in modificationDirectives.values():
for fieldDict in tableDict.values():
for comparisonTypeDict in fieldDict.values():
for fieldValueDict in comparisonTypeDict.values():
if "regex" in fieldValueDict:
fieldValueDict["regex"] = { newField : [ re.match(TagParser.reDetector, regexPair[0])[1], re.match(TagParser.reDetector, regexPair[1])[1] ]
for newField,regexPair in fieldValueDict["regex"].items() }
# Create Evaluator objects for assign directives with "eval(...)" values.
for tableDict in modificationDirectives.values():
for fieldDict in tableDict.values():
for comparisonTypeDict in fieldDict.values():
for fieldValueDict in comparisonTypeDict.values():
if "assign" in fieldValueDict:
for newField in fieldValueDict["assign"].keys():
if isinstance(fieldValueDict["assign"][newField], str) and (reMatch := Evaluator.isEvalString(fieldValueDict["assign"][newField])):
fieldValueDict["assign"][newField] = Evaluator(reMatch.group(1))
self.changedRecords = {}
for tableKey in modificationDirectives.keys():
if tableKey in self.extraction:
for fieldKey in modificationDirectives[tableKey].keys():
## These functions ultimately modify records in self.extraction.
self._applyExactModificationDirectives(tableKey, fieldKey, modificationDirectives)
self._applyRegexModificationDirectives(tableKey, fieldKey, modificationDirectives, regexObjects)
self._applyLevenshteinModificationDirectives(tableKey, fieldKey, modificationDirectives)
# Identify changed IDs.
# idTranslation = collections.defaultdict(dict)
# for tableKey, table in self.extraction.items():
# idTranslation[tableKey + ".id"].update({ idKey : record["id"] for idKey, record in table.items() if idKey != record["id"] })
# Translate changed IDs.
translated = {}
for tableKey,table in self.extraction.items() :
translated[tableKey] = { record["id"] : record for record in table.values() }
## This bit of code appears to be unnecessary and useless.
## idTranslation's keys aren't fieldkeys so this will never match anything.
# for record in table.values():
# for fieldKey, fieldValue in record.items() :
# if fieldKey in idTranslation:
# if type(fieldValue) == list:
# for index in range(len(fieldValue)):
# if fieldValue[index] in idTranslation[fieldKey]:
# fieldValue[index] = idTranslation[fieldKey][fieldValue[index]]
# elif fieldValue in idTranslation[fieldKey]:
# record[fieldKey] = idTranslation[fieldKey][fieldValue]
for tableKey in self.extraction:
if len(self.extraction[tableKey]) > len(translated[tableKey]) and not silent:
print("Warning: A modification directive has set at least 2 records in the \"" + tableKey + "\" table to the same id. The output will have less records than expected.", file=sys.stderr)
self.extraction = translated
# Identify used and unused modification directives.
for tableKey in modificationDirectives.keys():
for fieldKey in modificationDirectives[tableKey].keys():
for comparisonType in modificationDirectives[tableKey][fieldKey]:
for modificationID in modificationDirectives[tableKey][fieldKey][comparisonType]:
if (tableKey, fieldKey, comparisonType, modificationID) not in self.usedModifications:
self.unusedModifications.add((tableKey, fieldKey, comparisonType, modificationID))
elif (tableKey, fieldKey, comparisonType, modificationID) in self.unusedModifications:
self.unusedModifications.remove((tableKey, fieldKey, comparisonType, modificationID))
[docs] def merge(self, newMetadata: dict):
"""Merges new metadata with current metadata.
Args:
newMetadata: dict to merge with self.extraction dict.
"""
for tableKey, table in newMetadata.items():
if tableKey not in self.extraction:
self.extraction[tableKey] = table
else:
for idKey, record in table.items():
if idKey not in self.extraction[tableKey]:
self.extraction[tableKey][idKey] = record
else:
self.extraction[tableKey][idKey].update(record)
[docs] @staticmethod
def isComparable(value1: str, value2: str) -> bool:
"""Compares the two values first as strings and then as floats if convertable.
Args:
value1: first value to compare.
value2: second value to compare.
"""
if value1 == value2:
return True
try:
value1 = float(value1)
value2 = float(value2)
except ValueError:
return False
return value1 == value2 or abs(value1 - value2) / max(abs(value1),abs(value2)) < 0.00000001
[docs] def compare(self, otherMetadata: dict, groupSize: int =5, file: TextIO|None =sys.stdout) -> bool:
"""Compare current metadata to other metadata.
Args:
otherMetadata: dict to compare with self.extraction.
groupSize: number of record ids to print on a single line before printing more on a new line.
file: the IO to print messages to, if None then just return True or False instead of printing messages.
Returns:
True if otherMetadata and self.extraction are different, False otherwise.
"""
different = False
missingTables = [ tableKey for tableKey in otherMetadata.keys() if tableKey not in self.extraction ]
if missingTables:
different = True
if file is not None:
print("Missing Tables:"," ".join(missingTables), file=file)
else:
return True
extraTables = [ tableKey for tableKey in self.extraction.keys() if tableKey not in otherMetadata ]
if extraTables:
different = True
if file is not None:
print("Extra Tables:"," ".join(extraTables), file=file)
else:
return True
for tableKey, table in otherMetadata.items():
if tableKey in self.extraction:
missingIDs = [ idKey for idKey in table.keys() if idKey not in self.extraction[tableKey] ]
if missingIDs:
different = True
if file is not None:
print("Table", tableKey, "with missing records:", file=file)
while missingIDs:
group = missingIDs[0:groupSize]
missingIDs = missingIDs[groupSize:]
print(" ", " ".join(group), file=file)
else:
return True
extraIDs = [ idKey for idKey in self.extraction[tableKey].keys() if idKey not in otherMetadata[tableKey] ]
if extraIDs:
different = True
if file is not None:
print("Table", tableKey, "with extra records:", file=file)
while extraIDs:
group = extraIDs[0:groupSize]
extraIDs = extraIDs[groupSize:]
print(" ", " ".join(group), file=file)
else:
return True
for idKey, record in table.items():
if idKey in self.extraction[tableKey]:
differentFields = [ field for field, value in record.items() if field not in self.extraction[tableKey][idKey] or not TagParser.isComparable(value, self.extraction[tableKey][idKey][field]) ]
differentFields.extend([ field for field in self.extraction[tableKey][idKey] if field not in record ])
if differentFields:
different = True
if file is not None:
print("Table", tableKey, "id", idKey, "with different fields:", ", ".join(differentFields), file=file)
else:
return True
return different
[docs] def findParent(self, parentID: str) -> tuple[str,dict]|None:
"""Returns parent record for given parentID.
Args:
parentID: the id to look for in the records of self.extraction.
Returns:
None if the parentID was not found, (tableKey,parentRecord) if it was.
"""
for tableKey, table in self.extraction.items():
if parentID in table:
return (tableKey,table[parentID])
return None
@staticmethod
def _generateLineage(parentID: str, parent2children: dict) -> dict|None:
"""Generates and returns a lineage structure based on the given parentID.
Args:
parentID: key to look for in parent2children.
parent2children: dictionary of parentID to list of children.
Returns:
None if parentID is not in parent2children, a dictionary of children for the parentID if it is.
"""
if parentID in parent2children:
return { child : TagParser._generateLineage(child,parent2children) for child in parent2children[parentID] }
else:
return None
[docs] def generateLineages(self) -> dict:
"""Generates and returns parent-child record lineages.
Returns:
lineages by tableKey.
"""
entities_with_parentIDs = []
for tableKey, table in self.extraction.items():
entities_with_parentIDs.extend((entity, tableKey, self.findParent(entity["parent_id"])) for entity in table.values() if "parent_id" in entity)
parent2children = collections.defaultdict(list)
terminalParentsByTable = collections.defaultdict(list)
for entity_tuple in entities_with_parentIDs:
parent2children[entity_tuple[0]["parent_id"]].append(entity_tuple[0]["id"])
if entity_tuple[2] == None:
terminalParentsByTable[entity_tuple[1]].append(entity_tuple[0]["parent_id"])
elif "parent_id" not in entity_tuple[2][1]:
terminalParentsByTable[entity_tuple[2][0]].append(entity_tuple[0]["parent_id"])
lineages = collections.defaultdict(list)
for tableKey in terminalParentsByTable:
lineages[tableKey] = { parentID : TagParser._generateLineage(parentID,parent2children) for parentID in terminalParentsByTable[tableKey] }
return lineages
[docs] @staticmethod
def printLineages(lineages: collections.defaultdict, indentation: int, groupSize: int =5, file : TextIO =sys.stdout):
"""Prints the given lineages.
Args:
lineages: dictionary where the keys are table names and values are a dictionary of parentID and children.
indentation: number of spaces of indentation to print.
groupSize: number of childIDs to print per line.
"""
for id in sorted(lineages.keys()):
if lineages[id]:
print(" "*indentation,id,":", file=file)
terminal_children = sorted(childID for childID, children in lineages[id].items() if children == None)
while terminal_children:
children_group = terminal_children[0:groupSize]
terminal_children = terminal_children[groupSize:]
print(" "*(indentation+2), ", ".join(children_group),file=file)
non_terminal_children = {childID : children for childID, children in lineages[id].items() if children }
TagParser.printLineages(non_terminal_children,indentation+2)
## I don't think this can be executed from the CLI, I can get it to print if the table in lineages is an empty dict and that's it.
else:
print(" "*indentation,id,file=file)