-
Notifications
You must be signed in to change notification settings - Fork 0
/
riskmodel.py
206 lines (157 loc) · 7.68 KB
/
riskmodel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import numpy as np
import cplex as cplex
import math
from sklearn.base import BaseEstimator
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.utils.multiclass import unique_labels
from sklearn.metrics import euclidean_distances
from riskslim.helper_functions import load_data_from_csv, check_data, print_model
from riskslim.setup_functions import get_conservative_offset
from riskslim.coefficient_set import CoefficientSet
from riskslim.lattice_cpa import setup_lattice_cpa, finish_lattice_cpa, run_lattice_cpa
class RiskModel(BaseEstimator):
def __init__(self, sample_weights = None, data_headers = None, fold_csv_file = None, params = None, settings = None, show_omitted_variables = False, threshold = 0.5, op_constraints = None):
self.sample_weights = sample_weights
self.data_headers = data_headers
self.fold_csv_file = fold_csv_file
self.settings = settings
self.show_omitted_variables = show_omitted_variables
self.threshold = threshold
self.op_constraints = op_constraints
self.params = params
self.max_coefficient = self.params['max_coefficient']
self.max_L0_value = self.params['max_L0_value']
self.max_offset = self.params['max_offset']
self.c0_value = self.params['c0_value']
self.w_pos = self.params['w_pos']
self.model_info = {}
def fit(self, X, y):
X, y = check_X_y(X, y, accept_sparse=True)
self.is_fitted_ = True
# transforming data
raw_data = np.insert(X, 0, y, axis=1)
N = raw_data.shape[0]
# setup Y vector and Y_name
Y_col_idx = [0]
Y = raw_data[:, Y_col_idx]
Y_name = self.data_headers[Y_col_idx[0]]
Y[Y == 0] = -1
# setup X and X_names
X_col_idx = [j for j in range(raw_data.shape[1]) if j not in Y_col_idx]
X = raw_data[:, X_col_idx]
variable_names = [self.data_headers[j] for j in X_col_idx]
# insert a column of ones to X for the intercept
X = np.insert(arr=X, obj=0, values=np.ones(N), axis=1)
variable_names.insert(0, '(Intercept)')
if self.sample_weights is None or len(self.sample_weights) != N:
self.sample_weights = np.ones(N)
self.data = {
'X': X,
'Y': Y,
'variable_names': variable_names,
'outcome_name': Y_name,
'sample_weights': self.sample_weights,
}
#load folds
if self.fold_csv_file is not None:
if not os.path.isfile(self.fold_csv_file):
raise IOError('could not find fold_csv_file: %s' % self.fold_csv_file)
else:
fold_idx = pd.read_csv(self.fold_csv_file, sep=',', header=None)
fold_idx = fold_idx.values.flatten()
K = max(fold_idx)
all_fold_nums = np.sort(np.unique(fold_idx))
assert len(fold_idx) == N, "dimension mismatch: read %r fold indices (expected N = %r)" % (len(fold_idx), N)
assert np.all(all_fold_nums == np.arange(1, K+1)), "folds should contain indices between 1 to %r" % K
assert fold_num in np.arange(0, K+1), "fold_num should either be 0 or an integer between 1 to %r" % K
if fold_num >= 1:
test_idx = fold_num == fold_idx
train_idx = fold_num != fold_idx
data['X'] = data['X'][train_idx,]
data['Y'] = data['Y'][train_idx]
data['sample_weights'] = data['sample_weights'][train_idx]
assert check_data(self.data)
# create coefficient set and set the value of the offset parameter
coef_set = CoefficientSet(variable_names = self.data['variable_names'], lb = -self.max_coefficient, ub = self.max_coefficient, sign = 0)
conservative_offset = get_conservative_offset(self.data, coef_set, self.max_L0_value)
self.max_offset = min(self.max_offset, conservative_offset)
coef_set['(Intercept)'].ub = self.max_offset
coef_set['(Intercept)'].lb = -self.max_offset
# edit contraints here
constraints = {
'L0_min': 0,
'L0_max': self.max_L0_value,
'coef_set':coef_set,
}
# initialize MIP for lattice CPA
mip_objects = setup_lattice_cpa(self.data, constraints, self.settings)
# add operational constraints
mip = mip_objects['mip']
indices = mip_objects['indices']
get_alpha_name = lambda var_name: 'alpha_' + str(self.data['variable_names'].index(var_name))
get_alpha_ind = lambda var_names: [get_alpha_name(v) for v in var_names]
# applies mutual exclusivity feature contraints
if self.op_constraints is not None:
names = []
expressions = []
for key in self.op_constraints.keys():
names.append("mutually_exclusive_%s" % key)
expressions.append(cplex.SparsePair(ind = get_alpha_ind(self.op_constraints[key]),
val = [1.0] * len(self.op_constraints[key])))
mip.linear_constraints.add(
names = names,
lin_expr = expressions,
senses = ["L"] * len(self.op_constraints.keys()),
rhs = [1.0] * len(self.op_constraints.keys()))
mip_objects['mip'] = mip
# fit using ltca
model_info, mip_info, lcpa_info = finish_lattice_cpa(self.data, constraints, mip_objects, self.settings)
rho = model_info['solution']
self.model_info = model_info
if np.sum(rho[1:]) != 0:
print_model(model_info['solution'], self.data)
print("solver_time = %d" % model_info['solver_time'])
print("optimality_gap = %.3f" % model_info['optimality_gap'])
print(rho)
variable_names = self.data['variable_names']
rho_values = np.copy(rho)
rho_names = list(variable_names)
# removes intercept value or sets it to 0
if '(Intercept)' in rho_names:
intercept_ind = variable_names.index('(Intercept)')
self.intercept_val = int(rho[intercept_ind])
rho_values = np.delete(rho_values, intercept_ind)
rho_names.remove('(Intercept)')
else:
self.intercept_val = 0
self.filter_mask = np.array(rho_values) != 0
# removes zero values
if not self.show_omitted_variables:
selected_ind = np.flatnonzero(rho_values)
self.rho_values = rho_values[selected_ind]
self.rho_names = [rho_names[i] for i in selected_ind]
return self
def predict(self, X):
X = check_array(X, accept_sparse=True)
X = X[:,self.filter_mask]
scores = np.round(np.squeeze(np.asarray(np.dot(X, self.rho_values.T))))
y = np.array(scores)
for i,score in enumerate(scores):
y[i] = round(float(1.0/(1.0 + math.exp(-(self.intercept_val + score)))) - self.threshold + 0.50001)
return y
def predict_proba(self, X):
X = check_array(X, accept_sparse=True)
X = X[:,self.filter_mask]
scores = np.round(np.squeeze(np.asarray(np.dot(X, self.rho_values.T))))
y = np.array(scores)
for i,score in enumerate(scores):
y[i] = float(1.0/(1.0 + math.exp(-(self.intercept_val + score))))
return y
def decision_function(self, X):
X = check_array(X, accept_sparse=True)
X = X[:,self.filter_mask]
scores = np.round(np.squeeze(np.asarray(np.dot(X, self.rho_values.T))))
y = np.array(scores)
for i,score in enumerate(scores):
y[i] = self.intercept_val + score
return y