import os
import pprint as pp
from collections import OrderedDict, defaultdict
import json
import diff_viewer
import pandas as pd
import streamlit as st
from datasets import load_dataset, get_dataset_config_names
CHECK_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT = st.secrets["CHECK_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT"]
LOGS_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT = st.secrets["LOGS_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT"]
HF_API_TOKEN = st.secrets["HF_API_TOKEN"]
OPERATION_TYPES = [
"Applied filter",
"Applied deduplication function",
"Applied map function",
]
MAX_LEN_DS_CHECKS = st.secrets["MAX_LEN_DS_CHECKS"]
def get_ds(config):
ds = load_dataset(CHECK_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT, config, use_auth_token=HF_API_TOKEN, trust_remote_code=True)
return ds["train"]
def next_idx(idx: int):
idx += 1
return idx % len(st.session_state["ds"])
def previous_idx(idx: int):
idx -= 1
return idx % len(st.session_state["ds"])
def on_click_next():
st.session_state["idx_1"] = next_idx(st.session_state["idx_1"])
st.session_state["idx_2"] = next_idx(st.session_state["idx_2"])
def on_click_previous():
st.session_state["idx_1"] = previous_idx(st.session_state["idx_1"])
st.session_state["idx_2"] = previous_idx(st.session_state["idx_2"])
def on_ds_change(config):
st.session_state["ds"] = get_ds(config)
st.session_state["idx_1"] = 0
st.session_state["idx_2"] = 1 if len(st.session_state["ds"]) > 1 else 0
st.session_state["ds_check_config"] = config
st.session_state["ds_max_docs"] = len(st.session_state["ds"])
def get_log_stats_df(raw_log):
data = OrderedDict(
{
"Order": [],
"Name": [],
"Initial number of samples": [],
"Final number of samples": [],
"Initial size in bytes": [],
"Final size in bytes": [],
}
)
metric_dict = defaultdict(lambda: {})
order = 0
for line in raw_log.split("\n"):
for metric_name in list(data.keys()) + OPERATION_TYPES:
if metric_name == "Name" or metric_name == "Order":
continue
if metric_name not in line:
continue
if (
metric_name == "Removed percentage"
and "Removed percentage in bytes" in line
):
continue
if (
metric_name == "Deduplicated percentage"
and "Deduplicated percentage in bytes" in line
):
continue
value = line.split(metric_name)[1].split(" ")[1]
if metric_name in OPERATION_TYPES:
operation_name = value
metric_dict[operation_name]["Order"] = order
order += 1
continue
assert (
metric_name not in metric_dict[operation_name]
), f"operation_name: {operation_name}\n\nvalue: {value}\n\nmetric_dict: {pp.pformat(metric_dict)} \n\nmetric_name: {metric_name} \n\nline: {line}"
metric_dict[operation_name][metric_name] = value
for name, data_dict in metric_dict.items():
for metric_name in data.keys():
if metric_name == "Name":
data[metric_name].append(name)
continue
data[metric_name].append(data_dict[metric_name])
df = pd.DataFrame(data)
df.rename(
{
"Initial size in bytes": "Initial size (GB)",
"Final size in bytes": "Final size (GB)",
},
axis=1,
inplace=True,
)
df["% samples removed"] = (
(
df["Initial number of samples"].astype(float)
- df["Final number of samples"].astype(float)
)
/ df["Initial number of samples"].astype(float)
* 100
)
df["Size (GB) % removed"] = (
(df["Initial size (GB)"].astype(float) - df["Final size (GB)"].astype(float))
/ df["Initial size (GB)"].astype(float)
* 100
)
return df
def get_logs_stats(raw_log):
try:
df = get_log_stats_df(raw_log)
st.dataframe(df)
except Exception as e:
st.write(e)
st.write("Subset of the logs:")
subcontent = [
line
for line in raw_log.split("\n")
if "INFO - __main__" in line
and "Examples of" not in line
and "Examples n°" not in line
]
st.write(subcontent)
def meta_component(idx_key: str = "idx_1"):
if "meta" not in st.session_state["ds"][st.session_state[idx_key]]:
return
with st.expander("See meta field of the example"):
meta = st.session_state["ds"][st.session_state["idx_1"]]["meta"]
st.write(meta)
def filter_page():
index_example = st.number_input("Index of the chosen example", min_value=0, max_value=st.session_state["ds_max_docs"] -1, value=0, step=1)
st.session_state["idx_1"] = index_example
st.session_state["idx_2"] = next_idx(index_example)
idx_1 = st.session_state["idx_1"]
idx_2 = st.session_state["idx_2"]
text_1 = st.session_state["ds"][idx_1]["text"]
text_2 = st.session_state["ds"][idx_2]["text"]
st.markdown(
f"
Some examples of filtered out texts
",
unsafe_allow_html=True,
)
# col_button_previous, _, col_button_next = st.columns(3)
# col_button_next.button(
# "Go to next example",
# key=None,
# help=None,
# on_click=on_click_next,
# args=None,
# kwargs=None,
# )
# col_button_previous.button(
# "Go to previous example",
# key=None,
# help=None,
# on_click=on_click_previous,
# args=None,
# kwargs=None,
# )
col_1, col_2 = st.columns(2)
with col_1:
st.subheader(f"Example n°{idx_1}")
meta_component(idx_key="idx_1")
text_1_show = text_1.replace("\n", "
")
st.markdown(f"{text_1_show}
", unsafe_allow_html=True)
with col_2:
st.subheader(f"Example n°{idx_2}")
meta_component(idx_key="idx_2")
text_2_show = text_2.replace("\n", "
")
st.markdown(f"{text_2_show}
", unsafe_allow_html=True)
def dedup_or_cleaning_page():
index_example = st.number_input("Index of the chosen example", min_value=0, max_value=st.session_state["ds_max_docs"] -1, value=0, step=1)
st.session_state["idx_1"] = index_example
st.session_state["idx_2"] = next_idx(index_example)
# col_button_previous, col_title, col_button_next = st.columns(3)
# col_title.markdown(
# f"Example n°{st.session_state['idx_1']}
",
# unsafe_allow_html=True,
# )
# col_button_next.button(
# "Go to next example",
# key=None,
# help=None,
# on_click=on_click_next,
# args=None,
# kwargs=None,
# )
# col_button_previous.button(
# "Go to previous example",
# key=None,
# help=None,
# on_click=on_click_previous,
# args=None,
# kwargs=None,
# )
text = st.session_state["ds"][st.session_state["idx_1"]]["text"]
old_text = st.session_state["ds"][st.session_state["idx_1"]]["old_text"]
st.markdown(
f"Changes applied", unsafe_allow_html=True
)
col_text_1, col_text_2 = st.columns(2)
with col_text_1:
st.subheader("Old text")
with col_text_2:
st.subheader("New text")
diff_viewer.diff_viewer(old_text=old_text, new_text=text, lang="none")
meta_component(idx_key="idx_1")
with st.expander("See full old and new texts of the example"):
text_show = text.replace("\n", "
")
old_text_show = old_text.replace("\n", "
")
col_1, col_2 = st.columns(2)
with col_1:
st.subheader("Old text")
st.markdown(f"
{old_text_show}
", unsafe_allow_html=True)
with col_2:
st.subheader("New text")
st.markdown(f"{text_show}
", unsafe_allow_html=True)
# Streamlit page
st.set_page_config(page_title="Dataset explorer", page_icon=":hugging_face:", layout="wide")
st.write(
"The purpose of this application is to sequentially view the changes made to a dataset."
)
# st.write(CHECK_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT)
# ds_log = load_dataset(CHECK_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT, 'clean_v1_dsname_lm_en_multi_un_2', use_auth_token=HF_API_TOKEN)
# st.write(ds_log)
col_option_clean, col_option_ds = st.columns(2)
with open("dataset_configs.json", "r") as f:
CHECK_CONFIGS = json.load(f)
# CHECK_CONFIGS = get_dataset_config_names(CHECK_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT, use_auth_token=HF_API_TOKEN)
CLEANING_VERSIONS = set()
dataset_names = defaultdict(set)
checks_names = defaultdict(lambda: defaultdict(set))
for check_config in CHECK_CONFIGS:
cleaning_version, check_config = check_config.split("_dsname_")
dataset_name, checks_name = check_config.split("_operation_")
CLEANING_VERSIONS.add(cleaning_version)
dataset_names[cleaning_version].add(dataset_name)
checks_names[cleaning_version][dataset_name].add(checks_name)
# CLEANING_VERSIONS = sorted(list(os.listdir(DATASET_DIR_PATH_BEFORE_CLEAN_SELECT)), reverse=True)
option_clean = col_option_clean.selectbox(
"Select the cleaning version", sorted(CLEANING_VERSIONS, reverse=True)
)
# DATASET_DIR_PATH = os.path.join(DATASET_DIR_PATH_BEFORE_CLEAN_SELECT, option_clean)
# dataset_names = sorted(list(os.listdir(DATASET_DIR_PATH)))
option_ds = col_option_ds.selectbox("Select the dataset", sorted(dataset_names[option_clean]))
# checks_path = os.path.join(DATASET_DIR_PATH, option_ds, "checks")
# checks_names = sorted(list(os.listdir(checks_path)))
# log_path = os.path.join(DATASET_DIR_PATH, option_ds, "logs.txt")
ds_log = load_dataset(LOGS_DATASET_DIR_PATH_BEFORE_CLEAN_SELECT, f"{option_clean}_dsname_{option_ds}", use_auth_token=HF_API_TOKEN, trust_remote_code=True)
log = ds_log["train"][0]["log"]
get_logs_stats(raw_log=log)
option_check = st.selectbox("Select the operation applied to inspect", sorted(checks_names[option_clean][option_ds]))
ds_check_config = f"{option_clean}_dsname_{option_ds}_operation_{option_check}"
if "ds" not in st.session_state or ds_check_config != st.session_state["ds_check_config"]:
on_ds_change(ds_check_config)
if len(st.session_state["ds"]) == MAX_LEN_DS_CHECKS:
st.warning(
f"Note: only a subset of size {MAX_LEN_DS_CHECKS} of the modified / filtered examples can be shown in this application"
)
with st.expander("See details of the available checks"):
st.write(st.session_state["ds"])
_ = filter_page() if "_filter_" in option_check else dedup_or_cleaning_page()