This repository has been archived on 2023-06-18. You can view files and clone it, but cannot push or open issues or pull requests.
ima02/train_classifiers.py

199 lines
6.9 KiB
Python
Executable File

#!/usr/bin/env python3
import os
import pandas as pd
import glob
import itertools
from collections import Counter
import numpy as np
from sklearn import preprocessing
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score, make_scorer
from sklearn.model_selection import GridSearchCV, StratifiedShuffleSplit
from sklearn.utils import shuffle
from sklearn.neural_network import MLPClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.utils import resample
DIR: str = os.path.dirname(os.path.realpath(__file__))
IN_DIR: str = DIR + '/metrics'
OUT_DIR: str = DIR + '/models'
def clean_output():
filelist = glob.glob(OUT_DIR + '/models.csv')
for f in filelist:
os.remove(f)
def make_a_lot_of_architectures(min_n: int, max_n: int, step: int, depth: int):
if depth == 1:
return list(range(min_n, max_n + 1, step))
else:
return list(itertools.product(*([list(range(min_n, max_n + 1, step))] * depth)))
def get_classifiers() -> list[tuple[any, dict[str, list[str]]]]:
return [
(GaussianNB(), {}),
(SVC(), {
'kernel': ['linear', 'poly', 'rbf', 'sigmoid'],
'gamma': ['scale', 'auto']
}),
(DecisionTreeClassifier(), {
'criterion': ['gini', 'entropy'],
'splitter': ['best', 'random']
}),
(RandomForestClassifier(), {
'criterion': ['gini', 'entropy'],
'max_features': ['sqrt', 'log2'],
'class_weight': ['balanced', 'balanced_subsample']
}),
(MLPClassifier(), {
'max_iter': [500000],
'hidden_layer_sizes':
make_a_lot_of_architectures(5, 100, 5, 1) +
make_a_lot_of_architectures(15, 100, 15, 2) +
make_a_lot_of_architectures(20, 100, 20, 3),
'activation': ['identity', 'logistic', 'tanh', 'relu'],
'solver': ['lbfgs', 'sgd', 'adam'],
'learning_rate': ['constant', 'invscaling', 'adaptive']
})
]
def balance_classes_with_upscaling(X, y, random_state: int) -> tuple[any, any]:
# Count class with lower frequency
counts = Counter(y)
minority_class = 0 if counts[0] < counts[1] else 1
majority_class = 1 if minority_class == 0 else 0
X_minority = X[y == minority_class, :]
Y_minority = y[y == minority_class]
X_majority = X[y == majority_class, :]
Y_majority = y[y == majority_class]
minority_idxs = resample(list(range(len(X_minority))), replace=True, \
n_samples=counts[majority_class], random_state=random_state)
X_minority_resampled = np.array([X_minority[i, :] for i in minority_idxs])
Y_minority_resampled = np.array([Y_minority[i] for i in minority_idxs])
X = np.concatenate([X_minority_resampled, X_majority])
y = np.concatenate([Y_minority_resampled, Y_majority])
return (X, y,)
def perform_grid_search(X, y, classifiers, n_splits: int, random_state: int) -> pd.DataFrame:
# Balance classes in training set by upsampling the minority class
X_upscaled, y_upscaled = balance_classes_with_upscaling(X, y, random_state)
dfs: list[pd.DataFrame] = []
sss = StratifiedShuffleSplit(n_splits=n_splits, train_size=0.8, random_state=random_state)
for classifier, grid in classifiers:
# cross-validation splits are same across calls as data is not shuffled
# see: https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html
clf = GridSearchCV(classifier, grid, cv=sss, n_jobs=-1, refit='f1', verbose=4, scoring={
'precision': make_scorer(precision_score, average='binary', zero_division=0),
'accuracy': make_scorer(accuracy_score),
'recall': make_scorer(recall_score, average='binary'),
'f1': make_scorer(f1_score, average='binary')
})
clf.fit(X_upscaled, y_upscaled)
df_classifier = pd.DataFrame(clf.cv_results_)
df_classifier['classifier'] = type(classifier).__name__
dfs.append(df_classifier)
print(type(classifier).__name__ + " done")
return pd.concat(dfs, ignore_index=True)
def load_dataset() -> tuple[any, any]:
df = pd.read_csv(IN_DIR + '/feature_vectors_labeled.csv')
df = df.drop('class_name', axis=1)
X = df.iloc[:, :-1].to_numpy()
y = df.iloc[:, -1].to_numpy()
scaler = preprocessing.StandardScaler().fit(X)
X_scaled = scaler.transform(X)
return (X_scaled, y)
def find_best_and_save(df: pd.DataFrame):
df_classifiers = df[df.columns.drop(list(df.filter(regex='^(mean_)|(std_)|(rank_)|(param_)')))]
df_classifiers = df_classifiers.rename(columns={
"split0_test_precision": "precision",
"split0_test_accuracy": "accuracy",
"split0_test_recall": "recall",
"split0_test_f1": "f1"
})
df_best = df_classifiers \
.sort_values(['classifier', 'accuracy'], ascending=[True, False]) \
.groupby('classifier') \
.head(1) \
.reset_index(drop=True)
df_best.to_csv(OUT_DIR + '/best.csv', index=False)
# Prepare for formatting
df_best['params'] = df_best['params'] \
.apply(lambda x: ", ".join([f"`{x}`: {y}" for x, y in eval(x).items()]))
metrics = ['precision', 'accuracy', 'recall', 'f1']
df_best.loc[:, metrics] = df_best.loc[:, metrics].round(decimals=4)
df_best = df_best.reindex(
['classifier', 'params'] + \
[x for x in df_best.columns if x in metrics], \
axis=1)
print(df_best.to_markdown(index=False))
def main():
if os.path.exists(OUT_DIR + '/models.csv') and input("Run again grid search? (y/[n]): ").lower() == "y":
clean_output()
X, y = load_dataset()
df = perform_grid_search(X, y, get_classifiers(), 1, 0xDEADB017)
df.to_csv(OUT_DIR + '/models.csv', index=False)
# df.to_csv(OUT_DIR + '/models_eu.csv', index=False, sep=';')
else:
df = pd.read_csv(OUT_DIR + '/models.csv')
for clazz in set(df['classifier']):
dfc = df.loc[df.classifier == clazz, :].copy()
dfc = dfc[dfc.columns.drop(list(df.filter(regex='^(mean_)|(std_)|(rank_)|(params$)|(classifier$)')))]
dfc = dfc.rename(columns={
"split0_test_precision": "precision",
"split0_test_accuracy": "accuracy",
"split0_test_recall": "recall",
"split0_test_f1": "f1"
})
dfc = dfc.reindex(
[x for x in dfc.columns if x.startswith('param_')] + \
[x for x in dfc.columns if not x.startswith('param_')], \
axis=1)
dfc = dfc.rename(columns=dict([(c, c.replace('param_', '')) for c in dfc.columns]))
dfc = dfc.loc[:, dfc.notna().any(axis=0)]
print(clazz)
print(dfc.head(100).to_markdown(index=False))
print()
find_best_and_save(df)
if __name__ == '__main__':
main()