ml/as2_Maggioni_Claudio/deliverable/run_task2.py

123 lines
4.5 KiB
Python

import os
import pickle
import urllib.request as http
from zipfile import ZipFile
from tensorflow.keras import Sequential, applications
import tensorflow as tf
import numpy as np
from PIL import Image
from tensorflow.keras import utils
from tensorflow.keras import layers as keras_layers
from tensorflow.keras import backend as K
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.models import save_model, load_model
import torch
from keras.preprocessing.image import img_to_array, array_to_img
def load_keras_model(filename):
"""
Loads a compiled Keras model saved with models.save_model.
:param filename: string, path to the file storing the model.
:return: the model.
"""
model = load_model(filename)
return model
def load_rps(download=False, path='rps', reduction_factor=1):
"""
Downloads the rps dataset and returns the training and test sets.
Example of usage:
>>> (x_train, y_train), (x_test, y_test) = load_rps()
:param download: bool, default is False but for the first call should be True.
:param path: str, subdirectory in which the images should be downloaded, default is 'rps'.
:param reduction_factor: int, factor of reduction of the dataset (len = old_len // reduction_factor).
:return: the images and labels split into training and validation sets.
"""
url = 'https://drive.switch.ch/index.php/s/xjXhuYDUzoZvL02/download'
classes = ('rock', 'paper', 'scissors')
rps_dir = os.path.abspath(path)
filename = os.path.join(rps_dir, 'data.zip')
if not os.path.exists(rps_dir) and not download:
raise ValueError("Dataset not in the path. You should call this function with `download=True` the first time.")
if download:
os.makedirs(rps_dir, exist_ok=True)
print(f"Downloading rps images in {rps_dir} (may take a couple of minutes)")
path, msg = http.urlretrieve(url, filename)
with ZipFile(path, 'r') as zip_ref:
zip_ref.extractall(rps_dir)
os.remove(filename)
train_dir, test_dir = os.path.join(rps_dir, 'train'), os.path.join(rps_dir, 'test')
print("Loading training set...")
x_train, y_train = load_images_with_label(train_dir, classes)
x_train, y_train = x_train[::reduction_factor], y_train[::reduction_factor]
print("Loaded %d images for training" % len(y_train))
print("Loading test set...")
x_test, y_test = load_images_with_label(test_dir, classes)
x_test, y_test = x_test[::reduction_factor], y_test[::reduction_factor]
print("Loaded %d images for testing" % len(y_test))
return (x_train, y_train), (x_test, y_test)
def load_images(path):
img_files = os.listdir(path)
imgs, labels = [], []
for i in img_files:
if i.endswith('.jpg'):
# load the image (here you might want to resize the img to save memory)
imgs.append(Image.open(os.path.join(path, i)).copy())
return imgs
def load_images_with_label(path, classes):
imgs, labels = [], []
for c in classes:
# iterate over all the files in the folder
c_imgs = load_images(os.path.join(path, c))
imgs.extend(c_imgs)
labels.extend([c] * len(c_imgs))
return imgs, labels
if __name__ == '__main__':
model_aug = load_keras_model("nn_task2/nn_task2_aug.h5")
model_noaug = load_keras_model("nn_task2/nn_task2_noaug.h5")
# Resize the input images
resize = lambda x: [e.resize((224,224)) for e in x]
def process(x):
x_n = resize(x)
for i in range(len(x)):
bgr = img_to_array(x_n[i])[..., ::-1]
mean = [103.939, 116.779, 123.68]
bgr -= mean
x_n[i] = bgr
return x_n
_, (x_test, y_test) = load_rps(download=not os.path.exists("rps"))
x_test_n = tf.convert_to_tensor(process(x_test))
MAP = {'scissors': 0, 'paper': 1, 'rock': 2}
print(MAP)
mapfunc = np.vectorize(lambda x: MAP[x])
y_test_n = utils.to_categorical(mapfunc(y_test), 3)
print(np.shape(y_test_n))
y_pred_task1 = model_noaug.predict(x_test_n)
y_pred_task2 = model_aug.predict(x_test_n)
# Evaluate the missclassification error on the test set
# for example
assert y_test_n.shape == y_pred_task1.shape
acc = model_noaug.evaluate(x_test_n, y_test_n)[1] # evaluate accuracy with proper function
print("Accuracy model task 2 (no augmentation):", acc)
assert y_test_n.shape == y_pred_task2.shape
acc = model_aug.evaluate(x_test_n, y_test_n)[1] # evaluate accuracy with proper function
print("Accuracy model task 2 (with augmentation):", acc)