vibhorag101's picture
minor changes
bf3ca0c
import locale
import gradio as gr
import pandas as pd
from pandas.tseries.offsets import DateOffset
from portfolio import calculate_portfolio_returns
from utils import get_all_mf_schemes_df, get_mf_scheme_data
js_func = """
function refresh() {
const url = new URL(window.location);
if (url.searchParams.get('__theme') !== 'dark') {
url.searchParams.set('__theme', 'dark');
window.location.href = url.href;
}
}
"""
locale.setlocale(locale.LC_MONETARY, 'en_IN')
def get_portfolio_report(*args):
period = args[0]
custom_start_date = args[1]
custom_end_date = args[2]
SIP_Date = args[3]
sip_amount = args[4]
lumpsum_amount = args[5]
stepup = args[6]
schemes_df = args[7]
# Extract scheme names and weights, into a dictionary from the args
scheme_name_and_weight = {}
for i in range(8, len(args) - 1, 2):
if args[i] and args[i+1]:
scheme_name_and_weight[args[i]] = float(args[i+1])
use_inception_date = args[-1]
if not scheme_name_and_weight:
return "Please add at least one scheme.", None, None, None
end_date = pd.Timestamp.now().floor('D')
if use_inception_date:
start_date = pd.Timestamp(custom_start_date)
elif period == "Custom":
if not custom_start_date or not custom_end_date:
return "Please provide both start and end dates for custom period.", None, None, None
start_date = pd.Timestamp(custom_start_date)
end_date = pd.Timestamp(custom_end_date)
elif period == "YTD":
start_date = pd.Timestamp(f"{end_date.year}-01-01")
elif not period:
return "Please select a period, provide custom dates, or use the inception date.", None, None, None
else:
period_parts = period.split()
if len(period_parts) < 2:
return "Invalid period selected.", None, None, None
if 'year' in period_parts[1]:
years = int(period_parts[0])
start_date = end_date - DateOffset(years=years)
else:
months = int(period_parts[0])
start_date = end_date - DateOffset(months=months)
portfolio_report_string = calculate_portfolio_returns(scheme_name_and_weight, sip_amount, lumpsum_amount, stepup, start_date, end_date, SIP_Date, schemes_df)
return portfolio_report_string
def quick_search_schemes(query, schemes_df):
if not query:
return []
matching_schemes = schemes_df[schemes_df['schemeName'].str.contains(query, case=False, na=False)]
return matching_schemes['schemeName'].tolist()[:40]
def update_scheme_dropdown(query, schemes_df, key_up_data: gr.KeyUpData):
schemes = quick_search_schemes(key_up_data.input_value, schemes_df)
return gr.update(choices=schemes, visible=True)
def update_schemes_list(schemes_list, updated_data):
new_schemes_list = []
for _, row in updated_data.iterrows():
scheme_name = row.get('Scheme Name')
weight = row.get('Weight (%)')
action = row.get('Actions')
if scheme_name and weight is not None and action != '🗑️': # Only keep rows that aren't marked for deletion
try:
weight_float = float(weight)
new_schemes_list.append((scheme_name, weight_float))
except ValueError:
# If weight is not a valid float, skip this row
continue
return new_schemes_list
def update_schemes_table(schemes_list):
df = pd.DataFrame(schemes_list, columns=["Scheme Name", "Weight (%)"])
df["Actions"] = "❌"
# Calculate the sum of weights
total_weight = df["Weight (%)"].sum()
# Add a row for the total
total_row = pd.DataFrame({
"Scheme Name": ["Total"],
"Weight (%)": [total_weight],
"Actions": [""]
})
# Concatenate the original dataframe with the total row
df = pd.concat([df, total_row], ignore_index=True)
# Add a warning if total weight exceeds 100%
if total_weight > 100:
df.loc[df.index[-1], "Actions"] = "⚠️ Exceeds 100%"
return df
def add_scheme_to_list(schemes_list, scheme_name, weight):
if scheme_name and weight:
new_list = schemes_list + [(scheme_name, float(weight))]
return new_list, update_schemes_table(new_list), None, 0
return schemes_list, update_schemes_table(schemes_list), scheme_name, weight
def update_schemes(schemes_list, updated_data):
try:
new_schemes_list = []
for _, row in updated_data.iterrows():
scheme_name = row.get('Scheme Name')
weight = row.get('Weight (%)')
if scheme_name != 'Total' and weight is not None:
try:
weight_float = float(weight)
new_schemes_list.append((scheme_name, weight_float))
except ValueError:
continue
if not new_schemes_list:
return schemes_list, update_schemes_table(schemes_list), "No valid schemes found in the table."
return new_schemes_list, update_schemes_table(new_schemes_list), None
except Exception as e:
error_msg = f"Error updating schemes: {str(e)}"
return schemes_list, update_schemes_table(schemes_list), error_msg
def prepare_inputs(period, custom_start, custom_end, SIP_Date, sip_amount, schemes_list, schemes_df):
inputs = [period, custom_start, custom_end, SIP_Date, sip_amount, schemes_df]
for name, weight in schemes_list:
inputs.extend([name, weight])
return inputs
def handle_row_selection(schemes_list, evt: gr.SelectData, table_data):
if evt.index is not None and len(evt.index) > 1:
column_index = evt.index[1]
if column_index == 2: # "Actions" column
row_index = evt.index[0]
if row_index < len(table_data) - 1: # Ensure we're not trying to delete the total row
# Remove the row
table_data = table_data.drop(row_index).reset_index(drop=True)
# Update the schemes_list
updated_schemes_list = [(row['Scheme Name'], row['Weight (%)']) for _, row in table_data.iterrows() if row['Scheme Name'] != 'Total']
# Recalculate the total
return update_schemes_table(updated_schemes_list), updated_schemes_list
return table_data, schemes_list
def create_ui():
schemes_df = get_all_mf_schemes_df()
with gr.Blocks(js=js_func) as demo:
gr.Markdown("# Mutual Fund SIP Returns Calculator")
with gr.Row():
period = gr.Dropdown(choices=["YTD", "1 month","3 months","6 months","1 year", "3 years", "5 years", "7 years", "10 years","15 years","20 years", "Custom"], label="Select Period",value="YTD")
custom_start_date = gr.Textbox(label="Custom Start Date (YYYY-MM-DD)", visible=False)
custom_end_date = gr.Textbox(label="Custom End Date (YYYY-MM-DD)", visible=False)
SIP_Date = gr.Dropdown(label="Monthly SIP Date", choices=["start","middle","end"],value="start")
with gr.Column():
use_inception_date = gr.Checkbox(label="Use Earliest Inception Date", value=False)
inception_date_display = gr.Textbox(label="Earliest Inception Date", interactive=False)
with gr.Row():
sip_amount = gr.Number(label="SIP Amount (₹)")
upfront_amount = gr.Number(label="Upfront Investment (₹)",value=0)
stepup = gr.Number(label="Stepup %",value=0)
schemes_list = gr.State([])
with gr.Row():
scheme_dropdown = gr.Dropdown(label="Select Scheme", choices=[], allow_custom_value=True, interactive=True)
scheme_weight = gr.Slider(minimum=0, maximum=100, step=1, label="Scheme Weight (%)")
add_button = gr.Button("Add Scheme")
schemes_table = gr.Dataframe(
headers=["Scheme Name", "Weight (%)", "Actions"],
datatype=["str", "number", "str"],
col_count=(3, "fixed"),
label="Added Schemes",
type="pandas",
interactive=True
)
update_button = gr.Button("Update Schemes")
error_message = gr.Textbox(label="Error", visible=False)
calculate_button = gr.Button("Calculate Returns")
result = gr.Textbox(label="Results",)
# pie_chart = gr.Plot(label="Scheme Weightages")
# final_value = gr.Number(label="Final Value (₹)", interactive=False)
# total_investment = gr.Number(label="Total Investment (₹)", interactive=False)
def update_custom_date_visibility(period):
return {custom_start_date: gr.update(visible=period=="Custom"),
custom_end_date: gr.update(visible=period=="Custom")}
period.change(update_custom_date_visibility, inputs=[period], outputs=[custom_start_date, custom_end_date])
scheme_dropdown.key_up(
fn=update_scheme_dropdown,
inputs=[scheme_dropdown, gr.State(schemes_df)],
outputs=scheme_dropdown,
queue=False,
show_progress="hidden"
)
add_button.click(add_scheme_to_list,
inputs=[schemes_list, scheme_dropdown, scheme_weight],
outputs=[schemes_list, schemes_table, scheme_dropdown, scheme_weight])
def update_schemes_and_show_error(schemes_list, updated_data):
new_schemes_list, updated_table, error = update_schemes(schemes_list, updated_data)
return (
new_schemes_list,
updated_table,
gr.update(value=error, visible=bool(error))
)
update_button.click(
update_schemes_and_show_error,
inputs=[schemes_list, schemes_table],
outputs=[schemes_list, schemes_table, error_message]
)
schemes_table.select(
handle_row_selection,
inputs=[schemes_list, schemes_table],
outputs=[schemes_table, schemes_list]
)
def get_earliest_inception_date(schemes_list, schemes_df):
inception_dates = []
for scheme_name, _ in schemes_list:
scheme_code = schemes_df[schemes_df['schemeName'] == scheme_name]['schemeCode'].values[0]
_, inception_date = get_mf_scheme_data(scheme_code)
inception_dates.append(inception_date)
return max(inception_dates).strftime("%Y-%m-%d") if inception_dates else ""
def update_inception_date(use_inception_date, schemes_list, schemes_df):
if use_inception_date and schemes_list:
earliest_inception_date = get_earliest_inception_date(schemes_list, schemes_df)
return gr.update(value=earliest_inception_date, visible=True)
else:
return gr.update(value="", visible=False)
use_inception_date.change(
update_inception_date,
inputs=[use_inception_date, schemes_list, gr.State(schemes_df)],
outputs=inception_date_display
)
def prepare_inputs_with_inception(period, custom_start, custom_end, SIP_Date, sip_amount, upfront_amount,stepup, schemes_list, schemes_df, use_inception_date, inception_date_display):
inputs = [period, custom_start, custom_end, SIP_Date, sip_amount, upfront_amount, stepup, schemes_df]
for name, weight in schemes_list:
inputs.extend([name, weight])
inputs.append(use_inception_date) # Add use_inception_date to the inputs
if use_inception_date and inception_date_display:
inputs[1] = inception_date_display # Replace custom_start with inception_date_display
return inputs
calculate_button.click(
lambda *args: get_portfolio_report(*prepare_inputs_with_inception(*args)),
inputs=[period, custom_start_date, custom_end_date, SIP_Date, sip_amount,upfront_amount,stepup,schemes_list, gr.State(schemes_df), use_inception_date, inception_date_display],
outputs=[result]
# outputs=[result, final_value, total_investment]
# outputs=[result, pie_chart, final_value, total_investment]
)
return demo
demo = create_ui()
demo.launch(debug=True)