import pandas as pd
import numpy as np
from functools import reduce
from copy import deepcopy
from collections import OrderedDict
from ...common.utils import logger
__all__ = ['Analyzer', 'ReportColumn', 'ScoreColumn', 'NumericColumn']
CATEGORY = {
"Morphology": [
"SwapPrefix",
"Tense",
"SwapVerb",
"SwapMultiPOS",
"SwapAcronym",
"SwapLonger",
"SpellingError",
"Keyboard",
"Typos",
"Ocr",
"EntTypos"
],
"Syntax": [
"SwapNamedEnt",
"OOV/CrossCategory",
"SwapSpecialEnt",
"SwapTriplePos",
"DoubleDenial",
"SwapWord",
"SwapEnt",
"RevTgt",
"RevNon",
"InsertAdv",
"DeleteSubTree",
"AddSubTree",
"DeleteAdd",
"InsertClause"
],
"Pragmatics": [
"RndShuffle",
"Punctuation",
"AddSum",
"RndRepeat/Delete",
"AddSent",
"AddDiff",
"AppendIrr",
"TwitterType",
"RndInsert",
"ConcatSent",
"AddSentDiverse",
"PerturbAnswer/Question"
],
"ParadigmaticRelation": [
"SwapNum",
"SwapSyn",
"SwapContraction",
"SwapAnt",
"ReverseNeg",
"SwapName",
"MLMSuggestion"
],
"Other": [
"BackTrans",
"Overlap",
"ModifyPos"
]
}
[docs]class ReportColumn:
"""
A single column in the Robustness Report.
"""
def __init__(
self,
title
):
self.title = title
def set_title(self, title):
self.title = title
[docs]class ScoreColumn(ReportColumn):
"""
A column for numeric scores in the Robustness Report, displayed as a bar
chart.
"""
def __init__(
self,
title,
min_val,
max_val,
is_0_to_1=False
):
super(ScoreColumn, self).__init__(title)
self.min_val = min_val
self.max_val = max_val
self.is_0_to_1 = is_0_to_1
def set_min(self, min_val: float):
self.min_val = min_val
def set_max(self, max_val: float):
self.max_val = max_val
[docs]class NumericColumn(ReportColumn):
"""
A column for numeric data in the Robustness Report, displayed as the raw
value.
"""
def __init__(
self,
title
):
super(NumericColumn, self).__init__(title)
[docs]class Analyzer:
r"""
Convert evaluate result json to DataFrame for report generator,
and analysis model robustness according to linguistic classification.
Example::
{
"model_name": "BERT",
"dataset_name": "medical data",
"transformation": {
"Case": {
"ori_precision": 0.70,
"trans_precision": 0.65,
"ori_f1": 0.63,
"trans_f1": 0.60,
"size": 5000,
},
"Ocr": {
"ori_precision": 0.72,
"trans_precision": 0.43,
"ori_f1": 0.62,
"trans_f1": 0.41,
"size": 5000,
}
},
"subpopulation": {
"LengthLengthSubPopulation-0.0-0.1": {
"trans_precision": 0.68,
"trans_f1": 0.63,
"size": 500
}
},
"attack": {
"Bert-Attack": {
"ori_precision": 0.72,
"trans_precision": 0.43,
"ori_f1": 0.62,
"trans_f1": 0.41,
"size": 400,
}
}
}
"""
[docs] @staticmethod
def json_to_bar_chart(evaluate_json):
r"""
Parsing evaluate json and convert to bar chart input format.
:param dict evaluate_json: evaluate result of specific model.
:return: pandas.DataFrame, list[ReportColumn]
"""
bar_json_list = []
for generate_type in ["transformation", "subpopulation", "attack"]:
generate_methods = evaluate_json.get(generate_type, {})
if generate_methods:
for method in generate_methods:
bar_json = OrderedDict({
"generate_type": generate_type,
"generate_method": method if len(method) < 20 \
else method[:17] + "...",
})
metrics = {
(k, v) for k, v in generate_methods[method].items()
if k != "size"
}
bar_json.update(metrics)
bar_json["size"] = generate_methods[method].get("size", 0)
bar_json_list.append(bar_json)
if bar_json_list is []:
return None, None
df = pd.DataFrame.from_dict(bar_json_list, orient='columns')
cols = [ScoreColumn(col, 0, 1, is_0_to_1=True)
for col in df if col not in
["generate_method", "generate_type", "size"]
] + [NumericColumn("Size")]
df.columns = range(len(df.columns))
return df, cols
[docs] @staticmethod
def json_to_sunburst(evaluate_json, metric=None):
r"""
Parsing evaluate json and classify each transformation.
:param dict evaluate_json: evaluate result of specific model.
:param str metric: key metric to plot subburst figure.
:return: pandas.DataFrame, dict
"""
if "transformation" not in evaluate_json:
logger.info(("Cant find transformation in given json, "
"skip sunburst report generation!"))
return None, None
transformations = evaluate_json["transformation"]
metric = Analyzer.get_metric(transformations, metric)
sunburst_list = []
hover_data = None
for transformation in transformations:
trans_json = deepcopy(transformations[transformation])
trans_json['transformation'] = transformation
trans_json['parent'] = Analyzer.get_parent(transformation)
sunburst_list.append(trans_json)
if not hover_data:
hover_data = [
key for key in transformations[transformation].keys()
]
if metric in hover_data:
hover_data.remove(metric)
df = pd.DataFrame.from_dict(sunburst_list, orient='columns')
sunburst_settings = {
'path': ['parent', "transformation"],
'color': metric,
'values': 'size',
'hover_data': hover_data,
'color_continuous_scale': 'RdBu',
'color_continuous_midpoint': np.average(df[metric])
}
return df, sunburst_settings
[docs] @staticmethod
def get_metric(transformations, metric=None):
"""
Get key metric of given transformations.
:param dict transformations: evaluation result of transformation
:param str metric: key metric to plot subburst figure.
:return: str legal metric name
"""
if len(transformations) < 1:
raise ValueError(f"Cant get metric of {transformations} to plot!")
if metric:
assert isinstance(metric, str), f"Cant recognize metric {metric}"
metrics = [metric]
else:
metrics = []
for transformation in transformations:
original_result, transform_result = \
Analyzer.get_metrics(transformations[transformation])
if metrics:
metrics = reduce(
lambda x, y: list(set(x).intersection(list(y))),
[metrics, original_result.keys(), transform_result.keys()]
)
else:
metrics = list(original_result.keys())
if len(metrics) < 1:
raise ValueError(f"Failed to load metric value for "
f"{transformations}, cuz lack of metric scores.")
else:
return 'trans_' + metrics[0]
[docs] @staticmethod
def get_parent(transformation_str):
"""
Find linguistic classification of given transformation,
if not found, return Other label.
:param str transformation_str: transformation name
:return: str linguistic classification name
"""
parent = "Other"
for category_type in CATEGORY:
if transformation_str in CATEGORY[category_type]:
parent = category_type
break
return parent
[docs] @staticmethod
def json_to_linguistic_radar(evaluate_json):
r"""
Parsing evaluation result and calculate linguistic robustness scores.
:param dict evaluate_json: evaluate result of specific model.
:return: pandas.DataFrame
"""
if "transformation" not in evaluate_json:
logger.info(("Cant find transformation in given json, "
"skip linguistic radar report generation!"))
return None
transformations = evaluate_json['transformation']
scores = {category_type: [] for category_type in CATEGORY}
for transformation in transformations:
score = Analyzer.radar_score(transformations[transformation])
is_record = False
# transformation not in current category is considered as other type
for category_type in CATEGORY:
if transformation in CATEGORY[category_type]:
scores[category_type].append(score)
is_record = True
break
if not is_record:
scores['Other'].append(score)
for score in scores:
if scores[score]:
scores[score] = (1 - reduce(lambda x, y: x + y, scores[score])
/ len(scores[score])) * 5
else:
scores[score] = 5
return pd.DataFrame(
dict(
r=list(scores.values()),
theta=list(scores.keys())
)
)
[docs] @staticmethod
def radar_score(trans_json):
"""
Get radar score by calculate average metric decreasing ratio.
:param dict trans_json: evaluation result of specific
transformation.
:return: pandas.DataFrame
"""
assert isinstance(trans_json, dict), \
f"transformation evaluation should be dict type, " \
f"given {type(trans_json)}"
original_result, transform_result = Analyzer.get_metrics(trans_json)
decreasing_ratio = []
for metric in original_result:
ori_score = float(original_result[metric])
trans_score = float(transform_result[metric])
decreasing_ratio.append(
max(0, (ori_score - trans_score) / ori_score)
)
return reduce(lambda x, y: x + y, decreasing_ratio) \
/ len(decreasing_ratio)
[docs] @staticmethod
def get_metrics(trans_json):
"""
Parsing and checking evaluation result of specific transformation.
:param dict trans_json: evaluation result.
:return: dict, dict
"""
original_result = {}
transform_result = {}
for key in trans_json:
if "ori_" in key:
metric = key.split("ori_")[1]
original_result[metric] = trans_json[key]
elif "trans_" in key:
metric = key.split("trans_")[1]
transform_result[metric] = trans_json[key]
assert set(list(original_result.keys())) == \
set(list(transform_result.keys())), \
f"Original metric {original_result.keys()} unmatch with " \
f"transform metric {transform_result.keys()}"
return original_result, transform_result