import argparse import logging import os import re import typing from dataclasses import dataclass from typing import Optional import coloredlogs import nltk import numpy as np import pandas as pd from gensim.corpora import Dictionary from gensim.models import TfidfModel, LsiModel from gensim.models.doc2vec import TaggedDocument, Doc2Vec from gensim.similarities import SparseMatrixSimilarity from nltk.corpus import stopwords nltk.download('stopwords') SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__)) IN_DATASET = os.path.join(SCRIPT_DIR, "data.csv") DOC2VEC_MODEL = os.path.join(SCRIPT_DIR, "doc2vec_model.dat") # using nltk stop words and example words for now STOP_WORDS = set(stopwords.words('english')) \ .union(['test', 'tests', 'main', 'this', 'self']) def find_all(regex, word): matches = re.finditer(regex, word) return [m.group(0).lower() for m in matches] # https://stackoverflow.com/a/29920015 def camel_case_split(word): return find_all('.+?(?:(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|$)', word) def identifier_split(identifier): return [y for x in identifier.split("_") for y in camel_case_split(x)] def comment_split(comment): return find_all('[A-Za-z0-9]+', comment) def remove_stopwords(input_bow_list): return [word for word in input_bow_list if word not in STOP_WORDS] def get_bow(data, split_f): if data is None or (type(data) == float and np.isnan(data)): return [] return remove_stopwords(split_f(data)) def pick_most_similar(corpus, query, dictionary) -> list[tuple[int, float]]: index = SparseMatrixSimilarity(corpus, num_features=len(dictionary)) sims = index[query] pick_top = 5 return sorted(enumerate(sims), key=lambda x: x[1], reverse=True)[:pick_top] def print_results(indexes_scores: list[tuple[int, float]], df): print("\n===== RESULTS: =====") for idx, score in indexes_scores: row = df.loc[idx] comment = row["comment"] if type(comment) != str: desc = "" else: comment = re.sub(re.compile(r'[\s\n]+', re.MULTILINE), ' ', comment) desc = "Description: {c}\n".format(c=comment) desc = (desc[:75] + '...\n') if len(desc) > 75 else desc print("\nSimilarity: {s:2.02f}%".format(s=score * 100)) print("Python {feat}: {name}\n{desc}File: {file}\nLine: {line}" .format(feat=row["type"], name=row["name"], desc=desc, file=row["file"], line=row["line"])) def build_doc2vec_model(corpus_list): dvdocs = [TaggedDocument(text, [i]) for i, text in enumerate(corpus_list)] model = Doc2Vec(vector_size=100, epochs=100, sample=1e-5) model.build_vocab(dvdocs) model.train(dvdocs, total_examples=model.corpus_count, epochs=model.epochs) model.save(DOC2VEC_MODEL) return model def load_data() -> pd.DataFrame: df = pd.read_csv(IN_DATASET, index_col=0) df["name_bow"] = df["name"].apply(lambda n: get_bow(n, identifier_split)) df["comment_bow"] = df["comment"].apply(lambda c: get_bow(c, comment_split)) return df SparseVector = list[tuple[int, float]] DenseVector = np.array def to_dense(vector: SparseVector) -> DenseVector: dense = [0.0] * len(vector) for idx, value in vector: dense[idx] = value return np.array(dense) @dataclass class SearchResults: indexes_scores: list[tuple[int, float]] vectors: Optional[list[DenseVector]] query_vector: Optional[DenseVector] def __init__(self, indexes_values: list[tuple[int, float]], vectors: Optional[list[DenseVector]], query_vector: Optional[DenseVector]): self.indexes_scores = indexes_values self.vectors = vectors self.query_vector = query_vector def search(query: str, method: str, df: pd.DataFrame) -> SearchResults: corpus_list = [] for _, row in df.iterrows(): document_words = row["name_bow"] + row["comment_bow"] corpus_list.append(document_words) query_w = get_bow(query, comment_split) dictionary = None corpus_bow = None query_bow = None if method != "doc2vec": dictionary = Dictionary(corpus_list) corpus_bow = [dictionary.doc2bow(text) for text in corpus_list] query_bow = dictionary.doc2bow(query_w) if method == "tfidf": tfidf = TfidfModel(corpus_bow) return SearchResults(pick_most_similar(tfidf[corpus_bow], tfidf[query_bow], dictionary), None, None) elif method == "freq": return SearchResults(pick_most_similar(corpus_bow, query_bow, dictionary), None, None) elif method == "lsi": lsi = LsiModel(corpus_bow) corpus = typing.cast(list[SparseVector], lsi[corpus_bow]) results = pick_most_similar(corpus, lsi[query_bow], dictionary) result_vectors: list[DenseVector] = [to_dense(corpus[idx]) for idx, _ in results] return SearchResults(results, result_vectors, to_dense(lsi[query_bow])) elif method == "doc2vec": if os.path.exists(DOC2VEC_MODEL): model = Doc2Vec.load(DOC2VEC_MODEL) else: model = build_doc2vec_model(corpus_list) dv_query = model.infer_vector(query_w) results = model.dv.most_similar([dv_query], topn=5) result_vectors = [model.infer_vector(corpus_list[idx]) for idx, _ in results] return SearchResults(results, result_vectors, dv_query) else: raise ValueError("method unknown") def main(): parser = argparse.ArgumentParser() parser.add_argument("method", help="the method to compare similarities with", type=str) parser.add_argument("query", help="the query to search the corpus with", type=str) args = parser.parse_args() df = load_data() results = search(args.query, args.method, df) print_results(results.indexes_scores, df) if __name__ == "__main__": coloredlogs.install() logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO) main()