Source code for desdeo_emo.selection.NSGAIII_select

import numpy as np

# from pygmo import fast_non_dominated_sorting as nds
from desdeo_tools.utilities import fast_non_dominated_sort
from typing import List
from desdeo_emo.selection.SelectionBase import InteractiveDecompositionSelectionBase
from desdeo_emo.population.Population import Population


[docs]class NSGAIII_select(InteractiveDecompositionSelectionBase): """The NSGA-III selection operator. Code is heavily based on the version of nsga3 in the pymoo package by msu-coinlab. Parameters ---------- pop : Population [description] n_survive : int, optional [description], by default None """ def __init__( self, pop: Population, n_survive: int = None, selection_type: str = None ): super().__init__(pop.pop_size, pop.problem.n_of_fitnesses, selection_type) self.worst_fitness: np.ndarray = -np.full((1, pop.fitness.shape[1]), np.inf) self.extreme_points: np.ndarray = None if n_survive is None: self.n_survive: int = pop.pop_size if selection_type is None: selection_type = "mean" self.selection_type = selection_type self.ideal: np.ndarray = pop.ideal_fitness_val
[docs] def do(self, pop: Population) -> List[int]: """Select individuals for mating for NSGA-III. Parameters ---------- pop : Population The current population. Returns ------- List[int] List of indices of the selected individuals """ ref_dirs = self.vectors.values_planar fitness = self._calculate_fitness(pop) # Calculating fronts and ranks # fronts, dl, dc, rank = nds(fitness) fronts = fast_non_dominated_sort(fitness) fronts = [np.where(fronts[i])[0] for i in range(len(fronts))] non_dominated = fronts[0] fmin = np.amin(fitness, axis=0) self.ideal = np.amin(np.vstack((self.ideal, fmin)), axis=0) # Calculating worst points self.worst_fitness = np.amax(np.vstack((self.worst_fitness, fitness)), axis=0) worst_of_population = np.amax(fitness, axis=0) worst_of_front = np.max(fitness[non_dominated, :], axis=0) self.extreme_points = self.get_extreme_points_c( fitness[non_dominated, :], self.ideal, extreme_points=self.extreme_points ) nadir_point = self.get_nadir_point( self.extreme_points, self.ideal, self.worst_fitness, worst_of_population, worst_of_front, ) # Finding individuals in first 'n' fronts selection = np.asarray([], dtype=int) for front_id in range(len(fronts)): if len(np.concatenate(fronts[: front_id + 1])) < self.n_survive: continue else: fronts = fronts[: front_id + 1] selection = np.concatenate(fronts) break F = fitness[selection] last_front = fronts[-1] # Selecting individuals from the last acceptable front. if len(selection) > self.n_survive: niche_of_individuals, dist_to_niche = self.associate_to_niches( F, ref_dirs, self.ideal, nadir_point ) # if there is only one front if len(fronts) == 1: n_remaining = self.n_survive until_last_front = np.array([], dtype=np.int) niche_count = np.zeros(len(ref_dirs), dtype=np.int) # if some individuals already survived else: until_last_front = np.concatenate(fronts[:-1]) id_until_last_front = list(range(len(until_last_front))) niche_count = self.calc_niche_count( len(ref_dirs), niche_of_individuals[id_until_last_front] ) n_remaining = self.n_survive - len(until_last_front) last_front_selection_id = list(range(len(until_last_front), len(selection))) if np.any(selection[last_front_selection_id] != last_front): print("error!!!") selected_from_last_front = self.niching( fitness[last_front, :], n_remaining, niche_count, niche_of_individuals[last_front_selection_id], dist_to_niche[last_front_selection_id], ) final_selection = np.concatenate( (until_last_front, last_front[selected_from_last_front]) ) if self.extreme_points is None: print("Error") if final_selection is None: print("Error") else: final_selection = selection return final_selection.astype(int)
[docs] def get_extreme_points_c(self, F, ideal_point, extreme_points=None): """Taken from pymoo""" # calculate the asf which is used for the extreme point decomposition asf = np.eye(F.shape[1]) asf[asf == 0] = 1e6 # add the old extreme points to never loose them for normalization _F = F if extreme_points is not None: _F = np.concatenate([extreme_points, _F], axis=0) # use __F because we substitute small values to be 0 __F = _F - ideal_point __F[__F < 1e-3] = 0 # update the extreme points for the normalization having the highest asf value # each F_asf = np.max(__F * asf[:, None, :], axis=2) I = np.argmin(F_asf, axis=1) extreme_points = _F[I, :] return extreme_points
[docs] def get_nadir_point( self, extreme_points, ideal_point, worst_point, worst_of_front, worst_of_population, ): LinAlgError = np.linalg.LinAlgError try: # find the intercepts using gaussian elimination M = extreme_points - ideal_point b = np.ones(extreme_points.shape[1]) plane = np.linalg.solve(M, b) intercepts = 1 / plane nadir_point = ideal_point + intercepts if ( not np.allclose(np.dot(M, plane), b) or np.any(intercepts <= 1e-6) or np.any(nadir_point > worst_point) ): raise LinAlgError() except LinAlgError: nadir_point = worst_of_front b = nadir_point - ideal_point <= 1e-6 nadir_point[b] = worst_of_population[b] return nadir_point
[docs] def niching(self, F, n_remaining, niche_count, niche_of_individuals, dist_to_niche): survivors = [] # boolean array of elements that are considered for each iteration mask = np.full(F.shape[0], True) while len(survivors) < n_remaining: # all niches where new individuals can be assigned to next_niches_list = np.unique(niche_of_individuals[mask]) # pick a niche with minimum assigned individuals - break tie if necessary next_niche_count = niche_count[next_niches_list] next_niche = np.where(next_niche_count == next_niche_count.min())[0] next_niche = next_niches_list[next_niche] next_niche = next_niche[np.random.randint(0, len(next_niche))] # indices of individuals that are considered and assign to next_niche next_ind = np.where( np.logical_and(niche_of_individuals == next_niche, mask) )[0] # shuffle to break random tie (equal perp. dist) or select randomly np.random.shuffle(next_ind) if niche_count[next_niche] == 0: next_ind = next_ind[np.argmin(dist_to_niche[next_ind])] else: # already randomized through shuffling next_ind = next_ind[0] mask[next_ind] = False survivors.append(int(next_ind)) niche_count[next_niche] += 1 return survivors
[docs] def associate_to_niches( self, F, ref_dirs, ideal_point, nadir_point, utopian_epsilon=0.0 ): utopian_point = ideal_point - utopian_epsilon denom = nadir_point - utopian_point denom[denom == 0] = 1e-12 # normalize by ideal point and intercepts N = (F - utopian_point) / denom dist_matrix = self.calc_perpendicular_distance(N, ref_dirs) niche_of_individuals = np.argmin(dist_matrix, axis=1) dist_to_niche = dist_matrix[np.arange(F.shape[0]), niche_of_individuals] return niche_of_individuals, dist_to_niche
[docs] def calc_niche_count(self, n_niches, niche_of_individuals): niche_count = np.zeros(n_niches, dtype=np.int) index, count = np.unique(niche_of_individuals, return_counts=True) niche_count[index] = count return niche_count
[docs] def calc_perpendicular_distance(self, N, ref_dirs): u = np.tile(ref_dirs, (len(N), 1)) v = np.repeat(N, len(ref_dirs), axis=0) norm_u = np.linalg.norm(u, axis=1) scalar_proj = np.sum(v * u, axis=1) / norm_u proj = scalar_proj[:, None] * u / norm_u[:, None] val = np.linalg.norm(proj - v, axis=1) matrix = np.reshape(val, (len(N), len(ref_dirs))) return matrix
[docs] def _calculate_fitness(self, pop) -> np.ndarray: if self.selection_type == "mean": return pop.fitness if self.selection_type == "optimistic": return pop.fitness - pop.uncertainity if self.selection_type == "robust": return pop.fitness + pop.uncertainity