122 lines
4.5 KiB
Python
122 lines
4.5 KiB
Python
import os
|
|
import pickle
|
|
import urllib.request as http
|
|
from zipfile import ZipFile
|
|
from tensorflow.keras import Sequential, applications
|
|
|
|
import tensorflow as tf
|
|
import numpy as np
|
|
from PIL import Image
|
|
|
|
from tensorflow.keras import utils
|
|
|
|
from tensorflow.keras import layers as keras_layers
|
|
from tensorflow.keras import backend as K
|
|
from tensorflow.keras.datasets import cifar10
|
|
from tensorflow.keras.models import save_model, load_model
|
|
|
|
import torch
|
|
from keras.preprocessing.image import img_to_array, array_to_img
|
|
|
|
def load_keras_model(filename):
|
|
"""
|
|
Loads a compiled Keras model saved with models.save_model.
|
|
|
|
:param filename: string, path to the file storing the model.
|
|
:return: the model.
|
|
"""
|
|
model = load_model(filename)
|
|
return model
|
|
|
|
def load_rps(download=False, path='rps', reduction_factor=1):
|
|
"""
|
|
Downloads the rps dataset and returns the training and test sets.
|
|
Example of usage:
|
|
|
|
>>> (x_train, y_train), (x_test, y_test) = load_rps()
|
|
|
|
:param download: bool, default is False but for the first call should be True.
|
|
:param path: str, subdirectory in which the images should be downloaded, default is 'rps'.
|
|
:param reduction_factor: int, factor of reduction of the dataset (len = old_len // reduction_factor).
|
|
:return: the images and labels split into training and validation sets.
|
|
"""
|
|
url = 'https://drive.switch.ch/index.php/s/xjXhuYDUzoZvL02/download'
|
|
classes = ('rock', 'paper', 'scissors')
|
|
rps_dir = os.path.abspath(path)
|
|
filename = os.path.join(rps_dir, 'data.zip')
|
|
if not os.path.exists(rps_dir) and not download:
|
|
raise ValueError("Dataset not in the path. You should call this function with `download=True` the first time.")
|
|
if download:
|
|
os.makedirs(rps_dir, exist_ok=True)
|
|
print(f"Downloading rps images in {rps_dir} (may take a couple of minutes)")
|
|
path, msg = http.urlretrieve(url, filename)
|
|
with ZipFile(path, 'r') as zip_ref:
|
|
zip_ref.extractall(rps_dir)
|
|
os.remove(filename)
|
|
train_dir, test_dir = os.path.join(rps_dir, 'train'), os.path.join(rps_dir, 'test')
|
|
print("Loading training set...")
|
|
x_train, y_train = load_images_with_label(train_dir, classes)
|
|
x_train, y_train = x_train[::reduction_factor], y_train[::reduction_factor]
|
|
print("Loaded %d images for training" % len(y_train))
|
|
print("Loading test set...")
|
|
x_test, y_test = load_images_with_label(test_dir, classes)
|
|
x_test, y_test = x_test[::reduction_factor], y_test[::reduction_factor]
|
|
print("Loaded %d images for testing" % len(y_test))
|
|
return (x_train, y_train), (x_test, y_test)
|
|
|
|
def load_images(path):
|
|
img_files = os.listdir(path)
|
|
imgs, labels = [], []
|
|
for i in img_files:
|
|
if i.endswith('.jpg'):
|
|
# load the image (here you might want to resize the img to save memory)
|
|
imgs.append(Image.open(os.path.join(path, i)).copy())
|
|
return imgs
|
|
|
|
def load_images_with_label(path, classes):
|
|
imgs, labels = [], []
|
|
for c in classes:
|
|
# iterate over all the files in the folder
|
|
c_imgs = load_images(os.path.join(path, c))
|
|
imgs.extend(c_imgs)
|
|
labels.extend([c] * len(c_imgs))
|
|
return imgs, labels
|
|
|
|
if __name__ == '__main__':
|
|
model_aug = load_keras_model("nn_task2/nn_task2_aug.h5")
|
|
model_noaug = load_keras_model("nn_task2/nn_task2_noaug.h5")
|
|
|
|
# Resize the input images
|
|
resize = lambda x: [e.resize((224,224)) for e in x]
|
|
|
|
def process(x):
|
|
x_n = resize(x)
|
|
for i in range(len(x)):
|
|
bgr = img_to_array(x_n[i])[..., ::-1]
|
|
mean = [103.939, 116.779, 123.68]
|
|
bgr -= mean
|
|
x_n[i] = bgr
|
|
return x_n
|
|
|
|
_, (x_test, y_test) = load_rps(download=not os.path.exists("rps"))
|
|
x_test_n = tf.convert_to_tensor(process(x_test))
|
|
|
|
MAP = {'scissors': 0, 'paper': 1, 'rock': 2}
|
|
print(MAP)
|
|
mapfunc = np.vectorize(lambda x: MAP[x])
|
|
y_test_n = utils.to_categorical(mapfunc(y_test), 3)
|
|
|
|
print(np.shape(y_test_n))
|
|
|
|
y_pred_task1 = model_noaug.predict(x_test_n)
|
|
y_pred_task2 = model_aug.predict(x_test_n)
|
|
|
|
# Evaluate the missclassification error on the test set
|
|
# for example
|
|
assert y_test_n.shape == y_pred_task1.shape
|
|
acc = model_noaug.evaluate(x_test_n, y_test_n)[1] # evaluate accuracy with proper function
|
|
print("Accuracy model task 2 (no augmentation):", acc)
|
|
|
|
assert y_test_n.shape == y_pred_task2.shape
|
|
acc = model_aug.evaluate(x_test_n, y_test_n)[1] # evaluate accuracy with proper function
|
|
print("Accuracy model task 2 (with augmentation):", acc)
|