|
import numpy as np |
|
import torch |
|
import torchvision.models as models |
|
from numpy import matlib as mb |
|
from PIL import Image |
|
from torch.utils.data import Dataset |
|
from torchvision.datasets import ImageFolder |
|
import torchvision.transforms as transforms |
|
from FeatureExtractors import resnet50_features |
|
|
|
to_np = lambda x: x.data.to("cpu").numpy() |
|
|
|
|
|
def compute_spatial_similarity(conv1, conv2): |
|
""" |
|
Takes in the last convolutional layer from two images, computes the pooled output |
|
feature, and then generates the spatial similarity map for both images. |
|
""" |
|
conv1 = conv1.reshape(-1, 7 * 7).T |
|
conv2 = conv2.reshape(-1, 7 * 7).T |
|
|
|
pool1 = np.mean(conv1, axis=0) |
|
pool2 = np.mean(conv2, axis=0) |
|
out_sz = (int(np.sqrt(conv1.shape[0])), int(np.sqrt(conv1.shape[0]))) |
|
conv1_normed = conv1 / np.linalg.norm(pool1) / conv1.shape[0] |
|
conv2_normed = conv2 / np.linalg.norm(pool2) / conv2.shape[0] |
|
im_similarity = np.zeros((conv1_normed.shape[0], conv1_normed.shape[0])) |
|
|
|
for zz in range(conv1_normed.shape[0]): |
|
repPx = mb.repmat(conv1_normed[zz, :], conv1_normed.shape[0], 1) |
|
im_similarity[zz, :] = np.multiply(repPx, conv2_normed).sum(axis=1) |
|
similarity1 = np.reshape(np.sum(im_similarity, axis=1), out_sz) |
|
similarity2 = np.reshape(np.sum(im_similarity, axis=0), out_sz) |
|
return similarity1, similarity2 |
|
|
|
|
|
def normalize_array(x): |
|
x = np.asarray(x).copy() |
|
x -= np.min(x) |
|
x /= np.max(x) |
|
return x |
|
|
|
|
|
def apply_threshold(x, t): |
|
x = np.asarray(x).copy() |
|
x[x < t] = 0 |
|
return x |
|
|
|
|
|
def generate_mask(x, t): |
|
v = np.zeros_like(x) |
|
v[x >= t] = 1 |
|
return v |
|
|
|
|
|
def get_transforms(args_transform, chm_args): |
|
|
|
cosine_transform_target = transforms.Compose( |
|
[ |
|
transforms.Resize(256), |
|
transforms.CenterCrop(224), |
|
transforms.ToTensor(), |
|
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), |
|
] |
|
) |
|
|
|
chm_transform_target = transforms.Compose( |
|
[ |
|
transforms.Resize(chm_args["img_size"]), |
|
transforms.CenterCrop((chm_args["img_size"], chm_args["img_size"])), |
|
transforms.ToTensor(), |
|
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), |
|
] |
|
) |
|
|
|
if args_transform == "multi": |
|
cosine_transform_source = transforms.Compose( |
|
[ |
|
transforms.Resize((224, 224)), |
|
transforms.ToTensor(), |
|
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), |
|
] |
|
) |
|
|
|
chm_transform_source = transforms.Compose( |
|
[ |
|
transforms.Resize((chm_args["img_size"], chm_args["img_size"])), |
|
transforms.ToTensor(), |
|
transforms.Normalize( |
|
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225] |
|
), |
|
] |
|
) |
|
|
|
elif args_transform == "single": |
|
cosine_transform_source = transforms.Compose( |
|
[ |
|
transforms.Resize(chm_args["img_size"]), |
|
transforms.CenterCrop((chm_args["img_size"], chm_args["img_size"])), |
|
transforms.Resize((224, 224)), |
|
transforms.ToTensor(), |
|
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]), |
|
] |
|
) |
|
|
|
chm_transform_source = transforms.Compose( |
|
[ |
|
transforms.Resize(chm_args["img_size"]), |
|
transforms.CenterCrop((chm_args["img_size"], chm_args["img_size"])), |
|
transforms.ToTensor(), |
|
transforms.Normalize( |
|
mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225] |
|
), |
|
] |
|
) |
|
|
|
return ( |
|
chm_transform_source, |
|
chm_transform_target, |
|
cosine_transform_source, |
|
cosine_transform_target, |
|
) |
|
|
|
|
|
def clamp(x, min_value, max_value): |
|
return max(min_value, min(x, max_value)) |
|
|
|
|
|
def keep_top5(input_array, K=5): |
|
top_5 = np.sort(input_array.reshape(-1))[::-1][K - 1] |
|
masked = np.zeros_like(input_array) |
|
masked[input_array >= top_5] = 1 |
|
return masked |
|
|
|
|
|
def arg_topK(input_array, topK=5): |
|
return np.argsort(input_array.T.reshape(-1))[::-1][:topK] |
|
|
|
|
|
class KNNSupportSet: |
|
def __init__(self, train_folder, val_folder, knn_scores, custom_val_labels=None): |
|
self.train_data = ImageFolder(root=train_folder) |
|
self.val_data = ImageFolder(root=val_folder) |
|
self.knn_scores = knn_scores |
|
|
|
if custom_val_labels is None: |
|
self.val_labels = np.asarray([x[1] for x in self.val_data.imgs]) |
|
else: |
|
self.val_labels = custom_val_labels |
|
|
|
self.train_labels = np.asarray([x[1] for x in self.train_data.imgs]) |
|
|
|
def get_knn_predictions(self, k=20): |
|
knn_predictions = [ |
|
np.argmax(np.bincount(self.train_labels[self.knn_scores[I][::-1][:k]])) |
|
for I in range(len(self.knn_scores)) |
|
] |
|
knn_accuracy = ( |
|
100 |
|
* np.sum((np.asarray(knn_predictions) == self.val_labels)) |
|
/ len(self.val_labels) |
|
) |
|
return knn_predictions, knn_accuracy |
|
|
|
def get_support_set(self, selected_index, top_N=20): |
|
support_set = self.knn_scores[selected_index][-top_N:][::-1] |
|
return [self.train_data.imgs[x][0] for x in support_set] |
|
|
|
def get_support_set_labels(self, selected_index, top_N=20): |
|
support_set = self.knn_scores[selected_index][-top_N:][::-1] |
|
return [self.train_data.imgs[x][1] for x in support_set] |
|
|
|
def get_image_and_label_by_id(self, q_id): |
|
q = self.val_data.imgs[q_id][0] |
|
ql = self.val_data.imgs[q_id][1] |
|
return (q, ql) |
|
|
|
def get_folder_name(self, q_id): |
|
q = self.val_data.imgs[q_id][0] |
|
return q.split("/")[-2] |
|
|
|
def get_top5_knn(self, query_id, k=20): |
|
knn_pred, knn_acc = self.get_knn_predictions(k=k) |
|
top_5s_index = np.where( |
|
np.equal( |
|
self.train_labels[self.knn_scores[query_id][::-1]], knn_pred[query_id] |
|
) |
|
)[0][:5] |
|
top_5s = self.knn_scores[query_id][::-1][top_5s_index] |
|
top_5s_files = [self.train_data.imgs[x][0] for x in top_5s] |
|
return top_5s_files |
|
|
|
def get_topK_knn(self, query_id, k=20): |
|
knn_pred, knn_acc = self.get_knn_predictions(k=k) |
|
top_ks_index = np.where( |
|
np.equal( |
|
self.train_labels[self.knn_scores[query_id][::-1]], knn_pred[query_id] |
|
) |
|
)[0][:k] |
|
top_ks = self.knn_scores[query_id][::-1][top_ks_index] |
|
top_ks_files = [self.train_data.imgs[x][0] for x in top_ks] |
|
return top_ks_files |
|
|
|
def get_foldername_for_label(self, label): |
|
for i in range(len(self.train_data)): |
|
if self.train_data.imgs[i][1] == label: |
|
return self.train_data.imgs[i][0].split("/")[-2] |
|
|
|
def get_knn_confidence(self, query_id, k=20): |
|
return np.max( |
|
np.bincount(self.train_labels[self.knn_scores[query_id][::-1][:k]]) |
|
) |
|
|
|
|
|
class CosineCustomDataset(Dataset): |
|
r"""Parent class of PFPascal, PFWillow, and SPair""" |
|
|
|
def __init__(self, query_image, supporting_set, source_transform, target_transform): |
|
r"""XAICustomDataset constructor""" |
|
super(CosineCustomDataset, self).__init__() |
|
|
|
self.supporting_set = supporting_set |
|
self.query_image = [query_image] * len(supporting_set) |
|
|
|
self.source_transform = source_transform |
|
self.target_transform = target_transform |
|
|
|
def __len__(self): |
|
r"""Returns the number of pairs""" |
|
return len(self.supporting_set) |
|
|
|
def __getitem__(self, idx): |
|
r"""Constructs and return a batch""" |
|
|
|
|
|
batch = dict() |
|
batch["src_imname"] = self.query_image[idx] |
|
batch["trg_imname"] = self.supporting_set[idx] |
|
|
|
|
|
src_pil = self.get_image(self.query_image, idx) |
|
trg_pil = self.get_image(self.supporting_set, idx) |
|
|
|
batch["src_imsize"] = src_pil.size |
|
batch["trg_imsize"] = trg_pil.size |
|
|
|
|
|
batch["src_img"] = self.source_transform(src_pil) |
|
batch["trg_img"] = self.target_transform(trg_pil) |
|
|
|
|
|
batch["datalen"] = len(self.query_image) |
|
return batch |
|
|
|
def get_image(self, image_pathes, idx): |
|
r"""Reads PIL image from path""" |
|
path = image_pathes[idx] |
|
return Image.open(path).convert("RGB") |
|
|
|
|
|
class PairedLayer4Extractor(torch.nn.Module): |
|
""" |
|
Extracting layer-4 embedding for source and target images using ResNet-50 features |
|
""" |
|
|
|
def __init__(self): |
|
super(PairedLayer4Extractor, self).__init__() |
|
|
|
self.modelA = models.resnet50(pretrained=True) |
|
self.modelA.eval() |
|
|
|
self.modelB = models.resnet50(pretrained=True) |
|
self.modelB.eval() |
|
|
|
self.a_embeddings = None |
|
self.b_embeddings = None |
|
|
|
def a_hook(module, input, output): |
|
self.a_embeddings = output |
|
|
|
def b_hook(module, input, output): |
|
self.b_embeddings = output |
|
|
|
self.modelA._modules.get("layer4").register_forward_hook(a_hook) |
|
self.modelB._modules.get("layer4").register_forward_hook(b_hook) |
|
|
|
def forward(self, inputs): |
|
inputA, inputB = inputs |
|
self.modelA(inputA) |
|
self.modelB(inputB) |
|
|
|
return self.a_embeddings, self.b_embeddings |
|
|
|
def __repr__(self): |
|
return "PairedLayer4Extractor" |
|
|
|
|
|
class iNaturalistPairedLayer4Extractor(torch.nn.Module): |
|
""" |
|
Extracting layer-4 embedding for source and target images using iNaturalist ResNet-50 features |
|
""" |
|
|
|
def __init__(self): |
|
super(iNaturalistPairedLayer4Extractor, self).__init__() |
|
|
|
self.modelA = resnet50_features(inat=True, pretrained=True) |
|
self.modelA.eval() |
|
|
|
self.modelB = resnet50_features(inat=True, pretrained=True) |
|
self.modelB.eval() |
|
|
|
self.source_embedding = None |
|
self.target_embedding = None |
|
|
|
def forward(self, inputs): |
|
source_image, target_image = inputs |
|
self.source_embedding = self.modelA(source_image) |
|
self.target_embedding = self.modelB(target_image) |
|
|
|
return self.source_embedding, self.target_embedding |
|
|
|
def __repr__(self): |
|
return "iNatPairedLayer4Extractor" |
|
|