Skip to content
Snippets Groups Projects

Update Aufgabe 2: Spectrometer Alignment/Laurids Radtke/spectrometer_alignment.py

Compare and
1 file
+ 97
0
Compare changes
  • Side-by-side
  • Inline
import h5py
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import Dataset, DataLoader
from timeit import default_timer as timer
class HDF5Dataset(Dataset):
def __init__(self, path, device, normalize):
sims = 50000 # only half the data
self.x = []
self.y = []
with h5py.File(path, 'r') as f:
for key in range(sims):
X = np.array(f['/' + str(key) + '/X'])
Y = np.array(f['/' + str(key) + '/Y'])
if(normalize):
X = MinMaxScaler().fit_transform(X) #normalise the data between 0 and 1
#Y = MinMaxScaler().fit_transform(Y)
#print(Y[:])
if np.amax(X) > 0: # ignore the blank images (no data)
self.x.append(X)
self.y.append(Y)
self.x = torch.tensor(self.x, dtype=torch.float32, device=device) # input
self.y = torch.tensor(self.y, dtype=torch.float32, device=device) # input
self.len = len(self.x)
# print('x.shape', np.shape(self.x))
# print('y.shape', np.shape(self.y))
# print(self.x.element_size() * self.x.nelement())
# number of rows in the dataset
def __len__(self):
return self.len
# get a row at an index
def __getitem__(self, index):
self.xc = self.x[index]
self.yc = self.y[index]
return self.xc, self.yc
#clips testdata from end of tensor
def get_splits(self, test_data_size):
train_x = self.x[:-test_data_size]
test_x = self.x[-test_data_size:]
train_y = self.y[:-test_data_size]
test_y = self.y[-test_data_size:]
train_data = [train_x, train_y]
test_data = [test_x, test_y]
return train_data, test_data
def prepare_data(path, device, batch_size,test_data_size):
start = timer()
dataset = HDF5Dataset(path, device, normalize=True)
train_data, test_data = dataset.get_splits(test_data_size)
print("check")
end = timer()
print(end-start)
start = timer()
train_dl = DataLoader(train_data, batch_size=batch_size, shuffle=True, pin_memory=False)
test_dl = DataLoader(test_data, batch_size=batch_size, shuffle=False, pin_memory=False)
end = timer()
print(end-start)
return train_dl, test_dl, test_data
#TODO anpassen an unsere Situation
class Net(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(1,1,5) #1 channel input 1 channel output 5x5 kernel ??
self.pool = nn.MaxPool2d(5) #kernel size 5x5
self.conv2 = nn.Conv2d(1, 1, 5)
self.fc1 = nn.Linear(5 * 5, 120) #lin transformation
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 3)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = torch.flatten(x, 1) # flatten all dimensions except batch
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
path = "training_data_alignment.h5"
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
batch_size = 10
test_data_size = 1000
net = Net()
#print(net)
train_dl, test_dl, test_data = prepare_data(path, device, batch_size, test_data_size)