from model import Model import pandas as pd from re import match class Data: """Container for input and output data""" # initialise empty model as static class member for efficiency model = Model() def parse_seq(self, src:str): "parse input sequence" self.seq = src.strip().upper() if not all(x in self.model.alphabet for x in src): raise RuntimeError("Unrecognised characters in sequence") def parse_sub(self, trg:str): "parse input substitutions" self.mode = None self.sub = list() self.trg = trg.strip().upper() # identify running mode if len(self.trg.split()) == 1 and len(self.trg.split()[0]) == len(self.seq): # if single string of same length as sequence, seq vs seq mode self.mode = 'SVS' for resi,(src,trg) in enumerate(zip(self.seq, self.trg), 1): if src != trg: self.sub.append(f"{src}{resi}{trg}") else: self.trg = self.trg.split() if all(match(r'\d+', x) for x in self.trg): # if all strings are numbers, deep mutational scanning mode self.mode = 'DMS' for resi in map(int, self.trg): src = self.seq[resi-1] for trg in "ACDEFGHIKLMNPQRSTVWY".replace(src,''): self.sub.append(f"{src}{resi}{trg}") elif all(match(r'[A-Z]\d+[A-Z]', x) for x in self.trg): # if all strings are of the form X#Y, single substitution mode self.mode = 'MUT' self.sub = self.trg else: raise RuntimeError("Unrecognised running mode; wrong inputs?") self.sub = pd.DataFrame(self.sub, columns=['0']) def __init__(self, src:str, trg:str, model_name:str, scoring_strategy:str, out_file): "initialise data" # if model has changed, load new model if self.model.model_name != model_name: self.model_name = model_name self.model = Model(model_name) self.parse_seq(src) self.parse_sub(trg) self.scoring_strategy = scoring_strategy self.out = pd.DataFrame(self.sub, columns=['0', self.model_name]) self.out_buffer = out_file.name def parse_output(self) -> str: "format output data for visualisation" if self.mode == 'MUT': # if single substitution mode, sort by score self.out = self.out.sort_values(self.model_name, ascending=False) elif self.mode == 'DMS': # if deep mutational scanning mode, sort by residue and score self.out = pd.concat([(self.out.assign(resi=self.out['0'].str.extract(r'(\d+)', expand=False).astype(int)) # FIX: this doesn't work if there's jolly characters in the input sequence .sort_values(['resi', self.model_name], ascending=[True,False]) .groupby(['resi']) .head(19) .drop(['resi'], axis=1)).iloc[19*x:19*(x+1)] .reset_index(drop=True) for x in range(self.out.shape[0]//19)] , axis=1).set_axis(range(self.out.shape[0]//19*2), axis='columns') # save to temporary file to be downloaded self.out.round(2).to_csv(self.out_buffer, index=False) return (self.out.style .format(lambda x: f'{x:.2f}' if isinstance(x, float) else x) .hide(axis=0) .hide(axis=1) .background_gradient(cmap="RdYlGn", vmax=8, vmin=-8) .to_html(justify='center')) def calculate(self): "run model and parse output" self.model.run_model(self) return self.parse_output()