-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathopenmax.py
139 lines (113 loc) · 5.72 KB
/
openmax.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import numpy as np
import scipy.spatial.distance as spd
import torch
import libmr
def calc_distance(query_score, mcv, eu_weight, distance_type='eucos'):
if distance_type == 'eucos':
query_distance = spd.euclidean(mcv, query_score) * eu_weight + \
spd.cosine(mcv, query_score)
elif distance_type == 'euclidean':
query_distance = spd.euclidean(mcv, query_score)
elif distance_type == 'cosine':
query_distance = spd.cosine(mcv, query_score)
else:
print("distance type not known: enter either of eucos, euclidean or cosine")
return query_distance
def fit_weibull(means, dists, categories, tailsize=20, distance_type='eucos'):
"""
Input:
means (C, channel, C)
dists (N_c, channel, C) * C
Output:
weibull_model : Perform EVT based analysis using tails of distances and save
weibull model parameters for re-adjusting softmax scores
"""
weibull_model = {}
for mean, dist, category_name in zip(means, dists, categories):
weibull_model[category_name] = {}
weibull_model[category_name]['distances_{}'.format(distance_type)] = dist[distance_type]
weibull_model[category_name]['mean_vec'] = mean
weibull_model[category_name]['weibull_model'] = []
for channel in range(mean.shape[0]):
mr = libmr.MR()
tailtofit = np.sort(dist[distance_type][channel, :])[-tailsize:] # Find the largest "tailsize" distances.
mr.fit_high(tailtofit, len(tailtofit))
weibull_model[category_name]['weibull_model'].append(mr)
return weibull_model
def query_weibull(category_name, weibull_model, distance_type='eucos'):
return [weibull_model[category_name]['mean_vec'],
weibull_model[category_name]['distances_{}'.format(distance_type)],
weibull_model[category_name]['weibull_model']]
# Algorithm 2.
def compute_openmax_prob(scores, scores_u):
prob_scores, prob_unknowns = [], []
for s, su in zip(scores, scores_u):
channel_scores = np.exp(s)
channel_unknown = np.exp(np.sum(su))
total_denom = np.sum(channel_scores) + channel_unknown
prob_scores.append(channel_scores / total_denom)
prob_unknowns.append(channel_unknown / total_denom)
# Take channel mean
scores = np.mean(prob_scores, axis=0)
unknowns = np.mean(prob_unknowns, axis=0)
modified_scores = scores.tolist() + [unknowns]
return modified_scores
def softmax(x):
e_x = np.exp(x - np.max(x))
return e_x / e_x.sum()
def openmax(weibull_model, categories, input_score, eu_weight, alpha=10, distance_type='eucos'):
"""Re-calibrate scores via OpenMax layer
Output:
openmax probability and softmax probability
"""
nb_classes = len(categories)
ranked_list = input_score.argsort().ravel()[::-1][:alpha] # Return the idx of the largest alpha elements.
alpha_weights = [((alpha + 1) - i) / float(alpha) for i in range(1, alpha + 1)]
omega = np.zeros(nb_classes)
omega[ranked_list] = alpha_weights
scores, scores_u = [], []
for channel, input_score_channel in enumerate(input_score):
score_channel, score_channel_u = [], []
for c, category_name in enumerate(categories):
mav, dist, model = query_weibull(category_name, weibull_model, distance_type) # weibull_model's parameters
channel_dist = calc_distance(input_score_channel, mav[channel], eu_weight, distance_type)
wscore = model[channel].w_score(channel_dist)
modified_score = input_score_channel[c] * (1 - wscore * omega[c])
score_channel.append(modified_score)
score_channel_u.append(input_score_channel[c] - modified_score)
scores.append(score_channel)
scores_u.append(score_channel_u)
scores = np.asarray(scores)
scores_u = np.asarray(scores_u)
openmax_prob = np.array(compute_openmax_prob(scores, scores_u))
softmax_prob = softmax(np.array(input_score.ravel()))
return openmax_prob, softmax_prob
def compute_channel_distances(mavs, features, eu_weight=0.5):
"""
Input:
mavs (channel, C)
features: (N, channel, C)
Output:
channel_distances: dict of distance distribution from MAV for each channel.
"""
eucos_dists, eu_dists, cos_dists = [], [], []
for channel, mcv in enumerate(mavs): # Compute channel specific distances
eu_dists.append([spd.euclidean(mcv, feat[channel]) for feat in features]) # N elements
cos_dists.append([spd.cosine(mcv, feat[channel]) for feat in features])
eucos_dists.append([spd.euclidean(mcv, feat[channel]) * eu_weight +
spd.cosine(mcv, feat[channel]) for feat in features])
return {'eucos': np.array(eucos_dists), 'cosine': np.array(cos_dists), 'euclidean': np.array(eu_dists)}
def compute_train_score_and_mavs_and_dists(train_class_num, trainloader, device,net):
scores = [[] for _ in range(train_class_num)]
with torch.no_grad():
for batch_idx, (inputs, targets) in enumerate(trainloader):
inputs, targets = inputs.to(device), targets.to(device)
outputs = net(inputs)
for score, t in zip(outputs, targets):
# print(f"torch.argmax(score) is {torch.argmax(score)}, t is {t}")
if torch.argmax(score) == t:
scores[t].append(score.unsqueeze(dim=0).unsqueeze(dim=0)) # output: torch.Size([1, 1, 50])
scores = [torch.cat(x).cpu().numpy() for x in scores] # (N_c, 1, C) * C
mavs = np.array([np.mean(x, axis=0) for x in scores]) # (C, 1, C)
dists = [compute_channel_distances(mcv, score) for mcv, score in zip(mavs, scores)] #
return scores, mavs, dists