Source code for pyswarms.backend.operators

# -*- coding: utf-8 -*-

"""
Swarm Operation Backend

This module abstracts various operations in the swarm such as updating
the personal best, finding neighbors, etc. You can use these methods
to specify how the swarm will behave.
"""

# Import standard library
import logging

# Import modules
import numpy as np

from ..utils.reporter import Reporter
from .handlers import BoundaryHandler, VelocityHandler
from functools import partial


rep = Reporter(logger=logging.getLogger(__name__))


[docs]def compute_pbest(swarm): """Update the personal best score of a swarm instance You can use this method to update your personal best positions. .. code-block:: python import pyswarms.backend as P from pyswarms.backend.swarms import Swarm my_swarm = P.create_swarm(n_particles, dimensions) # Inside the for-loop... for i in range(iters): # It updates the swarm internally my_swarm.pbest_pos, my_swarm.pbest_cost = P.update_pbest(my_swarm) It updates your :code:`current_pbest` with the personal bests acquired by comparing the (1) cost of the current positions and the (2) personal bests your swarm has attained. If the cost of the current position is less than the cost of the personal best, then the current position replaces the previous personal best position. Parameters ---------- swarm : pyswarms.backend.swarm.Swarm a Swarm instance Returns ------- numpy.ndarray New personal best positions of shape :code:`(n_particles, n_dimensions)` numpy.ndarray New personal best costs of shape :code:`(n_particles,)` """ try: # Infer dimensions from positions dimensions = swarm.dimensions # Create a 1-D and 2-D mask based from comparisons mask_cost = swarm.current_cost < swarm.pbest_cost mask_pos = np.repeat(mask_cost[:, np.newaxis], dimensions, axis=1) # Apply masks new_pbest_pos = np.where(~mask_pos, swarm.pbest_pos, swarm.position) new_pbest_cost = np.where( ~mask_cost, swarm.pbest_cost, swarm.current_cost ) except AttributeError: rep.logger.exception( "Please pass a Swarm class. You passed {}".format(type(swarm)) ) raise else: return (new_pbest_pos, new_pbest_cost)
[docs]def compute_velocity(swarm, clamp, vh, bounds=None): """Update the velocity matrix This method updates the velocity matrix using the best and current positions of the swarm. The velocity matrix is computed using the cognitive and social terms of the swarm. The velocity is handled by a :code:`VelocityHandler`. A sample usage can be seen with the following: .. code-block :: python import pyswarms.backend as P from pyswarms.swarms.backend import Swarm, VelocityHandler my_swarm = P.create_swarm(n_particles, dimensions) my_vh = VelocityHandler(strategy="invert") for i in range(iters): # Inside the for-loop my_swarm.velocity = compute_velocity(my_swarm, clamp, my_vh, bounds) Parameters ---------- swarm : pyswarms.backend.swarms.Swarm a Swarm instance clamp : tuple of floats, optional a tuple of size 2 where the first entry is the minimum velocity and the second entry is the maximum velocity. It sets the limits for velocity clamping. vh : pyswarms.backend.handlers.VelocityHandler a VelocityHandler object with a specified handling strategy. For further information see :mod:`pyswarms.backend.handlers`. bounds : tuple of numpy.ndarray or list, optional a tuple of size 2 where the first entry is the minimum bound while the second entry is the maximum bound. Each array must be of shape :code:`(dimensions,)`. Returns ------- numpy.ndarray Updated velocity matrix """ try: # Prepare parameters swarm_size = swarm.position.shape c1 = swarm.options["c1"] c2 = swarm.options["c2"] w = swarm.options["w"] # Compute for cognitive and social terms cognitive = ( c1 * np.random.uniform(0, 1, swarm_size) * (swarm.pbest_pos - swarm.position) ) social = ( c2 * np.random.uniform(0, 1, swarm_size) * (swarm.best_pos - swarm.position) ) # Compute temp velocity (subject to clamping if possible) temp_velocity = (w * swarm.velocity) + cognitive + social updated_velocity = vh( temp_velocity, clamp, position=swarm.position, bounds=bounds ) except AttributeError: rep.logger.exception( "Please pass a Swarm class. You passed {}".format(type(swarm)) ) raise except KeyError: rep.logger.exception("Missing keyword in swarm.options") raise else: return updated_velocity
[docs]def compute_position(swarm, bounds, bh): """Update the position matrix This method updates the position matrix given the current position and the velocity. If bounded, the positions are handled by a :code:`BoundaryHandler` instance .. code-block :: python import pyswarms.backend as P from pyswarms.swarms.backend import Swarm, VelocityHandler my_swarm = P.create_swarm(n_particles, dimensions) my_bh = BoundaryHandler(strategy="intermediate") for i in range(iters): # Inside the for-loop my_swarm.position = compute_position(my_swarm, bounds, my_bh) Parameters ---------- swarm : pyswarms.backend.swarms.Swarm a Swarm instance bounds : tuple of numpy.ndarray or list, optional a tuple of size 2 where the first entry is the minimum bound while the second entry is the maximum bound. Each array must be of shape :code:`(dimensions,)`. bh : pyswarms.backend.handlers.BoundaryHandler a BoundaryHandler object with a specified handling strategy For further information see :mod:`pyswarms.backend.handlers`. Returns ------- numpy.ndarray New position-matrix """ try: temp_position = swarm.position.copy() temp_position += swarm.velocity if bounds is not None: temp_position = bh(temp_position, bounds) position = temp_position except AttributeError: rep.logger.exception( "Please pass a Swarm class. You passed {}".format(type(swarm)) ) raise else: return position
[docs]def compute_objective_function(swarm, objective_func, pool=None, **kwargs): """Evaluate particles using the objective function This method evaluates each particle in the swarm according to the objective function passed. If a pool is passed, then the evaluation of the particles is done in parallel using multiple processes. Parameters ---------- swarm : pyswarms.backend.swarms.Swarm a Swarm instance objective_func : function objective function to be evaluated pool: multiprocessing.Pool multiprocessing.Pool to be used for parallel particle evaluation kwargs : dict arguments for the objective function Returns ------- numpy.ndarray Cost-matrix for the given swarm """ if pool is None: return objective_func(swarm.position, **kwargs) else: results = pool.map( partial(objective_func, **kwargs), np.array_split(swarm.position, pool._processes), ) return np.concatenate(results)