import streamlit as st from pybitget import Client import datetime import pandas as pd from utils.preprocess.preprocess_data import ( preprocess, normalize, split_train_test, create_dataset, build_model, train_model, ) import matplotlib.pyplot as plt import plotly.graph_objects as go from sklearn.metrics import ( r2_score, ) from utils.preprocess.projections import project import numpy as np client = Client( st.secrets["apikey"], st.secrets["password"], passphrase=st.secrets["passphrase"], ) from itertools import cycle import plotly.express as px from sklearn.model_selection import train_test_split def get_symbols(): data = client.spot_get_symbols() return [x["symbol"] for x in data["data"]] def get_data(symbol, period, after, before): print(symbol, period, after, end) data = client.spot_get_candle_data( symbol=symbol, period=period, after=after, before=before, limit=1000, )["data"] return pd.DataFrame(data) st.set_page_config(page_title="Keras Bitget predictions", page_icon="📈", layout="wide") st.title("Crypto price prediction") coin = st.selectbox("Select your symbol", options=get_symbols()) period = st.selectbox( "Select the interval", options=[ "1min", "5min", "15min", "30min", "1h", "4h", "6h", "12h", "1day", "1week", ], ) default_time = datetime.time(13, 0) start = st.date_input( "Start date of the data", # value=datetime.datetime.now().date() - datetime.timedelta(days=30), value=datetime.date(year=2022, month=1, day=1), max_value=datetime.datetime.now().date(), ) # start = datetime.datetime.timestamp(start) end = st.date_input( "End date of the data", value=datetime.datetime.now().date(), max_value=datetime.datetime.now().date(), ) days = st.slider(label="Days to project", min_value=1, max_value=30, step=1, value=20) epochs = st.slider( label="Training Epochs", min_value=10, max_value=200, step=20, value=20 ) # end = datetime.datetime.timestamp(end) # end = st.date_input("Start date of the data", datetime.now().date()) if st.button("Start"): data = get_data( coin, period, after=str( int( datetime.datetime.timestamp( datetime.datetime.combine(start, default_time) ) ) * 1000 ), before=str( int( datetime.datetime.timestamp( datetime.datetime.combine(end, default_time) ) ) * 1000 ), ) closedf = preprocess(data) names = cycle( ["Stock Open Price", "Stock Close Price", "Stock High Price", "Stock Low Price"] ) figp = px.line( closedf, x=closedf.Date, y=[closedf["open"], closedf["close"], closedf["high"], closedf["low"]], labels={"Date": "Date", "value": "Stock value"}, ) figp.update_layout( title_text="Stock analysis chart", font_size=15, font_color="black", legend_title_text="Stock Parameters", ) figp.for_each_trace(lambda t: t.update(name=next(names))) figp.update_xaxes(showgrid=False) figp.update_yaxes(showgrid=False) st.plotly_chart(figp, use_container_width=True) close_stock = closedf.copy() close_stock = close_stock[["Date", "close"]] # st.write(closedf.shape) close_stock_train, close_stock_test = train_test_split(close_stock, train_size=0.60) # st.write(close_stock_train.shape) closedfsc, scaler = normalize(closedf=closedf) training_size = int(len(closedf) * 0.60) test_size = len(closedf) - training_size train_set, test_set = split_train_test( closedfsc=closedfsc, training_size=training_size, test_size=test_size ) # st.write(train_set.shape) time_step = int(days/2) X_train, y_train = create_dataset(train_set, time_step) X_test, y_test = create_dataset(test_set, time_step) X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1) X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1) model = build_model() st.write("Epoch Progress:") progress_bar = st.progress(0) def update_progress(epoch, history): progress_percent = (epoch + 1) / epochs * 100 progress_bar.progress(progress_percent / 100) # Normalize to [0.0, 1.0] # emp.write( # f"Epoch {epoch + 1}/{epochs} - Loss: {history.history['loss'][0]} - Val Loss: {history.history['val_loss'][0]}" # ) trained_model, train_losses, val_loss = train_model( model, X_train, y_train, X_test, y_test, epochs, progress_callback=update_progress, ) st.write("Training Completed!") epochs = [i for i in range(len(train_losses))] trace_train = go.Scatter( x=epochs, y=train_losses, mode="lines", name="Training Loss", line=dict(color="red"), ) # Create a trace for validation loss trace_val = go.Scatter( x=epochs, y=val_loss, mode="lines", name="Validation Loss", line=dict(color="blue"), ) # Create the layout for the plot layout = go.Layout( title="Training and Validation Loss", xaxis=dict(title="Epochs"), yaxis=dict(title="Loss"), ) # Create the figure fig = go.Figure(data=[trace_train, trace_val], layout=layout) # Show the plot st.plotly_chart(fig, use_container_width=True) train_predict = trained_model.predict(X_train) test_predict = trained_model.predict(X_test) train_predict = scaler.inverse_transform(train_predict) test_predict = scaler.inverse_transform(test_predict) original_ytrain = scaler.inverse_transform(y_train.reshape(-1, 1)) original_ytest = scaler.inverse_transform(y_test.reshape(-1, 1)) st.write( "Train data Accuracy score:", r2_score(original_ytrain, train_predict), ) st.write( "Test data Accuracy score:", r2_score(original_ytest, test_predict), ) plt.figure(figsize=(16, 10)) plt.plot(original_ytest) plt.plot(test_predict) plt.ylabel("Price") plt.title(coin + " Single Point Price Prediction") plt.legend(["Actual", "Predicted"]) plt.xticks(color="w") st.pyplot(plt.gcf(), use_container_width=True) projected_data = project( time_step=15, test_data=test_set, model=trained_model, days=days ) last_days = np.arange(1, time_step + 1) day_pred = np.arange(time_step + 1, time_step + days + 1) temp_mat = np.empty((len(last_days) + days + 1, 1)) temp_mat[:] = np.nan temp_mat = temp_mat.reshape(1, -1).tolist()[0] last_original_days_value = temp_mat next_predicted_days_value = temp_mat last_original_days_value[0 : time_step + 1] = ( scaler.inverse_transform( close_stock[len(close_stock.close) - time_step :].close.values.reshape( -1, 1 ) ) .reshape(1, -1) .tolist()[0] ) next_predicted_days_value[time_step + 1 :] = ( scaler.inverse_transform(np.array(projected_data).reshape(-1, 1)) .reshape(1, -1) .tolist()[0] ) new_pred_plot = pd.DataFrame( { "last_original_days_value": last_original_days_value, "next_predicted_days_value": next_predicted_days_value, } ) names = cycle(["Last 15 days close price", "Predicted next 30 days close price"]) fig = px.line( new_pred_plot, x=new_pred_plot.index, y=[ new_pred_plot["last_original_days_value"], new_pred_plot["next_predicted_days_value"], ], labels={"value": "Stock price", "index": "Timestamp"}, ) fig.update_layout( title_text="Compare last 15 days vs next 30 days", plot_bgcolor="white", font_size=15, font_color="black", legend_title_text="Close Price", ) fig.for_each_trace(lambda t: t.update(name=next(names))) fig.update_xaxes(showgrid=False) fig.update_yaxes(showgrid=False) st.plotly_chart(fig, use_container_width=True) lstmdf = closedfsc.tolist() lstmdf.extend((np.array(projected_data).reshape(-1, 1)).tolist()) lstmdf = scaler.inverse_transform(lstmdf).reshape(1, -1).tolist()[0] names = cycle(["Close price"]) fig = px.line(lstmdf, labels={"value": "Stock price", "index": "Timestamp"}) fig.update_layout( title_text="Plotting whole closing stock price with prediction", plot_bgcolor="white", font_size=15, font_color="black", legend_title_text="Stock", ) fig.for_each_trace(lambda t: t.update(name=next(names))) fig.update_xaxes(showgrid=False) fig.update_yaxes(showgrid=False) st.plotly_chart(fig, use_container_width=True)