import numpy as np
import matplotlib.pyplot as plt
from scipy.ndimage import rotate, gaussian_filter
from tqdm.auto import tqdm
#define your image in the variable img
#....
#....
#....
#img =
plt.figure()
plt.imshow(img, cmap = 'gray')
#compute the system matrix and save it in sys_mat
#....
#....
#....
#sys_mat =
#compute the sinogram and save it in sino
#sino =
#compute the noisy and blurry sinogram and save in sino_noise_blur
#....
#....
#....
#sino_noise_blur =
#visualize the sinogram
plt.figure()
plt.imshow(sino_noise_blur)
def error(x, y):
"""Relative error."""
return np.max(np.abs(x - y) / (np.maximum(1e-8, np.abs(x) + np.abs(y))))
def gradient(f, x, df=None, h=1e-5):
"""Numerical gradient of f at x."""
grad = np.zeros_like(x)
with np.nditer(x, flags=['multi_index'], op_flags=['readwrite']) as it:
while not it.finished:
# evaluate function at x +/- h
ix = it.multi_index
value = x[ix]
x[ix] = value + h
pos = f(x)
x[ix] = value - h
neg = f(x)
x[ix] = value
# compute the partial derivative with centered formula
grad[ix] = (pos - neg) if df is None else np.sum((pos - neg) * df)
grad[ix] /= 2 * h
it.iternext()
return grad
def affine_forward(x, w):
"""Forward pass for a fully-connected affine layer.
Input x has shape (N, d_1, ..., d_k) and contains a minibatch of N samples, where each sample x[i] has shape
(d_1, ..., d_k). We will reshape each input into a vector of dimension D = d_1 * ... * d_k, and then transform it
to an output vector of dimension M (if you follow the paper, then M = D).
Input:
- x: An array containing input data, of shape (N, d_1, ..., d_k)
- w: An array of weights, of shape (D, M)
Return:
- out: Output, of shape (N, M)
- cache: shape (x, w)
"""
out = None
###########################################################################
# 1) Implement the affine forward pass. Store the result in 'out'. Note #
# that you will need to reshape the input into rows. #
# ----------------------------------------------------------------------- #
# START OF YOUR CODE #
###########################################################################
###########################################################################
# END OF YOUR CODE #
###########################################################################
cache = (x, w)
return out, cache
num_inputs = 2
input_shape = 4, 5, 6
output_dim = 3
input_size = num_inputs * np.prod(input_shape)
weight_size = output_dim * np.prod(input_shape)
x = np.linspace(-0.1, 0.5, input_size).reshape(num_inputs, *input_shape)
w = np.linspace(-0.2, 0.3, weight_size).reshape(np.prod(input_shape), output_dim)
out, _ = affine_forward(x, w)
out_expected = np.array([
[1.79834967, 1.80660132, 1.81485297],
[3.55553199, 3.6141327, 3.67273342]])
print('Testing affine_forward...')
print(f'Output difference (expected <1e-5): {error(out, out_expected):.2e}')
def affine_backward(dout, cache):
"""Backward pass for an affine layer.
Input:
- dout: Upstream derivative, of shape (N, M)
- cache: Tuple of:
- x: Input data, of shape (N, d_1, ... d_k)
- w: Weights, of shape (D, M)
Return:
- dx: Gradient with respect to x, of shape (N, d1, ..., d_k)
- dw: Gradient with respect to w, of shape (D, M)
"""
x, w = cache
dx, dw = None, None
###########################################################################
# 2) Implement the affine backward pass. #
# ----------------------------------------------------------------------- #
# START OF YOUR CODE #
###########################################################################
###########################################################################
# END OF YOUR CODE #
###########################################################################
return dx, dw
np.random.seed(231)
x = np.random.randn(10, 2, 3)
w = np.random.randn(6, 5)
b = np.random.randn(5)
dout = np.random.randn(10, 5)
dx_expected = gradient(lambda x: affine_forward(x, w)[0], x, dout)
dw_expected = gradient(lambda w: affine_forward(x, w)[0], w, dout)
_, cache = affine_forward(x, w)
dx, dw = affine_backward(dout, cache)
print('Testing affine_backward...')
print(f'dx difference (expected <1e-5): {error(dx, dx_expected):.2e}')
print(f'dw difference (expected <1e-5): {error(dw, dw_expected):.2e}')
def affine_forward_nontrainable(x,w):
"""Forward pass for a fully-connected affine layer.
Input x has shape (N, d_1', ..., d_k') and contains a minibatch of N samples, where each sample x[i] has shape
(d_1', ..., d_k'). We will reshape each input into a vector of dimension D' = d_1' * ... * d_k', and then transform it
to an output vector of dimension M'.
Input:
- x: An array containing input data, of shape (N, d_1', ..., d_k')
Return:
- out: Output, of shape (N, M')
- cache: shape (x)
"""
out = None
###########################################################################
# 1) Implement the affine forward pass. Store the result in 'out'. Note #
# that you will need to reshape the input into rows. #
# ----------------------------------------------------------------------- #
# START OF YOUR CODE #
###########################################################################
###########################################################################
# END OF YOUR CODE #
###########################################################################
cache = (x)
return out, cache
num_inputs = 2
input_shape = 4, 5, 6
output_dim = 3
input_size = num_inputs * np.prod(input_shape)
weight_size = output_dim * np.prod(input_shape)
x = np.linspace(-0.1, 0.5, input_size).reshape(num_inputs, *input_shape)
w = np.linspace(-0.2, 0.3, weight_size).reshape(np.prod(input_shape), output_dim)
out, _ = affine_forward_nontrainable(x,w)
out_expected = np.array([
[1.79834967, 1.80660132, 1.81485297],
[3.55553199, 3.6141327, 3.67273342]])
print('Testing affine_nontrainable_forward...')
print(f'Output difference (expected <1e-5): {error(out, out_expected):.2e}')
def affine_backward_nontrainable(dout, cache):
"""Backward pass for an affine layer.
Input:
- dout: Upstream derivative, of shape (N, R)
- cache: Tuple of:
- x: Input data, of shape (N, d_1', ..., d_k')
Return:
- dx: Gradient with respect to x, of shape (N, d1', ..., d_k')
"""
x = cache
dx = None
###########################################################################
# 2) Implement the affine backward pass. #
# ----------------------------------------------------------------------- #
# START OF YOUR CODE #
###########################################################################
###########################################################################
# END OF YOUR CODE #
###########################################################################
return dx
np.random.seed(231)
x = np.random.randn(10, 2, 3)
w = np.random.randn(6, 5)
b = np.random.randn(5)
dout = np.random.randn(10, 5)
dx_expected = gradient(lambda x: affine_forward(x, w)[0], x, dout)
dw_expected = gradient(lambda w: affine_forward(x, w)[0], w, dout)
_, cache = affine_forward(x, w)
dx, dw = affine_backward(dout, cache)
print('Testing affine_backward...')
print(f'dx difference (expected <1e-5): {error(dx, dx_expected):.2e}')
print(f'dw difference (expected <1e-5): {error(dw, dw_expected):.2e}')
def MSE_loss(x, y):
"""Loss and gradient of MSE.
Input:
- x: Input data, of shape (N, M') where x[i,:] is the output recon of the ith input
- y: ground truth of shape (N,M') where y[i] is the ground truth recon of the ith input
Return:
- loss: Scalar giving the loss
- dx: Gradient of the loss with respect to x (N,M')
"""
loss, dx = None, None
###########################################################################
# 5) Implement the loss and gradient for softmax classification. #
# ----------------------------------------------------------------------- #
# START OF YOUR CODE #
###########################################################################
###########################################################################
# END OF YOUR CODE #
###########################################################################
return loss, dx
np.random.seed(231)
num_outputs, num_inputs = 50, 50
x = 0.001 * np.random.randn(num_inputs, num_outputs)
y = np.random.randint(num_outputs, size=num_inputs)
dx_expected = gradient(lambda x: MSE_loss(x, y)[0], x)
loss, dx = MSE_loss(x, y)
print('Testing MSE_loss...')
print(f'dx difference (expected <1e-5): {error(dx, dx_expected):.2e}')
class Network:
"""A fully-connected neural network.
A two-layer fully-connected network (without activation function) with MSE loss that uses a modular layer design. We
assume an input dimension of D, a hidden dimension of D, and performs a reconstruction.
The architecture is of the form: affine -> affine_nontrainable -> MSE.
Note that this class does not implement gradient descent; instead, it will interact with a separate Solver object
that is responsible for performing the optimization.
Learnable parameters of the model are stored in self.params dictionary, which maps parameter names to numpy arrays.
"""
def __init__(self, input_dim=60*64, hidden_dim=60*64, num_outputs=64*64, weight_scale=1e-3, reg=0):
"""Initialize a new network.
Input:
- input_dim: Size of the input (here: size of (flattened) sinogram)
- hidden_dim: Size of the hidden layer (here: equal to input_dim)
- num_outputs: Size of the output (here: size of (flattened) reconstruction)
- weight_scale: Standard deviation for random initialization of the weights
- reg: L2 regularization strength (here: we don't use it, so can just be left set to zero)
"""
self.params = {}
self.reg = reg
#######################################################################
# 6) Initialize the weights. Weights should be initialized #
# from a Gaussian centered at 0 with standard deviation equal to #
# 'weight_scale'. All weights should be stored in the dictionary #
# 'self.params', with first layer weights using the key 'W1'. #
# ------------------------------------------------------------------- #
# START OF YOUR CODE #
#######################################################################
#######################################################################
# END OF YOUR CODE #
#######################################################################
def loss(self, X, y=None):
"""Loss and gradient for a minibatch of data.
Inputs:
- X: Array of input data, of shape (N, d_1, ..., d_k)
- y: Ground truth reconstruction, of shape (N, M')
Returns:
If y is None, then run a test-time forward pass of the model, and return:
- recon: Array of shape (N, M'), where recon[i] is the (flattened) reconstructed image
If y is not None, then run a training-time forward and backward pass, and return:
- loss: Scalar loss
- grads: Dictionary with the same keys as self.params, mapping parameter names to gradients of the loss
"""
recon = None
#######################################################################
# 7) Implement the forward pass, computing the reconstruction for 'X' #
# and storing it in the recon variable. #
# ------------------------------------------------------------------- #
# START OF YOUR CODE #
#######################################################################
#######################################################################
# END OF YOUR CODE #
#######################################################################
if y is None: # then assume we are in test mode
return recon
loss, grads = 0, {}
#######################################################################
# 8) Implement the backward pass for the two-layer net. Store the #
# loss in the loss variable and gradients in the 'grads' dictionary. #
# Compute data loss using softmax, and make sure that 'grads[k]' #
# holds the gradients for 'self.params[k]'. #
# #
# ------------------------------------------------------------------- #
# START OF YOUR CODE #
#######################################################################
#######################################################################
# END OF YOUR CODE #
#######################################################################
return loss, grads
def sgd(w, dw, config=None):
"""Stochastic gradient descent."""
if config is None:
config = {}
config.setdefault('learning_rate', 1e-2)
###########################################################################
# 9) Implement the vanilla stochastic gradient descent update formula. #
# ----------------------------------------------------------------------- #
# START OF YOUR CODE #
###########################################################################
###########################################################################
# END OF YOUR CODE #
###########################################################################
return w, config
class Solver:
"""Solver for training classification models.
Accepts both training data and validation labels so it can periodically check classification accuracy to watch out
for overfitting.
To train a model, you will first construct a Solver instance, passing the model, dataset, and various options
(learning rate, batch size, etc) to the constructor. You will then call the train() method to run the optimization
procedure and train the model.
After the train() method returns, model.params will contain the parameters that performed best on the validation
set over the course of training. In addition, the instance variable solver.loss_history will contain a list of all
losses encountered during training and the instance variables solver.train_acc_history and solver.val_acc_history
will be lists of the accuracies of the model on the training and validation set at each epoch.
Example usage might look something like this:
data = {
'X_train': # training data
'y_train': # training labels
'X_val': # validation data
'y_val': # validation labels
}
model = MyAwesomeModel(hidden_size=100, reg=10)
solver = Solver(
model, data,
optim_config={'learning_rate': 1e-3},
lr_decay=0.95,
num_epochs=10,
batch_size=100,
print_every=100)
solver.train()
A Solver works on a model object that must conform to the following API:
- model.params must be a dictionary mapping string parameter names to numpy arrays containing parameter values.
- model.loss(X, y) must be a function that computes training-time loss and gradients, and test-time classification
scores, with the following inputs and outputs:
Input:
- X: Array giving a minibatch of input data of shape (N, d_1, ..., d_k)
- y: Array of labels, of shape (N,) giving labels for X where y[i] is the label for X[i].
Return:
If y is None, run a test-time forward pass, and return:
- scores: Array of shape (N, C), where scores[i, c] is the classification score for X[i] and class c
If y is not None, run a training-time forward and backward pass, and return:
- loss: Scalar giving the loss
- grads: Dictionary with the same keys as self.params mapping parameter names to gradients of the loss
"""
def __init__(self, model, data, **kwargs):
"""Solver constructor.
Required arguments:
- model: A model object conforming to the API described above
- data: A dictionary of training and validation data containing:
'X_train': Array, shape (N_train, d_1, ..., d_k) of training images
'X_val': Array, shape (N_val, d_1, ..., d_k) of validation images
'y_train': Array, shape (N_train,) of labels for training images
'y_val': Array, shape (N_val,) of labels for validation images
Optional arguments:
- optim_config: A dictionary containing hyperparameters that will be passed to the chosen update rule. Each
update rule requires different hyperparameters (see optim.py) but all update rules require a 'learning_rate'
parameter so that should always be present
- lr_decay: A scalar for learning rate decay; after each epoch the learning rate is multiplied by this value
- batch_size: Size of minibatches used to compute loss and gradient during training
- num_epochs: The number of epochs to run for during training
- print_every: Integer; training losses will be printed every print_every iterations
- verbose: Boolean; if set to false then no output will be printed during training
- num_train_samples: Number of training samples used to check training accuracy; default is 1000; set to None
to use entire training set
- num_val_samples: Number of validation samples to use to check val accuracy; default is None, which uses the
entire validation set
- checkpoint_name: If not None, then save model checkpoints here every epoch
"""
self.model = model
self.X_train = data['X_train']
self.y_train = data['y_train']
self.X_val = data['X_val']
self.y_val = data['y_val']
# unpack keyword arguments
self.optim_config = kwargs.pop('optim_config', {})
self.lr_decay = kwargs.pop('lr_decay', 1.0)
self.batch_size = kwargs.pop('batch_size', 100)
self.num_epochs = kwargs.pop('num_epochs', 10)
self.num_train_samples = kwargs.pop('num_train_samples', 1000)
self.num_val_samples = kwargs.pop('num_val_samples', None)
self.checkpoint_name = kwargs.pop('checkpoint_name', None)
self.print_every = kwargs.pop('print_every', 10)
self.verbose = kwargs.pop('verbose', True)
# throw an error if there are extra keyword arguments
if len(kwargs) > 0:
extra = ', '.join(f'"{k}"' for k in list(kwargs.keys()))
raise ValueError(f'Unrecognized arguments {extra}')
# make sure the update rule exists, then replace the string name with the actual function
self.update_rule = sgd
self._reset()
def _reset(self):
"""Book-keeping variables for optimization; Do not call manually."""
# set up some variables for book-keeping
self.epoch = 0
self.best_val_acc = 0
self.best_params = {}
self.loss_history = []
self.train_acc_history = []
self.val_acc_history = []
# make a deep copy of the optim_config for each parameter
self.optim_configs = {}
for p in self.model.params:
d = {k: v for k, v in self.optim_config.items()}
self.optim_configs[p] = d
def _step(self):
"""Perform a single gradient update; called by "train" method."""
# make a minibatch of training data
num_train = self.X_train.shape[0]
batch_mask = np.random.choice(num_train, self.batch_size)
X_batch = self.X_train[batch_mask]
y_batch = self.y_train[batch_mask]
# compute loss and gradient
loss, grads = self.model.loss(X_batch, y_batch)
self.loss_history.append(loss)
# perform a parameter update
for p, w in self.model.params.items():
dw = grads[p]
config = self.optim_configs[p]
next_w, next_config = self.update_rule(w, dw, config)
self.model.params[p] = next_w
self.optim_configs[p] = next_config
def _save_checkpoint(self):
if self.checkpoint_name is None:
return
checkpoint = {
'model': self.model,
'update_rule': self.update_rule,
'lr_decay': self.lr_decay,
'optim_config': self.optim_config,
'batch_size': self.batch_size,
'num_train_samples': self.num_train_samples,
'num_val_samples': self.num_val_samples,
'epoch': self.epoch,
'loss_history': self.loss_history,
'train_acc_history': self.train_acc_history,
'val_acc_history': self.val_acc_history,
}
filename = f'{self.checkpoint_name}_epoch_{self.epoch}.pkl'
if self.verbose:
print(f'Saving checkpoint to "{filename}"')
with open(filename, 'wb') as f:
pickle.dump(checkpoint, f)
def train(self):
"""Perform optimization to train the model."""
num_train = self.X_train.shape[0]
iterations_per_epoch = max(num_train // self.batch_size, 1)
num_iterations = self.num_epochs * iterations_per_epoch
for t in tqdm(range(num_iterations)):
self._step()
# print training loss
if self.verbose and t % self.print_every == 0:
print(f'(Iteration {t + 1} / {num_iterations}) loss: {self.loss_history[-1]}')
# at the end of every epoch, increment the epoch counter and decay the learning rate
epoch_end = (t + 1) % iterations_per_epoch == 0
if epoch_end:
self.epoch += 1
for k in self.optim_configs:
self.optim_configs[k]['learning_rate'] *= self.lr_decay
# data = {
# 'X_train':
# 'y_train':
# 'X_val':
# 'y_val':
# }
input_size =
hidden_size =
num_outputs =
model = Network(input_size, hidden_size, num_outputs)
solver = None
print(model.params)
###############################################################################
# 10) Use a 'Solver' instance to train the network. #
# --------------------------------------------------------------------------- #
# START OF YOUR CODE #
###############################################################################
###############################################################################
# END OF YOUR CODE #
###############################################################################
fig, ax = plt.subplots(figsize=(8,4))
ax.plot(solver.loss_history, '.', alpha=0.5)
ax.set_xlabel('Iteration')
ax.set_ylabel('Training Loss')
pass
# y = model.loss(....)
# y_im = y.reshape(...)
plt.imshow(y_im)
plt.colorbar()