Preferential Bayesian Optimisation#

A notebook showcasing the use of Preferential Bayesian Optimisation (PBO) [1] for molecular optimisation using preference data elicited from a human chemist.

[1]:
"""Imports"""

import warnings
warnings.filterwarnings("ignore") # Turn off Graphein warnings

from botorch import fit_gpytorch_model
from botorch.models.pairwise_gp import PairwiseGP, PairwiseLaplaceMarginalLogLikelihood

import gpytorch
import numpy as np
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split
import torch
import time
from itertools import combinations

from gauche.dataloader import MolPropLoader
from gauche.dataloader.data_utils import transform_data
from gauche.kernels.fingerprint_kernels.tanimoto_kernel import TanimotoKernel
from botorch.acquisition.preference import AnalyticExpectedUtilityOfBestOption
[2]:
"""Utility function for generating ground truth preference data"""

def generate_comparisons(y, n_comp, noise=0.0, replace=False):
    """Function simulating the preferences of a human chemist.

    Args:
        y: 1D NumPy array of training data labels
        n_comp: Int indicating the number of pairwise comparisons to generate
        noise: Float indicating the level of noise in the chemist's decisions
        replace: Bool indicating whether to generate comparisons with replacement

    Returns:
        comp_pairs: A NumPy array of comparison pairs of the form (m1, m2)

    """
    # generate all possible pairs of elements in y
    all_pairs = np.array(list(combinations(range(y.shape[0]), 2)))
    # randomly select n_comp pairs from all_pairs
    comp_pairs = all_pairs[
        np.random.choice(range(len(all_pairs)), n_comp, replace=replace)
    ]
    # add gaussian noise to the latent y values
    c0 = y[comp_pairs[:, 0]] + np.random.standard_normal(len(comp_pairs)) * noise
    c1 = y[comp_pairs[:, 1]] + np.random.standard_normal(len(comp_pairs)) * noise
    reverse_comp = (c0 < c1)
    comp_pairs[reverse_comp, :] = np.flip(comp_pairs[reverse_comp, :], 1)
    comp_pairs = torch.tensor(comp_pairs).long()

    return comp_pairs
[3]:
"""Utility functions for the Bayesian optimisation loop"""

def initialize_model(train_x, train_comp, state_dict=None):
    """Initialise model and loss function for binary Morgan fingerprints
       using a Tanimoto kernel

    Args:
        train_x: Tensor of inputs
        train_obj: Tensor of pairwise comparisons
        state_dict: current_state_dict used to speed up fitting.

    Returns: mll object, model object
    """

    # Define model for objective
    model = PairwiseGP(train_x, train_comp, covar_module=gpytorch.kernels.ScaleKernel(TanimotoKernel()))
    mll = PairwiseLaplaceMarginalLogLikelihood(model.likelihood, model)
    # load state_dict if it is passed
    if state_dict is not None:
        model.load_state_dict(state_dict)

    return mll, model

def optimize_acqf_and_get_observation(acq_func, heldout_inputs, heldout_outputs):
    """Optimizes the acquisition function and returns a new candidate and an observation.

    Args:
        acq_func: Object representing the acquisition function.
        heldout_inputs: Tensor of heldout inputs
        heldout_outputs: Tensor of heldout outputs.
    Returns:
        new_x, new_obj
    """

    # Loop over the discrete set of points to evaluate the acquisition function at:
    acq_vals = []
    for i in range(len(heldout_outputs)):
        acq_vals.append(acq_func(heldout_inputs[i].unsqueeze(-2))) # use unsqueeze to append batch dimension

    # Observe new values
    acq_vals = torch.tensor(acq_vals)
    best_idx = torch.argmax(acq_vals)
    new_x = heldout_inputs[best_idx].unsqueeze(-2)  # add batch dimension
    new_obj = heldout_outputs[best_idx].unsqueeze(-1) # add output dimension

    # Delete the selected input and value from the heldout set.
    heldout_inputs = torch.cat((heldout_inputs[:best_idx], heldout_inputs[best_idx+1:]), axis=0)
    heldout_outputs = torch.cat((heldout_outputs[:best_idx], heldout_outputs[best_idx+1:]), axis=0)

    return new_x, new_obj, heldout_inputs, heldout_outputs

def update_random_observations(best_random, heldout_inputs, heldout_outputs):
    """Simulates a random policy by taking the current list of best values observed randomly,
       drawing a new random point from the heldout set, observing its value, and updating the list.

       Args:
           best_random: List of best random values observed so far
           heldout_inputs: Tensor of inputs
           heldout_outputs: Tensor of output values

       Returns:
           best_random, float specifying the objective function value.
    """

    # Take a random sample by permuting the indices and selecting the first element
    index = torch.randperm(len(heldout_outputs))[0]
    next_random_best = heldout_outputs[index]
    best_random.append(max(best_random[-1], next_random_best))

    # Delete the selected input and value from the heldout set
    heldout_inputs = torch.cat((heldout_inputs[:index], heldout_inputs[index+1:]), axis=0)
    heldout_outputs = torch.cat((heldout_outputs[:index], heldout_outputs[index+1:]), axis=0)

    return best_random, heldout_inputs, heldout_outputs
[4]:
"""Load the dataset and featurise the molecules"""

loader = MolPropLoader()
loader.load_benchmark("Photoswitch")

# We use the fragprints representations (a concatenation of Morgan fingerprints and RDKit fragment features)

loader.featurize('ecfp_fragprints')
X = loader.features
y = loader.labels
Found 13 invalid labels [nan nan nan nan nan nan nan nan nan nan nan nan nan] at indices [41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 158]
To turn validation off, use dataloader.read_csv(..., validate=False).
[5]:
"""Preferential Bayesian optimisation loop"""

# Experiment parameters
N_TRIALS = 20
holdout_set_size = 0.75
N_ITERS = 20
NOISE = 0.0
verbose = False
m = 100

best_observed_all_eubo, best_random_all = [], []

# Average over multiple random trials (each trial splits the initial training set for the GP in a random manner)
for trial in range(1, N_TRIALS + 1):

    # Set seeds for reproducbibility. Some numerical instability with EUBO for some runs
    torch.manual_seed(trial+10)
    np.random.seed(trial+10)

    print(f'\nTrial {trial:>2} of {N_TRIALS}', end="")
    best_observed_eubo, best_random = [], []

    # Generate initial training data and initialize model
    train_x_eubo, heldout_x_eubo, train_y_eubo, heldout_y_eubo = train_test_split(X, y, test_size=holdout_set_size, random_state=trial)
    best_observed_value_eubo = torch.tensor(np.max(train_y_eubo))
    # Set the previous winner for the first round
    previous_winner = torch.tensor(train_x_eubo[np.argmax(train_y_eubo)]).unsqueeze(-2)

    # Convert numpy arrays to PyTorch tensors and flatten the label vectors
    train_x_eubo = torch.tensor(train_x_eubo.astype(np.float64))
    heldout_x_eubo = torch.tensor(heldout_x_eubo.astype(np.float64))
    train_y_eubo = torch.tensor(train_y_eubo.astype(np.float64))
    heldout_y_eubo = torch.tensor(heldout_y_eubo.astype(np.float64))
    train_comp = generate_comparisons(train_y_eubo.squeeze(-1), m, noise=NOISE)

    # The initial heldout set is the same for random search
    heldout_x_random = heldout_x_eubo
    heldout_y_random = heldout_y_eubo

    mll_eubo, model_eubo = initialize_model(train_x_eubo, train_comp)

    best_observed_eubo.append(best_observed_value_eubo)
    best_random.append(best_observed_value_eubo)

    # run N_ITERS rounds of BayesOpt after the initial random batch
    for iteration in range(1, N_ITERS + 1):
        t0 = time.time()

        # fit the model
        fit_gpytorch_model(mll_eubo)

        # Use EUBO acquisition function
        acq_func = AnalyticExpectedUtilityOfBestOption(pref_model=model_eubo, previous_winner=previous_winner)

        try:
            new_x_eubo, new_obj_eubo, heldout_x_eubo, heldout_y_eubo = optimize_acqf_and_get_observation(acq_func, heldout_x_eubo, heldout_y_eubo)
        except:
            break
        # Update training points
        train_x_eubo = torch.cat([train_x_eubo, new_x_eubo])
        train_y_eubo = torch.cat([train_y_eubo, new_obj_eubo])
        previous_winner = new_x_eubo

        next_comps = generate_comparisons(train_y_eubo.squeeze(-1), n_comp=m, noise=NOISE)
        train_comp = torch.cat([train_comp, next_comps])

        # update random search progress
        best_random, heldout_x_random, heldout_y_random = update_random_observations(best_random,
                                                                                     heldout_inputs=heldout_x_random,
                                                                                     heldout_outputs=heldout_y_random)

        best_value_eubo = torch.max(new_obj_eubo, best_observed_eubo[-1])
        best_observed_eubo.append(best_value_eubo)

        # reinitialise the model so it is ready for fitting on the next iteration
        # use the current state_dict to speed up fitting
        mll_eubo, model_eubo = initialize_model(
            train_x_eubo,
            train_comp,
            model_eubo.state_dict(),
        )

        t1 = time.time()

        if verbose:
            print(
                f"\nBatch {iteration:>2}: best value (random, qEUBO) = "
                f"({max(best_random):>4.2f}, {best_value_eubo:>4.2f}), "
                f"time = {t1 - t0:>4.2f}.", end=""
            )
        else:
            print(".", end="")

    best_observed_all_eubo.append(best_observed_eubo)
    best_random_all.append(best_random)


Trial  1 of 20....................
Trial  2 of 20....................
Trial  3 of 20....................
Trial  4 of 20....................
Trial  5 of 20....................
Trial  6 of 20....................
Trial  7 of 20....................
Trial  8 of 20....................
Trial  9 of 20....................
Trial 10 of 20....................
Trial 11 of 20....................
Trial 12 of 20....................
Trial 13 of 20....................
Trial 14 of 20....................
Trial 15 of 20....................
Trial 16 of 20....................
Trial 17 of 20....................
Trial 18 of 20....................
Trial 19 of 20....................
Trial 20 of 20....................
[6]:
"""Plots the results"""

# Define a confidence interval function for plotting.
def ci(y):
    return 1.0 * y.std(axis=0) / np.sqrt(N_TRIALS)

iters = np.arange(N_ITERS + 1)
# Use first n trials without failure due to numerical errors in EUBO. Can judge from output aboveß
n = 8
y_eubo = torch.tensor(best_observed_all_eubo[0:n])
y_rnd = torch.tensor(best_random_all[0:n])

y_rnd_mean = y_rnd.mean(axis=0)
y_eubo_mean = y_eubo.mean(axis=0)

lower_rnd = y_rnd_mean - ci(y_rnd)
upper_rnd = y_rnd_mean + ci(y_rnd)
lower_eubo = y_eubo_mean - ci(y_eubo)
upper_eubo = y_eubo_mean + ci(y_eubo)

plt.plot(iters, y_rnd_mean, label='Random search', color='tab:green')
plt.fill_between(iters, lower_rnd, upper_rnd, alpha=0.2, color='tab:green')
plt.plot(iters, y_eubo_mean, label='EUBO', color='tab:red')
plt.fill_between(iters, lower_eubo, upper_eubo, alpha=0.2, color='tab:red')
plt.xlabel('Number of Iterations')
plt.ylabel('Best Objective Value')
plt.legend(loc="upper left")
plt.xticks(list(np.arange(0, 21, 2)))
plt.show()
../_images/notebooks_preferential_bayesian_optimisation_6_0.png