import numpy from transformers import TokenClassificationPipeline,AutoTokenizer try: from transformers.utils import cached_file except: from transformers.file_utils import cached_path,hf_bucket_url cached_file=lambda x,y:os.path.join(x,y) if os.path.isdir(x) else cached_path(hf_bucket_url(x,y)) class BellmanFordTokenClassificationPipeline(TokenClassificationPipeline): def __init__(self,**kwargs): super().__init__(**kwargs) x=self.model.config.label2id y=[k for k in x if k.startswith("B-") or not (k.startswith("I-") or k.endswith("|root") or k.find("|l-")>0 or k.find("|r-")>0)] self.transition=numpy.full((len(x),len(x)),numpy.nan) for k,v in x.items(): for j in ["I-"+k[2:]] if k.startswith("B-") else [k]+y if k.startswith("I-") else y: self.transition[v,x[j]]=0 def check_model_type(self,supported_models): pass def postprocess(self,model_outputs,**kwargs): if "logits" not in model_outputs: return self.postprocess(model_outputs[0],**kwargs) m=model_outputs["logits"][0].numpy() e=numpy.exp(m-numpy.max(m,axis=-1,keepdims=True)) z=e/e.sum(axis=-1,keepdims=True) for i in range(m.shape[0]-1,0,-1): m[i-1]+=numpy.nanmax(m[i]+self.transition,axis=1) k=[numpy.nanargmax(m[0]+self.transition[0])] for i in range(1,m.shape[0]): k.append(numpy.nanargmax(m[i]+self.transition[k[-1]])) w=[{"entity":self.model.config.id2label[j],"start":s,"end":e,"score":z[i,j]} for i,((s,e),j) in enumerate(zip(model_outputs["offset_mapping"][0].tolist(),k)) if s0: self.left_arc[v]=0 elif k.find("|r-")>0: self.right_arc[v]=0 def postprocess(self,model_outputs,**kwargs): import torch if "logits" not in model_outputs: return self.postprocess(model_outputs[0],**kwargs) m=model_outputs["logits"][0].numpy() for i in range(m.shape[0]-1,0,-1): m[i-1]+=numpy.nanmax(m[i]+self.transition,axis=1) k=[numpy.nanargmax(m[0]+self.transition[0])] for i in range(1,m.shape[0]): k.append(numpy.nanargmax(m[i]+self.transition[k[-1]])) w=[{"entity":self.model.config.id2label[j],"start":s,"end":e} for i,((s,e),j) in enumerate(zip(model_outputs["offset_mapping"][0].tolist(),k)) if s0 and w[i-1]["end"]>w[i]["start"]: w[i-1]["end"]=max(w.pop(i)["end"],w[i-1]["end"]) elif p.startswith("B-"): t["entity_group"]=p[2:] else: t["entity_group"]=p d=[model_outputs["sentence"][t["start"]:t["end"]] for t in w] for i in range(len(d)-1,-1,-1): if d[i].startswith(" "): j=len(d[i])-len(d[i].lstrip()) d[i]=d[i].lstrip() w[i]["start"]+=j if d[i].endswith(" "): j=len(d[i])-len(d[i].rstrip()) d[i]=d[i].rstrip() w[i]["end"]-=j if d[i].strip()=="": d.pop(i) w.pop(i) v=self.oldtokenizer(d,add_special_tokens=False) e=self.model.get_input_embeddings().weight m=[] for x in v["input_ids"]: if x==[]: x=[self.tokenizer.unk_token_id] m.append(e[x,:].sum(axis=0)) m.append(e[self.tokenizer.sep_token_id,:]) m.append(e[self.tokenizer.pad_token_id,:]) m=torch.stack(m).to(self.device) k=list(range(len(d)+1)) e=[] with torch.no_grad(): for i in range(len(d)): e.append(self.model(inputs_embeds=torch.unsqueeze(m[k+list(range(i,len(d)))+[-1]*i,:],0)).logits[0,-len(d):,:]) e=torch.stack(e).cpu().numpy() for i in range(len(d)): for j in range(i): e[-j-1,-i-1],e[-i-1,-j-1]=e[-i-1,i-j]+self.left_arc,e[-i-1,i-j]+self.right_arc e[-i-1,-i-1]=e[-i-1,0]+self.root m,p=numpy.nanmax(e,axis=2),numpy.nanargmax(e,axis=2) h=self.chu_liu_edmonds(m) z=[i for i,j in enumerate(h) if i==j] if len(z)>1: k,h=z[numpy.nanargmax(m[z,z])],numpy.nanmin(m)-numpy.nanmax(m) m[:,z]+=[[0 if j in z and (i!=j or i==k) else h for i in z] for j in range(m.shape[0])] h=self.chu_liu_edmonds(m) q=[self.model.config.id2label[p[j,i]].split("|") for i,j in enumerate(h)] t=model_outputs["sentence"].replace("\n"," ") u="# text = "+t+"\n" for i,j in enumerate(d): u+="\t".join([str(i+1),j,"_",q[i][0],"_","_" if len(q[i])<3 else "|".join(q[i][1:-1]),str(0 if h[i]==i else h[i]+1),"root" if q[i][-1]=="root" else q[i][-1][2:],"_","_" if i+1