File size: 12,086 Bytes
1eced3c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import tensorflow as tf
import numpy as np
from config import DatasetName, ExpressionCodesRafdb, ExpressionCodesAffectnet
from keras import backend as K
from sklearn.metrics import confusion_matrix
import time
from config import LearningConfig


class CustomLosses:

    def embedding_loss_distance(self, embeddings):

        """
        for each item in batch: calculate the correlation between all the embeddings
        :param embeddings:
        :return:
        """
        '''correlation'''
        emb_len = len(embeddings)
        ''' emb_num, bs, emb_size: '''
        embeddings = tf.cast(embeddings, dtype=tf.dtypes.float32)
        loss = tf.cast([np.corrcoef(embeddings[:, i, :]) for i in range(LearningConfig.batch_size)],
                       dtype=tf.dtypes.float32)
        embedding_similarity_loss = tf.reduce_mean((1 - tf.eye(emb_len)) *  # ignore the affect of the diagonal
                                                   (1 + np.array(loss)))  # the loss -> more correlation, the better
        return embedding_similarity_loss

    def mean_embedding_loss_distance(self, embeddings, exp_gt_vec, exp_pr_vec, num_of_classes):
        """
        calculate the mean distribution for each class, and force the mean_embedding to be the same
        :param embedding: bs * embedding_size
        :param exp_gt_vec: bs
        :param exp_pr_vec: bs * num_of_classes
        :param num_of_classes:
        :return:
        """
        # bs_size = tf.shape(exp_pr_vec, out_type=tf.dtypes.int64)[0]
        #  7 * bs: for each class - which we have 7 class, not the embeddings -, is the sample belongs to it we put 1
        # else we put 0 .  THE SAME FOR ALL embeddings
        c_map = np.array([tf.cast(tf.where(exp_gt_vec == i, 1.0, K.epsilon()), dtype=tf.dtypes.float32)
                          for i in range(num_of_classes)])  # 7 * bs

        # calculate class-related mean embedding: num_of_classes * embedding_size
        # 7 embedding, and 7 classes and each class has an embedding mean of 256 -> 7,7,256
        mean_embeddings = np.array([[np.average(embeddings[k], axis=0, weights=c_map[i, :])
                                     for i in range(num_of_classes)]
                                    for k in range(len(embeddings))])  # 7:embedding,7:class, 256:size

        #  the correlation between each mean_embedding should be low:
        mean_emb_correlation_loss = tf.reduce_mean([(1 - tf.eye(num_of_classes)) *  # zero the diagonal
                                                    (1 + tf.cast(np.corrcoef(mean_embeddings[k, :, :]),
                                                                 dtype=tf.dtypes.float32))
                                                    for k in range(len(embeddings))])

        # the correlation between each mean_embedding should be low:
        return mean_emb_correlation_loss

    def mean_embedding_loss(self, embedding, exp_gt_vec, exp_pr_vec, num_of_classes):
        """
        calculate the mean distribution for each class, and force the mean_embedding to be the same
        :param embedding: bs * embedding_size
        :param exp_gt_vec: bs
        :param exp_pr_vec: bs * num_of_classes
        :param num_of_classes:
        :return:
        """
        kl = tf.keras.losses.KLDivergence()
        bs_size = tf.shape(exp_pr_vec, out_type=tf.dtypes.int64)[0]
        # calculate class maps: num_of_classes * bs
        c_map = np.array([tf.cast(tf.where(exp_gt_vec == i, 1, 0), dtype=tf.dtypes.int8)
                          for i in range(num_of_classes)])  # 7 * bs
        # calculate class-related mean embedding: num_of_classes * embedding_size
        mean_embeddings = np.array([np.average(embedding, axis=0, weights=c_map[i, :])
                                    if np.sum(c_map[i, :]) > 0 else np.zeros(LearningConfig.embedding_size)
                                    for i in range(num_of_classes)]) + \
                          K.epsilon()  # added as a bias to get ride of zeros

        # calculate loss:
        #   1 -> the correlation between each mean_embedding should be low:
        mean_emb_correlation_loss = tf.reduce_mean((1 - tf.eye(num_of_classes)) *
                                                   (1 + tf.cast(np.corrcoef(mean_embeddings), dtype=tf.dtypes.float32)))
        #   2 -> the KL-divergence between the mean distribution of each class and the related
        #   embeddings should be low. Accordingly, we lead the network towards learning the mean distribution
        mean_emb_batch = tf.cast([np.array(mean_embeddings)[i] for i in np.argmax(c_map.T, axis=1)],
                                 dtype=tf.dtypes.float32)
        emb_kl_loss = kl(y_true=mean_emb_batch, y_pred=embedding)

        return mean_emb_correlation_loss, emb_kl_loss

    def variance_embedding_loss(self, embedding, exp_gt_vec, exp_pr_vec, num_of_classes):
        """
        calculate the variance of the distribution for each class, and force the mean_embedding to be the same
        :param embedding:
        :param exp_gt_vec:
        :param exp_pr_vec:
        :param num_of_classes:
        :return:
        """
        kl = tf.keras.losses.KLDivergence()
        bs_size = tf.shape(exp_pr_vec, out_type=tf.dtypes.int64)[0]
        #
        c_map = np.array([tf.cast(tf.where(exp_gt_vec == i, 1, 0), dtype=tf.dtypes.int8)
                          for i in range(num_of_classes)])  # 7 * bs
        # calculate class-related var embedding: num_of_classes * embedding_size
        var_embeddings = np.array([tf.math.reduce_std(tf.math.multiply(embedding,
                                                                       tf.repeat(tf.expand_dims(
                                                                           tf.cast(c_map[i, :],
                                                                                   dtype=tf.dtypes.float32), -1),
                                                                           LearningConfig.embedding_size, axis=-1, )),
                                                      axis=0)
                                   for i in range(num_of_classes)]) \
                         + K.epsilon()  # added as a bias to get ride of zeros

        # calculate loss:
        #   1 -> the correlation between each mean_embedding should be low:
        var_emb_correlation_loss = tf.reduce_mean((1.0 - tf.eye(num_of_classes)) *
                                                  (1.0 + tf.cast(np.cov(var_embeddings), dtype=tf.dtypes.float32)))
        #   embeddings should be low. Accordingly, we lead the network towards learning the mean distribution
        var_emb_batch = tf.cast([np.array(var_embeddings)[i] for i in np.argmax(c_map.T, axis=1)],
                                dtype=tf.dtypes.float32)
        emb_kl_loss = abs(kl(y_true=var_emb_batch, y_pred=embedding))
        return var_emb_correlation_loss, emb_kl_loss

    def correlation_loss(self, embedding, exp_gt_vec, exp_pr_vec, tr_conf_matrix):
        bs_size = tf.shape(exp_pr_vec, out_type=tf.dtypes.int64)[0]
        # convert from sigmoid to labels to real classes:
        exp_pr = tf.constant([np.argmax(exp_pr_vec[i]) for i in range(bs_size)], dtype=tf.dtypes.int64)
        # Cov matrix
        phi_correlation_matrix = tf.cast(np.corrcoef(embedding), dtype=tf.dtypes.float32)  # bs * bs

        elems_col = tf.repeat(tf.expand_dims(exp_gt_vec, 0), repeats=[bs_size], axis=0)
        elems_row = tf.repeat(tf.expand_dims(exp_gt_vec, -1), repeats=[bs_size], axis=-1)
        delta = elems_row - elems_col
        omega_matrix = tf.cast(tf.where(delta == 0, 1, -1), dtype=tf.dtypes.float32)
        # creating the adaptive weights
        adaptive_weight = self._create_adaptive_correlation_weights(bs_size=bs_size,
                                                                    exp_gt_vec=exp_gt_vec,  # real labels
                                                                    exp_pr=exp_pr,  # real labels
                                                                    conf_mat=tr_conf_matrix)
        # calculate correlation loss
        cor_loss = tf.reduce_mean(adaptive_weight * tf.abs(omega_matrix - phi_correlation_matrix))
        return cor_loss

    def correlation_loss_multi(self, embeddings, exp_gt_vec, exp_pr_vec, tr_conf_matrix):
        """
        here, we consider only one embedding and so want to make the embeddings of the same classes be similar
        while the ones from different classes are different.
        :param embeddings:
        :param exp_gt_vec:
        :param exp_pr_vec:
        :param tr_conf_matrix:
        :return:
        """
        bs_size = tf.shape(exp_pr_vec, out_type=tf.dtypes.int64)[0]
        exp_pr = tf.constant([np.argmax(exp_pr_vec[i]) for i in range(bs_size)], dtype=tf.dtypes.int64)

        phi_correlation_matrices = [tf.cast(np.corrcoef(embeddings[i]), dtype=tf.dtypes.float32)
                                    for i in range(len(embeddings))]  # cls * bs * bs
        #
        elems_col = tf.repeat(tf.expand_dims(exp_gt_vec, 0), repeats=[bs_size], axis=0)
        elems_row = tf.repeat(tf.expand_dims(exp_gt_vec, -1), repeats=[bs_size], axis=-1)
        delta = elems_row - elems_col
        omega_matrix = tf.repeat(tf.expand_dims(tf.cast(tf.where(delta == 0, 1, -1),
                                                        dtype=tf.dtypes.float32), axis=0),
                                 repeats=len(embeddings), axis=0)
        cor_loss = tf.reduce_mean(tf.abs(omega_matrix - phi_correlation_matrices))

        return cor_loss

    def _create_adaptive_correlation_weights(self, bs_size, exp_gt_vec, exp_pr, conf_mat):
        """
        creating the weights
        :param exp_gt_vec: real int labels
        :param exp_pr_vec: one_hot labels
        :param conf_mat: confusion matrix which is normalized over the rows(
                        ground-truths with respect to the number of corresponding classes)
        :return: a bath_size * bath_size matrix containing weights. The diameter of the matrix is zero
        """
        '''new'''
        tf_identity = tf.eye(bs_size)
        # weight based on the correct section of the conf_matrix
        '''
        1 : - conf_mat[exp_gt_vec[i], exp_gt_vec[i]] : sum of all the missed values=> the better the performance of
        the model on a label, the smaller the weight  
        '''
        correct_row_base_weight = tf.repeat(tf.expand_dims(
            tf.map_fn(fn=lambda i: 1 - conf_mat[i, i], elems=exp_gt_vec)  # map
            , 0),  # expand_dims
            repeats=[bs_size], axis=0)  # repeat

        correct_col_base_weight = tf.einsum('ab->ba', correct_row_base_weight)
        correct_weight = correct_row_base_weight + correct_col_base_weight
        adaptive_weight = tf.cast((correct_weight), dtype=tf.dtypes.float32)
        adaptive_weight = 1 + adaptive_weight  # we don't want the weights to be zero (correct prediction)
        adaptive_weight = (1 - tf_identity) * adaptive_weight  # remove the main diagon
        return adaptive_weight

    def update_confusion_matrix(self, exp_gt_vec, exp_pr,
                                all_gt_exp, all_pr_exp):
        # adding to the previous predicted items:
        all_pr_exp += np.array(exp_pr).tolist()
        all_gt_exp += np.array(exp_gt_vec).tolist()
        # calculate confusion matrix:
        conf_mat = confusion_matrix(y_true=all_gt_exp, y_pred=all_pr_exp, normalize='true',
                                    labels=[0, 1, 2, 3, 4, 5, 6])
        return conf_mat, all_gt_exp, all_pr_exp

    def cross_entropy_loss(self, y_gt, y_pr, num_classes, ds_name):
        y_gt_oh = tf.one_hot(y_gt, depth=num_classes)
        ''' manual weighted CE'''
        y_pred = y_pr
        y_pred /= tf.reduce_sum(y_pred, axis=-1, keepdims=True)
        y_pred = K.clip(y_pred, K.epsilon(), 1)
        loss = -tf.reduce_mean(y_gt_oh * tf.math.log(y_pred))
        '''accuracy'''
        accuracy = tf.reduce_mean(tf.keras.metrics.categorical_accuracy(y_pr, y_gt_oh))
        return loss, accuracy