Spaces:
Runtime error
Runtime error
| import os, subprocess, pydriller,json, pandas as pd | |
| import sys | |
| from dotenv import dotenv_values | |
| from Database import Database | |
| class RefactorAnalysis: | |
| def __init__(self,input_path="",output_path=""): | |
| if input_path=="": | |
| self.input_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),"data","refactoring-toy-example") | |
| else: | |
| self.input_path=input_path | |
| if output_path=="": | |
| self.output_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),"output_ref","output.json") | |
| else: | |
| self.output_path=output_path | |
| def generate_refactor_details(self): | |
| # ref_miner_bin = os.path.join(os.path.dirname(os.path.abspath(__file__)),"executable","RefactoringMiner","bin") | |
| ref_miner_bin = os.path.abspath("executable/RefactoringMiner/bin") | |
| # command = ["cd",ref_miner_bin,"&&","sh","RefactoringMiner","-a",self.input_path,"-json",self.output_path] | |
| command = ["sh","RefactoringMiner","-a",self.input_path,"-json",self.output_path] | |
| try: | |
| os.chdir(ref_miner_bin) | |
| shell_result = subprocess.run(command,capture_output=True,text=True) | |
| shell_result.check_returncode() | |
| # if shell_result!=0: | |
| # raise Exception("Couldn't analyze repository - "+self.input_path+" with RefactorMiner") | |
| # return 0 | |
| except subprocess.CalledProcessError as error: | |
| print(error) | |
| sys.exit() | |
| except Exception as e: | |
| print(e) | |
| return 1 | |
| def parse_json_output(self): | |
| #TODO | |
| #Filter for Method Refs | |
| with open(self.output_path) as f: | |
| json_output = json.load(f) | |
| dict_output = {} | |
| for obj in json_output["commits"]: | |
| if len(obj["refactorings"])==0: | |
| continue | |
| changes = [] | |
| se_lines = [] | |
| for ref in obj["refactorings"]: | |
| if not "Method" in ref["type"]: | |
| continue | |
| for parent_refs in ref["leftSideLocations"]: | |
| changes.append(parent_refs["filePath"]) | |
| se_lines.append((parent_refs["startLine"],parent_refs["endLine"])) | |
| # list_output.append(dict_output) | |
| dict_output[obj["sha1"]]={ | |
| "paths":changes, | |
| "ref_start_end":se_lines, | |
| "ref_type":ref["type"] | |
| } | |
| return dict_output | |
| def create_project_dataframe(self): | |
| df = pd.DataFrame(columns=['commit','refactoring_type','filename','meth_rf_neg','method_refactored']) | |
| parse_output_dict = self.parse_json_output() | |
| commits_to_analyze = list(parse_output_dict.keys()) | |
| for commit in pydriller.Repository(self.input_path, only_commits=commits_to_analyze).traverse_commits(): | |
| ref_list = parse_output_dict.get(commit.hash) | |
| ref_path_name = list(map(lambda x: str(x).split("/")[len(str(x).split("/"))-1],ref_list["paths"])) | |
| for cf in commit.modified_files: | |
| try: | |
| index_ref = ref_path_name.index(cf.filename) | |
| except ValueError as ve: | |
| continue | |
| if len(cf.changed_methods)==0: | |
| continue | |
| #Diff between methods_changed and methods_before - does methods_changed reduces loop else we have to loop for all methods | |
| for cm in cf.changed_methods: | |
| if cm.start_line<=ref_list["ref_start_end"][index_ref][0] and cm.end_line>=ref_list["ref_start_end"][index_ref][1]: | |
| method_source_code = self.__split_and_extract_methods(cf.source_code_before,cm.start_line,cm.end_line) | |
| method_source_code_neg = self.__split_and_extract_methods(cf.source_code,cm.start_line,cm.end_line) | |
| class_source_code = cf.source_code_before | |
| # df_row = {"commit":commit.hash,"refactoring_type":ref_list["ref_type"],"filename":cf.filename, "meth_rf_neg":class_source_code,"method_refactored":method_source_code} | |
| df_row = {"commit":commit.hash,"refactoring_type":ref_list["ref_type"],"filename":cf.filename, "meth_rf_neg":method_source_code_neg,"method_refactored":method_source_code} | |
| df.loc[len(df)] = df_row | |
| return df | |
| def __split_and_extract_methods(self, source_code,start_line, end_line): | |
| source_code_lines = str(source_code).splitlines() | |
| return "\n".join(source_code_lines[start_line-1:end_line]) | |
| def main(): | |
| if not os.path.exists("data/repos/"): | |
| try: | |
| print("Starting repo download") | |
| repo_script = subprocess.run(["python","repo_download.py"], capture_output=True, text=True) | |
| repo_script.check_returncode() | |
| except subprocess.CalledProcessError as err: | |
| print(err) | |
| sys.exit(1) | |
| print("Repo Download Completed") | |
| lst_repos = next(os.walk("data/repos/"))[1] | |
| print(len(lst_repos)) | |
| cwd = os.path.dirname(os.path.abspath(__file__)) | |
| final_df = pd.DataFrame(columns=['commit','refactoring_type','filename','meth_rf_neg','method_refactored']) | |
| database = Database(dotenv_values(".env")['COLLECTION_NAME']) | |
| # database.connect_db() | |
| count=1 | |
| batch_size = 5 | |
| for idx,repo in enumerate(lst_repos): | |
| os.chdir(cwd) | |
| try: | |
| ref_obj = RefactorAnalysis(os.path.abspath(os.path.join("data/repos",repo)),os.path.abspath(os.path.join("output_ref",repo+".json"))) | |
| # ref_miner = ref_obj.generate_refactor_details() #Modify | |
| df = ref_obj.create_project_dataframe() | |
| except Exception as e: | |
| print(e) | |
| continue | |
| final_df = pd.concat([final_df,df], ignore_index=True) | |
| if count==batch_size or idx==len(lst_repos)-1: | |
| print("Inserting into DB", idx) | |
| database.insert_docs(final_df.to_dict(orient="records")) | |
| final_df = pd.DataFrame(columns=['commit','refactoring_type','filename','meth_rf_neg','method_refactored']) | |
| count=1 | |
| else: | |
| count+=1 | |
| if __name__=="__main__": | |
| main() |