Source code for buildamol.optimizers.algorithms

import numpy as np
import scipy.optimize as opt
from typing import Union

import buildamol.utils.auxiliary as aux
from buildamol.core import Molecule

# from multiprocessing import Pool, cpu_count
from copy import deepcopy


__all__ = [
    "swarm_optimize",
    "genetic_optimize",
    "scipy_optimize",
    "rdkit_optimize",
    "anneal_optimize",
    "mmff_optimize",
    "uff_optimize",
    # "multiprocess_swarm_optimize",
]

# =====================================================================================
# SCIPY OPTIMIZATION
# =====================================================================================


[docs] def scipy_optimize( env, steps: int = 1e5, method: str = "L-BFGS-B", **kws, ): """ Optimize a Rotatron environment through a simple scipy optimization Parameters ---------- env : buildamol.optimizers.environments.Rotatron The environment to optimize steps : int, optional The number of steps to take. method : str, optional The optimizer to use, by default "L-BFGS-B". This can be any optimizer from scipy.optimize.minimize kws : dict, optional Keyword arguments to pass as options to the optimizer Returns ------- solution, evaluation The angles(s) and evaluation(s) of the best solution(s) found. """ x0 = env.action_space.sample() def loss_fn(x): state, _eval, *_ = env.step(x) env.reset() return _eval if "bounds" in kws: bounds = kws.pop("bounds") elif hasattr(env, "_bounds_tuple"): bounds = opt.Bounds(*env._bounds_tuple) else: bounds = None kws["maxiter"] = int(steps) result = opt.minimize(loss_fn, x0, method=method, bounds=bounds, options=kws) return ( result.x, result.fun, )
# ===================================================================================== # GENETIC ALGORITHM # =====================================================================================
[docs] def genetic_optimize( env, max_generations: int = 5e2, stop_if_done: bool = True, threshold: float = 1e-6, variation: float = 0.2, population_size: int = 50, parents: Union[int, float] = 0.25, children: Union[int, float] = 0.3, mutants: Union[int, float] = 0.3, newcomers: Union[int, float] = 0.15, variation_cooldown: float = 1, n_best: int = 1, numba: bool = False, ): """ A simple genetic algorithm for optimizing a Rotatron environment. Parameters ---------- env : buildamol.optimizers.environments.Rotatron The environment to optimize max_generations : int, optional The maximum number of steps to take. stop_if_done : bool, optional Stop the optimization if the environment signals it is done or the solutions have converged. threshold : float, optional A thershold to use for convergence of the best solution found. The algorithm will stop if the variation of the best solution evaluation history is less than this threshold. variation : float, optional The variation to use for the initial action. population_size : int, optional The size of the population. parents : int or float, optional The number or fraction of parents (elites) to select. The parents are selected from the best solutions. Parents produce offspring and pass to the next generation. children : int or float, optional The number or fraction of children to generate from the parents. Children are generated by averaging the parents and adding some noise. mutants : int or float, optional The number or fraction of mutants to generate. Mutants are generated by adding noise to parents, therby generating abarrent clones. newcomers: int or float, optional Newcomers are entirely new solution candidates. variation_cooldown : float, optional The rate at which the variation is reduced. The variation is reduced by this factor every generation. E.g. 0.95 will reduce the variation by 5% every generation. n_best : int, optional The number of best solutions to return at the end of the optimization. numba : bool, optional Use numba for the optimization. This may speed up the optimization if you are going to optimize many molecules. Returns ------- solution, evaluation The angles(s) and evaluation(s) of the best solution(s) found. """ if isinstance(parents, float): parents = round(parents * population_size) if isinstance(children, float): children = round(children * population_size) if isinstance(mutants, float): mutants = round(mutants * population_size) if isinstance(newcomers, float): newcomers = round(newcomers * population_size) while children + mutants + parents + newcomers > population_size: if newcomers > 1: newcomers -= 1 elif children > 1: children -= 1 elif mutants > 1: mutants -= 1 elif parents > 1: parents -= 1 while children + mutants + parents + newcomers < population_size: mutants += 1 if any([n <= 0 for n in [children, mutants, parents, newcomers]]): raise ValueError( "n_children, n_mutations, n_parents, and n_newcomers must all be at least 1" ) if ( numba or aux.USE_ALL_NUMBA or (population_size * max_generations > 1e6 and aux.USE_NUMBA) ): make_generation = _numba_wrapper_make_generation converged_break = _numba_wrapper_converged_break else: make_generation = _make_generation converged_break = _converged_break n_parents = parents n_children = children n_mutants = mutants n_newcomers = newcomers blank = env.blank() if hasattr(env, "_bounds_tuple"): min_angle, max_angle = env._bounds_tuple else: min_angle, max_angle = -np.pi, np.pi population = np.stack([blank] * population_size) evals = np.zeros(population_size) bests = np.zeros(max(10, int(max_generations * 0.3))) pop_range = np.arange(0, population_size) parents_range = np.arange(0, n_parents) children_range = np.arange(0, n_children) mutations_range = np.arange(0, n_mutants) newcomers_range = np.arange(0, n_newcomers) for i in pop_range: population[i] = env.action_space.sample() _, evals[i], *_ = env.step(population[i]) env.reset() steps = 0 while steps < max_generations: parents, children, mutations = make_generation( evals, population, n_parents, n_children, n_mutants, variation, blank, parents_range, children_range, mutations_range, ) newcomers = np.stack([blank] * n_newcomers) for i in newcomers_range: newcomers[i] = env.action_space.sample() population = np.concatenate([parents, children, mutations, newcomers]) population = np.clip(population, min_angle, max_angle) evals = np.zeros(population_size) for i in pop_range: _, evals[i], done, *_ = env.step(population[i]) env.reset() if done and stop_if_done: break if done and stop_if_done: break # ------------------------------------------------------------------------------------- # Generally I think this should not be here, however # the very fact that the algorithm was running for 20min+ # for some setups, shows that this probably does not affect # the results too much. A way of assuring this would be fine, is to also reset the best # evaluation when resetting the environment... bests, should_break = converged_break( bests, evals, threshold, steps, max_generations, stop_if_done ) if should_break: break # ------------------------------------------------------------------------------------- variation *= variation_cooldown steps += 1 best = np.argsort(evals)[:n_best] if n_best == 1: return population[best[0]], evals[best[0]] return population[best], evals[best]
def _make_generation( evals, population, n_parents, n_children, n_mutants, variation, blank, parents_range, children_range, mutations_range, ): sorting = np.argsort(evals) parents = population[sorting[:n_parents]] children = np.zeros((n_children, blank.shape[0])) for i in children_range: p1, p2 = np.random.choice(parents_range, size=2, replace=False) p1, p2 = parents[int(p1)], parents[int(p2)] children[i] = (p1 + p2) / 2 + np.random.uniform( -variation / 3, variation / 3, size=blank.shape ) mutations = np.zeros((n_mutants, blank.shape[0])) for i in mutations_range: mutations[i] = parents[np.random.randint(0, n_parents)] + np.random.uniform( -variation, variation, size=blank.shape ) return parents, children, mutations _numba_wrapper_make_generation = aux.njit(_make_generation) def _converged_break(bests, evals, threshold, steps, max_generations, stop_if_done): bests = np.roll(bests, -1) bests[-1] = np.min(evals) if ( stop_if_done and steps > max_generations / 10 and np.var(bests[bests != 0]) < threshold ): return bests, True return bests, False _numba_wrapper_converged_break = aux.njit(_converged_break) # ===================================================================================== # PARTICLE SWARM OPTIMIZATION # ===================================================================================== class _Particle: """ DEPRECATED: Particle for particle swarm optimization and similar algorithms. """ bounds = None variance = 0.2 def __init__(self, dim): self.position = np.random.rand(dim) # Initialize position randomly self.velocity = np.random.rand(dim) # Initialize velocity randomly self.best_position = self.position.copy() # Personal best position self.best_fitness = np.inf # Personal best fitness self.fitness = np.inf # Current fitness self.bounds = _Particle.bounds if self.bounds is not None: self.update_position = self._bound_update_position else: self.update_position = self._nobound_update_position def scatter_copy(self): new = deepcopy(self) new.random_scatter() return new def _nobound_update_position(self): self.position += self.velocity def _bound_update_position(self): self.position += self.velocity self.position = np.clip(self.position, self.bounds[0], self.bounds[1]) def update_velocity(self, inertia, cognitive, social, best_particle): self.velocity = ( inertia * self.velocity + cognitive * np.random.rand() * (self.best_position - self.position) + social * np.random.rand() * (best_particle.position - self.position) ) def random_scatter(self) -> np.ndarray: new = self.position + np.random.uniform( -self.variance, self.variance, size=self.position.shape ) if self.bounds: new = np.clip(new, self.bounds[0], self.bounds[1]) return new
[docs] def swarm_optimize( env, n_particles: int = None, max_steps: int = 30, stop_if_done: bool = True, threshold: float = 1e-6, w: float = 0.9, c1: float = 0.5, c2: float = 0.3, cooldown_rate: float = 0.99, n_best: int = 1, numba: bool = False, ): """ Optimize a rotatron environment through a simple particle swarm optimization. Parameters ---------- env : buildamol.optimizers.environments.Rotatron The environment to optimize n_particles : int, optional The number of particles to use. Set this to None in order to compute the number of particles based on the number of rotatable edges in the environment. max_steps : int, optional The maximum number of steps to take. stop_if_done : bool, optional Stop the optimization if the environment signals it is done or the solutions have converged. threshold : float, optional A threshold to use for convergence of the best solution found. The algorithm will stop if the variation of the best solution evaluation history is less than this threshold. w : float, optional The inertia parameter for the particle swarm optimization. c1 : float, optional The cognitive parameter for the particle swarm optimization. c2 : float, optional The social parameter for the particle swarm optimization. cooldown_rate : float, optional The rate at which the inertia parameter is reduced. The inertia parameter is reduced by this factor every generation. E.g. 0.95 will reduce the inertia parameter by 5% every generation. n_best : int, optional The number of best solutions to return at the end of the optimization. numba : bool, optional Use numba for the optimization. This may speed up the optimization if you are going to optimize many molecules. Returns ------- solution, evaluation The solution and evaluation for the solution """ if n_particles is None: n_particles = max(15, len(env.rotatable_edges) // 3) if numba or aux.USE_ALL_NUMBA or (n_particles * max_steps > 1e6 and aux.USE_NUMBA): update_positions = _numba_wrapper_update_positions update_velocities = _numba_wrapper_update_velocities else: update_positions = _update_positions update_velocities = _update_velocities positions = np.array( [env.action_space.sample() for i in range(n_particles)] ) # np.random.rand(n_particles, env.action_space.shape[0]) velocities = np.random.rand(n_particles, env.action_space.shape[0]) best_positions = positions.copy() best_fitnesses = np.zeros(n_particles) fitnesses = np.zeros(n_particles) bounds = getattr(env, "_bounds_tuple", None) or (-9999, 9999) bounds = np.array(bounds, dtype=np.float64) best_fitness = np.inf best_solution = np.zeros(env.action_space.shape[0]) steps = 0 while steps < max_steps: for i in range(n_particles): fitnesses[i] = env.step(positions[i])[1] if fitnesses[i] < best_fitnesses[i]: best_fitnesses[i] = fitnesses[i] best_positions[i] = positions[i] if fitnesses[i] < best_fitness: best_fitness = fitnesses[i] best_solution[:] = positions[i] env.reset() velocities = update_velocities( velocities, w, c1, c2, best_positions, best_solution, positions ) positions = update_positions(positions, velocities, bounds) if stop_if_done: if np.var(best_fitnesses) < threshold: break w *= cooldown_rate steps += 1 if n_best == 1: return best_solution, best_fitness else: best = np.argsort(best_fitnesses)[:n_best] return np.array([best_positions[b] for b in best]), np.array( [best_fitnesses[b] for b in best] )
def _update_positions(positions, velocities, bounds): positions += velocities positions = np.clip(positions, bounds[0], bounds[1]) return positions _numba_wrapper_update_positions = aux.njit(_update_positions) def _update_velocities( velocities, inertia, cognitive, social, best_particle, best_position, positions ): dist_best = best_particle - positions dist_social = best_position - positions velocities = ( inertia * velocities + cognitive * np.random.rand() * dist_best + social * np.random.rand() * dist_social ) return velocities _numba_wrapper_update_velocities = aux.njit(_update_velocities) # def multiprocess_swarm_optimize( # env, # processes: int = None, # n_particles: int = None, # max_steps: int = 30, # stop_if_done: bool = True, # threshold: float = 1e-6, # w: float = 0.9, # c1: float = 0.5, # c2: float = 0.3, # cooldown_rate: float = 0.99, # n_best: int = 1, # ): # """ # Optimize a rotatron environment through a simple particle swarm optimization. # Parameters # ---------- # env : buildamol.optimizers.environments.Rotatron # The environment to optimize # processes : int, optional # The number of processes to use. # Set this to None in order to compute the number of processes # based on the number of rotatable edges in the environment. # n_particles : int, optional # The number of particles to use. # Set this to None in order to compute the number of particles # based on the number of rotatable edges in the environment. # max_steps : int, optional # The maximum number of steps to take. # stop_if_done : bool, optional # Stop the optimization if the environment signals it is done or the solutions have converged. # threshold : float, optional # A threshold to use for convergence of the best solution found. # The algorithm will stop if the variation of the best solution evaluation history # is less than this threshold. # w : float, optional # The inertia parameter for the particle swarm optimization. # c1 : float, optional # The cognitive parameter for the particle swarm optimization. # c2 : float, optional # The social parameter for the particle swarm optimization. # cooldown_rate : float, optional # The rate at which the inertia parameter is reduced. The inertia parameter is reduced by this factor every generation. E.g. 0.95 will reduce the inertia parameter by 5% every generation. # n_best : int, optional # The number of best solutions to return at the end of the optimization. # Returns # ------- # solution, evaluation # The solution and evaluation for the solution # """ # if n_particles is None: # n_particles = max(30, len(env.rotatable_edges) // 3) # if processes is None: # processes = min(cpu_count(), n_particles // 10) # _Particle.bounds = env._bounds_tuple or None # population = [_Particle(env.action_space.shape[0]) for _ in range(n_particles)] # best_particle = population[0] # best_fitness = np.inf # best_solution = np.zeros(env.action_space.shape[0]) # steps = 0 # no_improvement_since = 0 # ref_steps = max(3, max_steps // 10) # while steps < max_steps: # with Pool(processes) as pool: # results = pool.starmap( # _multiprocess_one_swarm, # [ # ( # population[(i - 1) * processes : i * processes], # env, # w, # c1, # c2, # best_particle, # best_solution, # threshold, # stop_if_done, # cooldown_rate, # max(10, max_steps // 3), # ) # for i in range(1, len(population) // processes + 1) # ], # ) # got_better = False # for best, _env, particle in results: # if ( # particle.best_fitness < best_fitness # and abs(particle.best_fitness - best_fitness) > 1e-5 # ): # no_improvement_since = 0 # got_better = True # best_fitness = particle.best_fitness # best_particle = particle # best_solution[:] = particle.position # if not got_better: # no_improvement_since += 1 # if no_improvement_since == ref_steps: # break # env.reset() # w *= cooldown_rate # population = [best_particle.scatter_copy() for _ in range(n_particles // 2)] + [ # population[p] # for p in np.random.choice(np.arange(0, n_particles), n_particles // 2) # ] # steps += processes # if n_best == 1: # return best_solution, best_fitness # else: # best = np.argsort([p.best_fitness for p in population])[:n_best] # return np.array([population[b].best_position for b in best]), np.array( # [population[b].best_fitness for b in best] # ) # def _multiprocess_one_swarm( # particles, # env, # w, # c1, # c2, # best_particle, # best_solution, # threshold, # stop_if_done, # cooldown_rate, # max_steps, # ): # steps = 0 # while steps < max_steps: # for particle in particles: # fitness = env.step(particle.position)[1] # if fitness < particle.best_fitness: # particle.best_fitness = fitness # particle.best_position[:] = particle.position # if fitness < best_particle.best_fitness: # best_fitness = fitness # best_particle = particle # best_solution[:] = particle.position # env.reset() # for particle in particles: # particle.update_velocity(w, c1, c2, best_particle) # particle.update_position() # if stop_if_done: # if np.var([p.best_fitness for p in particles]) < threshold: # return best_solution, env, best_particle # w *= cooldown_rate # steps += 1 # return best_solution, env, best_particle # ===================================================================================== # SIMULATED ANNEALING # =====================================================================================
[docs] def anneal_optimize( env, n_particles: int = None, max_steps: int = 100, stop_if_done: bool = True, threshold: float = 1e-6, variance: float = 0.3, cooldown_rate: float = 0.98, n_best: int = 1, numba: bool = False, ): """ Optimize a rotatron environment through a simple simulated annealing. Parameters ---------- env : buildamol.optimizers.environments.Rotatron The environment to optimize n_particles : int, optional The number of particles to use. Set to None in order to compute the number of particles based on the number of rotatable edges in the environment, where one particle is used per two rotatable edges. max_steps : int, optional The maximum number of steps to take. stop_if_done : bool, optional Stop the optimization if the environment signals it is done or the solutions have converged. threshold : float, optional A threshold to use for convergence of the best solution found. The algorithm will stop if the variation of the best solution evaluation history is less than this threshold. variance : float, optional The variation to use for updating particle positions. n_best : int, optional The number of best solutions to return at the end of the optimization. numba : bool, optional Use numba for the optimization. This may speed up the optimization if you are going to optimize many molecules. Returns ------- solution, evaluation The solution and evaluation for the solution """ if n_particles is None: n_particles = max(15, len(env.rotatable_edges) // 2) if numba or aux.USE_ALL_NUMBA or (n_particles * max_steps > 1e6 and aux.USE_NUMBA): accept = _numba_wrapper_accept else: accept = _accept particles = np.array( [env.action_space.sample() for i in range(n_particles)] ) # np.random.rand(n_particles, env.action_space.shape[0]) fitnesses = np.full(n_particles, 9999) best_fitness = np.inf best_solution = np.zeros(env.action_space.shape[0]) bounds = getattr(env, "_bounds_tuple", None) or (-9999, 9999) temperature = 1.0 steps = 0 while steps < max_steps: for i in range(n_particles): position = particles[i] + np.random.uniform( -variance, variance, size=particles[i].shape ) position = np.clip(position, bounds[0], bounds[1]) fitness = env.step(position)[1] if accept(fitnesses[i], fitness, temperature): fitnesses[i] = fitness particles[i] = position if fitness < best_fitness: best_fitness = fitness best_solution[:] = particles[i] env.reset() temperature *= cooldown_rate variance *= cooldown_rate * 0.95 steps += 1 if stop_if_done: if np.var(fitnesses) < threshold: break if n_best == 1: return best_solution, best_fitness else: best = np.argsort(fitnesses)[:n_best] return np.array([particles[b] for b in best]), np.array( [fitnesses[b] for b in best] )
def _accept(old, new, temp): return np.exp((old - new) / temp) > np.random.rand() _numba_wrapper_accept = aux.njit(_accept) # ===================================================================================== # RDKit OPTIMIZATION # ===================================================================================== def mmff_optimize(mol, steps=1000): """ Optimize a molecule using RDKit's MMFF force field optimization. Parameters ---------- mol : Molecule The molecule to optimize steps : int, optional The number of steps to take, by default 1000 Returns ------- Molecule The optimized molecule """ cls = mol.__class__ if not aux.HAS_RDKIT: raise ImportError("RDKit is not installed") rdmol = mol.to_rdkit() aux.AllChem.MMFFOptimizeMolecule(rdmol, maxIters=steps) return cls.from_rdkit(rdmol) def uff_optimize(mol, steps=1000): """ Optimize a molecule using RDKit's UFF force field optimization. Parameters ---------- mol : Molecule The molecule to optimize steps : int, optional The number of steps to take, by default 1000 Returns ------- Molecule The optimized molecule """ cls = mol.__class__ if not aux.HAS_RDKIT: raise ImportError("RDKit is not installed") rdmol = mol.to_rdkit() aux.AllChem.UFFOptimizeMolecule(rdmol, maxIters=steps) return cls.from_rdkit(rdmol) rdkit_optimize = mmff_optimize if __name__ == "__main__": import buildamol as bam import time mol = bam.molecule("/Users/noahhk/GIT/biobuild/buildamol/optimizers/final_opt2.pdb") env = bam.optimizers.DistanceRotatron(mol.get_residue_graph(True)) print(len(env.rotatable_edges)) t1 = time.time() results_single = swarm_optimize(env, n_particles=200, max_steps=50) print(time.time() - t1, results_single) env.reset() t1 = time.time() results = multiprocess_swarm_optimize(env, n_particles=200, max_steps=50) print(time.time() - t1, results) opt_single = bam.optimizers.apply_rotatron_solution( results_single[0], env, mol.copy() ) opt_multi = bam.optimizers.apply_rotatron_solution(results[0], env, mol.copy()) print(mol.count_clashes()) opt_multi.to_pdb( "/Users/noahhk/GIT/biobuild/buildamol/optimizers/final_opt2_new_multiprocessing_optimized.pdb" ) print(opt_single.count_clashes(), opt_multi.count_clashes()) exit() for i in range(10): t1 = time.time() results = multiprocess_swarm_optimize(env) print(time.time() - t1, results[0][1]) # out = bam.optimizers.apply_solution(sol, env, mol.copy()) # print(time.time() - t1, out.count_clashes()) # out.show() pass