import numpy as np
import scipy.optimize as opt
from typing import Union
import buildamol.utils.auxiliary as aux
from buildamol.core import Molecule
# from multiprocessing import Pool, cpu_count
from copy import deepcopy
__all__ = [
"swarm_optimize",
"genetic_optimize",
"scipy_optimize",
"rdkit_optimize",
"anneal_optimize",
"mmff_optimize",
"uff_optimize",
# "multiprocess_swarm_optimize",
]
# =====================================================================================
# SCIPY OPTIMIZATION
# =====================================================================================
[docs]
def scipy_optimize(
env,
steps: int = 1e5,
method: str = "L-BFGS-B",
**kws,
):
"""
Optimize a Rotatron environment through
a simple scipy optimization
Parameters
----------
env : buildamol.optimizers.environments.Rotatron
The environment to optimize
steps : int, optional
The number of steps to take.
method : str, optional
The optimizer to use, by default "L-BFGS-B".
This can be any optimizer from scipy.optimize.minimize
kws : dict, optional
Keyword arguments to pass as options to the optimizer
Returns
-------
solution, evaluation
The angles(s) and evaluation(s) of the best solution(s) found.
"""
x0 = env.action_space.sample()
def loss_fn(x):
state, _eval, *_ = env.step(x)
env.reset()
return _eval
if "bounds" in kws:
bounds = kws.pop("bounds")
elif hasattr(env, "_bounds_tuple"):
bounds = opt.Bounds(*env._bounds_tuple)
else:
bounds = None
kws["maxiter"] = int(steps)
result = opt.minimize(loss_fn, x0, method=method, bounds=bounds, options=kws)
return (
result.x,
result.fun,
)
# =====================================================================================
# GENETIC ALGORITHM
# =====================================================================================
[docs]
def genetic_optimize(
env,
max_generations: int = 5e2,
stop_if_done: bool = True,
threshold: float = 1e-6,
variation: float = 0.2,
population_size: int = 50,
parents: Union[int, float] = 0.25,
children: Union[int, float] = 0.3,
mutants: Union[int, float] = 0.3,
newcomers: Union[int, float] = 0.15,
variation_cooldown: float = 1,
n_best: int = 1,
numba: bool = False,
):
"""
A simple genetic algorithm for optimizing a Rotatron environment.
Parameters
----------
env : buildamol.optimizers.environments.Rotatron
The environment to optimize
max_generations : int, optional
The maximum number of steps to take.
stop_if_done : bool, optional
Stop the optimization if the environment signals it is done or the solutions have converged.
threshold : float, optional
A thershold to use for convergence of the best solution found.
The algorithm will stop if the variation of the best solution evaluation history
is less than this threshold.
variation : float, optional
The variation to use for the initial action.
population_size : int, optional
The size of the population.
parents : int or float, optional
The number or fraction of parents (elites) to select. The parents are selected from the best solutions. Parents produce offspring and pass to the next generation.
children : int or float, optional
The number or fraction of children to generate from the parents. Children are generated by averaging the parents and adding some noise.
mutants : int or float, optional
The number or fraction of mutants to generate. Mutants are generated by adding noise to parents, therby
generating abarrent clones.
newcomers: int or float, optional
Newcomers are entirely new solution candidates.
variation_cooldown : float, optional
The rate at which the variation is reduced. The variation is reduced by this factor every generation. E.g. 0.95 will reduce the variation by 5% every generation.
n_best : int, optional
The number of best solutions to return at the end of the optimization.
numba : bool, optional
Use numba for the optimization. This may speed up the optimization if you are going to optimize many molecules.
Returns
-------
solution, evaluation
The angles(s) and evaluation(s) of the best solution(s) found.
"""
if isinstance(parents, float):
parents = round(parents * population_size)
if isinstance(children, float):
children = round(children * population_size)
if isinstance(mutants, float):
mutants = round(mutants * population_size)
if isinstance(newcomers, float):
newcomers = round(newcomers * population_size)
while children + mutants + parents + newcomers > population_size:
if newcomers > 1:
newcomers -= 1
elif children > 1:
children -= 1
elif mutants > 1:
mutants -= 1
elif parents > 1:
parents -= 1
while children + mutants + parents + newcomers < population_size:
mutants += 1
if any([n <= 0 for n in [children, mutants, parents, newcomers]]):
raise ValueError(
"n_children, n_mutations, n_parents, and n_newcomers must all be at least 1"
)
if (
numba
or aux.USE_ALL_NUMBA
or (population_size * max_generations > 1e6 and aux.USE_NUMBA)
):
make_generation = _numba_wrapper_make_generation
converged_break = _numba_wrapper_converged_break
else:
make_generation = _make_generation
converged_break = _converged_break
n_parents = parents
n_children = children
n_mutants = mutants
n_newcomers = newcomers
blank = env.blank()
if hasattr(env, "_bounds_tuple"):
min_angle, max_angle = env._bounds_tuple
else:
min_angle, max_angle = -np.pi, np.pi
population = np.stack([blank] * population_size)
evals = np.zeros(population_size)
bests = np.zeros(max(10, int(max_generations * 0.3)))
pop_range = np.arange(0, population_size)
parents_range = np.arange(0, n_parents)
children_range = np.arange(0, n_children)
mutations_range = np.arange(0, n_mutants)
newcomers_range = np.arange(0, n_newcomers)
for i in pop_range:
population[i] = env.action_space.sample()
_, evals[i], *_ = env.step(population[i])
env.reset()
steps = 0
while steps < max_generations:
parents, children, mutations = make_generation(
evals,
population,
n_parents,
n_children,
n_mutants,
variation,
blank,
parents_range,
children_range,
mutations_range,
)
newcomers = np.stack([blank] * n_newcomers)
for i in newcomers_range:
newcomers[i] = env.action_space.sample()
population = np.concatenate([parents, children, mutations, newcomers])
population = np.clip(population, min_angle, max_angle)
evals = np.zeros(population_size)
for i in pop_range:
_, evals[i], done, *_ = env.step(population[i])
env.reset()
if done and stop_if_done:
break
if done and stop_if_done:
break
# -------------------------------------------------------------------------------------
# Generally I think this should not be here, however
# the very fact that the algorithm was running for 20min+
# for some setups, shows that this probably does not affect
# the results too much. A way of assuring this would be fine, is to also reset the best
# evaluation when resetting the environment...
bests, should_break = converged_break(
bests, evals, threshold, steps, max_generations, stop_if_done
)
if should_break:
break
# -------------------------------------------------------------------------------------
variation *= variation_cooldown
steps += 1
best = np.argsort(evals)[:n_best]
if n_best == 1:
return population[best[0]], evals[best[0]]
return population[best], evals[best]
def _make_generation(
evals,
population,
n_parents,
n_children,
n_mutants,
variation,
blank,
parents_range,
children_range,
mutations_range,
):
sorting = np.argsort(evals)
parents = population[sorting[:n_parents]]
children = np.zeros((n_children, blank.shape[0]))
for i in children_range:
p1, p2 = np.random.choice(parents_range, size=2, replace=False)
p1, p2 = parents[int(p1)], parents[int(p2)]
children[i] = (p1 + p2) / 2 + np.random.uniform(
-variation / 3, variation / 3, size=blank.shape
)
mutations = np.zeros((n_mutants, blank.shape[0]))
for i in mutations_range:
mutations[i] = parents[np.random.randint(0, n_parents)] + np.random.uniform(
-variation, variation, size=blank.shape
)
return parents, children, mutations
_numba_wrapper_make_generation = aux.njit(_make_generation)
def _converged_break(bests, evals, threshold, steps, max_generations, stop_if_done):
bests = np.roll(bests, -1)
bests[-1] = np.min(evals)
if (
stop_if_done
and steps > max_generations / 10
and np.var(bests[bests != 0]) < threshold
):
return bests, True
return bests, False
_numba_wrapper_converged_break = aux.njit(_converged_break)
# =====================================================================================
# PARTICLE SWARM OPTIMIZATION
# =====================================================================================
class _Particle:
"""
DEPRECATED:
Particle for particle swarm optimization and similar algorithms.
"""
bounds = None
variance = 0.2
def __init__(self, dim):
self.position = np.random.rand(dim) # Initialize position randomly
self.velocity = np.random.rand(dim) # Initialize velocity randomly
self.best_position = self.position.copy() # Personal best position
self.best_fitness = np.inf # Personal best fitness
self.fitness = np.inf # Current fitness
self.bounds = _Particle.bounds
if self.bounds is not None:
self.update_position = self._bound_update_position
else:
self.update_position = self._nobound_update_position
def scatter_copy(self):
new = deepcopy(self)
new.random_scatter()
return new
def _nobound_update_position(self):
self.position += self.velocity
def _bound_update_position(self):
self.position += self.velocity
self.position = np.clip(self.position, self.bounds[0], self.bounds[1])
def update_velocity(self, inertia, cognitive, social, best_particle):
self.velocity = (
inertia * self.velocity
+ cognitive * np.random.rand() * (self.best_position - self.position)
+ social * np.random.rand() * (best_particle.position - self.position)
)
def random_scatter(self) -> np.ndarray:
new = self.position + np.random.uniform(
-self.variance, self.variance, size=self.position.shape
)
if self.bounds:
new = np.clip(new, self.bounds[0], self.bounds[1])
return new
[docs]
def swarm_optimize(
env,
n_particles: int = None,
max_steps: int = 30,
stop_if_done: bool = True,
threshold: float = 1e-6,
w: float = 0.9,
c1: float = 0.5,
c2: float = 0.3,
cooldown_rate: float = 0.99,
n_best: int = 1,
numba: bool = False,
):
"""
Optimize a rotatron environment through a simple particle swarm optimization.
Parameters
----------
env : buildamol.optimizers.environments.Rotatron
The environment to optimize
n_particles : int, optional
The number of particles to use.
Set this to None in order to compute the number of particles
based on the number of rotatable edges in the environment.
max_steps : int, optional
The maximum number of steps to take.
stop_if_done : bool, optional
Stop the optimization if the environment signals it is done or the solutions have converged.
threshold : float, optional
A threshold to use for convergence of the best solution found.
The algorithm will stop if the variation of the best solution evaluation history
is less than this threshold.
w : float, optional
The inertia parameter for the particle swarm optimization.
c1 : float, optional
The cognitive parameter for the particle swarm optimization.
c2 : float, optional
The social parameter for the particle swarm optimization.
cooldown_rate : float, optional
The rate at which the inertia parameter is reduced. The inertia parameter is reduced by this factor every generation. E.g. 0.95 will reduce the inertia parameter by 5% every generation.
n_best : int, optional
The number of best solutions to return at the end of the optimization.
numba : bool, optional
Use numba for the optimization. This may speed up the optimization if you are going to optimize many molecules.
Returns
-------
solution, evaluation
The solution and evaluation for the solution
"""
if n_particles is None:
n_particles = max(15, len(env.rotatable_edges) // 3)
if numba or aux.USE_ALL_NUMBA or (n_particles * max_steps > 1e6 and aux.USE_NUMBA):
update_positions = _numba_wrapper_update_positions
update_velocities = _numba_wrapper_update_velocities
else:
update_positions = _update_positions
update_velocities = _update_velocities
positions = np.array(
[env.action_space.sample() for i in range(n_particles)]
) # np.random.rand(n_particles, env.action_space.shape[0])
velocities = np.random.rand(n_particles, env.action_space.shape[0])
best_positions = positions.copy()
best_fitnesses = np.zeros(n_particles)
fitnesses = np.zeros(n_particles)
bounds = getattr(env, "_bounds_tuple", None) or (-9999, 9999)
bounds = np.array(bounds, dtype=np.float64)
best_fitness = np.inf
best_solution = np.zeros(env.action_space.shape[0])
steps = 0
while steps < max_steps:
for i in range(n_particles):
fitnesses[i] = env.step(positions[i])[1]
if fitnesses[i] < best_fitnesses[i]:
best_fitnesses[i] = fitnesses[i]
best_positions[i] = positions[i]
if fitnesses[i] < best_fitness:
best_fitness = fitnesses[i]
best_solution[:] = positions[i]
env.reset()
velocities = update_velocities(
velocities, w, c1, c2, best_positions, best_solution, positions
)
positions = update_positions(positions, velocities, bounds)
if stop_if_done:
if np.var(best_fitnesses) < threshold:
break
w *= cooldown_rate
steps += 1
if n_best == 1:
return best_solution, best_fitness
else:
best = np.argsort(best_fitnesses)[:n_best]
return np.array([best_positions[b] for b in best]), np.array(
[best_fitnesses[b] for b in best]
)
def _update_positions(positions, velocities, bounds):
positions += velocities
positions = np.clip(positions, bounds[0], bounds[1])
return positions
_numba_wrapper_update_positions = aux.njit(_update_positions)
def _update_velocities(
velocities, inertia, cognitive, social, best_particle, best_position, positions
):
dist_best = best_particle - positions
dist_social = best_position - positions
velocities = (
inertia * velocities
+ cognitive * np.random.rand() * dist_best
+ social * np.random.rand() * dist_social
)
return velocities
_numba_wrapper_update_velocities = aux.njit(_update_velocities)
# def multiprocess_swarm_optimize(
# env,
# processes: int = None,
# n_particles: int = None,
# max_steps: int = 30,
# stop_if_done: bool = True,
# threshold: float = 1e-6,
# w: float = 0.9,
# c1: float = 0.5,
# c2: float = 0.3,
# cooldown_rate: float = 0.99,
# n_best: int = 1,
# ):
# """
# Optimize a rotatron environment through a simple particle swarm optimization.
# Parameters
# ----------
# env : buildamol.optimizers.environments.Rotatron
# The environment to optimize
# processes : int, optional
# The number of processes to use.
# Set this to None in order to compute the number of processes
# based on the number of rotatable edges in the environment.
# n_particles : int, optional
# The number of particles to use.
# Set this to None in order to compute the number of particles
# based on the number of rotatable edges in the environment.
# max_steps : int, optional
# The maximum number of steps to take.
# stop_if_done : bool, optional
# Stop the optimization if the environment signals it is done or the solutions have converged.
# threshold : float, optional
# A threshold to use for convergence of the best solution found.
# The algorithm will stop if the variation of the best solution evaluation history
# is less than this threshold.
# w : float, optional
# The inertia parameter for the particle swarm optimization.
# c1 : float, optional
# The cognitive parameter for the particle swarm optimization.
# c2 : float, optional
# The social parameter for the particle swarm optimization.
# cooldown_rate : float, optional
# The rate at which the inertia parameter is reduced. The inertia parameter is reduced by this factor every generation. E.g. 0.95 will reduce the inertia parameter by 5% every generation.
# n_best : int, optional
# The number of best solutions to return at the end of the optimization.
# Returns
# -------
# solution, evaluation
# The solution and evaluation for the solution
# """
# if n_particles is None:
# n_particles = max(30, len(env.rotatable_edges) // 3)
# if processes is None:
# processes = min(cpu_count(), n_particles // 10)
# _Particle.bounds = env._bounds_tuple or None
# population = [_Particle(env.action_space.shape[0]) for _ in range(n_particles)]
# best_particle = population[0]
# best_fitness = np.inf
# best_solution = np.zeros(env.action_space.shape[0])
# steps = 0
# no_improvement_since = 0
# ref_steps = max(3, max_steps // 10)
# while steps < max_steps:
# with Pool(processes) as pool:
# results = pool.starmap(
# _multiprocess_one_swarm,
# [
# (
# population[(i - 1) * processes : i * processes],
# env,
# w,
# c1,
# c2,
# best_particle,
# best_solution,
# threshold,
# stop_if_done,
# cooldown_rate,
# max(10, max_steps // 3),
# )
# for i in range(1, len(population) // processes + 1)
# ],
# )
# got_better = False
# for best, _env, particle in results:
# if (
# particle.best_fitness < best_fitness
# and abs(particle.best_fitness - best_fitness) > 1e-5
# ):
# no_improvement_since = 0
# got_better = True
# best_fitness = particle.best_fitness
# best_particle = particle
# best_solution[:] = particle.position
# if not got_better:
# no_improvement_since += 1
# if no_improvement_since == ref_steps:
# break
# env.reset()
# w *= cooldown_rate
# population = [best_particle.scatter_copy() for _ in range(n_particles // 2)] + [
# population[p]
# for p in np.random.choice(np.arange(0, n_particles), n_particles // 2)
# ]
# steps += processes
# if n_best == 1:
# return best_solution, best_fitness
# else:
# best = np.argsort([p.best_fitness for p in population])[:n_best]
# return np.array([population[b].best_position for b in best]), np.array(
# [population[b].best_fitness for b in best]
# )
# def _multiprocess_one_swarm(
# particles,
# env,
# w,
# c1,
# c2,
# best_particle,
# best_solution,
# threshold,
# stop_if_done,
# cooldown_rate,
# max_steps,
# ):
# steps = 0
# while steps < max_steps:
# for particle in particles:
# fitness = env.step(particle.position)[1]
# if fitness < particle.best_fitness:
# particle.best_fitness = fitness
# particle.best_position[:] = particle.position
# if fitness < best_particle.best_fitness:
# best_fitness = fitness
# best_particle = particle
# best_solution[:] = particle.position
# env.reset()
# for particle in particles:
# particle.update_velocity(w, c1, c2, best_particle)
# particle.update_position()
# if stop_if_done:
# if np.var([p.best_fitness for p in particles]) < threshold:
# return best_solution, env, best_particle
# w *= cooldown_rate
# steps += 1
# return best_solution, env, best_particle
# =====================================================================================
# SIMULATED ANNEALING
# =====================================================================================
[docs]
def anneal_optimize(
env,
n_particles: int = None,
max_steps: int = 100,
stop_if_done: bool = True,
threshold: float = 1e-6,
variance: float = 0.3,
cooldown_rate: float = 0.98,
n_best: int = 1,
numba: bool = False,
):
"""
Optimize a rotatron environment through a simple simulated annealing.
Parameters
----------
env : buildamol.optimizers.environments.Rotatron
The environment to optimize
n_particles : int, optional
The number of particles to use. Set to None in order to compute the number of particles
based on the number of rotatable edges in the environment, where one particle is used per two rotatable edges.
max_steps : int, optional
The maximum number of steps to take.
stop_if_done : bool, optional
Stop the optimization if the environment signals it is done or the solutions have converged.
threshold : float, optional
A threshold to use for convergence of the best solution found.
The algorithm will stop if the variation of the best solution evaluation history
is less than this threshold.
variance : float, optional
The variation to use for updating particle positions.
n_best : int, optional
The number of best solutions to return at the end of the optimization.
numba : bool, optional
Use numba for the optimization. This may speed up the optimization if you are going to optimize many molecules.
Returns
-------
solution, evaluation
The solution and evaluation for the solution
"""
if n_particles is None:
n_particles = max(15, len(env.rotatable_edges) // 2)
if numba or aux.USE_ALL_NUMBA or (n_particles * max_steps > 1e6 and aux.USE_NUMBA):
accept = _numba_wrapper_accept
else:
accept = _accept
particles = np.array(
[env.action_space.sample() for i in range(n_particles)]
) # np.random.rand(n_particles, env.action_space.shape[0])
fitnesses = np.full(n_particles, 9999)
best_fitness = np.inf
best_solution = np.zeros(env.action_space.shape[0])
bounds = getattr(env, "_bounds_tuple", None) or (-9999, 9999)
temperature = 1.0
steps = 0
while steps < max_steps:
for i in range(n_particles):
position = particles[i] + np.random.uniform(
-variance, variance, size=particles[i].shape
)
position = np.clip(position, bounds[0], bounds[1])
fitness = env.step(position)[1]
if accept(fitnesses[i], fitness, temperature):
fitnesses[i] = fitness
particles[i] = position
if fitness < best_fitness:
best_fitness = fitness
best_solution[:] = particles[i]
env.reset()
temperature *= cooldown_rate
variance *= cooldown_rate * 0.95
steps += 1
if stop_if_done:
if np.var(fitnesses) < threshold:
break
if n_best == 1:
return best_solution, best_fitness
else:
best = np.argsort(fitnesses)[:n_best]
return np.array([particles[b] for b in best]), np.array(
[fitnesses[b] for b in best]
)
def _accept(old, new, temp):
return np.exp((old - new) / temp) > np.random.rand()
_numba_wrapper_accept = aux.njit(_accept)
# =====================================================================================
# RDKit OPTIMIZATION
# =====================================================================================
def mmff_optimize(mol, steps=1000):
"""
Optimize a molecule using RDKit's MMFF force field optimization.
Parameters
----------
mol : Molecule
The molecule to optimize
steps : int, optional
The number of steps to take, by default 1000
Returns
-------
Molecule
The optimized molecule
"""
cls = mol.__class__
if not aux.HAS_RDKIT:
raise ImportError("RDKit is not installed")
rdmol = mol.to_rdkit()
aux.AllChem.MMFFOptimizeMolecule(rdmol, maxIters=steps)
return cls.from_rdkit(rdmol)
def uff_optimize(mol, steps=1000):
"""
Optimize a molecule using RDKit's UFF force field optimization.
Parameters
----------
mol : Molecule
The molecule to optimize
steps : int, optional
The number of steps to take, by default 1000
Returns
-------
Molecule
The optimized molecule
"""
cls = mol.__class__
if not aux.HAS_RDKIT:
raise ImportError("RDKit is not installed")
rdmol = mol.to_rdkit()
aux.AllChem.UFFOptimizeMolecule(rdmol, maxIters=steps)
return cls.from_rdkit(rdmol)
rdkit_optimize = mmff_optimize
if __name__ == "__main__":
import buildamol as bam
import time
mol = bam.molecule("/Users/noahhk/GIT/biobuild/buildamol/optimizers/final_opt2.pdb")
env = bam.optimizers.DistanceRotatron(mol.get_residue_graph(True))
print(len(env.rotatable_edges))
t1 = time.time()
results_single = swarm_optimize(env, n_particles=200, max_steps=50)
print(time.time() - t1, results_single)
env.reset()
t1 = time.time()
results = multiprocess_swarm_optimize(env, n_particles=200, max_steps=50)
print(time.time() - t1, results)
opt_single = bam.optimizers.apply_rotatron_solution(
results_single[0], env, mol.copy()
)
opt_multi = bam.optimizers.apply_rotatron_solution(results[0], env, mol.copy())
print(mol.count_clashes())
opt_multi.to_pdb(
"/Users/noahhk/GIT/biobuild/buildamol/optimizers/final_opt2_new_multiprocessing_optimized.pdb"
)
print(opt_single.count_clashes(), opt_multi.count_clashes())
exit()
for i in range(10):
t1 = time.time()
results = multiprocess_swarm_optimize(env)
print(time.time() - t1, results[0][1])
# out = bam.optimizers.apply_solution(sol, env, mol.copy())
# print(time.time() - t1, out.count_clashes())
# out.show()
pass