198 lines
6.9 KiB
Python
Executable file
198 lines
6.9 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import os
|
|
import pandas as pd
|
|
import glob
|
|
import itertools
|
|
from collections import Counter
|
|
import numpy as np
|
|
|
|
from sklearn import preprocessing
|
|
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score, make_scorer
|
|
from sklearn.model_selection import GridSearchCV, StratifiedShuffleSplit
|
|
from sklearn.utils import shuffle
|
|
|
|
from sklearn.neural_network import MLPClassifier
|
|
from sklearn.naive_bayes import GaussianNB
|
|
from sklearn.svm import SVC
|
|
from sklearn.tree import DecisionTreeClassifier
|
|
from sklearn.ensemble import RandomForestClassifier
|
|
from sklearn.utils import resample
|
|
|
|
DIR: str = os.path.dirname(os.path.realpath(__file__))
|
|
IN_DIR: str = DIR + '/metrics'
|
|
OUT_DIR: str = DIR + '/models'
|
|
|
|
|
|
def clean_output():
|
|
filelist = glob.glob(OUT_DIR + '/models.csv')
|
|
for f in filelist:
|
|
os.remove(f)
|
|
|
|
|
|
def make_a_lot_of_architectures(min_n: int, max_n: int, step: int, depth: int):
|
|
if depth == 1:
|
|
return list(range(min_n, max_n + 1, step))
|
|
else:
|
|
return list(itertools.product(*([list(range(min_n, max_n + 1, step))] * depth)))
|
|
|
|
|
|
def get_classifiers() -> list[tuple[any, dict[str, list[str]]]]:
|
|
return [
|
|
(GaussianNB(), {}),
|
|
(SVC(), {
|
|
'kernel': ['linear', 'poly', 'rbf', 'sigmoid'],
|
|
'gamma': ['scale', 'auto']
|
|
}),
|
|
(DecisionTreeClassifier(), {
|
|
'criterion': ['gini', 'entropy'],
|
|
'splitter': ['best', 'random']
|
|
}),
|
|
(RandomForestClassifier(), {
|
|
'criterion': ['gini', 'entropy'],
|
|
'max_features': ['sqrt', 'log2'],
|
|
'class_weight': ['balanced', 'balanced_subsample']
|
|
}),
|
|
(MLPClassifier(), {
|
|
'max_iter': [500000],
|
|
'hidden_layer_sizes':
|
|
make_a_lot_of_architectures(5, 100, 5, 1) +
|
|
make_a_lot_of_architectures(15, 100, 15, 2) +
|
|
make_a_lot_of_architectures(20, 100, 20, 3),
|
|
'activation': ['identity', 'logistic', 'tanh', 'relu'],
|
|
'solver': ['lbfgs', 'sgd', 'adam'],
|
|
'learning_rate': ['constant', 'invscaling', 'adaptive']
|
|
})
|
|
]
|
|
|
|
|
|
def balance_classes_with_upscaling(X, y, random_state: int) -> tuple[any, any]:
|
|
# Count class with lower frequency
|
|
counts = Counter(y)
|
|
minority_class = 0 if counts[0] < counts[1] else 1
|
|
majority_class = 1 if minority_class == 0 else 0
|
|
|
|
X_minority = X[y == minority_class, :]
|
|
Y_minority = y[y == minority_class]
|
|
X_majority = X[y == majority_class, :]
|
|
Y_majority = y[y == majority_class]
|
|
|
|
minority_idxs = resample(list(range(len(X_minority))), replace=True, \
|
|
n_samples=counts[majority_class], random_state=random_state)
|
|
|
|
X_minority_resampled = np.array([X_minority[i, :] for i in minority_idxs])
|
|
Y_minority_resampled = np.array([Y_minority[i] for i in minority_idxs])
|
|
|
|
X = np.concatenate([X_minority_resampled, X_majority])
|
|
y = np.concatenate([Y_minority_resampled, Y_majority])
|
|
|
|
return (X, y,)
|
|
|
|
|
|
def perform_grid_search(X, y, classifiers, n_splits: int, random_state: int) -> pd.DataFrame:
|
|
# Balance classes in training set by upsampling the minority class
|
|
X_upscaled, y_upscaled = balance_classes_with_upscaling(X, y, random_state)
|
|
|
|
dfs: list[pd.DataFrame] = []
|
|
sss = StratifiedShuffleSplit(n_splits=n_splits, train_size=0.8, random_state=random_state)
|
|
|
|
for classifier, grid in classifiers:
|
|
# cross-validation splits are same across calls as data is not shuffled
|
|
# see: https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html
|
|
clf = GridSearchCV(classifier, grid, cv=sss, n_jobs=-1, refit='f1', verbose=4, scoring={
|
|
'precision': make_scorer(precision_score, average='binary', zero_division=0),
|
|
'accuracy': make_scorer(accuracy_score),
|
|
'recall': make_scorer(recall_score, average='binary'),
|
|
'f1': make_scorer(f1_score, average='binary')
|
|
})
|
|
|
|
clf.fit(X_upscaled, y_upscaled)
|
|
|
|
df_classifier = pd.DataFrame(clf.cv_results_)
|
|
df_classifier['classifier'] = type(classifier).__name__
|
|
|
|
dfs.append(df_classifier)
|
|
print(type(classifier).__name__ + " done")
|
|
|
|
return pd.concat(dfs, ignore_index=True)
|
|
|
|
|
|
def load_dataset() -> tuple[any, any]:
|
|
df = pd.read_csv(IN_DIR + '/feature_vectors_labeled.csv')
|
|
df = df.drop('class_name', axis=1)
|
|
|
|
X = df.iloc[:, :-1].to_numpy()
|
|
y = df.iloc[:, -1].to_numpy()
|
|
|
|
scaler = preprocessing.StandardScaler().fit(X)
|
|
X_scaled = scaler.transform(X)
|
|
|
|
return (X_scaled, y)
|
|
|
|
|
|
def find_best_and_save(df: pd.DataFrame):
|
|
df_classifiers = df[df.columns.drop(list(df.filter(regex='^(mean_)|(std_)|(rank_)|(param_)')))]
|
|
df_classifiers = df_classifiers.rename(columns={
|
|
"split0_test_precision": "precision",
|
|
"split0_test_accuracy": "accuracy",
|
|
"split0_test_recall": "recall",
|
|
"split0_test_f1": "f1"
|
|
})
|
|
df_best = df_classifiers \
|
|
.sort_values(['classifier', 'accuracy'], ascending=[True, False]) \
|
|
.groupby('classifier') \
|
|
.head(1) \
|
|
.reset_index(drop=True)
|
|
df_best.to_csv(OUT_DIR + '/best.csv', index=False)
|
|
|
|
# Prepare for formatting
|
|
df_best['params'] = df_best['params'] \
|
|
.apply(lambda x: ", ".join([f"`{x}`: {y}" for x, y in eval(x).items()]))
|
|
|
|
metrics = ['precision', 'accuracy', 'recall', 'f1']
|
|
df_best.loc[:, metrics] = df_best.loc[:, metrics].round(decimals=4)
|
|
df_best = df_best.reindex(
|
|
['classifier', 'params'] + \
|
|
[x for x in df_best.columns if x in metrics], \
|
|
axis=1)
|
|
|
|
print(df_best.to_markdown(index=False))
|
|
|
|
|
|
def main():
|
|
if os.path.exists(OUT_DIR + '/models.csv') and input("Run again grid search? (y/[n]): ").lower() == "y":
|
|
clean_output()
|
|
|
|
X, y = load_dataset()
|
|
df = perform_grid_search(X, y, get_classifiers(), 1, 0xDEADB017)
|
|
|
|
df.to_csv(OUT_DIR + '/models.csv', index=False)
|
|
# df.to_csv(OUT_DIR + '/models_eu.csv', index=False, sep=';')
|
|
else:
|
|
df = pd.read_csv(OUT_DIR + '/models.csv')
|
|
|
|
for clazz in set(df['classifier']):
|
|
dfc = df.loc[df.classifier == clazz, :].copy()
|
|
dfc = dfc[dfc.columns.drop(list(df.filter(regex='^(mean_)|(std_)|(rank_)|(params$)|(classifier$)')))]
|
|
dfc = dfc.rename(columns={
|
|
"split0_test_precision": "precision",
|
|
"split0_test_accuracy": "accuracy",
|
|
"split0_test_recall": "recall",
|
|
"split0_test_f1": "f1"
|
|
})
|
|
|
|
dfc = dfc.reindex(
|
|
[x for x in dfc.columns if x.startswith('param_')] + \
|
|
[x for x in dfc.columns if not x.startswith('param_')], \
|
|
axis=1)
|
|
dfc = dfc.rename(columns=dict([(c, c.replace('param_', '')) for c in dfc.columns]))
|
|
dfc = dfc.loc[:, dfc.notna().any(axis=0)]
|
|
|
|
print(clazz)
|
|
print(dfc.head(100).to_markdown(index=False))
|
|
print()
|
|
|
|
find_best_and_save(df)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|