Source code for pyswarms.single.global_best

# -*- coding: utf-8 -*-

r"""
A Global-best Particle Swarm Optimization (gbest PSO) algorithm.

It takes a set of candidate solutions, and tries to find the best
solution using a position-velocity update method. Uses a
star-topology where each particle is attracted to the best
performing particle.

The position update can be defined as:

.. math::

   x_{i}(t+1) = x_{i}(t) + v_{i}(t+1)

Where the position at the current timestep :math:`t` is updated using
the computed velocity at :math:`t+1`. Furthermore, the velocity update
is defined as:

.. math::

   v_{ij}(t + 1) = w * v_{ij}(t) + c_{1}r_{1j}(t)[y_{ij}(t) − x_{ij}(t)]
                   + c_{2}r_{2j}(t)[\hat{y}_{j}(t) − x_{ij}(t)]

Here, :math:`c1` and :math:`c2` are the cognitive and social parameters
respectively. They control the particle's behavior given two choices: (1) to
follow its *personal best* or (2) follow the swarm's *global best* position.
Overall, this dictates if the swarm is explorative or exploitative in nature.
In addition, a parameter :math:`w` controls the inertia of the swarm's
movement.

An example usage is as follows:

.. code-block:: python

    import pyswarms as ps
    from pyswarms.utils.functions import single_obj as fx

    # Set-up hyperparameters
    options = {'c1': 0.5, 'c2': 0.3, 'w':0.9}

    # Call instance of GlobalBestPSO
    optimizer = ps.single.GlobalBestPSO(n_particles=10, dimensions=2,
                                        options=options)

    # Perform optimization
    stats = optimizer.optimize(fx.sphere, iters=100)

This algorithm was adapted from the earlier works of J. Kennedy and
R.C. Eberhart in Particle Swarm Optimization [IJCNN1995]_.

.. [IJCNN1995] J. Kennedy and R.C. Eberhart, "Particle Swarm Optimization,"
    Proceedings of the IEEE International Joint Conference on Neural
    Networks, 1995, pp. 1942-1948.
"""

# Import standard library
import logging

# Import modules
import numpy as np
import multiprocessing as mp

from collections import deque

from ..backend.operators import compute_pbest, compute_objective_function
from ..backend.topology import Star
from ..backend.handlers import BoundaryHandler, VelocityHandler, OptionsHandler
from ..base import SwarmOptimizer
from ..utils.reporter import Reporter


[docs]class GlobalBestPSO(SwarmOptimizer):
[docs] def __init__( self, n_particles, dimensions, options, bounds=None, oh_strategy=None, bh_strategy="periodic", velocity_clamp=None, vh_strategy="unmodified", center=1.00, ftol=-np.inf, ftol_iter=1, init_pos=None, ): """Initialize the swarm Attributes ---------- n_particles : int number of particles in the swarm. dimensions : int number of dimensions in the space. options : dict with keys :code:`{'c1', 'c2', 'w'}` a dictionary containing the parameters for the specific optimization technique. * c1 : float cognitive parameter * c2 : float social parameter * w : float inertia parameter bounds : tuple of numpy.ndarray, optional a tuple of size 2 where the first entry is the minimum bound while the second entry is the maximum bound. Each array must be of shape :code:`(dimensions,)`. oh_strategy : dict, optional, default=None(constant options) a dict of update strategies for each option. bh_strategy : str a strategy for the handling of out-of-bounds particles. velocity_clamp : tuple, optional a tuple of size 2 where the first entry is the minimum velocity and the second entry is the maximum velocity. It sets the limits for velocity clamping. vh_strategy : str a strategy for the handling of the velocity of out-of-bounds particles. center : list (default is :code:`None`) an array of size :code:`dimensions` ftol : float relative error in objective_func(best_pos) acceptable for convergence. Default is :code:`-np.inf` ftol_iter : int number of iterations over which the relative error in objective_func(best_pos) is acceptable for convergence. Default is :code:`1` init_pos : numpy.ndarray, optional option to explicitly set the particles' initial positions. Set to :code:`None` if you wish to generate the particles randomly. """ super(GlobalBestPSO, self).__init__( n_particles=n_particles, dimensions=dimensions, options=options, bounds=bounds, velocity_clamp=velocity_clamp, center=center, ftol=ftol, ftol_iter=ftol_iter, init_pos=init_pos, ) if oh_strategy is None: oh_strategy = {} # Initialize logger self.rep = Reporter(logger=logging.getLogger(__name__)) # Initialize the resettable attributes self.reset() # Initialize the topology self.top = Star() self.bh = BoundaryHandler(strategy=bh_strategy) self.vh = VelocityHandler(strategy=vh_strategy) self.oh = OptionsHandler(strategy=oh_strategy) self.name = __name__
[docs] def optimize( self, objective_func, iters, n_processes=None, verbose=True, **kwargs ): """Optimize the swarm for a number of iterations Performs the optimization to evaluate the objective function :code:`f` for a number of iterations :code:`iter.` Parameters ---------- objective_func : callable objective function to be evaluated iters : int number of iterations n_processes : int number of processes to use for parallel particle evaluation (default: None = no parallelization) verbose : bool enable or disable the logs and progress bar (default: True = enable logs) kwargs : dict arguments for the objective function Returns ------- tuple the global best cost and the global best position. """ # Apply verbosity if verbose: log_level = logging.INFO else: log_level = logging.NOTSET self.rep.log("Obj. func. args: {}".format(kwargs), lvl=logging.DEBUG) self.rep.log( "Optimize for {} iters with {}".format(iters, self.options), lvl=log_level, ) # Populate memory of the handlers self.bh.memory = self.swarm.position self.vh.memory = self.swarm.position # Setup Pool of processes for parallel evaluation pool = None if n_processes is None else mp.Pool(n_processes) self.swarm.pbest_cost = np.full(self.swarm_size[0], np.inf) ftol_history = deque(maxlen=self.ftol_iter) for i in self.rep.pbar(iters, self.name) if verbose else range(iters): # Compute cost for current position and personal best # fmt: off self.swarm.current_cost = compute_objective_function(self.swarm, objective_func, pool=pool, **kwargs) self.swarm.pbest_pos, self.swarm.pbest_cost = compute_pbest(self.swarm) # Set best_cost_yet_found for ftol best_cost_yet_found = self.swarm.best_cost self.swarm.best_pos, self.swarm.best_cost = self.top.compute_gbest(self.swarm) # fmt: on if verbose: self.rep.hook(best_cost=self.swarm.best_cost) # Save to history hist = self.ToHistory( best_cost=self.swarm.best_cost, mean_pbest_cost=np.mean(self.swarm.pbest_cost), mean_neighbor_cost=self.swarm.best_cost, position=self.swarm.position, velocity=self.swarm.velocity, ) self._populate_history(hist) # Verify stop criteria based on the relative acceptable cost ftol relative_measure = self.ftol * (1 + np.abs(best_cost_yet_found)) delta = ( np.abs(self.swarm.best_cost - best_cost_yet_found) < relative_measure ) if i < self.ftol_iter: ftol_history.append(delta) else: ftol_history.append(delta) if all(ftol_history): break # Perform options update self.swarm.options = self.oh( self.options, iternow=i, itermax=iters ) # Perform velocity and position updates self.swarm.velocity = self.top.compute_velocity( self.swarm, self.velocity_clamp, self.vh, self.bounds ) self.swarm.position = self.top.compute_position( self.swarm, self.bounds, self.bh ) # Obtain the final best_cost and the final best_position final_best_cost = self.swarm.best_cost.copy() final_best_pos = self.swarm.pbest_pos[ self.swarm.pbest_cost.argmin() ].copy() # Write report in log and return final cost and position self.rep.log( "Optimization finished | best cost: {}, best pos: {}".format( final_best_cost, final_best_pos ), lvl=log_level, ) # Close Pool of Processes if n_processes is not None: pool.close() return (final_best_cost, final_best_pos)