Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/expect #178

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions carla/evaluation/catalog/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# flake8: noqa

from .distance import Distance
from .invalidation_rate import InvalidationRate
from .redundancy import Redundancy
from .success_rate import SuccessRate
from .time import AvgTime
Expand Down
37 changes: 37 additions & 0 deletions carla/evaluation/catalog/invalidation_rate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import numpy as np
import pandas as pd
import torch
from torch.distributions import MultivariateNormal

from carla.evaluation.api import Evaluation


class InvalidationRate(Evaluation):
def __init__(self, mlmodel, hyperparameters):
super().__init__(mlmodel, hyperparameters)
self.var = hyperparameters["var"]
self.n_samples = hyperparameters["n_samples"]
self.cf_label = hyperparameters["cf_label"]
self.columns = ["invalidation_rate"]

def invalidation_rate(self, x):
x = x.float()
Sigma = torch.eye(len(x)) * self.var

# Monte Carlo Invalidation Loss
random_samples = MultivariateNormal(loc=x, covariance_matrix=Sigma).rsample(
(self.n_samples,)
)
random_samples = random_samples.cpu().detach().numpy()
y_hat = self.mlmodel.predict_proba(random_samples)[:, 1]
invalidation_rate = self.cf_label - np.mean(y_hat)

return invalidation_rate

def get_evaluation(
self, factuals: pd.DataFrame, counterfactuals: pd.DataFrame
) -> pd.DataFrame:
ir_rates = counterfactuals.apply(
lambda x: self.invalidation_rate(torch.from_numpy(x)), raw=True, axis=1
)
return pd.DataFrame(ir_rates, columns=self.columns)
139 changes: 139 additions & 0 deletions carla/models/catalog/trees.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
from typing import List

import numpy as np
import tensorflow as tf
import xgboost.core
from sklearn.ensemble import AdaBoostClassifier, RandomForestClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.tree import DecisionTreeClassifier
from sklearn.tree._tree import TREE_UNDEFINED

from carla.models.catalog.parse_xgboost import parse_booster

Expand Down Expand Up @@ -150,3 +155,137 @@ def tree_parser(tree):
softmax = expits / tf.reduce_sum(expits, axis=1)[:, None]

return softmax


def get_distilled_model(model_rf, include_training_data=True):
"""Distill random forest model to single decision tree.

Parameters
----------
model_rf:
Random forest model.
include_training_data:
Include training data when training the single decision tree.

Returns
-------
Distilled DecisionTreeClassifier

"""

def random_uniform_grid(n_samples, min_vals, max_vals):
# generate independent uniform samples for each dimension
grid = [
np.random.uniform(low, high, n_samples)
for low, high in zip(min_vals, max_vals)
]
# combine independent 1-d samples into multi-d samples
grid = np.vstack(grid).T
return grid

data_df = model_rf.data.df
x = data_df[model_rf.feature_input_order].to_numpy()
y = data_df[model_rf.data.target].to_numpy()

# TODO adapt for different normalization
min_vals = np.min(x, axis=0) # min values per feature
max_vals = np.max(x, axis=0) # max values per feature
grid = random_uniform_grid(len(x), min_vals, max_vals)

# Get labels
if include_training_data:
# augmented data + training data
x = np.concatenate((x, grid))
y = model_rf.predict(x)
else:
# only augmented data
x = grid
y = model_rf.predict(x)

parameters = {
"max_depth": [None],
"min_samples_leaf": [5, 10, 15],
"min_samples_split": [3, 5, 10, 15],
}
model = GridSearchCV(DecisionTreeClassifier(), parameters, n_jobs=4)
model.fit(X=x, y=y)
print(model.best_score_, model.best_params_)
model_distilled = model.best_estimator_

return model_distilled


def get_rules(tree, feature_names: List, class_names: List, verbose=False) -> List:
"""
solution from:
https://stackoverflow.com/questions/20224526/how-to-extract-the-decision-rules-from-scikit-learn-decision-tree

Parameters
----------
tree:
Tree to get rules for.
feature_names:
Names of the features.
class_names:
Names of the classes, e.g. [1, 0].
verbose:
Print flag.

Returns
-------

"""

def recurse(node, path: List, paths: List):
if tree_.feature[node] != TREE_UNDEFINED:
name = feature_name[node]
threshold = tree_.threshold[node]
p1, p2 = list(path), list(path)

p1 += [f"|{name} <= {np.round(threshold, 3)}|"]
recurse(tree_.children_left[node], p1, paths)

p2 += [f"|{name} > {np.round(threshold, 3)}|"]
recurse(tree_.children_right[node], p2, paths)
else:
path += [(tree_.value[node], tree_.n_node_samples[node])]
paths += [path]

tree_ = tree.tree_
feature_name = [
feature_names[i] if i != TREE_UNDEFINED else "undefined!" for i in tree_.feature
]

paths: List = []
path: List = []
recurse(0, path, paths)

# sort by samples count
samples_count = [p[-1][1] for p in paths]
ii = list(np.argsort(samples_count))
paths = [paths[i] for i in reversed(ii)]

rules = []
for path in paths:
rule = "if "
for p in path[:-1]:
if rule != "if ":
rule += " and "
rule += str(p)
rule += " then "

if class_names is None:
rule += "response: " + str(np.round(path[-1][0][0][0], 3))
else:
classes = path[-1][0][0]
i = np.argmax(classes)
rule += f"class: {class_names[i]}"
if verbose:
rule += f" | (proba: {np.round(100.0 * classes[i] / np.sum(classes), 2)}% of class {class_names[i]})"

if verbose:
rule += f" | based on {path[-1][1]:,} samples"

rules += [rule]

return rules
1 change: 1 addition & 0 deletions carla/recourse_methods/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
CCHVAE,
CEM,
CRUD,
EXPECT,
FOCUS,
ActionableRecourse,
CausalRecourse,
Expand Down
1 change: 1 addition & 0 deletions carla/recourse_methods/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .clue import Clue
from .crud import CRUD
from .dice import Dice
from .expect import EXPECT, EXPECTTree
from .face import Face
from .feature_tweak import FeatureTweak
from .focus import FOCUS
Expand Down
4 changes: 4 additions & 0 deletions carla/recourse_methods/catalog/expect/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# flake8: noqa

from .model import EXPECT
from .model_tree import EXPECTTree
177 changes: 177 additions & 0 deletions carla/recourse_methods/catalog/expect/hypercube.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import re

import numpy as np


def get_classes_from_rules(rules: list) -> list:
"""
Extract class for every rule in rules.

Parameters
----------
rules:
List of classification rules.

Returns
-------

"""
classes = []
for rule in rules:
# last element in string is class information
c = rule.split(": ")[-1]
c = int(c)
classes.append(c)
return classes


def get_plain_rules(rules: list) -> list:
"""
An example rule looks like:
'if |x1 <= 0.4| and |x2 <= 0.7| and |x3 > 0.2| then class: 1'
The corresponding plain rule would look like:
['x1 <= 0.4', 'x2 <= 0.7', 'x3 > 0.2']

Parameters
----------
rules:
List of classification rules.

Returns
-------

"""
re_plain_rule = re.compile(r"\|(.*?)\|")
plain_rules = [re_plain_rule.findall(rule) for rule in rules]
return plain_rules


def get_threshold_grouped_rules(rule_subset: list) -> list:
"""Group rules based on their threshold direction.

An example input would look like:
['x1 <= -0.064', 'x1 > -0.652', 'x1 <= -0.312']

The corresponding output would be like:
[['x1 <= -0.064', 'x1 <= -0.312'], ['x1 > -0.652']]

Parameters
----------
rule_subset:
List of classification rules.

"""
list_leq = [r for r in rule_subset if "<=" in r]
list_g = [r for r in rule_subset if ">" in r]
threshold_grouped_rules = [list_leq, list_g]
threshold_grouped_rules = list(
filter(None, threshold_grouped_rules)
) # get rid of empty lists
return threshold_grouped_rules


def get_feature_grouped_rules(feature_names: list, plain_rules: list) -> list:
"""Group rules based on their feature.

An example input could be:
[['x1 > -0.064', 'x1 > 0.263', 'x2 > -1.453'],
['x1 <= -0.064', 'x1 <= -0.652', 'x1 <= -1.028']]

The corresponding output would be.
[['x1 > -0.064', 'x1 > 0.263'], ['x2 > -1.453']],
[['x1 <= -0.064', 'x1 <= -0.652', 'x1 <= -1.028']]

Parameters
----------
feature_names:
Names of the features, e.g. 'x1' and 'x2'.
plain_rules:
List of classification rules.

Returns
-------

"""

def get_feature_group(rule):
# group rule into multiple rules, one for each feature.
feature_grouped_rules = []
for feature in feature_names:
feature_group = [r for r in rule if feature in r]
feature_grouped_rules.append(feature_group)
# feature_grouped_rules = list(
# filter(None, feature_grouped_rules)
# ) # get rid of empty lists
return feature_grouped_rules

grouped_rules = []
for rule in plain_rules:
feature_grouped_rules = get_feature_group(rule)
grouped_rules.append(feature_grouped_rules)
return grouped_rules


def form_interval(rule_subset: list, lower_bound, upper_bound):
"""
input example: threshold_grouped_rules:
[['x1 <= -0.064', 'x1 <= -0.312'], ['x1 > -0.652']]
"""

def get_interval(comparison):
# only the last split matters
comparison_split = re.split(" > | <= ", comparison[-1])
threshold = float(comparison_split[-1])

# determine endpoints
if ">" in comparison[-1]:
upper = upper_bound
lower = threshold
else:
upper = threshold
lower = lower_bound

return [lower, upper]

n_elements = len(rule_subset)
if (
n_elements == 1
): # covers the case of (a) lower bound =< x or (b) upper bound >= x
return get_interval(rule_subset[0])
elif n_elements == 2: # covers the case of lower bound <= x <= upper bound
# obtain two intervals since we have two kind of rules: <= and >
interval_1 = get_interval(rule_subset[0])
interval_2 = get_interval(rule_subset[1])
intervals = [interval_1, interval_2]

# combine the two intervals
left_interval = [inter for inter in intervals if lower_bound in inter]
right_interval = [inter for inter in intervals if upper_bound in inter]

# obtain final interval
left = np.max(left_interval)
right = np.min(right_interval)

return sorted([left, right], reverse=False)
else:
raise ValueError(
f"n_elements is {n_elements}: Something went wrong in the construction of the threshold_grouped_rules"
)


def get_hypercubes(feature_names, rules: list, lower_bound, upper_bound):
plain_rules = get_plain_rules(rules)
grouped_rules_complete = get_feature_grouped_rules(feature_names, plain_rules)

all_intervals = []
for rule_set in grouped_rules_complete:
feature_intervals = []
for feature, rule_subset in zip(feature_names, rule_set):
if rule_subset:
grouped_rule_subset = get_threshold_grouped_rules(rule_subset)
interval = form_interval(grouped_rule_subset, lower_bound, upper_bound)
else: # if rule_subset == []
interval = [lower_bound, upper_bound]
zipped_interval = list(zip([feature], [interval]))
feature_intervals.append(zipped_interval)
all_intervals.append(feature_intervals)
return all_intervals
Loading