import torch
import gpytorch
import matplotlib.pyplot as plt
import logging
from botorch import fit_gpytorch_mll
from gpytorch.models import ExactGP
from gpytorch.likelihoods import (
GaussianLikelihood,
MultitaskGaussianLikelihood,
Likelihood,
FixedNoiseGaussianLikelihood,
)
from gpytorch.mlls import MarginalLogLikelihood
from torch import Tensor
from gpytorchwrapper.src.config.config_classes import (
TrainingConf,
OptimizerConf,
LikelihoodConf,
ModelConf,
)
from gpytorchwrapper.src.config.model_factory import (
get_likelihood,
get_model,
get_optimizer,
)
logger = logging.getLogger(__name__)
[docs]
def define_optimizer(
model: ExactGP, optimizer_conf: OptimizerConf
) -> torch.optim.Optimizer:
"""
Define the optimizer used for training the model
Parameters
-----------
model : object
The model to be optimized
optimizer_conf : OptimizerConf
Configuration dataclass that specifies the options for the optimizer
Returns
--------
optimizer : object
The optimizer for the model
"""
optimizer_class = get_optimizer(optimizer_conf)
optimizer = optimizer_class(model.parameters(), **optimizer_conf.optimizer_options)
return optimizer
[docs]
def define_likelihood(
likelihood_conf: LikelihoodConf, likelihood_class: Likelihood, train_x: Tensor
):
"""
Define the likelihood used for training the model
Parameters
-----------
likelihood_conf : OptimizerConf
Configuration dataclass that specifies the options for the likelihood
likelihood_class : Likelihood
Likelihood class to instantiate
train_x : Tensor
The training input tensor
Returns
--------
optimizer : object
The optimizer for the model
Raises
------
KeyError
If a fixed noise likelihood is selected but the noise level is not specified
"""
print(likelihood_class)
if likelihood_class is FixedNoiseGaussianLikelihood:
if "noise" not in likelihood_conf.likelihood_options.keys():
raise KeyError(
"The noise parameter is not specified in the likelihood options."
)
elif isinstance(likelihood_conf.likelihood_options["noise"], str):
likelihood_conf.likelihood_options["noise"] = eval(
likelihood_conf.likelihood_options["noise"]
)
likelihood_conf.likelihood_options["noise"] = torch.tensor(
[likelihood_conf.likelihood_options["noise"]] * train_x.shape[0],
dtype=torch.float64,
)
elif isinstance(likelihood_conf.likelihood_options["noise"], list):
likelihood_conf.likelihood_options["noise"] = torch.tensor(
likelihood_conf.likelihood_options["noise"], dtype=torch.float64
)
elif isinstance(likelihood_conf.likelihood_options["noise"], float):
likelihood_conf.likelihood_options["noise"] = torch.tensor(
[likelihood_conf.likelihood_options["noise"]] * train_x.shape[0],
dtype=torch.float64,
)
elif isinstance(likelihood_conf.likelihood_options["noise"], Tensor):
likelihood_conf.likelihood_options["noise"] = torch.tensor(
[likelihood_conf.likelihood_options["noise"][0].item()]
* train_x.shape[0],
dtype=torch.float64,
)
if likelihood_conf.likelihood_options:
likelihood = likelihood_class(**likelihood_conf.likelihood_options)
else:
likelihood = likelihood_class()
return likelihood
[docs]
def define_model(
model_conf: ModelConf,
model_class: ExactGP,
train_x: Tensor,
train_y: Tensor,
likelihood: Likelihood,
) -> ExactGP:
"""
Parameters
----------
model_conf : ModelConf
Model configuration dataclass
model_class : ExactGP
Model class
train_x : Tensor
The training input tensor
train_y : Tensor
The training output tensor
likelihood : Likelihood
Likelihood object
Returns
-------
Tensor or Distribution or LinearOperator
Instantiated model
"""
if model_conf.model_options:
model = model_class(train_x, train_y, likelihood, **model_conf.model_options)
else:
model = model_class(train_x, train_y, likelihood)
return model
[docs]
def training_loop(
train_x: torch.Tensor,
train_y: torch.Tensor,
model: ExactGP,
mll: MarginalLogLikelihood,
optimizer: torch.optim.Optimizer,
learning_iterations: int,
debug: bool,
test_x: torch.Tensor = None,
test_y: torch.Tensor = None,
) -> None:
"""
The training loop for the model
Parameters
-----------
train_x : torch.Tensor
The input training data
train_y : torch.Tensor
The output training data
model : object
The model to be trained
mll : MarginalLogLikelihood
The marginal likelihood of the model
optimizer : object
The optimizer for the model
learning_iterations : int
The number of iterations to train the model
debug : bool
Whether to use the debug mode in GPyTorch. Turn off if unwanted debug exceptions interfere with training.
test_x
test_y
Returns
--------
None
"""
loss_hash = {"train_loss": [], "val_loss": [], "iteration": []}
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode="min", factor=0.1
)
print(train_x.shape)
for iteration in range(learning_iterations):
if (iteration + 1) % 10 == 0:
logger.info(f"Iteration {iteration + 1}/{learning_iterations}")
loss_figure(
loss_hash["train_loss"],
loss_hash["iteration"],
loss_hash["val_loss"],
)
optimizer.zero_grad()
output = model(train_x)
loss = -mll(output, train_y)
loss_hash["train_loss"].append(loss.item())
loss_hash["iteration"].append(iteration)
loss.backward()
optimizer.step()
if test_x is not None:
model.eval() # Set model to evaluation mode
with torch.no_grad():
print(train_y.shape)
print(test_y.shape)
val_output = model(test_x)
val_loss = -mll(val_output, test_y)
loss_hash["val_loss"].append(val_loss.item())
scheduler.step(val_loss)
model.train() # Switch back to training mode
else:
scheduler.step(loss)
# Plot the loss one last time
loss_figure(loss_hash["train_loss"], loss_hash["iteration"], loss_hash["val_loss"])
[docs]
def model_parameters(
model: ExactGP, transformed: bool = True
) -> tuple[list[str], list[Tensor | None]]:
"""
Helper function to get the current model parameters and parameter values.
Parameters
----------
model : ExactGP
The model object
transformed : bool, default True
If the parameters must be transformed from the raw parameters used by the model to human-readable ones.
Returns
-------
parameter_names, parameters : tuple of list of str and list of (Tensor or None)
A tuple containing the parameter names and their values
"""
parameters, parameter_names = [], []
for full_param_name, param in model.named_parameters():
parameter_names.append(full_param_name)
if transformed:
# Split the full parameter name to get the module and parameter name.
module_path, param_name = full_param_name.rsplit(".", 1)
module = model
for attr in module_path.split("."):
module = getattr(module, attr)
# If the parameter is a "raw" parameter and the module defines a property, use that.
if param_name.startswith("raw_"):
prop_name = param_name.replace("raw_", "")
if hasattr(module, prop_name):
t_value = getattr(module, prop_name)
else:
# Fall back to manually applying the registered constraint (if any)
constraint = module._constraints.get(param_name, None)
if constraint is not None:
t_value = constraint.transform(param)
else:
t_value = param
else:
t_value = param
# Convert the value to a Python float or list, as appropriate.
if t_value.numel() == 1:
parameters.append(t_value.item())
else:
parameters.append(t_value.tolist())
else:
# If not transforming, just use the raw parameter.
parameters.append(param.tolist() if param.numel() > 1 else param.item())
return parameter_names, parameters
[docs]
def create_model_parameters_string(
parameter_names: list[str], parameters: list[Tensor | None]
) -> str:
"""
Helper function to print the model parameters generated by `model_parameters()`
Parameters
----------
parameter_names : list of string
The parameter name strings
parameters : list of Tensor and/or None
The parameter values
Returns
-------
return : str
The string to print
"""
parameter_strings = []
for parameter_name, parameter in zip(parameter_names, parameters):
parameter_strings.append(
f"Transformed parameter name: {parameter_name:42} value = {parameter}\n"
)
return "".join(parameter_strings)
[docs]
def train_model(
train_x: torch.Tensor,
train_y: torch.Tensor,
training_conf: TrainingConf,
test_x: torch.Tensor = None,
test_y: torch.Tensor = None,
) -> tuple[
ExactGP, GaussianLikelihood | MultitaskGaussianLikelihood, dict[str, list[float]]
]:
"""
Train the model using the training data
Parameters
-----------
train_x : torch.Tensor
The input training data
train_y : torch.Tensor
The output training data
training_conf : TrainingConf
Dictionary containing the training specifications
test_x : Tensor
The input test data
test_y : Tensor
The output test data
Returns
--------
model : ExactGP
The trained model
likelihood : Likelihood
The likelihood of the trained model
parameter_dict : dict
Dictionary containing the parameter values after training
"""
logger.info("Defining the model specifications.")
# Load the training specifications
learning_iterations = training_conf.learning_iterations
botorch = training_conf.botorch
debug = training_conf.debug
optimizer_conf = training_conf.optimizer
# Define likelihood and model
likelihood_class = get_likelihood(training_conf.likelihood)
model_class = get_model(training_conf.model)
likelihood = define_likelihood(training_conf.likelihood, likelihood_class, train_x)
model = define_model(training_conf.model, model_class, train_x, train_y, likelihood)
parameter_names, parameters = model_parameters(model)
logger.info(
f"Parameters before training: \n{create_model_parameters_string(parameter_names, parameters)}"
)
# Training in double precision
model.double()
likelihood.double()
# Define the marginal likelihood
mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, model)
logger.info("Start training the model.")
with (
gpytorch.settings.debug(debug),
gpytorch.settings.fast_computations(
covar_root_decomposition=False, log_prob=False, solves=False
),
):
if botorch:
fit_gpytorch_mll(mll)
else:
# Optimize model hyperparameters
optimizer = define_optimizer(model, optimizer_conf)
training_loop(
train_x,
train_y,
model,
mll,
optimizer,
learning_iterations,
debug,
test_x,
test_y,
)
parameter_names, parameters = model_parameters(model)
logger.info(
f"Parameters after training: \n{create_model_parameters_string(parameter_names, parameters)}"
)
parameter_dict = dict(zip(parameter_names, parameters))
return model, likelihood, parameter_dict