#++++++++++++++++++++++++++++++++++++
# Libraries and modules
#++++++++++++++++++++++++++++++++++++
import numpy as np
import copy
from .bee import Bee
from tqdm import trange,tqdm
#++++++++++++++++++++++++++++++++++++
# Artificial Bee Colony (ABC) class
#++++++++++++++++++++++++++++++++++++
[docs]
class ArtificialBeeColony():
"""
Artificial Bee Colony (ABC) class
Attributes:
colony_size (int) : The total number of bees in the colony.
n_employed_bees (int) : The number of employed bees.
n_onlooker_bees (int) : The number of onlooker bees.
max_scouts (int) : The maximum number of scout bees per iteration. Defaults to None (will be set to n_employed_bees).
dim (int) : The dimensionality of the search space.
function (callable) : The objective function to optimize.
bounds (array-like) : The bounds for each dimension of the search space, provided as a 2D array [(lower1, upper1), ..., (lowerD, upperD)].
employed_bees (list[Bee]) : The employed bees in the colony.
onlooker_bees (list[Bee]) : The onlooker bees in the colony.
colony_history (list[list[Bee]]) : The history of the employed bees at each iteration.
optimal_bee (Bee) : The optimal bee in the colony.
optimal_bee_history (list[Bee]) : The history of the optimal bee at each iteration.
max_iters (int) : The maximum number of iterations. Defaults to 1000.
actual_iters (int) : The actual number of iterations.
limit (int) : The trial limit for scout bees. If 'default', it is set to 0.6 * n_employed_bees * dimensionality. Defaults to 'default'.
selection (str, optional) : The selection strategy for onlooker bees. Must be one among 'RouletteWheel' and 'Tournament'. Defaults to 'RouletteWheel'.
mutation (str) : The mutation strategy. Must be one among 'StandardABC', 'ModifiedABC', 'ABC/best/1', 'ABC/best/2' and 'DirectedABC'. Defaults to 'StandardABC'.
initialization (str) : The initialization strategy for the bee population. Must be one among 'random' and 'cahotic'. Defaults to 'random'.
stagnation_tol (float) : The tolerance for stagnation in fitness values to trigger early termination. Defaults to np.NINF (i.e. stagnation disabled).
sf (float) : The scaling factor for mutations. Defaults to 1.0.
initial_sf (float) : The initial scaling factor. Defaults to 1.0.
self_adaptive_sf (bool) : Whether to use a self-adaptive scaling factor. Defaults to False.
mr (float) : The mutation rate for 'ModifiedABC' strategy. Defaults to 0.7.
.. note::
To ensure compatibility with all the mutation types, the bee colony must have at least 5 employed bees and at least 5 onlokeer bees.
"""
#------------------------------------------------------------------------------------------------------------------
def __init__(self,colony_size,function,bounds,n_employed_bees=None,max_scouts=None):
"""
Initializes the ABC
Args:
colony_size (int) : The total number of bees in the colony.
function (callable) : The objective function to optimize.
bounds (array-like) : The bounds for each dimension of the search space, provided as a 2D array [(lower1, upper1), ..., (lowerD, upperD)].
n_employed_bees (int, optional) : The number of employed bees. Defaults to half the total number of bees.
max_scouts (int, optional) : The maximum number of scout bees per iteration. Defaults to None (will be set to n_employed_bees).
Raises:
TypeError : If the function is not callable.
TypeError : If the bounds are not a numpy array.
ValueError : If the bounds do not have shape `(D, 2)` or `(2, D)`.
ValueError : If any dimension has its lower bound greater than the upper bound.
TypeError : If the colony size is not an integer.
ValueError : If the colony size is less than 10.
TypeError : If the number of employed bees is not an integer.
ValueError : If the number of employed bees is less than 5 or it is s.t the number of onlookers is less than 5.
TypeError : If the maximum number of scouts is not an integer.
ValueError : If the maximum number of scouts is less than 0 or greater than the number of employed bees.
.. note::
Constraints about colony size, n_employed_bees and max_scouts ensure compatibility across all mutation types.
"""
if not callable(function):
raise TypeError("`function` must be callable.")
self.function = function
if not isinstance(bounds, np.ndarray):
raise TypeError("`bounds` must be a numpy array.")
if not ((bounds.shape[0] == 2) or (bounds.shape[1] == 2)):
raise ValueError("`bounds` must have shape `(D, 2)` or `(2, D)`, but got {bounds.shape}.")
if not np.all(bounds.reshape(-1,2)[:, 0] <= bounds.reshape(-1,2)[:, 1]):
raise ValueError("Each lower bound must be less than or equal to its upper bound.")
self.bounds = bounds.reshape(-1,2)
self.dim = self.bounds.shape[0]
if not isinstance(colony_size,int):
raise TypeError("`colony_size` must be an integer.")
if colony_size < 10:
raise ValueError(f"`colony_size` must be at least 10 to ensure compatibility across all mutation types, but got {colony_size}")
self.colony_size = colony_size
if n_employed_bees is not None:
if not isinstance(n_employed_bees,int):
raise TypeError("`n_employed_bees` must be an integer.")
if not ((n_employed_bees >= 5) and ((self.colony_size-n_employed_bees)>=5)):
raise ValueError("It must hold 5 <= `n_employed_bees` <= (`colony_size` - 5) to ensure compatibility across all mutation types")
self.n_employed_bees = n_employed_bees
else:
self.n_employed_bees = self.colony_size // 2
self.n_onlooker_bees = self.colony_size - self.n_employed_bees
if max_scouts is not None:
if not isinstance(max_scouts,int):
raise TypeError("`max_scouts` must be an integer.")
if not ( (max_scouts >= 0) and (max_scouts <= self.n_employed_bees) ):
raise ValueError(f"`max_scouts` must be beteen 0 and `n_employed_bees`, but got {max_scouts} and {self.n_employed_bees}, respectively.")
self.max_scouts = max_scouts
else:
self.max_scouts = self.n_employed_bees
self.employed_bees = []
self.onlooker_bees = []
self.colony_history = []
self.optimal_bee = None
self.optimal_bee_history = []
#------------------------------------------------------------------------------------------------------------------
[docs]
def optimize(self,
max_iters = 1000,
limit = 'default',
selection = 'RouletteWheel',
mutation = 'StandardABC',
initialization = 'random',
tournament_size = None,
stagnation_tol = np.NINF,
sf = 1.0,
self_adaptive_sf = False,
mr = 1.0,
verbose = False,
random_seed = None):
"""
Runs the optimization process.
Args:
max_iters (int, optional) : The maximum number of iterations. Defaults to 1000.
limit (int or str, optional) : The trial limit for scout bees. If 'default', it is set to 0.6 * n_employed_bees * dimensionality. Defaults to 'default'.
selection (str, optional) : The selection strategy for onlooker bees. Must be one among 'RouletteWheel' and 'Tournament'. Defaults to 'RouletteWheel'.
mutation (str, optional) : The mutation strategy. Must be one among 'StandardABC', 'ModifiedABC', 'ABC/best/1' and 'ABC/best/2'. Defaults to 'StandardABC'.
initialization (str, optional) : The initialization strategy for the bee population. Must be one among 'random' and 'cahotic'. Defaults to 'random'.
tournament_size (int, optional) : The size of the tournament for the 'Tournament' selection strategy. Defaults to None.
stagnation_tol (float, optional) : The tolerance for stagnation in fitness values to trigger early termination. Defaults to np.NINF (i.e. stagnation disabled).
sf (float, optional) : The scaling factor for mutations. Defaults to 1.0.
self_adaptive_sf (bool, optional): Whether to use a self-adaptive scaling factor. Defaults to False.
mr (float, optional) : The mutation rate for 'ModifiedABC' strategy. Defaults to 1.0.
verbose (bool, optional) : Whether to display optimization progress. Defaults to False.
random_seed (int, optional) : The seed for random number generation. Defaults to None.
Raises:
ValueError: If `max_iters` is not a positive integer.
ValueError: If `mutation` is not one of ['StandardABC', 'ModifiedABC', 'ABC/best/1', 'ABC/best/2', 'DirectedABC'].
ValueError: If `initialization` is not one of ['random', 'cahotic'].
ValueError: If `selection` is not one of ['RouletteWheel', 'Tournament'].
ValueError: If `mr` is not a float value between 0.0 and 1.0.
ValueError: If `selection` is 'Tournament' and `tournament_size` is not an integer between 1 and `n_employed_bees`.
ValueError: If `sf` is not a float value greater than 0.
ValueError: If `limit` is less than or equal to 0.
TypeError : If `self_adaptive_sf` is not a boolean.
TypeError : If `stagnation_tol` is not a float.
"""
# Sanity checks and setting optimization parameters
#.....................................................................................................................................
if not (isinstance(max_iters, int) and max_iters >= 0):
raise ValueError("`max_iters` must be a positive integer.")
self.max_iters = max_iters
self.actual_iters = 0
valid_mutations_ = ['StandardABC', 'ModifiedABC', 'ABC/best/1', 'ABC/best/2', 'DirectedABC']
if mutation not in valid_mutations_:
raise ValueError(f"{mutation} is an invalid mutation strategy. Choose one among {', '.join(valid_mutations_)}.")
self.mutation = mutation
valid_initializations_ = ['random','cahotic']
if initialization not in valid_initializations_:
raise ValueError(f"{initialization} is an invalid intialization. Choose one among {', '.join(valid_initializations_)}.")
self.initialization = initialization
valid_selections_ = ['RouletteWheel','Tournament']
if selection not in valid_selections_:
raise ValueError(f"{selection} is an invalid selection. Choose one among {', '.join(valid_selections_)}.")
self.selection = selection
if not (isinstance(mr,float) and (mr>=0) and (mr<= 1.0)):
raise ValueError("`mr` must be a float value between 0.0 and 1.0")
self.mr = mr
if self.selection == 'Tournament':
if tournament_size is None:
raise ValueError("`Please specify a tournament_size`.")
if not (isinstance(tournament_size, int) and (1 <= tournament_size <= self.n_employed_bees)):
raise ValueError("`tournament_size` must be an integer between 1 and `n_employed_bees` when using 'Tournament' selection.")
self.tournament_size = tournament_size
if not (isinstance(sf,(int,float)) and (sf>0.0)):
raise ValueError(f"`sf` must be greater than 0, but got {sf}")
self.sf = sf
self.initial_sf = sf
if not isinstance(self_adaptive_sf,bool):
raise TypeError("`self_adaptive_sf` must be bool")
self.self_adaptive_sf = self_adaptive_sf
self.limit = limit if (limit != 'default') else (0.6 * self.n_employed_bees * self.dim)
if not (self.limit>0):
raise ValueError("`limit` must be greater than 0, but got {self.limit}. If this error occurs when `limit`='default', change your configuration.")
self.directions = np.full((self.n_employed_bees,self.dim),None)
if self.mutation=='DirectedABC':
self.directions = np.zeros((self.n_employed_bees,self.dim))
if not isinstance(stagnation_tol,float):
raise TypeError('`stagnation_tol` must be float')
self.stagnation_tol = stagnation_tol
if not isinstance(verbose,bool):
raise TypeError("`verbose` must be bool")
#.....................................................................................................................................
# Initialization
if random_seed:
np.random.seed(random_seed)
if self.initialization == 'random':
self.employed_bees = [Bee(position = np.random.uniform(self.bounds[:,0],self.bounds[:,1],self.dim),
function = self.function,
bounds = self.bounds) for _ in range(self.n_employed_bees) ]
elif self.initialization == 'cahotic':
# Define cahotic map and iterate over it
cahotic_map = np.random.rand(self.n_employed_bees,self.dim)
for _ in range(300):
cahotic_map = np.sin(cahotic_map * np.pi)
cahotic_pop = [Bee(position = self.bounds[:,0] + (self.bounds[:,1] - self.bounds[:,0]) * cahotic_map[i,:],
function = self.function,
bounds = self.bounds) for i in range(self.n_employed_bees) ]
opposite_pop = [Bee(position = self.bounds[:,0] + self.bounds[:,1] - cahotic_pop[i].position,
function = self.function,
bounds = self.bounds) for i in range(self.n_employed_bees) ]
self.employed_bees = sorted(cahotic_pop+opposite_pop, key=lambda bee: bee.fitness, reverse=True)[:self.n_employed_bees]
self.colony_history.append(copy.deepcopy(self.employed_bees))
self.optimal_bee = copy.deepcopy(max(self.employed_bees,key=lambda bee: bee.fitness))
self.optimal_bee_history.append(copy.deepcopy(self.optimal_bee))
#.....................................................................................................................................
# Optimization Loop
for _ in trange(self.max_iters,desc='Running Optimization',disable= not verbose,bar_format='{l_bar}{bar}|[{elapsed}<{remaining}]'):
self.actual_iters += 1
self.employees_phase_()
self.onlookers_phase_()
self.scouts_phase_()
self.colony_history.append(copy.deepcopy(self.employed_bees))
self.optimal_bee = copy.deepcopy(max(self.employed_bees,key=lambda bee: bee.fitness))
self.optimal_bee_history.append(copy.deepcopy(self.optimal_bee))
# Stagnation
if (np.std([bee.fitness for bee in self.employed_bees]) < self.stagnation_tol):
if verbose:
tqdm.write(f"Early termination: Optimization stagnated at iteration {self.actual_iters} / {self.max_iters}")
break
#.....................................................................................................................................
#------------------------------------------------------------------------------------------------------------------
[docs]
def employees_phase_(self):
"""
Performs the employed bees phase, where each employed bee explores the search space by generating candidate solutions.
.. note::
Updates the employed bees with better candidate solutions based on the greedy selection.
Adjusts the scaling factor if self-adaptive scaling is enabled.
"""
succesful_mutations = 0
for bee_idx, bee in enumerate(self.employed_bees):
candidate_bee = self.get_candidate_neighbor_(bee=bee,bee_idx=bee_idx,population=self.employed_bees)
# Greedy Selection
if candidate_bee.fitness >= bee.fitness:
self.employed_bees[bee_idx] = candidate_bee
succesful_mutations += 1
if self.mutation == 'DirectedABC':
if (bee.position != candidate_bee.position).any(): # this is needed when candidate_bee.fitness == bee.fitness
# Retrieve the index mutated and update the direction
j = np.where(bee.position != candidate_bee.position)[0][0]
self.directions[bee_idx,j] = np.sign(candidate_bee.position[j] - bee.position[j])
else:
bee.trial += 1
if self.mutation == 'DirectedABC':
# Retrieve the index mutated and update the direction
j = np.where(bee.position != candidate_bee.position)[0][0]
self.directions[bee_idx,j] = 0
if self.self_adaptive_sf:
self.update_SF_(succesful_mutations_ratio= (succesful_mutations / self.n_employed_bees) )
#------------------------------------------------------------------------------------------------------------------
[docs]
def waggle_dance_(self):
"""
Implements the waggle dance, which determines the probability of selecting employed bees for the onlooker phase.
Returns:
array: Indices of the selected employed bees (based on the chosen selection strategy).
"""
fitness_values = np.array([bee.fitness for bee in self.employed_bees])
if self.selection == 'RouletteWheel':
selection_probabilities = fitness_values / np.sum(fitness_values)
dance_winners = np.random.choice(range(self.n_employed_bees),size=self.n_onlooker_bees,p=selection_probabilities,replace=True)
return dance_winners
if self.selection == 'Tournament':
dance_winners = []
for _ in range(self.n_onlooker_bees):
tournament_indices = np.random.choice(range(self.n_employed_bees),size=self.tournament_size,replace=False)
tournament_fitness = fitness_values[tournament_indices]
winner_idx = tournament_indices[np.argmax(tournament_fitness)]
dance_winners.append(winner_idx)
return dance_winners
#------------------------------------------------------------------------------------------------------------------
[docs]
def onlookers_phase_(self):
"""
Performs the onlooker bees phase, where onlookers exploit the information shared by employed bees to explore the search space.
.. note::
Updates employed bees with better solutions discovered by onlookers.
Adjusts the scaling factor if self-adaptive scaling is enabled.
"""
dance_winners = self.waggle_dance_()
self.onlooker_bees = [Bee(position = self.employed_bees[winner_idx].position,
function = self.function,
bounds = self.bounds) for winner_idx in dance_winners
]
succesful_mutations = 0
for bee_idx, winner_idx in enumerate(dance_winners):
bee = self.onlooker_bees[bee_idx]
# Get Candidate Neighbor
candidate_bee = self.get_candidate_neighbor_(bee=bee,bee_idx=bee_idx,population=self.onlooker_bees)
# Greedy Selection
if candidate_bee.fitness >= bee.fitness:
self.employed_bees[winner_idx] = candidate_bee
succesful_mutations += 1
if self.mutation == 'DirectedABC':
if (bee.position != candidate_bee.position).any(): # this is needed when candidate_bee.fitness == bee.fitness
# Retrieve the index mutated and update the direction
j = np.where(bee.position != candidate_bee.position)[0][0]
self.directions[winner_idx,j] = np.sign(candidate_bee.position[j] - bee.position[j])
else:
if self.mutation == 'DirectedABC':
# Retrieve the index mutated and update the direction
j = np.where(bee.position != candidate_bee.position)[0][0]
self.directions[winner_idx,j] = 0
if self.self_adaptive_sf:
self.update_SF_(succesful_mutations_ratio= (succesful_mutations / self.n_onlooker_bees) )
#------------------------------------------------------------------------------------------------------------------
[docs]
def scouts_phase_(self):
"""
Performs the scout bees phase, where employed bees that exceed the trial limit are forced to explore a new solution
.. note::
Depending on the initialization strategy, scouts are reinitialized either randomly or using a chaotic map.
"""
n_scouts = 0
for bee_idx, bee in enumerate(self.employed_bees):
if n_scouts > self.max_scouts:
break
if bee.trial > self.limit:
n_scouts += 1
if self.initialization == 'random':
self.employed_bees[bee_idx] = Bee(position = np.random.uniform(self.bounds[:,0],self.bounds[:,1],self.dim),
function = self.function,
bounds = self.bounds)
elif self.initialization == 'cahotic':
cahotic_map = np.random.rand(self.n_employed_bees,self.dim)
for _ in range(300):
cahotic_map = np.sin(cahotic_map * np.pi)
cahotic_pop = [Bee(position = self.bounds[:,0] + (self.bounds[:,1] - self.bounds[:,0]) * cahotic_map[i,:],
function = self.function,
bounds = self.bounds) for i in range(self.n_employed_bees) ]
opposite_pop = [Bee(position = self.bounds[:,0] + self.bounds[:,1] - cahotic_pop[i].position,
function = self.function,
bounds = self.bounds) for i in range(self.n_employed_bees) ]
self.employed_bees[bee_idx] = sorted(cahotic_pop+opposite_pop,
key=lambda bee: bee.fitness, reverse=True)[0]
#------------------------------------------------------------------------------------------------------------------
[docs]
def get_candidate_neighbor_(self,bee,bee_idx,population):
"""
Generates a candidate neighbor solution for a given bee based on the chosen mutation strategy.
Parameters:
bee (Bee) : The bee for which a candidate neighbor is generated.
bee_idx (int) : The index of the bee in the population.
population (list) : The population of bees from which donors are selected (i.e., employed or onlooker bees).
Returns:
Bee: A new candidate bee solution.
"""
if self.mutation == 'StandardABC':
phi = np.random.uniform(-self.sf,self.sf)
donor_bee = self.get_donor_bees_(n_donors=1,bee_idx=bee_idx,population=population)[0]
candidate_bee = copy.deepcopy(bee)
j = np.random.randint(0,self.dim)
candidate_bee.position[j] = bee.position[j] + phi*(bee.position[j] - donor_bee.position[j])
candidate_bee.position[j] = np.clip(candidate_bee.position[j],self.bounds[j][0],self.bounds[j][1])
if self.mutation == 'ModifiedABC':
donor_bee = self.get_donor_bees_(n_donors=1,bee_idx=bee_idx,population=population)[0]
candidate_bee = copy.deepcopy(bee)
phi = np.random.uniform(-self.sf,self.sf,self.dim)
mutation_mask = np.random.uniform(size=self.dim) <= self.mr
candidate_bee.position[mutation_mask] = bee.position[mutation_mask] + phi[mutation_mask] * (bee.position[mutation_mask] - donor_bee.position[mutation_mask])
candidate_bee.position = np.clip(candidate_bee.position,self.bounds[:,0],self.bounds[:,1])
if self.mutation == 'ABC/best/1':
phi = np.random.uniform(-self.sf,self.sf)
donor1,donor2 = self.get_donor_bees_(n_donors=2,bee_idx=bee_idx,population=population)
candidate_bee = copy.deepcopy(bee)
j = np.random.randint(0,self.dim)
candidate_bee.position[j] = self.optimal_bee.position[j] + phi*(donor1.position[j] - donor2.position[j])
candidate_bee.position[j] = np.clip(candidate_bee.position[j],self.bounds[j][0],self.bounds[j][1])
if self.mutation == 'ABC/best/2':
phi = np.random.uniform(-self.sf,self.sf)
donor1,donor2,donor3,donor4 = self.get_donor_bees_(n_donors=4,bee_idx=bee_idx,population=population)
candidate_bee = copy.deepcopy(bee)
j = np.random.randint(0,self.dim)
candidate_bee.position[j] = self.optimal_bee.position[j] + phi*(donor1.position[j] - donor2.position[j]) \
+ phi*(donor3.position[j] - donor4.position[j])
candidate_bee.position[j] = np.clip(candidate_bee.position[j],self.bounds[j][0],self.bounds[j][1])
if self.mutation == 'DirectedABC':
donor_bee = self.get_donor_bees_(n_donors=1,bee_idx=bee_idx,population=population)[0]
candidate_bee = copy.deepcopy(bee)
directions = self.directions[bee_idx,:]
j = np.random.randint(0,self.dim)
r = (directions[j] == 0) * np.random.uniform(-self.sf,self.sf) + \
(directions[j] == 1) * np.random.uniform(0,self.sf) + \
(directions[j] == -1) * np.random.uniform(-self.sf,0)
candidate_bee.position[j] = bee.position[j] + r * np.abs(bee.position[j] - donor_bee.position[j])
candidate_bee.position[j] = np.clip(candidate_bee.position[j],self.bounds[j][0],self.bounds[j][1])
return candidate_bee
#------------------------------------------------------------------------------------------------------------------
[docs]
def get_donor_bees_(self,n_donors=1,bee_idx=None,population=None):
"""
Selects donor bees from the population (for a given bee)
Args:
n_donors (int, optional) : The number of donor bees to return. Defaults to 1.
bee_idx (int, optional) : The index of the bee for which donors are selected. Defaults to None.
population (list[Bee], optional) : The population of bees from which donors are selected. Defaults to None.
Returns:
list: A list of donor bees (as Bee instances).
"""
available_indices = np.delete(np.arange(len(population)), bee_idx)
k_list = np.random.choice(available_indices,size=n_donors,replace=False)
return [copy.deepcopy(population[k]) for k in k_list]
#------------------------------------------------------------------------------------------------------------------
[docs]
def update_SF_(self,succesful_mutations_ratio):
"""
Updates the scaling factor (SF) adaptively based on the ratio of successful mutations
Args:
successful_mutations_ratio(float): The ratio of successful mutations used to update the scaling factor.
.. note::
Self-adaptiveness is based on the "one-fifth" rule (Rechenberg 1971)
"""
if succesful_mutations_ratio > 1/5:
self.sf = self.sf / 0.85
elif succesful_mutations_ratio < 1/5:
self.sf = self.sf * 0.85
#------------------------------------------------------------------------------------------------------------------
[docs]
def reset(self):
"""
Resets the ABC object to its initial state.
"""
self.employed_bees = []
self.onlooker_bees = []
self.colony_history = []
self.optimal_bee = None
self.optimal_bee_history = []
self.actual_iters = 0
self.sf = self.initial_sf
#------------------------------------------------------------------------------------------------------------------