File size: 5,025 Bytes
f5ad77e
 
9d6ff9a
 
f5ad77e
 
 
9d6ff9a
f5ad77e
 
9d6ff9a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f5ad77e
9d6ff9a
 
 
 
 
 
 
 
 
 
 
f5ad77e
 
9d6ff9a
 
 
 
f5ad77e
9d6ff9a
f5ad77e
 
9d6ff9a
f5ad77e
 
9d6ff9a
 
 
 
 
 
 
 
 
f5ad77e
 
9d6ff9a
 
 
 
 
 
 
 
 
f5ad77e
 
9d6ff9a
 
 
 
 
f5ad77e
9d6ff9a
 
 
 
 
 
f5ad77e
 
9d6ff9a
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import os
import pandas as pd
import requests
import json
from io import StringIO
from datetime import datetime

from src.assets.text_content import REPO

def get_github_data():
    """
    Read and process data from CSV files hosted on GitHub. - https://github.com/clembench/clembench-runs

    Returns:
        github_data (dict): Dictionary containing:
            - "text": List of DataFrames for each version's textual leaderboard data.
            - "multimodal": List of DataFrames for each version's multimodal leaderboard data.
            - "date": Formatted date of the latest version in "DD Month YYYY" format.
    """
    base_repo = REPO
    json_url = base_repo + "benchmark_runs.json"
    response = requests.get(json_url)

    # Check if the JSON file request was successful
    if response.status_code != 200:
        print(f"Failed to read JSON file: Status Code: {response.status_code}")
        return None, None, None, None

    json_data = response.json()
    versions = json_data['versions']

    # Sort version names - latest first
    version_names = sorted(
        [ver['version'] for ver in versions],
        key=lambda v: float(v[1:]),
        reverse=True
    )
    print(f"Found {len(version_names)} versions from get_github_data(): {version_names}.")

    # Get Last updated date of the latest version
    latest_version = version_names[0]
    latest_date = next(
        ver['date'] for ver in versions if ver['version'] == latest_version
    )
    formatted_date = datetime.strptime(latest_date, "%Y/%m/%d").strftime("%d %b %Y")

    # Get Leaderboard data - for text-only + multimodal
    github_data = {}

    # Collect Dataframes
    text_dfs = []
    mm_dfs = []

    for version in version_names:
        # Collect CSV data in descending order of clembench-runs versions
        # Collect Text-only data
        text_url = f"{base_repo}{version}/results.csv"
        csv_response = requests.get(text_url)
        if csv_response.status_code == 200:
            df = pd.read_csv(StringIO(csv_response.text))
            df = process_df(df)
            df = df.sort_values(by=df.columns[1], ascending=False)  # Sort by clemscore column
            text_dfs.append(df)
        else:
            print(f"Failed to read Text-only leaderboard CSV file for version: {version}. Status Code: {csv_response.status_code}")

        # Collect Multimodal data
        if float(version[1:]) >= 1.6:
            mm_url = f"{base_repo}{version}_multimodal/results.csv"
            mm_response = requests.get(mm_url)
            if mm_response.status_code == 200:
                df = pd.read_csv(StringIO(mm_response.text))
                df = process_df(df)
                df = df.sort_values(by=df.columns[1], ascending=False) # Sort by clemscore column
                mm_dfs.append(df)
        else:
            print(f"Failed to read multimodal leaderboard CSV file for version: {version}: Status Code: {csv_response.status_code}. Please ignore this message if multimodal results are not available for this version")

    github_data["text"] = text_dfs
    github_data["multimodal"] = mm_dfs
    github_data["date"] = formatted_date

    return github_data


def process_df(df: pd.DataFrame) -> pd.DataFrame:
    """
    Process dataframe:
    - Convert datatypes to sort by "float" instead of "str"
    - Remove repetition in model names
    - Update column names

    Args:
        df: Unprocessed Dataframe (after using update_cols)

    Returns:
        df: Processed Dataframe
    """

    # Convert column values to float, apart from the model names column
    for col in df.columns[1:]:
        df[col] = pd.to_numeric(df[col], errors='coerce')

    # Remove repetition in model names
    df[df.columns[0]] = df[df.columns[0]].str.replace('-t0.0', '', regex=True)
    df[df.columns[0]] = df[df.columns[0]].apply(lambda x: '--'.join(set(x.split('--'))))

    # Update column names
    custom_column_names = ['Model', 'Clemscore', '% Played', 'Quality Score']
    for i, col in enumerate(df.columns[4:]):  # Start Capitalizing from the 5th column
        parts = col.split(',')
        custom_name = f"{parts[0].strip().capitalize()} {parts[1].strip()}"
        custom_column_names.append(custom_name)

    # Rename columns
    df.columns = custom_column_names

    return df


def query_search(df: pd.DataFrame, query: str) -> pd.DataFrame:
    """
    Filter the dataframe based on the search query.

    Args:
        df (pd.DataFrame): Unfiltered dataframe.
        query (str): A string of queries separated by ";".
    Returns:
        pd.DataFrame: Filtered dataframe containing searched queries in the 'Model' column.
    """
    if not query.strip():  # Reset Dataframe if empty query is passed
        return df

    queries = [q.strip().lower() for q in query.split(';') if q.strip()]  # Normalize and split queries

    # Filter dataframe based on queries in 'Model' column
    filtered_df = df[df['Model'].str.lower().str.contains('|'.join(queries))]

    return filtered_df