Spaces:
Sleeping
Sleeping
import torch | |
from torch import nn | |
import torch.nn.functional as F | |
import numpy as np | |
from os.path import join | |
from models.networks.utils import NormGPS | |
class L1(nn.Module): | |
def __init__(self): | |
super(L1, self).__init__() | |
def forward(self, x, y): | |
""" | |
Args: | |
x: dict that contains "gps": torch.Tensor Bx2 | |
y: dict that contains "gps": torch.Tensor Bx2 | |
Returns: | |
torch.Tensor: L1 loss between x and y: torch.Tensor([B]) | |
""" | |
return {"L1_loss": torch.abs(x["gps"] - y["gps"]).mean(dim=-1)} | |
class L2(nn.Module): | |
def __init__(self): | |
super(L2, self).__init__() | |
def forward(self, x, y): | |
""" | |
Args: | |
x: dict that contains "gps": torch.Tensor Bx2 | |
y: dict that contains "gps": torch.Tensor Bx2 | |
Returns: | |
torch.Tensor: L2 loss between x and y: torch.Tensor([B]) | |
""" | |
return {"L2_loss": ((x["gps"] - y["gps"]) ** 2).mean(dim=-1)} | |
class L2Hybrid(nn.Module): | |
def __init__(self): | |
super(L2Hybrid, self).__init__() | |
self.norm = NormGPS() | |
def forward(self, x, y): | |
""" | |
Args: | |
x: dict that contains "gps": torch.Tensor Bx2 | |
y: dict that contains "gps": torch.Tensor Bx2 | |
Returns: | |
torch.Tensor: L2 loss between x and y: torch.Tensor([B]) | |
""" | |
return { | |
"L2_loss": ( | |
(x["reg"] - (self.norm(y["gps"]) - x["center"]) * x["size"]) ** 2 | |
).mean(dim=-1) | |
} | |
class CrossEntropy(nn.Module): | |
def __init__(self): | |
super(CrossEntropy, self).__init__() | |
self.loss = nn.CrossEntropyLoss(reduction="none") | |
def forward(self, x, y): | |
""" | |
Args: | |
x: dict that contains "label": torch.Tensor BxN | |
y: dict that contains "label": torch.Tensor BxN | |
Returns: | |
torch.Tensor: CrossEntropy loss between x and y: torch.Tensor([B]) | |
""" | |
return {"cross_entropy_loss": self.loss(x["label"], y["label"])} | |
class HierarchicalCrossEntropyQuad(nn.Module): | |
def __init__(self, data_path=""): | |
super(HierarchicalCrossEntropyQuad, self).__init__() | |
self.dict_losses = {"classif_loss": nn.CrossEntropyLoss(reduction="none")} | |
for i in range(1, 10): | |
self.dict_losses[f"quadtree_{i}_loss"] = nn.NLLLoss() | |
self.matrixes = torch.load(join(data_path, "quadtree_matrixes.pt")) | |
self.dicts = torch.load(join(data_path, "quadtree_dicts.pt")) | |
self.id_to_quad = torch.load(join(data_path, "id_to_quad_10_1000.pt")) | |
def forward(self, x, y): | |
""" | |
Args: | |
x: dict that contains "label": torch.Tensor BxN | |
y: dict that contains "label": torch.Tensor BxN | |
Returns: | |
torch.Tensor: Hierarchical CrossEntropy for Quadtrees loss between x and y: torch.Tensor([B]) | |
""" | |
out = {"classif_loss": self.dict_losses["classif_loss"](x["label"], y["label"])} | |
probas = nn.functional.softmax(x["label"], dim=1) | |
device = x["label"].device | |
gt = self.id_to_quad[y["label"].cpu()] | |
for i in range(9): | |
logits = torch.log(torch.mm(probas, self.matrixes[i].to(device)) + 1e-10) | |
l = [s[: 9 - i] if len(s) >= 10 - i else s for s in gt] | |
out[f"quadtree_{i+1}_loss"] = self.dict_losses[f"quadtree_{i+1}_loss"]( | |
logits, torch.tensor([self.dicts[i][item] for item in l]).to(device) | |
) | |
return out | |
class HierarchicalCrossEntropy(nn.Module): | |
def __init__(self, path=""): | |
super(HierarchicalCrossEntropy, self).__init__() | |
self.city_loss = nn.CrossEntropyLoss(reduction="none") | |
self.country_loss = nn.NLLLoss() | |
self.area_loss = nn.NLLLoss() | |
self.region_loss = nn.NLLLoss() | |
self.city_to_country = torch.load(path + "city_to_country.pt") | |
self.city_to_region = torch.load(path + "city_to_region.pt") | |
self.city_to_area = torch.load(path + "city_to_area.pt") | |
self.country_to_idx = torch.load(path + "country_to_idx.pt") | |
self.region_to_idx = torch.load(path + "region_to_idx.pt") | |
self.area_to_idx = torch.load(path + "area_to_idx.pt") | |
def forward(self, x, y): | |
""" | |
Args: | |
x: dict that contains "label": torch.Tensor BxN | |
y: dict that contains "label": torch.Tensor BxN | |
Returns: | |
torch.Tensor: Hierarchical CrossEntropy loss between x and y: torch.Tensor([B]) | |
""" | |
country_mask = np.array(y["unique_country"]) != "NaN" | |
self.city_to_country = self.city_to_country.to(x["label"].device) | |
countries_probas = nn.functional.softmax(x["label"][country_mask], dim=1) | |
countries_logits = torch.log( | |
torch.mm(countries_probas, self.city_to_country) + 1e-10 | |
) | |
country_gt = torch.tensor( | |
[ | |
self.country_to_idx[item] | |
for item in np.array(y["unique_country"])[country_mask] | |
] | |
).to(x["label"].device) | |
region_mask = np.array(y["unique_region"]) != "NaN" | |
self.city_to_region = self.city_to_region.to(x["label"].device) | |
regions_probas = nn.functional.softmax(x["label"][region_mask], dim=1) | |
regions_logits = torch.log( | |
torch.mm(regions_probas, self.city_to_region) + 1e-10 | |
) | |
region_gt = torch.tensor( | |
[ | |
self.region_to_idx[item] | |
for item in np.array(y["unique_region"])[region_mask] | |
] | |
).to(x["label"].device) | |
area_mask = np.array(y["unique_sub-region"]) != "NaN" | |
self.city_to_area = self.city_to_area.to(x["label"].device) | |
areas_probas = nn.functional.softmax(x["label"][area_mask], dim=1) | |
areas_logits = torch.log(torch.mm(areas_probas, self.city_to_area) + 1e-10) | |
area_gt = torch.tensor( | |
[ | |
self.area_to_idx[item] | |
for item in np.array(y["unique_sub-region"])[area_mask] | |
] | |
).to(x["label"].device) | |
return { | |
"cross_entropy_country_loss": self.country_loss( | |
countries_logits, country_gt | |
), | |
"cross_entropy_city_loss": self.city_loss(x["label"], y["label"]), | |
"cross_entropy_area_loss": self.area_loss(areas_logits, area_gt), | |
"cross_entropy_region_loss": self.region_loss(regions_logits, region_gt), | |
} | |
class LandCoverLoss(nn.Module): | |
def __init__(self): | |
super(LandCoverLoss, self).__init__() | |
self.loss = nn.CrossEntropyLoss() | |
def forward(self, x, y): | |
""" | |
Args: | |
x: dict that contains "land_cover": torch.Tensor BxN | |
y: dict that contains "land_cover": torch.Tensor BxN | |
Returns: | |
torch.Tensor: CrossEntropy loss between x and y: torch.Tensor([B]) | |
""" | |
return { | |
"land_cover_cross_entropy_loss": self.loss(x["land_cover"], y["land_cover"]) | |
} | |
class RoadIndexLoss(nn.Module): | |
def __init__(self): | |
super(RoadIndexLoss, self).__init__() | |
self.loss = nn.MSELoss() | |
def forward(self, x, y): | |
""" | |
Args: | |
x: dict that contains "road_index": torch.Tensor BxN | |
y: dict that contains "road_index": torch.Tensor BxN | |
Returns: | |
torch.Tensor: CrossEntropy loss between x and y: torch.Tensor([B]) | |
""" | |
return {"road_index_mse_loss": self.loss(x["road_index"], y["road_index"])} | |
class DriveSideLoss(nn.Module): | |
def __init__(self): | |
super(DriveSideLoss, self).__init__() | |
self.loss = nn.BCELoss() | |
def forward(self, x, y): | |
""" | |
Args: | |
x: dict that contains "drive_side": torch.Tensor BxN | |
y: dict that contains "drive_side": torch.Tensor BxN | |
Returns: | |
torch.Tensor: CrossEntropy loss between x and y: torch.Tensor([B]) | |
""" | |
return {"drive_side_bce_loss": self.loss(x["drive_side"], y["drive_side"])} | |
class ClimateLoss(nn.Module): | |
def __init__(self): | |
super(ClimateLoss, self).__init__() | |
self.loss = nn.CrossEntropyLoss() | |
def forward(self, x, y): | |
""" | |
Args: | |
x: dict that contains "climate": torch.Tensor BxN | |
y: dict that contains "climate": torch.Tensor BxN | |
Returns: | |
torch.Tensor: CrossEntropy loss between x and y: torch.Tensor([B]) | |
""" | |
return {"climate_cross_entropy_loss": self.loss(x["climate"], y["climate"])} | |
class SoilLoss(nn.Module): | |
def __init__(self): | |
super(SoilLoss, self).__init__() | |
self.loss = nn.CrossEntropyLoss() | |
def forward(self, x, y): | |
""" | |
Args: | |
x: dict that contains "soil": torch.Tensor BxN | |
y: dict that contains "soil": torch.Tensor BxN | |
Returns: | |
torch.Tensor: CrossEntropy loss between x and y: torch.Tensor([B]) | |
""" | |
return {"soil_cross_entropy_loss": self.loss(x["soil"], y["soil"])} | |
class DistSeaLoss(nn.Module): | |
def __init__(self): | |
super(DistSeaLoss, self).__init__() | |
self.loss = nn.MSELoss() | |
def forward(self, x, y): | |
""" | |
Args: | |
x: dict that contains "dist_sea": torch.Tensor BxN | |
y: dict that contains "dist_sea": torch.Tensor BxN | |
Returns: | |
torch.Tensor: CrossEntropy loss between x and y: torch.Tensor([B]) | |
""" | |
return {"dist_sea_mse_loss": self.loss(x["dist_sea"], y["dist_sea"])} | |
class Haversine(nn.Module): | |
def __init__(self): | |
super(Haversine, self).__init__() | |
def forward(self, x, y): | |
""" | |
Args: | |
x: dict that contains "gps": torch.Tensor Bx2 | |
y: dict that contains "gps": torch.Tensor Bx2 | |
Returns: | |
torch.Tensor: Haversine loss between x and y: torch.Tensor([B]) | |
Note: | |
Haversine distance doesn't contain the 2 * 6371 constant. | |
""" | |
x, y = x["gps"], y["gps"] | |
lhs = torch.sin((x[:, 0] - y[:, 0]) / 2) ** 2 | |
rhs = ( | |
torch.cos(x[:, 0]) | |
* torch.cos(y[:, 0]) | |
* torch.sin((x[:, 1] - y[:, 1]) / 2) ** 2 | |
) | |
a = lhs + rhs | |
return { | |
"haversine_loss": torch.arctan2(torch.sqrt(a), torch.sqrt(1 - a)) | |
} # ommitting 2 * 6371 as both are a constant | |
class GeoguessrLoss(Haversine): | |
def __init__(self): | |
super(GeoguessrLoss, self).__init__() | |
def forward(self, x, y): | |
distance = super().forward(x, y)["haversine_loss"] | |
loss = torch.exp(-distance / 1852) | |
return {"geoguessr_loss": loss} | |
class InfoNCE(nn.Module): | |
def __init__(self, tau=0.1): | |
super(InfoNCE, self).__init__() | |
self.tau = tau | |
def cosine_similarity(self, a, b, normalize=True): | |
if normalize: | |
w1 = a.norm(p=2, dim=1, keepdim=True) | |
w2 = b.norm(p=2, dim=1, keepdim=True) | |
sim_matrix = torch.mm(a, b.t()) / (w1 * w2.t()).clamp(min=1e-8) | |
else: | |
sim_matrix = torch.mm(a, b.t()) | |
return sim_matrix | |
def forward(self, x, y=None): | |
""" | |
neg_sim: BxB | |
pos_sim: Bx1 | |
""" | |
features = x["features"] | |
positive_features = x["pos_features"] | |
pos_sim = F.cosine_similarity( | |
features, positive_features, dim=1, eps=1e-8 | |
).unsqueeze(1) | |
neg_sim = self.cosine_similarity(features, features, normalize=True) | |
b = neg_sim.shape[0] | |
logits = (1 - torch.eye(b)).type_as(neg_sim) * neg_sim + torch.eye(b).type_as( | |
pos_sim | |
) * pos_sim | |
logits = logits / self.tau | |
labels = torch.arange(b, dtype=torch.long).cuda() | |
loss = F.cross_entropy(logits, labels) | |
return { | |
"contrastive_loss": loss, | |
} | |
class TextNCE(nn.Module): | |
def __init__(self, tau=0.1, num_devices=1): | |
super(TextNCE, self).__init__() | |
self.distributed = num_devices > 1 | |
self.tau = tau | |
def cosine_similarity(self, a, b, normalize=True): | |
if normalize: | |
w1 = a.norm(p=2, dim=1, keepdim=True) | |
w2 = b.norm(p=2, dim=1, keepdim=True) | |
sim_matrix = torch.mm(a, b.t()) / (w1 * w2.t()).clamp(min=1e-8) | |
else: | |
sim_matrix = torch.mm(a, b.t()) | |
return sim_matrix | |
def forward(self, x, y=None): | |
""" | |
neg_sim: BxB | |
pos_sim: Bx1 | |
""" | |
if self.distributed: | |
all_image_features = torch.cat( | |
torch.distributed.nn.all_gather(x["features"]), dim=0 | |
) | |
all_text_features = torch.cat( | |
torch.distributed.nn.all_gather(x["text_features"]), dim=0 | |
) | |
all_labels = torch.cat(torch.distributed.nn.all_gather(y["label"]), dim=0) | |
else: | |
all_image_features = x["features"] | |
all_text_features = x["text_features"] | |
all_labels = y["label"] | |
labels_u = torch.unique(all_labels) | |
logits = self.cosine_similarity( | |
all_image_features, all_text_features, normalize=True | |
) | |
rows, cols = logits.size() | |
indices = torch.arange(0, rows, device=all_image_features.device) | |
loss = torch.sum( | |
torch.logsumexp( | |
logits[indices != indices.view(-1, 1)].view(rows, cols - 1) / self.tau, | |
dim=1, | |
) | |
) | |
for label in labels_u: | |
if not (label == "NaN"): | |
# Get the positive and negative examples | |
idx = all_labels == label | |
pos_logits = logits[idx][:, idx] | |
# Compute the MIL-NCE loss | |
loss += torch.sum(-torch.logsumexp(pos_logits / self.tau, dim=1)) | |
return { | |
"contrastive_loss": loss, | |
} | |
class MILNCE(nn.Module): | |
def __init__(self, tau=0.1, num_devices=1): | |
super(MILNCE, self).__init__() | |
self.distributed = num_devices > 1 | |
self.tau = tau | |
def cosine_similarity(self, a, b, normalize=True): | |
if normalize: | |
w1 = a.norm(p=2, dim=1, keepdim=True) | |
w2 = b.norm(p=2, dim=1, keepdim=True) | |
sim_matrix = torch.mm(a, b.t()) / (w1 * w2.t()).clamp(min=1e-8) | |
else: | |
sim_matrix = torch.mm(a, b.t()) | |
return sim_matrix | |
def forward(self, x, y=None): | |
""" | |
COmpute MIL-NCE loss | |
""" | |
if self.distributed: | |
all_image_features = torch.cat( | |
torch.distributed.nn.all_gather(x["features"]), dim=0 | |
) | |
all_pos_features = torch.cat( | |
torch.distributed.nn.all_gather(x["pos_features"]), dim=0 | |
) | |
all_labels = torch.cat(torch.distributed.nn.all_gather(y["label"]), dim=0) | |
else: | |
all_image_features = x["features"] | |
all_pos_features = x["pos_features"] | |
all_labels = y["label"] | |
labels_u = torch.unique(all_labels) | |
features = torch.cat([all_image_features, all_pos_features]) | |
labels = torch.cat([all_labels, all_labels]) | |
logits = self.cosine_similarity(features, features, normalize=True) | |
rows, cols = logits.size() | |
indices = torch.arange(0, rows, device=features.device) | |
loss = torch.sum( | |
torch.logsumexp( | |
logits[indices != indices.view(-1, 1)].view(rows, cols - 1) / self.tau, | |
dim=1, | |
) | |
) | |
for label in labels_u: | |
if not (label == "NaN"): | |
# Get the positive and negative examples | |
idx = labels == label | |
pos_logits = logits[idx][:, idx] | |
rows, cols = pos_logits.size() | |
indices = torch.arange(0, rows, device=features.device) | |
pos_logits = pos_logits[indices != indices.view(-1, 1)].view( | |
rows, cols - 1 | |
) | |
# Compute the MIL-NCE loss | |
loss += torch.sum(-torch.logsumexp(pos_logits / self.tau, dim=1)) | |
return { | |
"contrastive_loss": loss, | |
} | |
class RegionMILNCE(nn.Module): | |
def __init__(self, tau=0.1, num_devices=1): | |
super(RegionMILNCE, self).__init__() | |
self.distributed = num_devices > 1 | |
self.tau = tau | |
def cosine_similarity(self, a, b, normalize=True): | |
if normalize: | |
w1 = a.norm(p=2, dim=1, keepdim=True) | |
w2 = b.norm(p=2, dim=1, keepdim=True) | |
sim_matrix = torch.mm(a, b.t()) / (w1 * w2.t()).clamp(min=1e-8) | |
else: | |
sim_matrix = torch.mm(a, b.t()) | |
return sim_matrix | |
def forward(self, x, y=None): | |
""" | |
neg_sim: BxB | |
pos_sim: Bx1 | |
""" | |
if self.distributed: | |
all_image_features = torch.cat( | |
torch.distributed.nn.all_gather(x["features"]), dim=0 | |
) | |
all_pos_features = torch.cat( | |
torch.distributed.nn.all_gather(x["pos_features"]), dim=0 | |
) | |
all_labels = torch.cat(torch.distributed.nn.all_gather(y["label"]), dim=0) | |
else: | |
all_image_features = x["features"] | |
all_pos_features = x["pos_features"] | |
all_labels = y["label"] | |
labels_u = torch.unique(all_labels) | |
features = torch.cat([all_image_features, all_pos_features]) | |
labels = torch.cat([all_labels, all_labels]) | |
logits = self.cosine_similarity(features, features, normalize=True) | |
rows, cols = logits.size() | |
indices = torch.arange(0, rows, device=features.device) | |
loss = torch.sum( | |
torch.logsumexp( | |
logits[indices != indices.view(-1, 1)].view(rows, cols - 1) / self.tau, | |
dim=1, | |
) | |
) | |
for label in labels_u: | |
if not (label == "NaN"): | |
# Get the positive and negative examples | |
idx = labels == label | |
pos_logits = logits[idx][:, idx] | |
rows, cols = pos_logits.size() | |
indices = torch.arange(0, rows, device=features.device) | |
pos_logits = pos_logits[indices != indices.view(-1, 1)].view( | |
rows, cols - 1 | |
) | |
# Compute the MIL-NCE loss | |
loss += torch.sum(-torch.logsumexp(pos_logits / self.tau, dim=1)) | |
return { | |
"contrastive_loss": loss / len(all_labels), | |
} | |
LOSSES = { | |
"l1": L1, | |
"l2": L2, | |
"l2_hybrid": L2Hybrid, | |
"haversine": Haversine, | |
"geoguessr": GeoguessrLoss, | |
"crossentropy": CrossEntropy, | |
"infonce": InfoNCE, | |
"mil-nce": MILNCE, | |
"text-nce": TextNCE, | |
"land_cover": LandCoverLoss, | |
"road_index": RoadIndexLoss, | |
"drive_side": DriveSideLoss, | |
"climate": ClimateLoss, | |
"soil": SoilLoss, | |
"dist_sea": DistSeaLoss, | |
"hierarchical": HierarchicalCrossEntropy, | |
"hier_quad": HierarchicalCrossEntropyQuad, | |
"region_mil": RegionMILNCE, | |
} | |
AVERAGE = {False: lambda x: x, True: lambda x: x.mean(dim=-1)} | |
class Losses(nn.Module): | |
"""The Losses meta-object that can take a mix of losses.""" | |
def __init__(self, mix={}, aux_data=[], path="", num_devices=1): | |
"""Initializes the Losses object. | |
Args: | |
mix (dict): dictionary with keys "loss_name" and values weight | |
""" | |
super(Losses, self).__init__() | |
assert len(mix) | |
self.aux = len(aux_data) > 0 | |
if self.aux: | |
self.aux_list = aux_data | |
total = ["land_cover", "drive_side", "climate", "soil", "dist_sea"] | |
for col in self.aux_list: | |
total.remove(col) | |
for col in total: | |
del mix[col] | |
self.init_losses(mix, path, num_devices) | |
def init_losses(self, mix, path="", num_devices=1): | |
"""Initializes the losses. | |
Args: | |
mix (dict): dictionary with keys "loss_name" and values weight | |
""" | |
self.loss = {} | |
for m, v in mix.items(): | |
m = m.lower() | |
if m in ["hierarchical", "hier_quad"]: | |
try: | |
self.loss[m] = (LOSSES[m](path), v) | |
except KeyError: | |
raise KeyError(f"Loss {m} not found in {LOSSES.keys()}") | |
elif m in ["region_mil", "mil-nce", "text-nce"]: | |
try: | |
self.loss[m] = (LOSSES[m](num_devices=num_devices), v) | |
except KeyError: | |
raise KeyError(f"Loss {m} not found in {LOSSES.keys()}") | |
else: | |
try: | |
self.loss[m] = (LOSSES[m](), v) | |
except KeyError: | |
raise KeyError(f"Loss {m} not found in {LOSSES.keys()}") | |
def forward(self, x, y, average=True): | |
"""Computes the losses. | |
Args: | |
x: dict that contains "gps": torch.Tensor Bx2 or "label": torch.Tensor BxN | |
y: dict that contains "gps": torch.Tensor Bx2 or "label": torch.Tensor BxN | |
average (bool): whether to average the losses or not | |
Returns: | |
dict: dictionary with losses | |
""" | |
output = {"loss": 0} | |
for loss_name, (loss, weight) in self.loss.items(): | |
loss_output = loss(x, y) | |
for k, v in loss_output.items(): | |
v = AVERAGE[average](v) | |
if k.endswith("_loss"): | |
output["loss"] += weight * v | |
output[k] = v | |
return output | |