from cherche import retrieve from sentence_transformers import SentenceTransformer, util from transformers import RobertaTokenizer, RobertaModel, EncoderDecoderModel from config import classifier_class_mapping, config import pandas as pd import numpy as np import pickle import torch from sklearn.multiclass import OneVsRestClassifier from sklearn.ensemble import RandomForestClassifier import spacy # nlp = spacy.load("en_core_web_trf") nlp = spacy.load("en_core_web_sm") class wrappedTokenizer(RobertaTokenizer): def __call__(self, text_input): return self.tokenize(text_input) def generate_index(db): db_cp = db.copy() index_list = [] for id_, dirname in db_cp.values: index_list.append( { 'id': id_, 'library': dirname.lower() }) return index_list def load_db(db_metadata_path, db_constructor_path, db_params_path, exclusion_list_path): ''' Function to load dataframe Params: db_metadata_path (string): the path to the db_metadata file db_constructor_path (string): the path to the db_constructor file Output: db_metadata (pandas dataframe): a dataframe containing metadata information about the library db_constructor (pandas dataframe): a dataframe containing the mapping of library names to valid constructor ''' db_metadata = pd.read_csv(db_metadata_path) db_metadata.dropna(inplace=True) db_constructor = pd.read_csv(db_constructor_path) db_constructor.dropna(inplace=True) db_params = pd.read_csv(db_params_path) db_params.dropna(inplace=True) with open(exclusion_list_path, 'r') as f: ex_list = f.read() ex_list = ex_list.split("\n") return db_metadata, db_constructor, db_params, ex_list def load_retrieval_model_lexical(tokenizer_path, max_k, db_metadata): ''' Function to load BM25 model Params: tokenizer_path (string): the path to a tokenizer (can be a path to either a huggingface model or local directory) max_k (int): the maximum number of returned sequences db_metadata (pandas dataframe): a dataframe containing metadata information about the library Returns: retrieval_model: a retrieval model ''' # generate index index_list = generate_index(db_metadata[['id', 'library']]) # load model tokenizer = wrappedTokenizer.from_pretrained(tokenizer_path) retrieval_model = retrieve.BM25Okapi( key='id', on='library', documents=index_list, k=max_k, tokenizer=tokenizer ) return retrieval_model def load_retrieval_model_deep_learning(model_path, max_k, db_metadata): ''' Function to load a deep learning-based model Params: model_path (string): the path to the model (can be a path to either a huggingface model or local directory) max_k (int): the maximum number of returned sequences db_metadata (pandas dataframe): a dataframe containing metadata information about the library Returns: retrieval_model: a retrieval model ''' # generate index index_list = generate_index(db_metadata[['id', 'library']]) # load model retrieval_model = retrieve.Encoder( key='id', on='library', encoder=SentenceTransformer(model_path).encode, k=max_k, path=f"../temp/dl.pkl" ) retrieval_model = dl_retriever.add(documents=index_list) return retrieval_model def load_generative_model_codebert(model_path): ''' Function load a generative model using codebert checkpoint Params: model_path (string): path to the model (can be a path to either a huggingface model or local directory) Returns: tokenizer: a huggingface tokenizer generative_model: a generative model to generate API pattern given the library name as the input ''' tokenizer = RobertaTokenizer.from_pretrained(model_path) generative_model = EncoderDecoderModel.from_pretrained(model_path) return tokenizer, generative_model def get_metadata_library(predictions, db_metadata): ''' Function to get the metadata of a library using the library unique id Params: predictions (list): a list of dictionary containing the prediction details db_metadata: a dataframe containing metadata information about the library Returns: metadata_dict (dict): a dictionary where the key is the metadata type and the value is the metadata value ''' predictions_cp = predictions.copy() for prediction_dict in predictions_cp: temp_db = db_metadata[db_metadata.id==prediction_dict.get('id')] assert(len(temp_db)==1) prediction_dict['Sensor Type'] = temp_db.iloc[0]['cat'].capitalize() prediction_dict['Github URL'] = temp_db.iloc[0]['url'] # prefer the description from the arduino library list, if not found use the repo description if temp_db.iloc[0].desc_ardulib != 'nan': prediction_dict['Description'] = temp_db.iloc[0].desc_ardulib elif temp_db.iloc[0].desc_repo != 'nan': prediction_dict['Description'] = temp_db.iloc[0].desc_repo else: prediction_dict['Description'] = "Description not found" return predictions_cp def id_to_libname(id_, db_metadata): ''' Function to convert a library id to its library name Params: id_ (int): a unique library id db_metadata (pandas dataframe): a dataframe containing metadata information about the library Returns: library_name (string): the library name that corresponds to the input id ''' temp_db = db_metadata[db_metadata.id==id_] assert(len(temp_db)==1) library_name = temp_db.iloc[0].library return library_name def retrieve_libraries(retrieval_model, model_input, db_metadata): ''' Function to retrieve a set of relevant libraries using a model based on the input query Params: retrieval_model: a model to perform retrieval model_input (string): an input query from the user Returns: library_ids (list): a list of library unique ids library_names (list): a list of library names ''' results = retrieval_model(model_input) library_ids = [item.get('id') for item in results] scores = [item.get('similarity') for item in results] library_names = [id_to_libname(item, db_metadata) for item in library_ids] return library_ids, library_names, scores def prepare_input_generative_model(library_ids, db_constructor): ''' Function to prepare the input of the model to generate API usage patterns Params: library_ids (list): a list of library ids db_constructor (pandas dataframe): a dataframe containing the mapping of library names to valid constructor Returns: output_dict (dictionary): a dictionary where the key is library id and the value is a list of valid inputs ''' output_dict = {} for id_ in library_ids: temp_db = db_constructor[db_constructor.id==id_] output_dict[id_] = [] for id__, library_name, methods, constructor in temp_db.values: output_dict[id_].append( f'{library_name} [SEP] {constructor}' ) return output_dict def generate_api_usage_patterns(generative_model, tokenizer, model_input, num_beams, num_return_sequences, max_length): ''' Function to generate API usage patterns Params: generative_model: a huggingface model tokenizer: a huggingface tokenizer model_input (string): a string in the form of [SEP] constructor num_beams (int): the beam width used for decoding num_return_sequences (int): how many API usage patterns are returned by the model Returns: api_usage_patterns (list): a list of API usage patterns ''' model_input = tokenizer(model_input, return_tensors='pt').input_ids model_output = generative_model.generate( model_input, num_beams=num_beams, num_return_sequences=num_return_sequences, early_stopping=True, max_length=max_length ) api_usage_patterns = tokenizer.batch_decode( model_output, skip_special_tokens=True ) return api_usage_patterns def add_params(api_usage_patterns, db_params, library_id): patterns_cp = api_usage_patterns.copy() processed_sequences = [] for sequence in patterns_cp: sequence_list = sequence.split() if len(sequence_list) < 2: continue temp_list = [] ref_obj = '' valid = True for api in sequence_list: temp_db = db_params[(db_params.id==library_id) & (db_params.methods==api.split(".")[-1])] if ref_obj == '': ref_obj = api.split(".")[0] if len(temp_db) > 0 and ref_obj == api.split(".")[0]: param = temp_db.iloc[0].params new_api = api + param temp_list.append(new_api) else: valid = False break if valid: processed_sequences.append("[API-SEP]".join(temp_list)) return processed_sequences def generate_api_usage_patterns_batch(generative_model, tokenizer, library_ids, db_constructor, db_params, num_beams, num_return_sequences, max_length): ''' Function to generate API usage patterns in batch Params: generative_model: a huggingface model tokenizer: a huggingface tokenizer library_ids (list): a list of libary ids db_constructor (pandas dataframe): a dataframe containing the mapping of library names to valid constructor num_beams (int): the beam width used for decoding num_return_sequences (int): how many API usage patterns are returned by the model Returns: predictions (list): a list of dictionary containing the api usage patterns, library name, and id ''' input_generative_model_dict = prepare_input_generative_model(library_ids, db_constructor) predictions = [] for id_ in input_generative_model_dict: temp_dict = { 'id': id_, 'library_name': None, 'hw_config': None, 'usage_patterns': {} } for input_generative_model in input_generative_model_dict.get(id_): api_usage_patterns = generate_api_usage_patterns( generative_model, tokenizer, input_generative_model, num_beams, num_return_sequences, max_length ) temp = input_generative_model.split("[SEP]") library_name = temp[0].strip() constructor = temp[1].strip() assert(constructor not in temp_dict.get('usage_patterns')) api_usage_patterns = add_params(api_usage_patterns, db_params, id_) temp_dict['usage_patterns'][constructor] = api_usage_patterns assert(temp_dict.get('library_name')==None) temp_dict['library_name'] = library_name predictions.append(temp_dict) return predictions # def generate_api_usage_patterns(generative_model, tokenizer, model_inputs, num_beams, num_return_sequences): # ''' # Function to generate API usage patterns # Params: # generative_model: a huggingface model # tokenizer: a huggingface tokenizer # model_inputs (list): a list of [SEP] # num_beams (int): the beam width used for decoding # num_return_sequences (int): how many API usage patterns are returned by the model # Returns: # api_usage_patterns (list): a list of API usage patterns # ''' # model_inputs = tokenizer( # model_inputs, # max_length=max_length, # padding='max_length', # return_tensors='pt', # truncation=True) # model_output = generative_model.generate( # **model_inputs, # num_beams=num_beams, # num_return_sequences=num_return_sequences # ) # api_usage_patterns = tokenizer.batch_decode( # model_output, # skip_special_tokens=True # ) # api_usage_patterns = [api_usage_patterns[i:i+num_return_sequences] for i in range(0, len(api_usage_patterns), num_return_sequences)] # return api_usage_patterns def prepare_input_classification_model(id_, db_metadata): ''' Function to get a feature for a classification model using library id Params: id_ (int): a unique library id db_metadata (pandas dataframe): a dataframe containing metadata information about the library Returns: feature (string): a feature used for the classification model input ''' temp_db = db_metadata[db_metadata.id == id_] assert(len(temp_db)==1) feature = temp_db.iloc[0].features return feature def load_hw_classifier(model_path_classifier, model_path_classifier_head): ''' Function to load a classifier model and classifier head Params: model_path_classifier (string): path to the classifier checkpoint (can be either huggingface path or local directory) model_path_classifier_head (string): path to the classifier head checkpoint (should be a local directory) Returns: classifier_model: a huggingface model classifier_head: a classifier model (can be either svm or rf) tokenizer: a huggingface tokenizer ''' tokenizer = RobertaTokenizer.from_pretrained(model_path_classifier) classifier_model = RobertaModel.from_pretrained(model_path_classifier) with open(model_path_classifier_head, 'rb') as f: classifier_head = pickle.load(f) return classifier_model, classifier_head, tokenizer def predict_hw_config(classifier_model, classifier_tokenizer, classifier_head, library_ids, db_metadata, max_length): ''' Function to predict hardware configs Params: classifier_model: a huggingface model to convert a feature to a feature vector classifier_tokenizer: a huggingface tokenizer classifier_head: a classifier head library_ids (list): a list of library ids db_metadata (pandas dataframe): a dataframe containing metadata information about the library max_length (int): max length of the tokenizer output Returns: prediction (list): a list of prediction ''' features = [prepare_input_classification_model(id_, db_metadata) for id_ in library_ids] tokenized_features = classifier_tokenizer( features, max_length=max_length, padding='max_length', return_tensors='pt', truncation=True ) with torch.no_grad(): embedding_features = classifier_model(**tokenized_features).pooler_output.numpy() prediction = classifier_head.predict_proba(embedding_features).tolist() prediction = np.argmax(prediction, axis=1).tolist() prediction = [classifier_class_mapping.get(idx) for idx in prediction] return prediction def initialize_all_components(config): ''' Function to initialize all components of ArduProg Params: config (dict): a dictionary containing the configuration to initialize all components Returns: db_metadata (pandas dataframe): a dataframe containing metadata information about the library db_constructor (pandas dataframe): a dataframe containing the mapping of library names to valid constructor model_retrieval, model_generative : a huggingface model tokenizer_generative, tokenizer_classifier: a huggingface tokenizer model_classifier: a huggingface model classifier_head: a random forest model ''' # load db db_metadata, db_constructor, db_params, ex_list = load_db( config.get('db_metadata_path'), config.get('db_constructor_path'), config.get('db_params_path'), config.get('exclusion_list_path') ) # load model model_retrieval = load_retrieval_model_lexical( config.get('tokenizer_path_retrieval'), config.get('max_k'), db_metadata, ) tokenizer_generative, model_generative = load_generative_model_codebert(config.get('model_path_generative')) model_classifier, classifier_head, tokenizer_classifier = load_hw_classifier( config.get('model_path_classifier'), config.get('classifier_head_path') ) return db_metadata, db_constructor, db_params, ex_list, model_retrieval, model_generative, tokenizer_generative, model_classifier, classifier_head, tokenizer_classifier def make_predictions(input_query, model_retrieval, model_generative, model_classifier, classifier_head, tokenizer_generative, tokenizer_classifier, db_metadata, db_constructor, db_params, ex_list, config): ''' Function to retrieve relevant libraries, generate API usage patterns, and predict the hw configs Params: input_query (string): a query from the user model_retrieval, model_generative, model_classifier: a huggingface model classifier_head: a random forest classifier toeknizer_generative, tokenizer_classifier: a hugggingface tokenizer, db_metadata (pandas dataframe): a dataframe containing metadata information about the library db_constructor (pandas dataframe): a dataframe containing the mapping of library names to valid constructor config (dict): a dictionary containing the configuration to initialize all components Returns: predictions (list): a list of dictionary containing the prediction details ''' print("retrieve libraries") queries = extract_keywords(input_query.lower(), ex_list) temp_list = [] for query in queries: temp_library_ids, temp_library_names, temp_scores = retrieve_libraries(model_retrieval, query, db_metadata) if len(temp_library_ids) > 0: for id_, name, score in zip(temp_library_ids, temp_library_names, temp_scores): temp_list.append((id_, name, score)) library_ids = [] library_names = [] if len(temp_list) > 0: sorted_list = sorted(temp_list, key=lambda tup: tup[2], reverse=True) sorted_list = sorted_list[:config.get('max_k')] for item in sorted_list: library_ids.append(item[0]) library_names.append(item[1]) if len(library_ids) == 0: print("null libraries") return "null" print("generate usage patterns") predictions = generate_api_usage_patterns_batch( model_generative, tokenizer_generative, library_ids, db_constructor, db_params, config.get('num_beams'), config.get('num_return_sequences'), config.get('max_length_generate') ) print("generate hw configs") hw_configs = predict_hw_config( model_classifier, tokenizer_classifier, classifier_head, library_ids, db_metadata, config.get('max_length') ) for output_dict, hw_config in zip(predictions, hw_configs): output_dict['hw_config'] = hw_config print("finished the predictions") predictions = get_metadata_library(predictions, db_metadata) return predictions def extract_series(x): name = x.replace("-", " ").replace("_", " ") name = name.split() series = [] for token in name: if token.isalnum() and not(token.isalpha()) and not(token.isdigit()): series.append(token) if len(series) > 0: return series else: return [x] def extract_keywords(query, ex_list): doc = nlp(query) keyword_candidates = [] # extract keywords for chunk in doc.noun_chunks: temp_list = [] for token in chunk: if token.text not in ex_list and token.pos_ not in ("DET", "PRON", "CCONJ", "NUM"): temp_list.append(token.text) if len(temp_list) > 0: keyword_candidates.append(" ".join(temp_list)) filtered_keyword_candidates = [] for keyword in keyword_candidates: temp_candidates = extract_series(keyword) for keyword in temp_candidates: if len(keyword.split()) > 1: doc = nlp(keyword) for chunk in doc.noun_chunks: filtered_keyword_candidates.append(chunk.root.text) else: filtered_keyword_candidates.append(keyword) if len(filtered_keyword_candidates) == 0: filtered_keyword_candidates.append(query) return filtered_keyword_candidates