Exercise 2 - Template¶

In [ ]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.ndimage import rotate, gaussian_filter
from tqdm.auto import tqdm

Define the image of an ideal point source¶

  • decide on an image size (e.g. 64x64 pixels)
  • ideal point source (PET or SPECT imaging): set the activity to zero everywhere but in one pixel
In [ ]:
#define your image in the variable img
#....
#....
#....
#img = 
In [ ]:
plt.figure()
plt.imshow(img, cmap = 'gray')

Compute system matrix and sinogram¶

  • note that for PET/SPECT image the system matrix works in the same way as the system matrices we computed in the tutorials for transmission imaging (see also lecture slides)
  • decide on the number of projection angles (e.g. 60 angles $\in [0, 180)$)
  • iterate over all angles and all projections to compute the rows of the system matrix
In [ ]:
#compute the system matrix and save it in sys_mat
#....
#....
#....
#sys_mat = 
  • compute the (noise- and artifact-free) sinogram using the system matrix and the image
In [ ]:
#compute the sinogram and save it in sino
#sino = 
  • add noise to the sinogram: which statistics describe noise in PET imaging?
  • add blur to the sinogram: reshape sino such that you can visulaize the sinogram, run a blurring filter over the image (you can determine the amount of blur)
  • for both noise and blur you can use functions from python libraries
In [ ]:
#compute the noisy and blurry sinogram and save in sino_noise_blur
#....
#....
#....
#sino_noise_blur = 
In [ ]:
#visualize the sinogram
plt.figure()
plt.imshow(sino_noise_blur)

Define the building blocks for your neural network¶

In [ ]:
def error(x, y):
    """Relative error."""
    return np.max(np.abs(x - y) / (np.maximum(1e-8, np.abs(x) + np.abs(y))))
In [ ]:
def gradient(f, x, df=None, h=1e-5):
    """Numerical gradient of f at x."""
    grad = np.zeros_like(x)
    with np.nditer(x, flags=['multi_index'], op_flags=['readwrite']) as it:
        while not it.finished:
            # evaluate function at x +/- h
            ix = it.multi_index
            value = x[ix]
            x[ix] = value + h
            pos = f(x)
            x[ix] = value - h
            neg = f(x)
            x[ix] = value
            # compute the partial derivative with centered formula
            grad[ix] = (pos - neg) if df is None else np.sum((pos - neg) * df)
            grad[ix] /= 2 * h
            it.iternext()
    return grad
  • define the forward pass for the first layer of the network: fully connected without bias
In [ ]:
def affine_forward(x, w):
    """Forward pass for a fully-connected affine layer.
    Input x has shape (N, d_1, ..., d_k) and contains a minibatch of N samples, where each sample x[i] has shape
    (d_1, ..., d_k). We will reshape each input into a vector of dimension D = d_1 * ... * d_k, and then transform it
    to an output vector of dimension M (if you follow the paper, then M = D).
    Input:
    - x: An array containing input data, of shape (N, d_1, ..., d_k)
    - w: An array of weights, of shape (D, M)
    
    Return:
    - out: Output, of shape (N, M)
    - cache: shape (x, w)
    """
    out = None
    ###########################################################################
    # 1) Implement the affine forward pass. Store the result in 'out'. Note   #
    # that you will need to reshape the input into rows.                      #
    # ----------------------------------------------------------------------- #
    #     START OF YOUR CODE                                                  #
    ###########################################################################
    ###########################################################################
    #     END OF YOUR CODE                                                    #
    ###########################################################################
    cache = (x, w)
    return out, cache
In [ ]:
num_inputs = 2
input_shape = 4, 5, 6
output_dim = 3
input_size = num_inputs * np.prod(input_shape)
weight_size = output_dim * np.prod(input_shape)
x = np.linspace(-0.1, 0.5, input_size).reshape(num_inputs, *input_shape)
w = np.linspace(-0.2, 0.3, weight_size).reshape(np.prod(input_shape), output_dim)
out, _ = affine_forward(x, w)
out_expected = np.array([
    [1.79834967, 1.80660132, 1.81485297],
    [3.55553199, 3.6141327,  3.67273342]])
print('Testing affine_forward...')
print(f'Output difference (expected <1e-5): {error(out, out_expected):.2e}')
  • define the backward pass for the first layer
In [ ]:
def affine_backward(dout, cache):
    """Backward pass for an affine layer.
    Input:
    - dout: Upstream derivative, of shape (N, M)
    - cache: Tuple of:
      - x: Input data, of shape (N, d_1, ... d_k)
      - w: Weights, of shape (D, M)
      
    Return:
    - dx: Gradient with respect to x, of shape (N, d1, ..., d_k)
    - dw: Gradient with respect to w, of shape (D, M)
   
    """
    x, w = cache
    dx, dw = None, None
    ###########################################################################
    # 2) Implement the affine backward pass.                                  #
    # ----------------------------------------------------------------------- #
    #     START OF YOUR CODE                                                  #
    ###########################################################################
    ###########################################################################
    #     END OF YOUR CODE                                                    #
    ###########################################################################
    return dx, dw
In [ ]:
np.random.seed(231)
x = np.random.randn(10, 2, 3)
w = np.random.randn(6, 5)
b = np.random.randn(5)
dout = np.random.randn(10, 5)
dx_expected = gradient(lambda x: affine_forward(x, w)[0], x, dout)
dw_expected = gradient(lambda w: affine_forward(x, w)[0], w, dout)
_, cache = affine_forward(x, w)
dx, dw = affine_backward(dout, cache)
print('Testing affine_backward...')
print(f'dx difference (expected <1e-5): {error(dx, dx_expected):.2e}')
print(f'dw difference (expected <1e-5): {error(dw, dw_expected):.2e}')
  • define the forward pass for the second layer: this layer has no trainable weights, but uses the weights from the system matrix (sys_mat)
In [ ]:
def affine_forward_nontrainable(x,w):
    """Forward pass for a fully-connected affine layer.
    Input x has shape (N, d_1', ..., d_k') and contains a minibatch of N samples, where each sample x[i] has shape
    (d_1', ..., d_k'). We will reshape each input into a vector of dimension D' = d_1' * ... * d_k', and then transform it
    to an output vector of dimension M'.
    Input:
    - x: An array containing input data, of shape (N, d_1', ..., d_k')
    
    Return:
    - out: Output, of shape (N, M')
    - cache: shape (x)
    
    """
    out = None
    ###########################################################################
    # 1) Implement the affine forward pass. Store the result in 'out'. Note   #
    # that you will need to reshape the input into rows.                      #
    # ----------------------------------------------------------------------- #
    #     START OF YOUR CODE                                                  #
    ###########################################################################
    ###########################################################################
    #     END OF YOUR CODE                                                    #
    ###########################################################################
    cache = (x)
    return out, cache
In [ ]:
num_inputs = 2
input_shape = 4, 5, 6
output_dim = 3
input_size = num_inputs * np.prod(input_shape)
weight_size = output_dim * np.prod(input_shape)
x = np.linspace(-0.1, 0.5, input_size).reshape(num_inputs, *input_shape)
w = np.linspace(-0.2, 0.3, weight_size).reshape(np.prod(input_shape), output_dim)
out, _ = affine_forward_nontrainable(x,w)
out_expected = np.array([
    [1.79834967, 1.80660132, 1.81485297],
    [3.55553199, 3.6141327,  3.67273342]])
print('Testing affine_nontrainable_forward...')
print(f'Output difference (expected <1e-5): {error(out, out_expected):.2e}')
  • define the backward pass for the second (nontrainable) layer
In [ ]:
def affine_backward_nontrainable(dout, cache):
    """Backward pass for an affine layer.
    Input:
    - dout: Upstream derivative, of shape (N, R)
    - cache: Tuple of:
      - x: Input data, of shape (N, d_1', ..., d_k')
      
    Return:
    - dx: Gradient with respect to x, of shape (N, d1', ..., d_k')
    
   
    """
    x = cache
    dx = None
    ###########################################################################
    # 2) Implement the affine backward pass.                                  #
    # ----------------------------------------------------------------------- #
    #     START OF YOUR CODE                                                  #
    ###########################################################################
    ###########################################################################
    #     END OF YOUR CODE                                                    #
    ###########################################################################
    return dx
In [ ]:
np.random.seed(231)
x = np.random.randn(10, 2, 3)
w = np.random.randn(6, 5)
b = np.random.randn(5)
dout = np.random.randn(10, 5)
dx_expected = gradient(lambda x: affine_forward(x, w)[0], x, dout)
dw_expected = gradient(lambda w: affine_forward(x, w)[0], w, dout)
_, cache = affine_forward(x, w)
dx, dw = affine_backward(dout, cache)
print('Testing affine_backward...')
print(f'dx difference (expected <1e-5): {error(dx, dx_expected):.2e}')
print(f'dw difference (expected <1e-5): {error(dw, dw_expected):.2e}')
  • define the loss function (e.g. Mean Squared Error) and its gradient
In [ ]:
def MSE_loss(x, y):
    """Loss and gradient of MSE.
    Input:
    - x: Input data, of shape (N, M') where x[i,:] is the output recon of the ith input
    - y: ground truth of shape (N,M') where y[i] is the ground truth recon of the ith input
    Return:
    - loss: Scalar giving the loss
    - dx: Gradient of the loss with respect to x (N,M')
    """
    loss, dx = None, None
    ###########################################################################
    # 5) Implement the loss and gradient for softmax classification.          #
    # ----------------------------------------------------------------------- #
    #     START OF YOUR CODE                                                  #
    ###########################################################################
    ###########################################################################
    #     END OF YOUR CODE                                                    #
    ###########################################################################
    return loss, dx
In [ ]:
np.random.seed(231)
num_outputs, num_inputs = 50, 50
x = 0.001 * np.random.randn(num_inputs, num_outputs)
y = np.random.randint(num_outputs, size=num_inputs)
dx_expected = gradient(lambda x: MSE_loss(x, y)[0], x)
loss, dx = MSE_loss(x, y)
print('Testing MSE_loss...')
print(f'dx difference (expected <1e-5): {error(dx, dx_expected):.2e}')
  • insert the code for initialisation of the weights, forward and backward pass through the network
In [ ]:
class Network:
    """A fully-connected neural network.
    A two-layer fully-connected network (without activation function) with MSE loss that uses a modular layer design. We
    assume an input dimension of D, a hidden dimension of D, and performs a reconstruction.
    The architecture is of the form: affine -> affine_nontrainable -> MSE.
    Note that this class does not implement gradient descent; instead, it will interact with a separate Solver object
    that is responsible for performing the optimization.
    Learnable parameters of the model are stored in self.params dictionary, which maps parameter names to numpy arrays.
    """
    def __init__(self, input_dim=60*64, hidden_dim=60*64, num_outputs=64*64, weight_scale=1e-3, reg=0):
        """Initialize a new network.
        Input:
        - input_dim: Size of the input (here: size of (flattened) sinogram)
        - hidden_dim: Size of the hidden layer (here: equal to input_dim)
        - num_outputs: Size of the output (here: size of (flattened) reconstruction)
        - weight_scale: Standard deviation for random initialization of the weights
        - reg: L2 regularization strength (here: we don't use it, so can just be left set to zero)
        """
        self.params = {}
        self.reg = reg
        #######################################################################
        # 6) Initialize the weights. Weights should be initialized            #
        # from a Gaussian centered at 0 with standard deviation equal to      #
        # 'weight_scale'. All weights should be stored in the dictionary      # 
        # 'self.params', with first layer weights using the key 'W1'.         #
        # ------------------------------------------------------------------- #
        #     START OF YOUR CODE                                              #
        #######################################################################
        #######################################################################
        #     END OF YOUR CODE                                                #
        #######################################################################
    def loss(self, X, y=None):
        """Loss and gradient for a minibatch of data.
        Inputs:
        - X: Array of input data, of shape (N, d_1, ..., d_k)
        - y: Ground truth reconstruction, of shape (N, M')
        Returns:
        If y is None, then run a test-time forward pass of the model, and return:
        - recon: Array of shape (N, M'), where recon[i] is the (flattened) reconstructed image
        If y is not None, then run a training-time forward and backward pass, and return:
        - loss: Scalar loss
        - grads: Dictionary with the same keys as self.params, mapping parameter names to gradients of the loss
        """
        recon = None
        #######################################################################
        # 7) Implement the forward pass, computing the reconstruction for 'X' #
        # and storing it in the recon variable.                               #
        # ------------------------------------------------------------------- #
        #     START OF YOUR CODE                                              #
        #######################################################################
        #######################################################################
        #     END OF YOUR CODE                                                #
        #######################################################################
        if y is None:  # then assume we are in test mode
            return recon
        loss, grads = 0, {}
        #######################################################################
        # 8) Implement the backward pass for the two-layer net. Store the     #
        # loss in the loss variable and gradients in the 'grads' dictionary.  #
        # Compute data loss using softmax, and make sure that 'grads[k]'      #
        # holds the gradients for 'self.params[k]'.                           #
        #                                                                     #
        # ------------------------------------------------------------------- #
        #     START OF YOUR CODE                                              #
        #######################################################################
        #######################################################################
        #     END OF YOUR CODE                                                #
        #######################################################################
        return loss, grads
  • Implement the stochastic gradient update formula
In [ ]:
def sgd(w, dw, config=None):
    """Stochastic gradient descent."""
    if config is None:
        config = {}
    config.setdefault('learning_rate', 1e-2)
    ###########################################################################
    # 9) Implement the vanilla stochastic gradient descent update formula.    #
    # ----------------------------------------------------------------------- #
    #     START OF YOUR CODE                                                  #
    ###########################################################################
    ###########################################################################
    #     END OF YOUR CODE                                                    #
    ###########################################################################
    return w, config
In [ ]:
class Solver:
    """Solver for training classification models.
    Accepts both training data and validation labels so it can periodically check classification accuracy to watch out
    for overfitting.
    To train a model, you will first construct a Solver instance, passing the model, dataset, and various options
    (learning rate, batch size, etc) to the constructor. You will then call the train() method to run the optimization
    procedure and train the model.
    After the train() method returns, model.params will contain the parameters that performed best on the validation
    set over the course of training. In addition, the instance variable solver.loss_history will contain a list of all
    losses encountered during training and the instance variables solver.train_acc_history and solver.val_acc_history
    will be lists of the accuracies of the model on the training and validation set at each epoch.
    Example usage might look something like this:
    data = {
      'X_train': # training data
      'y_train': # training labels
      'X_val': # validation data
      'y_val': # validation labels
    }
    model = MyAwesomeModel(hidden_size=100, reg=10)
    solver = Solver(
        model, data,
        optim_config={'learning_rate': 1e-3},
        lr_decay=0.95,
        num_epochs=10,
        batch_size=100,
        print_every=100)
    solver.train()
    A Solver works on a model object that must conform to the following API:
    - model.params must be a dictionary mapping string parameter names to numpy arrays containing parameter values.
    - model.loss(X, y) must be a function that computes training-time loss and gradients, and test-time classification
      scores, with the following inputs and outputs:
      Input:
      - X: Array giving a minibatch of input data of shape (N, d_1, ..., d_k)
      - y: Array of labels, of shape (N,) giving labels for X where y[i] is the label for X[i].
      Return:
      If y is None, run a test-time forward pass, and return:
      - scores: Array of shape (N, C), where scores[i, c] is the classification score for X[i] and class c
      If y is not None, run a training-time forward and backward pass, and return:
      - loss: Scalar giving the loss
      - grads: Dictionary with the same keys as self.params mapping parameter names to gradients of the loss
    """
    def __init__(self, model, data, **kwargs):
        """Solver constructor.
        Required arguments:
        - model: A model object conforming to the API described above
        - data: A dictionary of training and validation data containing:
          'X_train': Array, shape (N_train, d_1, ..., d_k) of training images
          'X_val': Array, shape (N_val, d_1, ..., d_k) of validation images
          'y_train': Array, shape (N_train,) of labels for training images
          'y_val': Array, shape (N_val,) of labels for validation images
        Optional arguments:
        - optim_config: A dictionary containing hyperparameters that will be passed to the chosen update rule. Each
          update rule requires different hyperparameters (see optim.py) but all update rules require a 'learning_rate'
          parameter so that should always be present
        - lr_decay: A scalar for learning rate decay; after each epoch the learning rate is multiplied by this value
        - batch_size: Size of minibatches used to compute loss and gradient during training
        - num_epochs: The number of epochs to run for during training
        - print_every: Integer; training losses will be printed every print_every iterations
        - verbose: Boolean; if set to false then no output will be printed during training
        - num_train_samples: Number of training samples used to check training accuracy; default is 1000; set to None
          to use entire training set
        - num_val_samples: Number of validation samples to use to check val accuracy; default is None, which uses the
          entire validation set
        - checkpoint_name: If not None, then save model checkpoints here every epoch
        """
        self.model = model
        self.X_train = data['X_train']
        self.y_train = data['y_train']
        self.X_val = data['X_val']
        self.y_val = data['y_val']
        # unpack keyword arguments
        self.optim_config = kwargs.pop('optim_config', {})
        self.lr_decay = kwargs.pop('lr_decay', 1.0)
        self.batch_size = kwargs.pop('batch_size', 100)
        self.num_epochs = kwargs.pop('num_epochs', 10)
        self.num_train_samples = kwargs.pop('num_train_samples', 1000)
        self.num_val_samples = kwargs.pop('num_val_samples', None)
        self.checkpoint_name = kwargs.pop('checkpoint_name', None)
        self.print_every = kwargs.pop('print_every', 10)
        self.verbose = kwargs.pop('verbose', True)
        # throw an error if there are extra keyword arguments
        if len(kwargs) > 0:
            extra = ', '.join(f'"{k}"' for k in list(kwargs.keys()))
            raise ValueError(f'Unrecognized arguments {extra}')
        # make sure the update rule exists, then replace the string name with the actual function
        self.update_rule = sgd
        self._reset()
    def _reset(self):
        """Book-keeping variables for optimization; Do not call manually."""
        # set up some variables for book-keeping
        self.epoch = 0
        self.best_val_acc = 0
        self.best_params = {}
        self.loss_history = []
        self.train_acc_history = []
        self.val_acc_history = []
        # make a deep copy of the optim_config for each parameter
        self.optim_configs = {}
        for p in self.model.params:
            d = {k: v for k, v in self.optim_config.items()}
            self.optim_configs[p] = d
    def _step(self):
        """Perform a single gradient update; called by "train" method."""
        # make a minibatch of training data
        num_train = self.X_train.shape[0]
        batch_mask = np.random.choice(num_train, self.batch_size)
        X_batch = self.X_train[batch_mask]
        y_batch = self.y_train[batch_mask]
        # compute loss and gradient
        loss, grads = self.model.loss(X_batch, y_batch)
        self.loss_history.append(loss)
        # perform a parameter update
        for p, w in self.model.params.items():
            dw = grads[p]
            config = self.optim_configs[p]
            next_w, next_config = self.update_rule(w, dw, config)
            self.model.params[p] = next_w
            self.optim_configs[p] = next_config
    def _save_checkpoint(self):
        if self.checkpoint_name is None:
            return
        checkpoint = {
            'model': self.model,
            'update_rule': self.update_rule,
            'lr_decay': self.lr_decay,
            'optim_config': self.optim_config,
            'batch_size': self.batch_size,
            'num_train_samples': self.num_train_samples,
            'num_val_samples': self.num_val_samples,
            'epoch': self.epoch,
            'loss_history': self.loss_history,
            'train_acc_history': self.train_acc_history,
            'val_acc_history': self.val_acc_history,
        }
        filename = f'{self.checkpoint_name}_epoch_{self.epoch}.pkl'
        if self.verbose:
            print(f'Saving checkpoint to "{filename}"')
        with open(filename, 'wb') as f:
            pickle.dump(checkpoint, f)
    def train(self):
        """Perform optimization to train the model."""
        num_train = self.X_train.shape[0]
        iterations_per_epoch = max(num_train // self.batch_size, 1)
        num_iterations = self.num_epochs * iterations_per_epoch
        for t in tqdm(range(num_iterations)):
            self._step()
            # print training loss
            if self.verbose and t % self.print_every == 0:
                print(f'(Iteration {t + 1} / {num_iterations}) loss: {self.loss_history[-1]}')
            # at the end of every epoch, increment the epoch counter and decay the learning rate
            epoch_end = (t + 1) % iterations_per_epoch == 0
            if epoch_end:
                self.epoch += 1
                for k in self.optim_configs:
                    self.optim_configs[k]['learning_rate'] *= self.lr_decay
  • Create the data set: as a first try you can just set the training data (key 'X_train') to an array containing the sinogram you computed before and the ground truth (key 'y_train') to an array containing the flattened image of the ideal point source. You can use the same images for 'X_val' and 'y_val'.
  • Of course, this is not the proper way to train an image nowadays, we have much more computational ressources than the paper from 1991! If you'd like to, you can create more ground truth images, and more sinograms, and split them into training, validation and testing data.
In [ ]:
# data = {
#       'X_train': 
#       'y_train': 
#       'X_val': 
#       'y_val': 
#     }
  • set the correct sizes for input, hidden layer and output
  • initialize the solver and start the training.
In [ ]:
input_size =
hidden_size =
num_outputs =
model = Network(input_size, hidden_size, num_outputs)
solver = None
print(model.params)
###############################################################################
# 10) Use a 'Solver' instance to train the network.                           #
# --------------------------------------------------------------------------- #
#     START OF YOUR CODE                                                      #
###############################################################################
###############################################################################
#     END OF YOUR CODE                                                        #
###############################################################################
  • plot the loss using the code below - is it decreasing? If not, play with the learning rate.
In [ ]:
fig, ax = plt.subplots(figsize=(8,4))
ax.plot(solver.loss_history, '.', alpha=0.5)
ax.set_xlabel('Iteration')
ax.set_ylabel('Training Loss')
pass
  • get the output of the trained network (evalute the model.loss function of the sinogram without providing a ground truth)
  • visualize the output of the network - what does your reconstruction look like?
  • hint: the output is flattened, therefore first reshape it into image dimensions.
In [ ]:
# y = model.loss(....)
# y_im = y.reshape(...)
In [ ]:
plt.imshow(y_im)
plt.colorbar()
In [ ]: