kcelia commited on
Commit
4fdac74
1 Parent(s): 49a1dd4

chore: update

Browse files
Files changed (2) hide show
  1. app.py +56 -47
  2. utils.py +4 -4
app.py CHANGED
@@ -27,38 +27,39 @@ time.sleep(3)
27
  # pylint: disable=c-extension-no-member,invalid-name
28
 
29
 
30
- def is_nan(inputs) -> bool:
31
  """
32
- Check if the input is NaN.
33
 
34
  Args:
35
- inputs (any): The input to be checked.
36
 
37
  Returns:
38
- bool: True if the input is NaN or empty, False otherwise.
39
  """
40
- return inputs is None or (inputs is not None and len(inputs) < 1)
41
 
42
 
 
 
43
  # def fill_in_fn(default_disease: str, *checkbox_symptoms: Tuple[str]) -> Dict:
44
  # """
45
- # Fill in the gr.CheckBoxGroup list with the predefined symptoms of a selected default disease.
46
-
47
  # Args:
48
- # default_disease (str): The default disease
49
- # *checkbox_symptoms (Tuple[str]): Tuple of selected symptoms
50
-
51
  # Returns:
52
  # dict: The updated gr.CheckBoxesGroup.
53
  # """
 
 
54
  # df = pd.read_csv(TRAINING_FILENAME)
55
  # df_filtred = df[df[TARGET_COLUMNS[1]] == default_disease]
56
  # symptoms = pretty_print(df_filtred.columns[df_filtred.eq(1).any()].to_list())
57
-
58
  # if any(lst for lst in checkbox_symptoms if lst):
59
  # for sublist in checkbox_symptoms:
60
  # symptoms.extend(sublist)
61
-
62
  # return {box: symptoms for box in check_boxes}
63
 
64
 
@@ -67,7 +68,7 @@ def get_user_symptoms_from_checkboxgroup(checkbox_symptoms: List) -> np.array:
67
  Convert the user symptoms into a binary vector representation.
68
 
69
  Args:
70
- checkbox_symptoms (list): A list of user symptoms.
71
 
72
  Returns:
73
  np.array: A binary vector representing the user's symptoms.
@@ -76,7 +77,7 @@ def get_user_symptoms_from_checkboxgroup(checkbox_symptoms: List) -> np.array:
76
  KeyError: If a provided symptom is not recognized as a valid symptom.
77
 
78
  """
79
- symptoms_vector = {key: 0 for key in valid_columns}
80
  for pretty_symptom in checkbox_symptoms:
81
  original_symptom = "_".join((pretty_symptom.lower().split(" ")))
82
  if original_symptom not in symptoms_vector.keys():
@@ -130,10 +131,10 @@ def key_gen_fn(user_symptoms: List[str]) -> Dict:
130
  """
131
  clean_directory()
132
 
133
- if is_nan(user_symptoms):
134
  print("Error: Please submit your symptoms or select a default disease.")
135
  return {
136
- error_box2: gr.update(visible=True, value="Please submit your symptoms first"),
137
  }
138
 
139
  # Generate a random user ID
@@ -142,7 +143,6 @@ def key_gen_fn(user_symptoms: List[str]) -> Dict:
142
 
143
  client = FHEModelClient(path_dir=DEPLOYMENT_DIR, key_dir=KEYS_DIR / f"{user_id}")
144
  client.load()
145
- print("CLIENT_LOADED")
146
 
147
  # Creates the private and evaluation keys on the client side
148
  client.generate_private_and_evaluation_keys()
@@ -175,7 +175,7 @@ def encrypt_fn(user_symptoms: np.ndarray, user_id: str) -> None:
175
  user_id (user): The current user's ID
176
  """
177
 
178
- if is_nan(user_id) or is_nan(user_symptoms):
179
  print("Error in encryption step: Provide your symptoms and generate the evaluation keys.")
180
  return {
181
  error_box3: gr.update(
@@ -192,7 +192,7 @@ def encrypt_fn(user_symptoms: np.ndarray, user_id: str) -> None:
192
 
193
  encrypted_quantized_user_symptoms = client.quantize_encrypt_serialize(user_symptoms)
194
  assert isinstance(encrypted_quantized_user_symptoms, bytes)
195
- encrypted_input_path = KEYS_DIR / f"{user_id}/encrypted_symptoms"
196
 
197
  with encrypted_input_path.open("wb") as f:
198
  f.write(encrypted_quantized_user_symptoms)
@@ -213,11 +213,11 @@ def send_input_fn(user_id: str, user_symptoms: np.ndarray) -> Dict:
213
  """Send the encrypted data and the evaluation key to the server.
214
 
215
  Args:
216
- user_id (int): The current user's ID
217
- user_symptoms (numpy.ndarray): The user symptoms
218
  """
219
 
220
- if is_nan(user_id) or is_nan(user_symptoms):
221
  return {
222
  error_box4: gr.update(
223
  visible=True,
@@ -227,7 +227,7 @@ def send_input_fn(user_id: str, user_symptoms: np.ndarray) -> Dict:
227
  }
228
 
229
  evaluation_key_path = KEYS_DIR / f"{user_id}/evaluation_key"
230
- encrypted_input_path = KEYS_DIR / f"{user_id}/encrypted_symptoms"
231
 
232
  if not evaluation_key_path.is_file():
233
  print(
@@ -252,7 +252,7 @@ def send_input_fn(user_id: str, user_symptoms: np.ndarray) -> Dict:
252
  # Define the data and files to post
253
  data = {
254
  "user_id": user_id,
255
- "filter": user_symptoms,
256
  }
257
 
258
  files = [
@@ -268,18 +268,20 @@ def send_input_fn(user_id: str, user_symptoms: np.ndarray) -> Dict:
268
  files=files,
269
  ) as response:
270
  print(f"Sending Data: {response.ok=}")
271
- return {error_box4: gr.update(visible=False),
272
-
273
- next_step_tab3: gr.update(visible=True),srv_resp_send_data_box: "Data sent",}
 
 
274
 
275
 
276
  def run_fhe_fn(user_id: str) -> Dict:
277
- """Send the encrypted input as well as the evaluation key to the server.
278
 
279
  Args:
280
  user_id (int): The current user's ID.
281
  """
282
- if is_nan(user_id): # or is_nan(user_symptoms):
283
  return {
284
  error_box5: gr.update(
285
  visible=True,
@@ -292,8 +294,6 @@ def run_fhe_fn(user_id: str) -> Dict:
292
  "user_id": user_id,
293
  }
294
 
295
- # Trigger the FHE execution on the encrypted previously sent
296
-
297
  url = SERVER_URL + "run_fhe"
298
 
299
  with requests.post(
@@ -302,7 +302,13 @@ def run_fhe_fn(user_id: str) -> Dict:
302
  ) as response:
303
  if not response.ok:
304
  return {
305
- error_box5: gr.update(visible=True, value="Please wait."),
 
 
 
 
 
 
306
  fhe_execution_time_box: gr.update(visible=True),
307
  }
308
  else:
@@ -311,7 +317,7 @@ def run_fhe_fn(user_id: str) -> Dict:
311
  return {
312
  error_box5: gr.update(visible=False),
313
  fhe_execution_time_box: gr.update(value=f"{response.json()} seconds"),
314
- next_step_tab4: gr.update(visible=True)
315
  }
316
 
317
 
@@ -319,11 +325,11 @@ def get_output_fn(user_id: str, user_symptoms: np.ndarray) -> Dict:
319
  """Retreive the encrypted data from the server.
320
 
321
  Args:
322
- user_id (int): The current user's ID
323
- user_symptoms (numpy.ndarray): The user symptoms
324
  """
325
 
326
- if is_nan(user_id) or is_nan(user_symptoms):
327
  return {
328
  error_box6: gr.update(
329
  visible=True,
@@ -360,14 +366,14 @@ def decrypt_fn(user_id: str, user_symptoms: np.ndarray) -> Dict:
360
  """Dencrypt the data on the `Client Side`.
361
 
362
  Args:
363
- user_id (int): The current user's ID
364
- user_symptoms (numpy.ndarray): The user symptoms
365
 
366
  Returns:
367
  Decrypted output
368
  """
369
 
370
- if is_nan(user_id) or is_nan(user_symptoms):
371
  return {
372
  error_box7: gr.update(
373
  visible=True,
@@ -377,7 +383,6 @@ def decrypt_fn(user_id: str, user_symptoms: np.ndarray) -> Dict:
377
  }
378
 
379
  # Get the encrypted output path
380
-
381
  encrypted_output_path = CLIENT_DIR / f"{user_id}_encrypted_output"
382
 
383
  if not encrypted_output_path.is_file():
@@ -398,6 +403,7 @@ def decrypt_fn(user_id: str, user_symptoms: np.ndarray) -> Dict:
398
  # Retrieve the client API
399
  client = FHEModelClient(path_dir=DEPLOYMENT_DIR, key_dir=KEYS_DIR / f"{user_id}")
400
  client.load()
 
401
  # Deserialize, decrypt and post-process the encrypted output
402
  output = client.deserialize_decrypt_dequantize(encrypted_output)
403
 
@@ -408,7 +414,7 @@ def decrypt_fn(user_id: str, user_symptoms: np.ndarray) -> Dict:
408
 
409
 
410
  def reset_fn():
411
- """Clear all the box outputs."""
412
 
413
  clean_directory()
414
 
@@ -455,9 +461,7 @@ if __name__ == "__main__":
455
 
456
  clean_directory()
457
 
458
- (X_train, X_test), (y_train, y_test) = load_data()
459
-
460
- valid_columns = X_train.columns.to_list()
461
 
462
  with gr.Blocks(css=CSS) as demo:
463
 
@@ -512,6 +516,8 @@ if __name__ == "__main__":
512
 
513
  error_box1 = gr.Textbox(label="Error", visible=False)
514
 
 
 
515
  # Default disease, picked from the dataframe
516
  # disease_box = gr.Dropdown(list(sorted(set(df_test["prognosis"]))),
517
  # label="Disease:")
@@ -535,7 +541,8 @@ if __name__ == "__main__":
535
  <p align="center">
536
  <img width="80%" height="20%" src="https://raw.githubusercontent.com/kcelia/Img/main/Go-To-Step2.png">
537
  </p>
538
- """, visible=False
 
539
  )
540
 
541
  submit_button.click(
@@ -628,7 +635,8 @@ if __name__ == "__main__":
628
  <p align="center">
629
  <img width="80%" height="20%" src="https://raw.githubusercontent.com/kcelia/Img/main/Go-To-Step3.png">
630
  </p>
631
- """, visible=False
 
632
  )
633
 
634
  send_input_btn.click(
@@ -652,7 +660,8 @@ if __name__ == "__main__":
652
  <p align="center">
653
  <img width="80%" height="20%" src="https://raw.githubusercontent.com/kcelia/Img/main/Go-to-Step4.png">
654
  </p>
655
- """, visible=False
 
656
  )
657
  run_fhe_btn.click(
658
  run_fhe_fn,
 
27
  # pylint: disable=c-extension-no-member,invalid-name
28
 
29
 
30
+ def is_none(obj) -> bool:
31
  """
32
+ Check if the object is None.
33
 
34
  Args:
35
+ obj (any): The input to be checked.
36
 
37
  Returns:
38
+ bool: True if the object is None or empty, False otherwise.
39
  """
40
+ return all((obj is None, (obj is not None and len(obj) < 1)))
41
 
42
 
43
+ # <!> This function has been paused due to UI issues.
44
+
45
  # def fill_in_fn(default_disease: str, *checkbox_symptoms: Tuple[str]) -> Dict:
46
  # """
47
+ # Fill in the gr.CheckBoxGroup list with predefined symptoms of a selected default disease.
 
48
  # Args:
49
+ # default_disease (str): The default selected disease
50
+ # *checkbox_symptoms (Tuple[str]): Existing checked symptoms
 
51
  # Returns:
52
  # dict: The updated gr.CheckBoxesGroup.
53
  # """
54
+ #
55
+ # # Figure out the symptoms of the disease, selected by the user
56
  # df = pd.read_csv(TRAINING_FILENAME)
57
  # df_filtred = df[df[TARGET_COLUMNS[1]] == default_disease]
58
  # symptoms = pretty_print(df_filtred.columns[df_filtred.eq(1).any()].to_list())
59
+ # # Check if there are existing symptoms, in the CheckbBxGroup list
60
  # if any(lst for lst in checkbox_symptoms if lst):
61
  # for sublist in checkbox_symptoms:
62
  # symptoms.extend(sublist)
 
63
  # return {box: symptoms for box in check_boxes}
64
 
65
 
 
68
  Convert the user symptoms into a binary vector representation.
69
 
70
  Args:
71
+ checkbox_symptoms (List): A list of user symptoms.
72
 
73
  Returns:
74
  np.array: A binary vector representing the user's symptoms.
 
77
  KeyError: If a provided symptom is not recognized as a valid symptom.
78
 
79
  """
80
+ symptoms_vector = {key: 0 for key in valid_symptoms}
81
  for pretty_symptom in checkbox_symptoms:
82
  original_symptom = "_".join((pretty_symptom.lower().split(" ")))
83
  if original_symptom not in symptoms_vector.keys():
 
131
  """
132
  clean_directory()
133
 
134
+ if is_none(user_symptoms):
135
  print("Error: Please submit your symptoms or select a default disease.")
136
  return {
137
+ error_box2: gr.update(visible=True, value="Please submit your symptoms first!."),
138
  }
139
 
140
  # Generate a random user ID
 
143
 
144
  client = FHEModelClient(path_dir=DEPLOYMENT_DIR, key_dir=KEYS_DIR / f"{user_id}")
145
  client.load()
 
146
 
147
  # Creates the private and evaluation keys on the client side
148
  client.generate_private_and_evaluation_keys()
 
175
  user_id (user): The current user's ID
176
  """
177
 
178
+ if is_none(user_id) or is_none(user_symptoms):
179
  print("Error in encryption step: Provide your symptoms and generate the evaluation keys.")
180
  return {
181
  error_box3: gr.update(
 
192
 
193
  encrypted_quantized_user_symptoms = client.quantize_encrypt_serialize(user_symptoms)
194
  assert isinstance(encrypted_quantized_user_symptoms, bytes)
195
+ encrypted_input_path = KEYS_DIR / f"{user_id}/encrypted_input"
196
 
197
  with encrypted_input_path.open("wb") as f:
198
  f.write(encrypted_quantized_user_symptoms)
 
213
  """Send the encrypted data and the evaluation key to the server.
214
 
215
  Args:
216
+ user_id (str): The current user's ID
217
+ user_symptoms (np.ndarray): The user symptoms
218
  """
219
 
220
+ if is_none(user_id) or is_none(user_symptoms):
221
  return {
222
  error_box4: gr.update(
223
  visible=True,
 
227
  }
228
 
229
  evaluation_key_path = KEYS_DIR / f"{user_id}/evaluation_key"
230
+ encrypted_input_path = KEYS_DIR / f"{user_id}/encrypted_input"
231
 
232
  if not evaluation_key_path.is_file():
233
  print(
 
252
  # Define the data and files to post
253
  data = {
254
  "user_id": user_id,
255
+ "input": user_symptoms,
256
  }
257
 
258
  files = [
 
268
  files=files,
269
  ) as response:
270
  print(f"Sending Data: {response.ok=}")
271
+ return {
272
+ error_box4: gr.update(visible=False),
273
+ next_step_tab3: gr.update(visible=True),
274
+ srv_resp_send_data_box: "Data sent",
275
+ }
276
 
277
 
278
  def run_fhe_fn(user_id: str) -> Dict:
279
+ """Send the encrypted input and the evaluation key to the server.
280
 
281
  Args:
282
  user_id (int): The current user's ID.
283
  """
284
+ if is_none(user_id):
285
  return {
286
  error_box5: gr.update(
287
  visible=True,
 
294
  "user_id": user_id,
295
  }
296
 
 
 
297
  url = SERVER_URL + "run_fhe"
298
 
299
  with requests.post(
 
302
  ) as response:
303
  if not response.ok:
304
  return {
305
+ error_box5: gr.update(
306
+ visible=True,
307
+ value=(
308
+ "An error occurred on the Server Side. "
309
+ "Please check connectivity and data transmission."
310
+ ),
311
+ ),
312
  fhe_execution_time_box: gr.update(visible=True),
313
  }
314
  else:
 
317
  return {
318
  error_box5: gr.update(visible=False),
319
  fhe_execution_time_box: gr.update(value=f"{response.json()} seconds"),
320
+ next_step_tab4: gr.update(visible=True),
321
  }
322
 
323
 
 
325
  """Retreive the encrypted data from the server.
326
 
327
  Args:
328
+ user_id (str): The current user's ID
329
+ user_symptoms (np.ndarray): The user symptoms
330
  """
331
 
332
+ if is_none(user_id) or is_none(user_symptoms):
333
  return {
334
  error_box6: gr.update(
335
  visible=True,
 
366
  """Dencrypt the data on the `Client Side`.
367
 
368
  Args:
369
+ user_id (str): The current user's ID
370
+ user_symptoms (np.ndarray): The user symptoms
371
 
372
  Returns:
373
  Decrypted output
374
  """
375
 
376
+ if is_none(user_id) or is_none(user_symptoms):
377
  return {
378
  error_box7: gr.update(
379
  visible=True,
 
383
  }
384
 
385
  # Get the encrypted output path
 
386
  encrypted_output_path = CLIENT_DIR / f"{user_id}_encrypted_output"
387
 
388
  if not encrypted_output_path.is_file():
 
403
  # Retrieve the client API
404
  client = FHEModelClient(path_dir=DEPLOYMENT_DIR, key_dir=KEYS_DIR / f"{user_id}")
405
  client.load()
406
+
407
  # Deserialize, decrypt and post-process the encrypted output
408
  output = client.deserialize_decrypt_dequantize(encrypted_output)
409
 
 
414
 
415
 
416
  def reset_fn():
417
+ """Reset the space and clear all the box outputs."""
418
 
419
  clean_directory()
420
 
 
461
 
462
  clean_directory()
463
 
464
+ (X_train, X_test), (y_train, y_test), valid_symptoms = load_data()
 
 
465
 
466
  with gr.Blocks(css=CSS) as demo:
467
 
 
516
 
517
  error_box1 = gr.Textbox(label="Error", visible=False)
518
 
519
+ # <!> This part has been paused due to UI issues.
520
+
521
  # Default disease, picked from the dataframe
522
  # disease_box = gr.Dropdown(list(sorted(set(df_test["prognosis"]))),
523
  # label="Disease:")
 
541
  <p align="center">
542
  <img width="80%" height="20%" src="https://raw.githubusercontent.com/kcelia/Img/main/Go-To-Step2.png">
543
  </p>
544
+ """,
545
+ visible=False,
546
  )
547
 
548
  submit_button.click(
 
635
  <p align="center">
636
  <img width="80%" height="20%" src="https://raw.githubusercontent.com/kcelia/Img/main/Go-To-Step3.png">
637
  </p>
638
+ """,
639
+ visible=False,
640
  )
641
 
642
  send_input_btn.click(
 
660
  <p align="center">
661
  <img width="80%" height="20%" src="https://raw.githubusercontent.com/kcelia/Img/main/Go-to-Step4.png">
662
  </p>
663
+ """,
664
+ visible=False,
665
  )
666
  run_fhe_btn.click(
667
  run_fhe_fn,
utils.py CHANGED
@@ -1,7 +1,7 @@
1
  import os
2
  import shutil
3
  from pathlib import Path
4
- from typing import Any, List, Tuple
5
 
6
  import numpy
7
  import pandas
@@ -87,7 +87,7 @@ def get_disease_name(encoded_prediction: int, file_name: str = TRAINING_FILENAME
87
  return disease_name
88
 
89
 
90
- def load_data() -> Tuple[pandas.DataFrame, pandas.DataFrame, numpy.ndarray]:
91
  """
92
  Return the data
93
 
@@ -95,7 +95,7 @@ def load_data() -> Tuple[pandas.DataFrame, pandas.DataFrame, numpy.ndarray]:
95
  None
96
 
97
  Return:
98
- Tuple[pandas.DataFrame, pandas.DataFrame, numpy.ndarray]: The train and testing set.
99
 
100
 
101
  """
@@ -113,7 +113,7 @@ def load_data() -> Tuple[pandas.DataFrame, pandas.DataFrame, numpy.ndarray]:
113
  y_test = df_test[TARGET_COLUMNS[0]]
114
  X_test = df_test.drop(columns=TARGET_COLUMNS, axis=1, errors="ignore")
115
 
116
- return (X_train, X_test), (y_train, y_test)
117
 
118
 
119
  def load_model(X_train: pandas.DataFrame, y_train: numpy.ndarray):
 
1
  import os
2
  import shutil
3
  from pathlib import Path
4
+ from typing import List, Tuple, Union
5
 
6
  import numpy
7
  import pandas
 
87
  return disease_name
88
 
89
 
90
+ def load_data() -> Union[Tuple[pandas.DataFrame, numpy.ndarray], List]:
91
  """
92
  Return the data
93
 
 
95
  None
96
 
97
  Return:
98
+ The train, testing set and valid symptoms.
99
 
100
 
101
  """
 
113
  y_test = df_test[TARGET_COLUMNS[0]]
114
  X_test = df_test.drop(columns=TARGET_COLUMNS, axis=1, errors="ignore")
115
 
116
+ return (X_train, X_test), (y_train, y_test), X_train.columns.to_list()
117
 
118
 
119
  def load_model(X_train: pandas.DataFrame, y_train: numpy.ndarray):