#!/usr/bin/env python3 import os import pandas as pd import glob import itertools from collections import Counter import numpy as np from sklearn import preprocessing from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score, make_scorer from sklearn.model_selection import GridSearchCV, StratifiedShuffleSplit from sklearn.utils import shuffle from sklearn.neural_network import MLPClassifier from sklearn.naive_bayes import GaussianNB from sklearn.svm import SVC from sklearn.tree import DecisionTreeClassifier from sklearn.ensemble import RandomForestClassifier from sklearn.utils import resample DIR: str = os.path.dirname(os.path.realpath(__file__)) IN_DIR: str = DIR + '/metrics' OUT_DIR: str = DIR + '/models' def clean_output(): filelist = glob.glob(OUT_DIR + '/models.csv') for f in filelist: os.remove(f) def make_a_lot_of_architectures(min_n: int, max_n: int, step: int, depth: int): if depth == 1: return list(range(min_n, max_n + 1, step)) else: return list(itertools.product(*([list(range(min_n, max_n + 1, step))] * depth))) def get_classifiers() -> list[tuple[any, dict[str, list[str]]]]: return [ (GaussianNB(), {}), (SVC(), { 'kernel': ['linear', 'poly', 'rbf', 'sigmoid'], 'gamma': ['scale', 'auto'] }), (DecisionTreeClassifier(), { 'criterion': ['gini', 'entropy'], 'splitter': ['best', 'random'] }), (RandomForestClassifier(), { 'criterion': ['gini', 'entropy'], 'max_features': ['sqrt', 'log2'], 'class_weight': ['balanced', 'balanced_subsample'] }), (MLPClassifier(), { 'max_iter': [500000], 'hidden_layer_sizes': make_a_lot_of_architectures(5, 100, 5, 1) + make_a_lot_of_architectures(15, 100, 15, 2) + make_a_lot_of_architectures(20, 100, 20, 3), 'activation': ['identity', 'logistic', 'tanh', 'relu'], 'solver': ['lbfgs', 'sgd', 'adam'], 'learning_rate': ['constant', 'invscaling', 'adaptive'] }) ] def balance_classes_with_upscaling(X, y, random_state: int) -> tuple[any, any]: # Count class with lower frequency counts = Counter(y) minority_class = 0 if counts[0] < counts[1] else 1 majority_class = 1 if minority_class == 0 else 0 X_minority = X[y == minority_class, :] Y_minority = y[y == minority_class] X_majority = X[y == majority_class, :] Y_majority = y[y == majority_class] minority_idxs = resample(list(range(len(X_minority))), replace=True, \ n_samples=counts[majority_class], random_state=random_state) X_minority_resampled = np.array([X_minority[i, :] for i in minority_idxs]) Y_minority_resampled = np.array([Y_minority[i] for i in minority_idxs]) X = np.concatenate([X_minority_resampled, X_majority]) y = np.concatenate([Y_minority_resampled, Y_majority]) return (X, y,) def perform_grid_search(X, y, classifiers, n_splits: int, random_state: int) -> pd.DataFrame: # Balance classes in training set by upsampling the minority class X_upscaled, y_upscaled = balance_classes_with_upscaling(X, y, random_state) dfs: list[pd.DataFrame] = [] sss = StratifiedShuffleSplit(n_splits=n_splits, train_size=0.8, random_state=random_state) for classifier, grid in classifiers: # cross-validation splits are same across calls as data is not shuffled # see: https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html clf = GridSearchCV(classifier, grid, cv=sss, n_jobs=-1, refit='f1', verbose=4, scoring={ 'precision': make_scorer(precision_score, average='binary', zero_division=0), 'accuracy': make_scorer(accuracy_score), 'recall': make_scorer(recall_score, average='binary'), 'f1': make_scorer(f1_score, average='binary') }) clf.fit(X_upscaled, y_upscaled) df_classifier = pd.DataFrame(clf.cv_results_) df_classifier['classifier'] = type(classifier).__name__ dfs.append(df_classifier) print(type(classifier).__name__ + " done") return pd.concat(dfs, ignore_index=True) def load_dataset() -> tuple[any, any]: df = pd.read_csv(IN_DIR + '/feature_vectors_labeled.csv') df = df.drop('class_name', axis=1) X = df.iloc[:, :-1].to_numpy() y = df.iloc[:, -1].to_numpy() scaler = preprocessing.StandardScaler().fit(X) X_scaled = scaler.transform(X) return (X_scaled, y) def find_best_and_save(df: pd.DataFrame): df_classifiers = df[df.columns.drop(list(df.filter(regex='^(mean_)|(std_)|(rank_)|(param_)')))] df_classifiers = df_classifiers.rename(columns={ "split0_test_precision": "precision", "split0_test_accuracy": "accuracy", "split0_test_recall": "recall", "split0_test_f1": "f1" }) df_best = df_classifiers \ .sort_values(['classifier', 'accuracy'], ascending=[True, False]) \ .groupby('classifier') \ .head(1) \ .reset_index(drop=True) df_best.to_csv(OUT_DIR + '/best.csv', index=False) # Prepare for formatting df_best['params'] = df_best['params'] \ .apply(lambda x: ", ".join([f"`{x}`: {y}" for x, y in eval(x).items()])) metrics = ['precision', 'accuracy', 'recall', 'f1'] df_best.loc[:, metrics] = df_best.loc[:, metrics].round(decimals=4) df_best = df_best.reindex( ['classifier', 'params'] + \ [x for x in df_best.columns if x in metrics], \ axis=1) print(df_best.to_markdown(index=False)) def main(): if os.path.exists(OUT_DIR + '/models.csv') and input("Run again grid search? (y/[n]): ").lower() == "y": clean_output() X, y = load_dataset() df = perform_grid_search(X, y, get_classifiers(), 1, 0xDEADB017) df.to_csv(OUT_DIR + '/models.csv', index=False) # df.to_csv(OUT_DIR + '/models_eu.csv', index=False, sep=';') else: df = pd.read_csv(OUT_DIR + '/models.csv') for clazz in set(df['classifier']): dfc = df.loc[df.classifier == clazz, :].copy() dfc = dfc[dfc.columns.drop(list(df.filter(regex='^(mean_)|(std_)|(rank_)|(params$)|(classifier$)')))] dfc = dfc.rename(columns={ "split0_test_precision": "precision", "split0_test_accuracy": "accuracy", "split0_test_recall": "recall", "split0_test_f1": "f1" }) dfc = dfc.reindex( [x for x in dfc.columns if x.startswith('param_')] + \ [x for x in dfc.columns if not x.startswith('param_')], \ axis=1) dfc = dfc.rename(columns=dict([(c, c.replace('param_', '')) for c in dfc.columns])) dfc = dfc.loc[:, dfc.notna().any(axis=0)] print(clazz) print(dfc.head(100).to_markdown(index=False)) print() find_best_and_save(df) if __name__ == '__main__': main()