2023-10-11 11:59:07 +00:00
|
|
|
import argparse
|
2023-10-23 13:42:25 +00:00
|
|
|
import logging
|
2023-10-11 11:59:07 +00:00
|
|
|
import os
|
2023-10-23 13:42:25 +00:00
|
|
|
import re
|
2023-10-25 13:10:47 +00:00
|
|
|
import typing
|
2023-11-07 10:48:00 +00:00
|
|
|
from collections import defaultdict
|
2023-10-25 13:10:47 +00:00
|
|
|
from dataclasses import dataclass
|
|
|
|
from typing import Optional
|
2023-10-23 13:42:25 +00:00
|
|
|
|
|
|
|
import coloredlogs
|
2023-10-11 12:35:41 +00:00
|
|
|
import nltk
|
|
|
|
import numpy as np
|
2023-10-23 13:42:25 +00:00
|
|
|
import pandas as pd
|
2023-10-11 15:49:38 +00:00
|
|
|
from gensim.corpora import Dictionary
|
2023-10-23 13:42:25 +00:00
|
|
|
from gensim.models import TfidfModel, LsiModel
|
|
|
|
from gensim.models.doc2vec import TaggedDocument, Doc2Vec
|
|
|
|
from gensim.similarities import SparseMatrixSimilarity
|
|
|
|
from nltk.corpus import stopwords
|
2023-10-11 12:35:41 +00:00
|
|
|
|
2023-11-07 10:48:00 +00:00
|
|
|
nltk.download('stopwords', quiet=True)
|
2023-10-11 11:59:07 +00:00
|
|
|
|
|
|
|
SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__))
|
|
|
|
IN_DATASET = os.path.join(SCRIPT_DIR, "data.csv")
|
2023-10-16 14:36:25 +00:00
|
|
|
DOC2VEC_MODEL = os.path.join(SCRIPT_DIR, "doc2vec_model.dat")
|
2023-10-11 11:59:07 +00:00
|
|
|
|
2023-10-25 13:10:47 +00:00
|
|
|
# using nltk stop words and example words for now
|
2023-10-11 12:35:41 +00:00
|
|
|
STOP_WORDS = set(stopwords.words('english')) \
|
2023-11-07 10:48:00 +00:00
|
|
|
.union(['test', 'tests', 'main', 'this', 'self', 'def', 'object', 'false', 'class', 'tuple', 'use', 'default',
|
|
|
|
'none', 'dtype', 'true', 'function', 'returns', 'int', 'get', 'set', 'new', 'return', 'list', 'python',
|
|
|
|
'numpy', 'type', 'name'])
|
2023-10-11 12:35:41 +00:00
|
|
|
|
|
|
|
|
2023-11-07 10:48:00 +00:00
|
|
|
def find_all(regex: str, word: str, lower=True) -> list[str]:
|
2023-10-11 12:35:41 +00:00
|
|
|
matches = re.finditer(regex, word)
|
2023-11-07 10:48:00 +00:00
|
|
|
return [m.group(0).lower() if lower else m.group(0) for m in matches]
|
2023-10-11 12:35:41 +00:00
|
|
|
|
|
|
|
|
|
|
|
# https://stackoverflow.com/a/29920015
|
2023-11-07 10:48:00 +00:00
|
|
|
def camel_case_split(word: str) -> list[str]:
|
2023-10-11 12:35:41 +00:00
|
|
|
return find_all('.+?(?:(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|$)', word)
|
|
|
|
|
|
|
|
|
2023-11-07 10:48:00 +00:00
|
|
|
def identifier_split(identifier: str) -> list[str]:
|
2023-10-11 12:35:41 +00:00
|
|
|
return [y for x in identifier.split("_") for y in camel_case_split(x)]
|
|
|
|
|
|
|
|
|
2023-11-07 10:48:00 +00:00
|
|
|
def comment_split(comment: str) -> list[str]:
|
|
|
|
# Camel case split within "words" found takes care of referenced type names in the docstring comment
|
|
|
|
return [s for word in find_all('[A-Za-z]+', comment, lower=False) for s in camel_case_split(word)]
|
2023-10-11 12:35:41 +00:00
|
|
|
|
|
|
|
|
2023-11-07 10:48:00 +00:00
|
|
|
def remove_stopwords(input_bow_list: list[str]) -> list[str]:
|
|
|
|
return [word for word in input_bow_list if word not in STOP_WORDS and len(word) > 2]
|
2023-10-11 12:35:41 +00:00
|
|
|
|
|
|
|
|
2023-11-07 10:48:00 +00:00
|
|
|
def get_bow(data: Optional[float | str], split_f) -> list[str]:
|
2023-10-11 12:35:41 +00:00
|
|
|
if data is None or (type(data) == float and np.isnan(data)):
|
|
|
|
return []
|
|
|
|
return remove_stopwords(split_f(data))
|
|
|
|
|
2023-10-11 11:59:07 +00:00
|
|
|
|
2023-10-25 13:10:47 +00:00
|
|
|
def pick_most_similar(corpus, query, dictionary) -> list[tuple[int, float]]:
|
2023-10-11 15:49:38 +00:00
|
|
|
index = SparseMatrixSimilarity(corpus, num_features=len(dictionary))
|
|
|
|
sims = index[query]
|
2023-10-23 13:42:25 +00:00
|
|
|
pick_top = 5
|
|
|
|
return sorted(enumerate(sims), key=lambda x: x[1], reverse=True)[:pick_top]
|
2023-10-16 14:36:25 +00:00
|
|
|
|
2023-10-16 13:10:45 +00:00
|
|
|
|
2023-10-23 13:42:25 +00:00
|
|
|
def print_results(indexes_scores: list[tuple[int, float]], df):
|
2023-10-16 14:36:25 +00:00
|
|
|
print("\n===== RESULTS: =====")
|
|
|
|
|
2023-10-23 13:42:25 +00:00
|
|
|
for idx, score in indexes_scores:
|
2023-10-11 15:49:38 +00:00
|
|
|
row = df.loc[idx]
|
2023-10-23 13:42:25 +00:00
|
|
|
|
2023-10-16 14:36:25 +00:00
|
|
|
comment = row["comment"]
|
|
|
|
if type(comment) != str:
|
|
|
|
desc = ""
|
|
|
|
else:
|
|
|
|
comment = re.sub(re.compile(r'[\s\n]+', re.MULTILINE), ' ', comment)
|
|
|
|
desc = "Description: {c}\n".format(c=comment)
|
|
|
|
desc = (desc[:75] + '...\n') if len(desc) > 75 else desc
|
|
|
|
|
2023-10-23 13:42:25 +00:00
|
|
|
print("\nSimilarity: {s:2.02f}%".format(s=score * 100))
|
2023-10-25 13:10:47 +00:00
|
|
|
print("Python {feat}: {name}\n{desc}File: {file}\nLine: {line}"
|
2023-10-16 14:36:25 +00:00
|
|
|
.format(feat=row["type"], name=row["name"], desc=desc, file=row["file"], line=row["line"]))
|
|
|
|
|
|
|
|
|
|
|
|
def build_doc2vec_model(corpus_list):
|
|
|
|
dvdocs = [TaggedDocument(text, [i]) for i, text in enumerate(corpus_list)]
|
2023-11-07 10:48:00 +00:00
|
|
|
model = Doc2Vec(vector_size=300, epochs=50, sample=0)
|
2023-10-16 14:36:25 +00:00
|
|
|
model.build_vocab(dvdocs)
|
|
|
|
model.train(dvdocs, total_examples=model.corpus_count, epochs=model.epochs)
|
|
|
|
model.save(DOC2VEC_MODEL)
|
|
|
|
return model
|
2023-10-11 15:49:38 +00:00
|
|
|
|
2023-10-11 12:35:41 +00:00
|
|
|
|
2023-11-07 10:48:00 +00:00
|
|
|
def load_data(print_frequent=False) -> pd.DataFrame:
|
2023-10-23 13:42:25 +00:00
|
|
|
df = pd.read_csv(IN_DATASET, index_col=0)
|
2023-10-11 15:49:38 +00:00
|
|
|
df["name_bow"] = df["name"].apply(lambda n: get_bow(n, identifier_split))
|
|
|
|
df["comment_bow"] = df["comment"].apply(lambda c: get_bow(c, comment_split))
|
2023-11-07 10:48:00 +00:00
|
|
|
|
|
|
|
if print_frequent:
|
|
|
|
freq = defaultdict(int)
|
|
|
|
for bow in df["name_bow"].tolist():
|
|
|
|
for i in bow:
|
|
|
|
freq[i] += 1
|
|
|
|
|
|
|
|
for bow in df["comment_bow"].tolist():
|
|
|
|
for i in bow:
|
|
|
|
freq[i] += 1
|
|
|
|
|
|
|
|
for key, value in sorted(freq.items(), key=lambda k: k[1], reverse=True)[:100]:
|
|
|
|
print(f"{value}: {key}")
|
|
|
|
|
2023-10-23 13:42:25 +00:00
|
|
|
return df
|
2023-10-11 15:49:38 +00:00
|
|
|
|
2023-10-23 13:42:25 +00:00
|
|
|
|
2023-10-25 13:10:47 +00:00
|
|
|
SparseVector = list[tuple[int, float]]
|
|
|
|
DenseVector = np.array
|
|
|
|
|
|
|
|
|
|
|
|
def to_dense(vector: SparseVector) -> DenseVector:
|
|
|
|
dense = [0.0] * len(vector)
|
|
|
|
for idx, value in vector:
|
|
|
|
dense[idx] = value
|
|
|
|
return np.array(dense)
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
class SearchResults:
|
|
|
|
indexes_scores: list[tuple[int, float]]
|
|
|
|
vectors: Optional[list[DenseVector]]
|
|
|
|
query_vector: Optional[DenseVector]
|
|
|
|
|
|
|
|
def __init__(self, indexes_values: list[tuple[int, float]], vectors: Optional[list[DenseVector]],
|
|
|
|
query_vector: Optional[DenseVector]):
|
|
|
|
self.indexes_scores = indexes_values
|
|
|
|
self.vectors = vectors
|
|
|
|
self.query_vector = query_vector
|
|
|
|
|
|
|
|
|
|
|
|
def search(query: str, method: str, df: pd.DataFrame) -> SearchResults:
|
2023-10-11 15:49:38 +00:00
|
|
|
corpus_list = []
|
2023-10-25 13:10:47 +00:00
|
|
|
for _, row in df.iterrows():
|
2023-10-11 15:49:38 +00:00
|
|
|
document_words = row["name_bow"] + row["comment_bow"]
|
|
|
|
corpus_list.append(document_words)
|
|
|
|
|
2023-10-16 13:10:45 +00:00
|
|
|
query_w = get_bow(query, comment_split)
|
2023-10-23 13:42:25 +00:00
|
|
|
dictionary = None
|
|
|
|
corpus_bow = None
|
|
|
|
query_bow = None
|
|
|
|
|
2023-10-16 14:36:25 +00:00
|
|
|
if method != "doc2vec":
|
|
|
|
dictionary = Dictionary(corpus_list)
|
|
|
|
corpus_bow = [dictionary.doc2bow(text) for text in corpus_list]
|
|
|
|
query_bow = dictionary.doc2bow(query_w)
|
2023-10-23 13:42:25 +00:00
|
|
|
|
2023-10-11 15:49:38 +00:00
|
|
|
if method == "tfidf":
|
|
|
|
tfidf = TfidfModel(corpus_bow)
|
2023-10-25 13:10:47 +00:00
|
|
|
return SearchResults(pick_most_similar(tfidf[corpus_bow], tfidf[query_bow], dictionary), None, None)
|
2023-10-11 15:49:38 +00:00
|
|
|
elif method == "freq":
|
2023-10-25 13:10:47 +00:00
|
|
|
return SearchResults(pick_most_similar(corpus_bow, query_bow, dictionary), None, None)
|
2023-10-11 15:49:38 +00:00
|
|
|
elif method == "lsi":
|
|
|
|
lsi = LsiModel(corpus_bow)
|
2023-10-25 13:10:47 +00:00
|
|
|
corpus = typing.cast(list[SparseVector], lsi[corpus_bow])
|
|
|
|
results = pick_most_similar(corpus, lsi[query_bow], dictionary)
|
|
|
|
result_vectors: list[DenseVector] = [to_dense(corpus[idx]) for idx, _ in results]
|
|
|
|
return SearchResults(results, result_vectors, to_dense(lsi[query_bow]))
|
2023-10-16 13:10:45 +00:00
|
|
|
elif method == "doc2vec":
|
2023-10-16 14:36:25 +00:00
|
|
|
if os.path.exists(DOC2VEC_MODEL):
|
|
|
|
model = Doc2Vec.load(DOC2VEC_MODEL)
|
|
|
|
else:
|
|
|
|
model = build_doc2vec_model(corpus_list)
|
|
|
|
|
2023-10-23 13:42:25 +00:00
|
|
|
dv_query = model.infer_vector(query_w)
|
2023-10-25 13:10:47 +00:00
|
|
|
results = model.dv.most_similar([dv_query], topn=5)
|
|
|
|
|
|
|
|
result_vectors = [model.infer_vector(corpus_list[idx]) for idx, _ in results]
|
|
|
|
return SearchResults(results, result_vectors, dv_query)
|
2023-10-16 13:10:45 +00:00
|
|
|
else:
|
2023-10-23 13:42:25 +00:00
|
|
|
raise ValueError("method unknown")
|
2023-10-11 11:59:07 +00:00
|
|
|
|
|
|
|
|
|
|
|
def main():
|
2023-11-07 10:48:00 +00:00
|
|
|
methods = ["tfidf", "freq", "lsi", "doc2vec"]
|
|
|
|
|
2023-10-11 11:59:07 +00:00
|
|
|
parser = argparse.ArgumentParser()
|
2023-11-07 10:48:00 +00:00
|
|
|
parser.add_argument("method", help="the method to compare similarities with", type=str,
|
|
|
|
choices=methods + ["all"])
|
2023-10-11 11:59:07 +00:00
|
|
|
parser.add_argument("query", help="the query to search the corpus with", type=str)
|
2023-11-07 10:48:00 +00:00
|
|
|
parser.add_argument("-v", "--verbose", help="enable verbose logging", action='store_true')
|
2023-10-11 11:59:07 +00:00
|
|
|
args = parser.parse_args()
|
2023-10-23 13:42:25 +00:00
|
|
|
|
2023-11-07 10:48:00 +00:00
|
|
|
if args.verbose:
|
|
|
|
coloredlogs.install()
|
|
|
|
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
|
|
|
|
|
2023-10-23 13:42:25 +00:00
|
|
|
df = load_data()
|
2023-11-07 10:48:00 +00:00
|
|
|
|
|
|
|
if args.method == "all":
|
|
|
|
for method in methods:
|
|
|
|
print(f"Applying method {method}:")
|
|
|
|
results = search(args.query, method, df)
|
|
|
|
print_results(results.indexes_scores, df)
|
|
|
|
print()
|
|
|
|
else:
|
|
|
|
results = search(args.query, args.method, df)
|
|
|
|
print_results(results.indexes_scores, df)
|
2023-10-11 11:59:07 +00:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
main()
|