import ast import os.path import typing import zipfile from typing import Optional import pandas as pd from tqdm import tqdm from fastparquet import write import multiprocessing PWD = os.path.dirname(__file__) IN_DIR = os.path.join(PWD, "download") OUT_DIR = os.path.join(PWD, "functions") def read_functions(content, filename: str, zip_name: str) -> Optional[pd.DataFrame]: records = [] try: tree = ast.parse(content.decode('utf-8'), filename=filename) for node in ast.walk(tree): if isinstance(node, ast.FunctionDef): f_source: str = ast.unparse(typing.cast(any, node)) records.append({ "zip_filename": zip_name, "py_filename": filename, "source": f_source, "success": True, "error": None, }) except Exception as e: print(f"project '{zip_name}': error parsing '{filename}': {e}") records.append({ "zip_filename": zip_name, "py_filename": filename, "source": "", "success": False, "error": str(e) }) return pd.DataFrame.from_records(records) def read_zip_file(zip_file: str): out_path = os.path.join(OUT_DIR, os.path.basename(zip_file) + ".pq") df = pd.DataFrame(columns=["zip_filename", "py_filename", "source"]) try: with zipfile.ZipFile(zip_file, 'r') as zip_ref: info_list = [info for info in zip_ref.infolist() if info.filename.endswith('.py')] for info in tqdm(info_list, desc=os.path.basename(zip_file), ncols=0, position=None, leave=True): content = zip_ref.read(info.filename) df_file = read_functions(content, info.filename, zip_file) if df_file is not None: df = pd.concat([df, df_file], ignore_index=True) write(out_path, df, compression='GZIP') return zip_file except Exception as e: print(e) def read_clones(zip_dir: str): zip_files = [] for a_file in tqdm(os.listdir(zip_dir), desc="Scan dir"): path = os.path.join(zip_dir, a_file) out_path = os.path.join(OUT_DIR, os.path.basename(path) + ".pq") if zipfile.is_zipfile(path) and not os.path.isfile(out_path): zip_files.append(path) num_processes = 192 with multiprocessing.Manager(): with multiprocessing.Pool(processes=num_processes) as pool: for _ in tqdm(pool.imap_unordered(read_zip_file, zip_files), desc="Read ZIPs", unit="item", total=len(zip_files), position=None, leave=True): pass # dummy iteration to consume multiprocessing iterator, needed to launch processes def main(): if not os.path.isdir(OUT_DIR): os.makedirs(OUT_DIR) read_clones(IN_DIR) if __name__ == "__main__": main()