from imports import * from utils import normalize, replace_all class NerFeatures(object): def __init__(self, input_ids, token_type_ids, attention_mask, valid_ids, labels, label_masks): self.input_ids = torch.as_tensor(input_ids, dtype=torch.long) self.labels = torch.as_tensor(labels, dtype=torch.long) self.token_type_ids = torch.as_tensor(token_type_ids, dtype=torch.long) self.attention_mask = torch.as_tensor(attention_mask, dtype=torch.long) self.valid_ids = torch.as_tensor(valid_ids, dtype=torch.long) self.label_masks = torch.as_tensor(label_masks, dtype=torch.long) class NerOutput(OrderedDict): loss: Optional[torch.FloatTensor] = torch.FloatTensor([0.0]) tags: Optional[List[int]] = [] cls_metrics: Optional[List[int]] = [] def __getitem__(self, k): if isinstance(k, str): inner_dict = {k: v for (k, v) in self.items()} return inner_dict[k] else: return self.to_tuple()[k] def __setattr__(self, name, value): if name in self.keys() and value is not None: super().__setitem__(name, value) super().__setattr__(name, value) def __setitem__(self, key, value): super().__setitem__(key, value) super().__setattr__(key, value) def to_tuple(self) -> Tuple[Any]: return tuple(self[k] for k in self.keys()) class NerDataset(Dataset): def __init__(self, features: List[NerFeatures], device: str = 'cpu'): self.examples = features self.device = device def __len__(self): return len(self.examples) def __getitem__(self, index): return {key: val.to(self.device) for key, val in self.examples[index].__dict__.items()} # return sentiment dataset at tensor type def sentiment_dataset(path_folder, train_file_name, test_file_name): def extract(path): data = pd.read_csv(os.path.join(path), encoding="utf-8").dropna() label = [np.argmax(i) for i in data[["negative", "positive", "neutral"]].values.astype(float)] # text = data["text"].apply(lambda x: x.replace("_"," ")) text = data["text"]#.apply(lambda x: normalize(x)) return text, label x_train, y_train = extract(os.path.join(path_folder, train_file_name)) x_test, y_test = extract(os.path.join(path_folder, test_file_name)) train_set = datasets.Dataset.from_pandas(pd.DataFrame(data=zip(x_train,y_train), columns=['text','label'])) test_set = datasets.Dataset.from_pandas(pd.DataFrame(data=zip(x_test,y_test), columns=['text','label'])) custom_dt = datasets.DatasetDict({'train': train_set, 'test': test_set}) tokenizer = AutoTokenizer.from_pretrained('wonrax/phobert-base-vietnamese-sentiment', use_fast=False) def tokenize(batch): return tokenizer(list(batch['text']), padding=True, truncation=True) custom_tokenized = custom_dt.map(tokenize, batched=True, batch_size=None) custom_tokenized.set_format('torch',columns=["input_ids", 'token_type_ids', "attention_mask", "label"]) return custom_tokenized # support function for ner task def get_dict_map(data, mode="token"): if mode == "token": vocab = list(set([j[0] for i in data for j in i])) else: vocab = list(set([j[1] for i in data for j in i])) idx2tok = {idx:tok for idx, tok in enumerate(vocab)} tok2idx = {tok:idx for idx, tok in enumerate(vocab)} return tok2idx, idx2tok def read_csv_to_ner_data(path): data = pd.read_csv(path, encoding="utf-8") tok = list(data["token"]) tok = [replace_all(i) for i in tok] lab = list(data["label"]) token = [] label = [] tmp = [] tmp_ = [] for i, txt in enumerate(tok): if str(txt) != "nan": tmp.append(txt) tmp_.append(lab[i]) else: token.append(tmp) label.append(tmp_) tmp = [] tmp_ = [] data = [] tmp = [] for i, sent in enumerate(token): for j, tok in enumerate(sent): tmp.append([tok, label[i][j]]) data.append(tmp) tmp = [] return data # get feature for ner task def feature_for_phobert(data, tokenizer, max_seq_len: int=256, use_crf: bool = False) -> List[NerFeatures]: features = [] tokens = [] tag_ids = [] # args = parse_arguments() path = os.path.abspath("./data/topic") file_name = os.listdir(path)[0] df = read_csv_to_ner_data(os.path.join(path, file_name)) tag2idx, idx2tag = get_dict_map(df, 'tag') for id, tokens in enumerate(data): if tokens == []: continue tag_ids = [tag2idx[i[1]] for i in tokens] seq_len = len(tokens) sentence = ' '.join([tok[0] for tok in tokens]) encoding = tokenizer(sentence, padding='max_length', truncation=True, max_length=max_seq_len) subwords = tokenizer.tokenize(sentence) valid_ids = np.zeros(len(encoding.input_ids), dtype=int) label_marks = np.zeros(len(encoding.input_ids), dtype=int) valid_labels = np.ones(len(encoding.input_ids), dtype=int) * -100 i = 1 for idx, subword in enumerate(subwords): # subwords[:max_seq_len-2] if idx != 0 and subwords[idx-1].endswith("@@"): continue if use_crf: valid_ids[i-1] = idx + 1 else: valid_ids[idx+1] = 1 valid_labels[idx+1] = tag_ids[i-1] i += 1 if max_seq_len >= seq_len: label_padding_size = (max_seq_len - seq_len) label_marks[:seq_len] = [1] * seq_len tag_ids.extend([0] * label_padding_size) else: tag_ids = tag_ids[:max_seq_len] label_marks[:-2] = [1] * (max_seq_len - 2) tag_ids[-2:] = [0] * 2 if use_crf and label_marks[0] == 0: try: raise f"{sentence} - {tag_ids} have mark == 0 at index 0!" except: print(f"{sentence} - {tag_ids} have mark == 0 at index 0!") break items = {key: val for key, val in encoding.items()} items['labels'] = tag_ids if use_crf else valid_labels items['valid_ids'] = valid_ids items['label_masks'] = label_marks if use_crf else valid_ids features.append(NerFeatures(**items)) for k, v in items.items(): assert len(v) == max_seq_len, f"Expected length of {k} is {max_seq_len} but got {len(v)}" tokens = [] tag_ids = [] return features # create ner dataset def topic_dataset(path_folder, file_name, tokenizer, use_crf=True): data = read_csv_to_ner_data(os.path.join(path_folder, file_name)) train_data, test_data = train_test_split(data, test_size=0.2, random_state=42) # token2idx, idx2token = get_dict_map(train_data+test_data, 'token') #tag2idx, idx2tag = get_dict_map(data, 'tag') train_set = NerDataset(feature_for_phobert(train_data, tokenizer=tokenizer, use_crf=use_crf)) test_set = NerDataset(feature_for_phobert(test_data, tokenizer=tokenizer, use_crf=use_crf)) return train_set, test_set