|
import tensorflow as tf |
|
import numpy as np |
|
from config import DatasetName, ExpressionCodesRafdb, ExpressionCodesAffectnet |
|
from keras import backend as K |
|
from sklearn.metrics import confusion_matrix |
|
import time |
|
from config import LearningConfig |
|
|
|
|
|
class CustomLosses: |
|
|
|
def embedding_loss_distance(self, embeddings): |
|
|
|
""" |
|
for each item in batch: calculate the correlation between all the embeddings |
|
:param embeddings: |
|
:return: |
|
""" |
|
'''correlation''' |
|
emb_len = len(embeddings) |
|
''' emb_num, bs, emb_size: ''' |
|
embeddings = tf.cast(embeddings, dtype=tf.dtypes.float32) |
|
loss = tf.cast([np.corrcoef(embeddings[:, i, :]) for i in range(LearningConfig.batch_size)], |
|
dtype=tf.dtypes.float32) |
|
embedding_similarity_loss = tf.reduce_mean((1 - tf.eye(emb_len)) * |
|
(1 + np.array(loss))) |
|
return embedding_similarity_loss |
|
|
|
def mean_embedding_loss_distance(self, embeddings, exp_gt_vec, exp_pr_vec, num_of_classes): |
|
""" |
|
calculate the mean distribution for each class, and force the mean_embedding to be the same |
|
:param embedding: bs * embedding_size |
|
:param exp_gt_vec: bs |
|
:param exp_pr_vec: bs * num_of_classes |
|
:param num_of_classes: |
|
:return: |
|
""" |
|
|
|
|
|
|
|
c_map = np.array([tf.cast(tf.where(exp_gt_vec == i, 1.0, K.epsilon()), dtype=tf.dtypes.float32) |
|
for i in range(num_of_classes)]) |
|
|
|
|
|
|
|
mean_embeddings = np.array([[np.average(embeddings[k], axis=0, weights=c_map[i, :]) |
|
for i in range(num_of_classes)] |
|
for k in range(len(embeddings))]) |
|
|
|
|
|
mean_emb_correlation_loss = tf.reduce_mean([(1 - tf.eye(num_of_classes)) * |
|
(1 + tf.cast(np.corrcoef(mean_embeddings[k, :, :]), |
|
dtype=tf.dtypes.float32)) |
|
for k in range(len(embeddings))]) |
|
|
|
|
|
return mean_emb_correlation_loss |
|
|
|
def mean_embedding_loss(self, embedding, exp_gt_vec, exp_pr_vec, num_of_classes): |
|
""" |
|
calculate the mean distribution for each class, and force the mean_embedding to be the same |
|
:param embedding: bs * embedding_size |
|
:param exp_gt_vec: bs |
|
:param exp_pr_vec: bs * num_of_classes |
|
:param num_of_classes: |
|
:return: |
|
""" |
|
kl = tf.keras.losses.KLDivergence() |
|
bs_size = tf.shape(exp_pr_vec, out_type=tf.dtypes.int64)[0] |
|
|
|
c_map = np.array([tf.cast(tf.where(exp_gt_vec == i, 1, 0), dtype=tf.dtypes.int8) |
|
for i in range(num_of_classes)]) |
|
|
|
mean_embeddings = np.array([np.average(embedding, axis=0, weights=c_map[i, :]) |
|
if np.sum(c_map[i, :]) > 0 else np.zeros(LearningConfig.embedding_size) |
|
for i in range(num_of_classes)]) + \ |
|
K.epsilon() |
|
|
|
|
|
|
|
mean_emb_correlation_loss = tf.reduce_mean((1 - tf.eye(num_of_classes)) * |
|
(1 + tf.cast(np.corrcoef(mean_embeddings), dtype=tf.dtypes.float32))) |
|
|
|
|
|
mean_emb_batch = tf.cast([np.array(mean_embeddings)[i] for i in np.argmax(c_map.T, axis=1)], |
|
dtype=tf.dtypes.float32) |
|
emb_kl_loss = kl(y_true=mean_emb_batch, y_pred=embedding) |
|
|
|
return mean_emb_correlation_loss, emb_kl_loss |
|
|
|
def variance_embedding_loss(self, embedding, exp_gt_vec, exp_pr_vec, num_of_classes): |
|
""" |
|
calculate the variance of the distribution for each class, and force the mean_embedding to be the same |
|
:param embedding: |
|
:param exp_gt_vec: |
|
:param exp_pr_vec: |
|
:param num_of_classes: |
|
:return: |
|
""" |
|
kl = tf.keras.losses.KLDivergence() |
|
bs_size = tf.shape(exp_pr_vec, out_type=tf.dtypes.int64)[0] |
|
|
|
c_map = np.array([tf.cast(tf.where(exp_gt_vec == i, 1, 0), dtype=tf.dtypes.int8) |
|
for i in range(num_of_classes)]) |
|
|
|
var_embeddings = np.array([tf.math.reduce_std(tf.math.multiply(embedding, |
|
tf.repeat(tf.expand_dims( |
|
tf.cast(c_map[i, :], |
|
dtype=tf.dtypes.float32), -1), |
|
LearningConfig.embedding_size, axis=-1, )), |
|
axis=0) |
|
for i in range(num_of_classes)]) \ |
|
+ K.epsilon() |
|
|
|
|
|
|
|
var_emb_correlation_loss = tf.reduce_mean((1.0 - tf.eye(num_of_classes)) * |
|
(1.0 + tf.cast(np.cov(var_embeddings), dtype=tf.dtypes.float32))) |
|
|
|
var_emb_batch = tf.cast([np.array(var_embeddings)[i] for i in np.argmax(c_map.T, axis=1)], |
|
dtype=tf.dtypes.float32) |
|
emb_kl_loss = abs(kl(y_true=var_emb_batch, y_pred=embedding)) |
|
return var_emb_correlation_loss, emb_kl_loss |
|
|
|
def correlation_loss(self, embedding, exp_gt_vec, exp_pr_vec, tr_conf_matrix): |
|
bs_size = tf.shape(exp_pr_vec, out_type=tf.dtypes.int64)[0] |
|
|
|
exp_pr = tf.constant([np.argmax(exp_pr_vec[i]) for i in range(bs_size)], dtype=tf.dtypes.int64) |
|
|
|
phi_correlation_matrix = tf.cast(np.corrcoef(embedding), dtype=tf.dtypes.float32) |
|
|
|
elems_col = tf.repeat(tf.expand_dims(exp_gt_vec, 0), repeats=[bs_size], axis=0) |
|
elems_row = tf.repeat(tf.expand_dims(exp_gt_vec, -1), repeats=[bs_size], axis=-1) |
|
delta = elems_row - elems_col |
|
omega_matrix = tf.cast(tf.where(delta == 0, 1, -1), dtype=tf.dtypes.float32) |
|
|
|
adaptive_weight = self._create_adaptive_correlation_weights(bs_size=bs_size, |
|
exp_gt_vec=exp_gt_vec, |
|
exp_pr=exp_pr, |
|
conf_mat=tr_conf_matrix) |
|
|
|
cor_loss = tf.reduce_mean(adaptive_weight * tf.abs(omega_matrix - phi_correlation_matrix)) |
|
return cor_loss |
|
|
|
def correlation_loss_multi(self, embeddings, exp_gt_vec, exp_pr_vec, tr_conf_matrix): |
|
""" |
|
here, we consider only one embedding and so want to make the embeddings of the same classes be similar |
|
while the ones from different classes are different. |
|
:param embeddings: |
|
:param exp_gt_vec: |
|
:param exp_pr_vec: |
|
:param tr_conf_matrix: |
|
:return: |
|
""" |
|
bs_size = tf.shape(exp_pr_vec, out_type=tf.dtypes.int64)[0] |
|
exp_pr = tf.constant([np.argmax(exp_pr_vec[i]) for i in range(bs_size)], dtype=tf.dtypes.int64) |
|
|
|
phi_correlation_matrices = [tf.cast(np.corrcoef(embeddings[i]), dtype=tf.dtypes.float32) |
|
for i in range(len(embeddings))] |
|
|
|
elems_col = tf.repeat(tf.expand_dims(exp_gt_vec, 0), repeats=[bs_size], axis=0) |
|
elems_row = tf.repeat(tf.expand_dims(exp_gt_vec, -1), repeats=[bs_size], axis=-1) |
|
delta = elems_row - elems_col |
|
omega_matrix = tf.repeat(tf.expand_dims(tf.cast(tf.where(delta == 0, 1, -1), |
|
dtype=tf.dtypes.float32), axis=0), |
|
repeats=len(embeddings), axis=0) |
|
cor_loss = tf.reduce_mean(tf.abs(omega_matrix - phi_correlation_matrices)) |
|
|
|
return cor_loss |
|
|
|
def _create_adaptive_correlation_weights(self, bs_size, exp_gt_vec, exp_pr, conf_mat): |
|
""" |
|
creating the weights |
|
:param exp_gt_vec: real int labels |
|
:param exp_pr_vec: one_hot labels |
|
:param conf_mat: confusion matrix which is normalized over the rows( |
|
ground-truths with respect to the number of corresponding classes) |
|
:return: a bath_size * bath_size matrix containing weights. The diameter of the matrix is zero |
|
""" |
|
'''new''' |
|
tf_identity = tf.eye(bs_size) |
|
|
|
''' |
|
1 : - conf_mat[exp_gt_vec[i], exp_gt_vec[i]] : sum of all the missed values=> the better the performance of |
|
the model on a label, the smaller the weight |
|
''' |
|
correct_row_base_weight = tf.repeat(tf.expand_dims( |
|
tf.map_fn(fn=lambda i: 1 - conf_mat[i, i], elems=exp_gt_vec) |
|
, 0), |
|
repeats=[bs_size], axis=0) |
|
|
|
correct_col_base_weight = tf.einsum('ab->ba', correct_row_base_weight) |
|
correct_weight = correct_row_base_weight + correct_col_base_weight |
|
adaptive_weight = tf.cast((correct_weight), dtype=tf.dtypes.float32) |
|
adaptive_weight = 1 + adaptive_weight |
|
adaptive_weight = (1 - tf_identity) * adaptive_weight |
|
return adaptive_weight |
|
|
|
def update_confusion_matrix(self, exp_gt_vec, exp_pr, |
|
all_gt_exp, all_pr_exp): |
|
|
|
all_pr_exp += np.array(exp_pr).tolist() |
|
all_gt_exp += np.array(exp_gt_vec).tolist() |
|
|
|
conf_mat = confusion_matrix(y_true=all_gt_exp, y_pred=all_pr_exp, normalize='true', |
|
labels=[0, 1, 2, 3, 4, 5, 6]) |
|
return conf_mat, all_gt_exp, all_pr_exp |
|
|
|
def cross_entropy_loss(self, y_gt, y_pr, num_classes, ds_name): |
|
y_gt_oh = tf.one_hot(y_gt, depth=num_classes) |
|
''' manual weighted CE''' |
|
y_pred = y_pr |
|
y_pred /= tf.reduce_sum(y_pred, axis=-1, keepdims=True) |
|
y_pred = K.clip(y_pred, K.epsilon(), 1) |
|
loss = -tf.reduce_mean(y_gt_oh * tf.math.log(y_pred)) |
|
'''accuracy''' |
|
accuracy = tf.reduce_mean(tf.keras.metrics.categorical_accuracy(y_pr, y_gt_oh)) |
|
return loss, accuracy |