Spaces:
Runtime error
Runtime error
"""Routines that implement processing data & getting models. | |
This file includes various routines for processing & acquiring models, for | |
later use in the code. The table data preprocessing is straightforward. We | |
first applying scaling to the data and fit a random forest classifier. | |
The processing of the image data is a bit more complex. To simplify the construction | |
of the explanations, the explanations don't accept images. Instead, for image explanations, | |
it is necessary to define a function that accept a array of 0's and 1's corresponding to | |
segments for a particular image being either excluded or included respectively. The explanation | |
is performed on this array. | |
""" | |
import numpy as np | |
from copy import deepcopy | |
from sklearn.ensemble import RandomForestClassifier | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.model_selection import train_test_split | |
import torch | |
from torchvision import models, transforms | |
from data.mnist.mnist_model import Net | |
def get_xtrain(segs): | |
"""A function to get the mock training data to use in the image explanations. | |
This function returns a dataset containing a single instance of ones and | |
another of zeros to represent the training data for the explanation. The idea | |
is that the explanation will use this data to compute the perturbations, which | |
will then be fed into the wrapped model. | |
Arguments: | |
segs: The current segments array | |
""" | |
n_segs = len(np.unique(segs)) | |
xtrain = np.concatenate((np.ones((1, n_segs)), np.zeros((1, n_segs))), axis=0) | |
return xtrain | |
def process_imagenet_get_model(data): | |
"""Gets wrapped imagenet model.""" | |
# Get the vgg16 model, used in the experiments | |
model = models.efficientnet_b0(pretrained=True) | |
model.eval() | |
# model.cuda() | |
xtest = data['X'] | |
ytest = data['y'].astype(int) | |
xtest_segs = data['segments'] | |
softmax = torch.nn.Softmax(dim=1) | |
# Transforms | |
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], | |
std=[0.229, 0.224, 0.225]) | |
transf = transforms.Compose([ | |
transforms.ToTensor(), | |
normalize | |
]) | |
t_xtest = transf(xtest[0])[None, :]#.cuda() | |
# Define the wrapped model | |
def get_wrapped_model(instance, segments, background=0, batch_size=64): | |
def wrapped_model(data): | |
perturbed_images = [] | |
for d in data: | |
perturbed_image = deepcopy(instance) | |
for i, is_on in enumerate(d): | |
if is_on == 0: | |
perturbed_image[segments==i, 0] = background | |
perturbed_image[segments==i, 1] = background | |
perturbed_image[segments==i, 2] = background | |
perturbed_images.append(transf(perturbed_image)[None, :]) | |
perturbed_images = torch.from_numpy(np.concatenate(perturbed_images, axis=0)).float() | |
predictions = [] | |
for q in range(0, perturbed_images.shape[0], batch_size): | |
predictions.append(softmax(model(perturbed_images[q:q+batch_size])).cpu().detach().numpy()) | |
predictions = np.concatenate(predictions, axis=0) | |
return predictions | |
return wrapped_model | |
output = { | |
"model": get_wrapped_model, | |
"xtest": xtest, | |
"ytest": ytest, | |
"xtest_segs": xtest_segs, | |
"label": data['y'][0] | |
} | |
return output | |
def process_mnist_get_model(data): | |
"""Gets wrapped mnist model.""" | |
xtest = data['X'] | |
ytest = data['y'].astype(int) | |
xtest_segs = data['segments'] | |
model = Net() | |
model.load_state_dict(torch.load("../data/mnist/mnist_cnn.pt")) | |
model.eval() | |
model.cuda() | |
softmax = torch.nn.Softmax(dim=1) | |
def get_wrapped_model(instance, segments, background=-0.4242, batch_size=100): | |
def wrapped_model(data): | |
perturbed_images = [] | |
data = torch.from_numpy(data).float().cuda() | |
for d in data: | |
perturbed_image = deepcopy(instance) | |
for i, is_on in enumerate(d): | |
if is_on == 0: | |
a = segments==i | |
perturbed_image[0, segments[0]==i] = background | |
perturbed_images.append(perturbed_image[:, None]) | |
perturbed_images = torch.from_numpy(np.concatenate(perturbed_images, axis=0)).float().cuda() | |
# Batch predictions if necessary | |
if perturbed_images.shape[0] > batch_size: | |
predictions = [] | |
for q in range(0, perturbed_images.shape[0], batch_size): | |
predictions.append(softmax(model(perturbed_images[q:q+batch_size])).cpu().detach().numpy()) | |
predictions = np.concatenate(predictions, axis=0) | |
else: | |
predictions = softmax(model(perturbed_images)).cpu().detach().numpy() | |
return np.array(predictions) | |
return wrapped_model | |
output = { | |
"model": get_wrapped_model, | |
"xtest": xtest, | |
"ytest": ytest, | |
"xtest_segs": xtest_segs, | |
"label": data['y'][0], | |
} | |
return output | |
def process_tabular_data_get_model(data): | |
"""Processes tabular data + trains random forest classifier.""" | |
X = data['X'] | |
y = data['y'] | |
xtrain,xtest,ytrain,ytest = train_test_split(X,y,test_size=0.2) | |
ss = StandardScaler().fit(xtrain) | |
xtrain = ss.transform(xtrain) | |
xtest = ss.transform(xtest) | |
rf = RandomForestClassifier(n_estimators=100).fit(xtrain,ytrain) | |
output = { | |
"model": rf, | |
"xtrain": xtrain, | |
"xtest": xtest, | |
"ytrain": ytrain, | |
"ytest": ytest, | |
"label": 1, | |
"model_score": rf.score(xtest, ytest) | |
} | |
print(f"Model Score: {output['model_score']}") | |
return output |