Spaces:
Sleeping
Sleeping
File size: 10,303 Bytes
81a5d0a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
from src.hw_nats_fast_interface import HW_NATS_FastInterface
from src.genetics import FastIndividual
from src.genetics import Genetic, Population
from typing import Iterable, Union, Text
import numpy as np
from collections import OrderedDict
FreeREA_dict = {
"n": 5, # tournament size
"N": 25, # population size
"mutation_prob": 1., # always mutates
"recombination_prob": 1., # always recombines
"P_parent1": 0.5, # fraction of child that comes from parent1 (on average)
"n_mutations": 1, # how many loci to mutate at a time
"loci_prob": None, # the probability of mutating a given locus (if None, uniform)
}
class GeneticSearch:
def __init__(self,
searchspace:HW_NATS_FastInterface,
genetics_dict:dict=FreeREA_dict,
init_population:Union[None, Iterable[FastIndividual]]=None,
fitness_weights:Union[None, np.ndarray]=np.array([0.5, 0.5])):
# instantiating a searchspace instance
self.searchspace = searchspace
# instatiating the dataset based on searchspace
self.dataset = self.searchspace.dataset
# instatiating the device based on searchspace
self.target_device = self.searchspace.target_device
# hardware aware scores changes based on whether or not one uses a given target device
if self.target_device is None:
self.hw_scores = ["flops", "params"]
else:
self.hw_scores = [f"{self.target_device}_energy"]
# scores used to evaluate the architectures on downstream tasks
self.classification_scores = ["naswot_score", "logsynflow_score", "skip_score"]
self.genetics_dict = genetics_dict
# weights used to combine classification performance with hardware performance.
self.weights = fitness_weights
# instantiating a population
self.population = Population(
searchspace=self.searchspace,
init_population=True if init_population is None else init_population,
n_individuals=self.genetics_dict["N"],
normalization="dynamic"
)
# initialize the object taking care of performing genetic operations
self.genetic_operator = Genetic(
genome=self.searchspace.all_ops,
strategy="comma", # population evolution strategy
tournament_size=self.genetics_dict["n"],
searchspace=self.searchspace
)
# preprocess population
self.preprocess_population()
def normalize_score(self, score_value:float, score_name:Text, type:Text="std")->float:
"""
Normalize the given score value using a specified normalization type.
Args:
score_value (float): The score value to be normalized.
score_name (Text): The name of the score used for normalization.
type (Text, optional): The type of normalization to be applied. Defaults to "std".
Returns:
float: The normalized score value.
Raises:
ValueError: If the specified normalization type is not available.
Note:
The available normalization types are:
- "std": Standard score normalization using mean and standard deviation.
"""
if type == "std":
score_mean = self.searchspace.get_score_mean(score_name)
score_std = self.searchspace.get_score_std(score_name)
return (score_value - score_mean) / score_std
else:
raise ValueError(f"Normalization type {type} not available!")
def fitness_function(self, individual:FastIndividual)->FastIndividual:
"""
Directly overwrites the fitness attribute for a given individual.
Args:
individual (FastIndividual): Individual to score.
# Returns:
# FastIndividual: Individual, with fitness field.
"""
if individual.fitness is None: # None at initialization only
scores = np.array([
self.normalize_score(
score_value=self.searchspace.list_to_score(input_list=individual.genotype,
score=score),
score_name=score
)
for score in self.classification_scores
])
hardware_performance = np.array([
self.normalize_score(
score_value=self.searchspace.list_to_score(input_list=individual.genotype,
score=score),
score_name=score
)
for score in self.hw_scores
])
# individual fitness is a convex combination of multiple scores
network_score = (np.ones_like(scores) / len(scores)) @ scores
network_hardware_performance = (np.ones_like(hardware_performance) / len(hardware_performance)) @ hardware_performance
# in the hardware aware contest performance is in a direct tradeoff with hardware performance
individual._fitness = np.array([network_score, -network_hardware_performance]) @ self.weights
# return individual
def preprocess_population(self):
"""
Applies scoring and fitness function to the whole population. This allows each individual to
have the appropriate fields.
"""
# assign the fitness score
self.assign_fitness()
def perform_mutation(
self,
individual:FastIndividual,
)->FastIndividual:
"""Performs mutation with respect to genetic ops parameters"""
realization = np.random.random()
if realization <= self.genetics_dict["mutation_prob"]: # do mutation
mutant = self.genetic_operator.mutate(
individual=individual,
n_loci=self.genetics_dict["n_mutations"],
genes_prob=self.genetics_dict["loci_prob"]
)
return mutant
else: # don't do mutation
return individual
def perform_recombination(
self,
parents:Iterable[FastIndividual],
)->FastIndividual:
"""Performs recombination with respect to genetic ops parameters"""
realization = np.random.random()
if realization <= self.genetics_dict["recombination_prob"]: # do recombination
child = self.genetic_operator.recombine(
individuals=parents,
P_parent1=self.genetics_dict["P_parent1"]
)
return child
else: # don't do recombination - simply return 1st parent
return parents[0]
def assign_fitness(self):
"""This function assigns to each invidual a given fitness score."""
# define a fitness function and compute fitness for each individual
fitness_function = lambda individual: self.fitness_function(individual=individual)
self.population.update_fitness(fitness_function=fitness_function)
def obtain_parents(self, n_parents:int=2):
# obtain tournament
tournament = self.genetic_operator.tournament(population=self.population.individuals)
# turn tournament into a local population
parents = sorted(tournament, key = lambda individual: individual.fitness, reverse=True)[:n_parents]
return parents
def solve(self, max_generations:int=100, return_trajectory:bool=False)->Union[FastIndividual, float]:
"""
This function performs Regularized Evolutionary Algorithm (REA) with Training-Free metrics.
Details on the whole procedure can be found in FreeREA (https://arxiv.org/pdf/2207.05135.pdf).
Args:
max_generations (int, optional): TODO - ADD DESCRIPTION. Defaults to 100.
Returns:
Union[FastIndividual, float]: Index-0 points to best individual object whereas Index-1 refers to its test
accuracy.
"""
MAX_GENERATIONS = max_generations
population, individuals = self.population, self.population.individuals
bests = []
history = OrderedDict()
for gen in range(MAX_GENERATIONS):
# store the population
history.update({self.searchspace.list_to_architecture(ind.genotype): ind for ind in population})
# save best individual
bests.append(max(individuals, key=lambda ind: ind.fitness))
# perform ageing
population.age()
# obtain parents
parents = self.obtain_parents()
# obtain recombinant child
child = self.perform_recombination(parents=parents)
# mutate parents
mutant1, mutant2 = [self.perform_mutation(parent) for parent in parents]
# add mutants and child to population
population.add_to_population([child, mutant1, mutant2])
# preprocess the new population - TODO: Implement a only-if-extremes-change strategy
self.preprocess_population()
# remove from population worst (from fitness perspective) individuals
population.remove_from_population(attribute="fitness", n=2)
# prune from population oldest individual
population.remove_from_population(attribute="age", ascending=False)
# overwrite population
individuals = population.individuals
best_individual = max(history.values(), key=lambda ind: ind._fitness)
# appending in last position the actual best element
bests.append(best_individual)
test_accuracy = self.searchspace.list_to_accuracy(best_individual.genotype)
if not return_trajectory:
return (best_individual, test_accuracy)
else:
return (best_individual, test_accuracy, bests, len(history))
|