Spaces:
Sleeping
Sleeping
import pandas as pd | |
import polars as pl | |
import numpy as np | |
import json | |
import gc | |
import folium | |
import html | |
from matplotlib import pyplot as plt | |
import seaborn as sns | |
import xgboost as xgb | |
from xgboost import plot_importance | |
from bs4 import BeautifulSoup | |
import plotly.express as px | |
import plotly.graph_objects as go | |
import plotly.figure_factory as ff | |
from plotly.subplots import make_subplots | |
import plotly.io as pio | |
from statsmodels.graphics.tsaplots import plot_pacf, plot_acf | |
from statsmodels.tsa.stattools import kpss, adfuller | |
from bertopic import BERTopic | |
from collections import defaultdict | |
color_pal = sns.color_palette("tab10") | |
impute_cols = [ | |
'MeanTemp', 'MinTemp', 'MaxTemp', 'DewPoint', | |
'Percipitation', 'WindSpeed', 'MaxSustainedWind', | |
'Gust', 'Rain', 'SnowDepth', 'SnowIce', | |
] | |
def convert_schema_to_polars(schema): | |
pl_schema = {} | |
for k, v in schema.items(): | |
if v == "String": | |
pl_schema[k] = pl.String | |
elif v == "Float64": | |
pl_schema[k] = pl.Float64 | |
elif v == "Int64": | |
pl_schema[k] = pl.Int64 | |
return pl_schema | |
def create_datetime(data, dt_col, format="%m/%d/%Y %I:%M:%S %p"): | |
# df type is either pandas or polars | |
df_type = "pandas" if isinstance(data, pd.DataFrame) else "polars" | |
if "datetime" in str(data[dt_col].dtype).lower(): | |
return data | |
if df_type == "pandas": | |
data[dt_col] = pd.to_datetime(data[dt_col], format=format) | |
elif df_type == "polars": | |
data = data.with_columns( | |
pl.col(dt_col).str.strptime(pl.Date, format=format).cast(pl.Datetime) | |
) | |
return data | |
def create_seasons(data, dt_col="Datetime", out_col="Season", prefix=""): | |
df_type = "pandas" if isinstance(data, pd.DataFrame) else "polars" | |
out_col = prefix + out_col | |
spring_start = pd.to_datetime("2018-3-20", format = "%Y-%m-%d").dayofyear | |
summer_start = pd.to_datetime("2018-6-21", format = "%Y-%m-%d").dayofyear | |
autumn_start = pd.to_datetime("2018-9-22", format = "%Y-%m-%d").dayofyear | |
winter_start = pd.to_datetime("2018-12-21", format = "%Y-%m-%d").dayofyear | |
if df_type == "pandas": | |
def map_season(date): | |
if date.dayofyear < spring_start or date.dayofyear >= winter_start: | |
return "Winter" | |
elif date.dayofyear >= spring_start and date.dayofyear < summer_start: | |
return "Spring" | |
elif date.dayofyear >= summer_start and date.dayofyear < autumn_start: | |
return "Summer" | |
elif date.dayofyear >= autumn_start and date.dayofyear < winter_start: | |
return "Autumn" | |
data[out_col] = data[dt_col].apply(map_season) | |
return data | |
elif df_type == "polars": | |
def map_season(date): | |
# for date in dates: | |
if date.timetuple().tm_yday < spring_start or date.timetuple().tm_yday >= winter_start: | |
return "Winter" | |
elif date.timetuple().tm_yday >= spring_start and date.timetuple().tm_yday < summer_start: | |
return "Spring" | |
elif date.timetuple().tm_yday >= summer_start and date.timetuple().tm_yday < autumn_start: | |
return "Summer" | |
elif date.timetuple().tm_yday >= autumn_start and date.timetuple().tm_yday < winter_start: | |
return "Autumn" | |
data = data.with_columns( | |
pl.col(dt_col).map_elements(map_season, return_dtype=pl.String).alias(out_col) | |
) | |
return data | |
def create_weekend(data, dt_col="Datetime", out_col="is_weekend", prefix=""): | |
df_type = "pandas" if isinstance(data, pd.DataFrame) else "polars" | |
out_col = prefix + out_col | |
if df_type == "pandas": | |
data[out_col] = (data[dt_col].dt.weekday.isin([5,6])).astype(np.int8) | |
elif df_type == "polars": | |
data = data.with_columns( | |
pl.col(dt_col).dt.weekday().is_in([6,7]).cast(pl.Int8).alias(out_col) | |
) | |
return data | |
def create_holidays(data, dt_col="Datetime", out_col="is_holiday", prefix=""): | |
df_type = "pandas" if isinstance(data, pd.DataFrame) else "polars" | |
out_col = prefix + out_col | |
# The only holiday not included will be new years as I expect a potential affect | |
HOLIDAYS = [ | |
pd.to_datetime("2016-01-18"), pd.to_datetime("2016-02-15"), | |
pd.to_datetime("2016-05-30"), pd.to_datetime("2016-07-04"), pd.to_datetime("2016-09-05"), | |
pd.to_datetime("2016-10-10"), pd.to_datetime("2016-11-11"), pd.to_datetime("2016-11-24"), | |
# Christmas is variable (depends on what day is actually holiday vs. what day is XMAS) | |
pd.to_datetime("2016-12-24"), pd.to_datetime("2016-12-25"), pd.to_datetime("2016-12-26"), | |
pd.to_datetime("2017-01-16"), pd.to_datetime("2017-02-20"), | |
pd.to_datetime("2017-05-29"), pd.to_datetime("2017-07-04"), pd.to_datetime("2017-09-04"), | |
pd.to_datetime("2017-10-09"), pd.to_datetime("2017-11-10"), pd.to_datetime("2017-11-23"), | |
pd.to_datetime("2017-12-24"), pd.to_datetime("2017-12-25"), | |
pd.to_datetime("2018-01-15"), pd.to_datetime("2018-02-19"), | |
pd.to_datetime("2018-05-28"), pd.to_datetime("2018-07-04"), pd.to_datetime("2018-09-03"), | |
pd.to_datetime("2018-10-08"), pd.to_datetime("2018-11-12"), pd.to_datetime("2018-11-22"), | |
pd.to_datetime("2018-12-24"), pd.to_datetime("2018-12-25"), | |
] | |
if df_type == "pandas": | |
data[out_col] = (data[dt_col].isin(HOLIDAYS)).astype(np.int8) | |
elif df_type == "polars": | |
data = data.with_columns( | |
pl.col(dt_col).dt.datetime().is_in(HOLIDAYS).cast(pl.Int8).alias(out_col) | |
) | |
return data | |
def build_temporal_features(data, dt_col, prefix=""): | |
df_type = "pandas" if isinstance(data, pd.DataFrame) else "polars" | |
if df_type == "pandas" and data.index.name == dt_col: | |
data = data.reset_index() | |
if df_type == "pandas": | |
data[prefix+"Year"] = data[dt_col].dt.year.astype(np.int16) | |
data[prefix+"Month"] = data[dt_col].dt.month.astype(np.int8) | |
data[prefix+"Day"] = data[dt_col].dt.day.astype(np.int8) | |
data[prefix+"DayOfYear"] = data[dt_col].dt.dayofyear.astype(np.int16) | |
data[prefix+"DayOfWeek"] = data[dt_col].dt.dayofweek.astype(np.int8) | |
else: | |
data = data.with_columns (**{ | |
prefix+"Year": pl.col(dt_col).dt.year().cast(pl.Int16), | |
prefix+"Month": pl.col(dt_col).dt.month().cast(pl.Int8), | |
prefix+"Day": pl.col(dt_col).dt.day().cast(pl.Int8), | |
prefix+"DayOfYear": pl.col(dt_col).dt.ordinal_day().cast(pl.Int16), | |
prefix+"DayOfWeek": pl.col(dt_col).dt.weekday().cast(pl.Int8) | |
}) | |
data = create_seasons(data, dt_col, prefix=prefix) | |
data = create_weekend(data, dt_col, prefix=prefix) | |
data = create_holidays(data, dt_col, prefix=prefix) | |
return data | |
def agg_and_merge_historical(curr_df, hist_df, col, agg_cols=[], ops=["mean", "max", "min"]): | |
merge_dict = {} | |
for agg_col in agg_cols: | |
describe_tb = hist_df.groupby(col)[agg_col].describe().reset_index() | |
if col not in merge_dict: | |
merge_dict[col] = describe_tb[col].values | |
for op in ops: | |
merge_col_name = "historical_" + col + "_" + op + "_" + agg_col | |
if op == "mean": | |
merge_dict[merge_col_name] = describe_tb["mean"].values | |
elif op == "max": | |
merge_dict[merge_col_name] = describe_tb["max"].values | |
elif op == "min": | |
merge_dict[merge_col_name] = describe_tb["min"].values | |
elif op == "median": | |
merge_dict[merge_col_name] = describe_tb["50%"].values | |
elif op == "std": | |
merge_dict[merge_col_name] = describe_tb["std"].values | |
merge_df = pd.merge(curr_df, pd.DataFrame(merge_dict), on=col, how="left") | |
return merge_df | |
def map_vals(data, cols=["Latitude", "Longitude"], label_cols=[], color="red", submap=None, weight=3, radius=1, sample_size=10000, start_loc=[42.1657, -74.9481], zoom_start=6): | |
cols = cols | |
df_type = "pandas" if isinstance(data, pd.DataFrame) or isinstance(data, pd.Series) else "polars" | |
fig = folium.Figure(height=500, width=750) | |
if submap is None: | |
map_nyc = folium.Map( | |
location=start_loc, | |
zoom_start=zoom_start, | |
tiles='cartodbpositron', | |
zoom_control=False, | |
scrollWheelZoom=False, | |
dragging=False | |
) | |
else: | |
map_nyc = submap | |
cols.extend(label_cols) | |
if df_type == "pandas": | |
for idx, row in data.loc[:, cols].dropna().sample(sample_size).iterrows(): | |
label = "" | |
lat, long = row.iloc[0,], row.iloc[1,] | |
for i, label_col in enumerate(label_cols): | |
label += label_col + ": " + str(row.iloc[2+i,]) + "\n" | |
label_params = {"popup": label, "tooltip": label} if len(label_cols) > 0 else {} | |
folium.CircleMarker(location=[lat, long], radius=radius, weight=weight, color=color, fill_color=color, fill_opacity=0.7, **label_params).add_to(map_nyc) | |
else: | |
for row in data[:, cols].drop_nulls().sample(sample_size).rows(): | |
label = "" | |
lat, long = row[0], row[1] | |
for i, label_col in enumerate(label_cols): | |
label += label_col + ": " + str(row[2+i]) + "\n" | |
label_params = {"popup": label, "tooltip": label} if len(label_cols) > 0 else {} | |
folium.CircleMarker(location=[lat, long], radius=radius, weight=weight, color=color, fill_color=color, fill_opacity=0.7, **label_params).add_to(map_nyc) | |
fig.add_child(map_nyc) | |
return fig, map_nyc | |
def find_variable_data(soup, curr_var = "Created Date"): | |
src = "<!doctype html>" | |
# HTML and head start | |
src += "<html lang=\"en\">" | |
src += str(soup.find("head")) | |
# Body -> content -> container -> row -> variable | |
src += "<body style=\"background-color: var(--table-odd-background-fill); padding-top: 20px;\">" | |
src += "<div class=\"content\" style=\"padding-left: 150px; padding-right: 150px; border: 0px !important; \">" | |
# src += "<div class=\"container\">" | |
src += "<div class=\"section-items\" style=\"background-color: white;\">" | |
# src += "<div class=\"row spacing\">" | |
variables_html = soup.find_all("div", class_="variable") | |
for var_html in variables_html: | |
if var_html.text[:len(curr_var)] == curr_var: | |
parent = var_html.parent | |
parent['style'] = "border: 0px" | |
src += str(parent) | |
break | |
src += "</div></div>" | |
# Scripts | |
for script in soup.find_all("script"): | |
src += str(script) | |
# End | |
src += "</body>" | |
src += "</html>" | |
# src = BeautifulSoup(src, 'html.parser').prettify() | |
src_doc = html.escape(src) | |
iframe = f'<iframe width="100%" height="1200px" srcdoc="{src_doc}" frameborder="0"></iframe>' | |
return iframe, src_doc | |
def plot_autocorr(data, col, apply=None): | |
time_series = data.loc[:, col].to_frame().copy() | |
if apply: | |
time_series[col] = time_series[col].apply(apply) | |
fig, ax = plt.subplots(2, 1, figsize=(12, 8)) | |
_ = plot_acf(time_series[col], lags=30, ax=ax[0]) | |
_ = plot_pacf(time_series[col], lags=30, method="ols-adjusted", ax=ax[1]) | |
_ = plt.suptitle(f"{col}", y=0.95) | |
return fig | |
def adf_test(timeseries): | |
dftest = adfuller(timeseries, autolag='AIC') | |
dfoutput = pd.Series(dftest[0:4], index=['Test Statistic','p-value','Lags Used','Number of Observations Used']) | |
dfoutput['Number of Observations Used'] = dfoutput['Number of Observations Used'].astype(np.int64) | |
for key,value in dftest[4].items(): | |
dfoutput['Critical Value (%s)'%key] = value | |
return dfoutput | |
def kpss_test(timeseries): | |
kpsstest = kpss(timeseries, regression='ct') | |
kpss_output = pd.Series(kpsstest[0:3], index=['Test Statistic','p-value','Lags Used']) | |
for key,value in kpsstest[3].items(): | |
kpss_output['Critical Value (%s)'%key] = value | |
return kpss_output | |
def test_stationary(data, var): | |
adf_df = adf_test(data[var].dropna()) | |
kpss_df = kpss_test(data[var].dropna()) | |
result_df = adf_df.to_frame(name="Augmented-Dickey-Fuller") | |
result_df["KPSS Test"] = kpss_df | |
def pass_hypothesis(col): | |
test_stat, p_val = col.iloc[0], col.iloc[1] | |
one_p, five_p, ten_p = col.iloc[4], col.iloc[5], col.iloc[6] | |
if col.name == "KPSS Test": | |
if test_stat < one_p and p_val < 0.01: | |
color_fmt = ["background-color: #fc5749; font-weight: bold; color: black"] | |
elif test_stat < five_p and p_val < 0.05: | |
color_fmt = ["background-color: #F88379; font-weight: bold; color: black"] | |
elif test_stat < ten_p and p_val < 0.1: | |
color_fmt = ["background-color: #ff9f96; font-weight: bold; color: black"] | |
else: | |
color_fmt = ["background-color: green; font-weight: bold; color: black"] | |
else: | |
if test_stat < one_p and p_val < 0.01: | |
color_fmt = ["background-color: green; font-weight: bold; color: black"] | |
elif test_stat < five_p and p_val < 0.05: | |
color_fmt = ["background-color: greenyellow; font-weight: bold; color: black"] | |
elif test_stat < ten_p and p_val < 0.1: | |
color_fmt = ["background-color: lightgreen; font-weight: bold; color: black"] | |
else: | |
color_fmt = ["background-color: #fc5749; font-weight: bold; color: black"] | |
color_fmt.extend(['' for _ in col[1:]]) | |
return color_fmt | |
result_df.loc["Lags Used",:] = result_df.loc["Lags Used",:].astype(np.int32) | |
return result_df.style.apply(pass_hypothesis) | |
def plot_timeseries(data, var, data_name="My", all_vars=[], height=800, width=600, start_date="2017-12-31", end_date="2018-12-31"): | |
if var == "": | |
return gr.update() | |
fig = go.Figure() | |
fig.add_trace( | |
go.Scatter( | |
x=data.index, | |
y=data[var], | |
name=var, | |
customdata=np.dstack((data["Season"].to_numpy(), data.reset_index()["Datetime"].dt.day_name().to_numpy(), data["is_holiday"].astype(bool).to_numpy()))[0], | |
hovertemplate='<br>value:%{y:.3f} <br>Season: %{customdata[0]} <br>Weekday: %{customdata[1]} <br>Is Holiday: %{customdata[2]}', | |
) | |
) | |
fig.update_layout( | |
autosize=True, | |
title=f"{data_name} Time Series by {var}", | |
xaxis_title='Date', | |
yaxis_title=var, | |
hovermode='x unified', | |
) | |
fig.update_layout( | |
autosize=True, | |
xaxis=dict( | |
rangeselector=dict( | |
buttons=list([ | |
dict(count=7, label="1w", step="day", stepmode="backward"), | |
dict(count=21, label="3w", step="day", stepmode="backward"), | |
dict(count=1, label="1m", step="month", stepmode="backward"), | |
dict(count=6, label="6m", step="month", stepmode="backward"), | |
dict(count=1, label="1y", step="year", stepmode="backward"), | |
dict(step="all") | |
]) | |
), | |
rangeslider=dict( | |
visible=True, | |
# | |
), | |
type="date", | |
range=(start_date, end_date), | |
), | |
) | |
return fig | |
def plot_bivariate(data, x, y, subset=None, trendline=True): | |
title = f"Scatterplot of {x} vs. {y}" | |
if subset == "None" or subset is None: | |
subset = None | |
height = 450 | |
else: | |
subset_title = subset.replace(" String","") | |
title += f" By {subset_title}" | |
if subset_title in ["Season", "Year"]: | |
height = 450 | |
else: | |
height = 800 | |
if trendline: | |
trendline = "ols" | |
else: | |
trendline = None | |
# Special case to view categorical features | |
if x in ["Agency", "Borough", "Descriptor"]: | |
if x == "Agency": | |
prefix = 'AG' | |
elif x == "Borough": | |
prefix = "Borough" | |
else: | |
prefix="DG" | |
categories = [col for col in data.columns if prefix in col] | |
melt_df = pd.melt(data, id_vars=["Target"], value_vars=categories) | |
fig = px.scatter( | |
melt_df, | |
x="value", | |
y="Target", | |
trendline=trendline, | |
facet_col="variable", | |
facet_col_wrap=4, | |
facet_col_spacing=0.05, | |
title=title | |
) | |
height = 800 | |
else: | |
fig = px.scatter( | |
data, | |
x=x, y=y, | |
trendline=trendline, | |
facet_col=subset, | |
facet_col_wrap=4, | |
facet_col_spacing=0.05, | |
title=title | |
) | |
fig.update_layout( | |
autosize=True, | |
height=height, | |
) | |
return fig | |
def plot_seasonality(data, x, y, show_box=True, show_outliers=False): | |
title = f"{y} by {x}" | |
if show_box: | |
if show_outliers: | |
points = "outliers" | |
else: | |
points = "all" | |
fig = px.box(data, x=x, y=y, points=points, title=title, facet_col_wrap=4, facet_col_spacing=0.05,) | |
else: | |
fig = px.strip(data, x=x, y=y, title=title, facet_col_wrap=4, facet_col_spacing=0.05,) | |
fig.update_layout( | |
autosize=True, | |
height=600, | |
) | |
return fig | |
def build_service_data(filename): | |
# Loading data directly with polars leads to errors | |
# Some rows end up missing for an unknown reason | |
# FIX: Load in pandas then convert to polars | |
service_data_pd = pd.read_csv(filename) | |
# Quick test to assure the unique key is in fact unique | |
assert service_data_pd["Unique Key"].nunique() == len(service_data_pd) | |
# Load from pandas Dataframe | |
service_data_pd["Incident Zip"] = service_data_pd["Incident Zip"].astype("string") | |
service_data_pd["BBL"] = service_data_pd["BBL"].astype("string") | |
service_data = pl.DataFrame(service_data_pd) | |
# Clear some ram | |
del service_data_pd | |
gc.collect() | |
drop_cols = [ | |
"Unique Key", "Agency Name", "Location Type", "Incident Zip", | |
"Incident Address", "Street Name", "Cross Street 1", | |
"Cross Street 2", "Intersection Street 1", "Intersection Street 2", | |
"Address Type", "City", "Landmark", "Facility Type", | |
"Status", "Due Date", "Resolution Description", | |
"Resolution Action Updated Date", "Community Board", | |
"BBL", "X Coordinate (State Plane)", "Y Coordinate (State Plane)", | |
"Open Data Channel Type", "Park Facility Name", "Park Borough", | |
"Vehicle Type", "Taxi Company Borough", "Taxi Pick Up Location", | |
"Bridge Highway Name", "Bridge Highway Direction", "Road Ramp", | |
"Bridge Highway Segment", "Location", "Created Year" | |
] | |
# Drop columns and create the date variable | |
service_data = service_data.drop(drop_cols) | |
service_data = create_datetime(service_data, "Created Date") | |
service_data = create_datetime(service_data, "Closed Date") | |
# Group by date to get the number of Created tickets (as target) | |
sd_grouped = service_data.rename({"Created Date": "Datetime"}).group_by("Datetime").agg( | |
pl.len().alias("Target"), | |
).sort(by="Datetime") | |
# Calculate the number of closed tickets | |
# Mean diff used to filter service data | |
# mean_diff = service_data.with_columns( | |
# diff_created_closed = pl.col("Closed Date") - pl.col("Created Date") | |
# ).filter((pl.col("Closed Date").dt.year() >= 2016) & (pl.col("Closed Date").dt.year() < 2020))["diff_created_closed"].mean().days | |
# Mean diff precalculated as | |
mean_diff = 13 | |
# Create new Closed date with errors filled using the mean diff above | |
service_data = service_data.with_columns( | |
Closed_Date_New = pl.when(pl.col("Created Date") - pl.col("Closed Date") > pl.duration(days=1)) | |
.then(pl.col("Created Date") + pl.duration(days=mean_diff)) | |
.otherwise(pl.col("Closed Date")).fill_null(pl.col("Created Date") + pl.duration(days=mean_diff)) | |
) | |
# Filter tickets such that the closed date < the created date to prevent future data leakage in our dataset | |
# We want to make sure future data is not accidentally leaked across other points in our data | |
closed_tickets = service_data.group_by(["Closed_Date_New", "Created Date"]) \ | |
.agg((pl.when(pl.col("Created Date") <= pl.col("Closed_Date_New")).then(1).otherwise(0)).sum().alias("count")) \ | |
.sort("Closed_Date_New") \ | |
.filter((pl.col("Closed_Date_New").dt.year() >= 2016) & (pl.col("Closed_Date_New").dt.year() < 2019)) \ | |
.group_by("Closed_Date_New").agg(pl.col("count").sum().alias("num_closed_tickets")) | |
# Rename this column to num closed tickets | |
ct_df = closed_tickets.with_columns( | |
pl.col("num_closed_tickets") | |
) | |
# Concat the new columns into our data | |
sd_df = pl.concat([sd_grouped, ct_df.drop("Closed_Date_New")], how="horizontal") | |
assert len(sd_grouped) == len(ct_df) | |
# CATEGORICAL FEATURE MAPPING | |
# MAPPING FOR BOROUGH | |
Borough_Map = { | |
"Unspecified": "OTHER", | |
"2017": "OTHER", | |
None: "OTHER", | |
"2016": "OTHER" | |
} | |
service_data = service_data.with_columns( | |
pl.col("Borough").replace(Borough_Map) | |
) | |
# MAPPING FOR AGENCY | |
# This mapping was done Manually | |
Agency_Map = { | |
"NYPD": "Security", "HPD": "Buildings", "DOT": "Transportation", | |
"DSNY": "Environment & Sanitation", "DEP": "Environment & Sanitation", | |
"DOB": "Buildings", "DOE": "Buildings", "DPR": "Parks", | |
"DOHMH": "Health", "DOF": "Other", "DHS": "Security", | |
"TLC": "Transportation", "HRA": "Other", "DCA": "Other", | |
"DFTA": "Other", "EDC": "Other", "DOITT": "Other", "OMB": "Other", | |
"DCAS": "Other", "NYCEM": "Other", "ACS": "Other", "3-1-1": "Other", | |
"TAX": "Other", "DCP": "Other", "DORIS": "Other", "FDNY": "Other", | |
"TAT": "Other", "COIB": "Other", "CEO": "Other", "MOC": "Other", | |
} | |
service_data = service_data.with_columns( | |
pl.col("Agency").replace(Agency_Map).alias("AG") # AG Shorthand for Agency Groups | |
) | |
# Mapping for Descriptor using BERTopic | |
# Store descriptors as pandas dataframe (polars not supported) | |
# Drop any nan values, and we only care about the unique values | |
descriptor_docs = service_data["Descriptor"].unique().to_numpy() | |
# Build our topic mapping using the pretrained BERTopic model | |
# Load model and get predictions | |
topic_model = BERTopic.load("models/BERTopic") | |
topics, probs = topic_model.transform(descriptor_docs) | |
# Visualize if wanted | |
# topic_model.visualize_barchart(list(range(-1,6,1))) | |
# Create a topic to ID map | |
topic_df = topic_model.get_topic_info() | |
topic_id_map = {row["Topic"]: row["Name"][2:] for _, row in topic_df.iterrows()} | |
topic_id_map[-1] = topic_id_map[-1][1:] # Fix for the -1 topic case | |
# For each document (descriptor string) get a mapping of topics | |
doc_to_topic_map = defaultdict(str) | |
for topic_id, doc in zip(topics, descriptor_docs): | |
topic = topic_id_map[topic_id] | |
doc_to_topic_map[doc] = topic | |
service_data = service_data.with_columns( | |
pl.col("Descriptor").replace(doc_to_topic_map).alias("DG") # DG Shorthand for descriptor Groups | |
) | |
# One Hot Encode Features | |
cat_features = ["AG", "Borough", "DG"] | |
service_data = service_data.to_dummies(columns=cat_features) | |
# Group by Date and create our Category Feature Vector | |
cat_df = service_data.rename({"Created Date": "Datetime"}).group_by("Datetime").agg( | |
# Categorical Features Sum | |
pl.col('^AG_.*$').sum(), | |
pl.col('^Borough_.*$').sum(), | |
pl.col('^DG_.*$').sum(), | |
).sort(by="Datetime") | |
# Concat our category features to our current dataframe | |
sd_df = pl.concat([sd_df, cat_df.drop("Datetime")], how="horizontal") | |
# Now that our dataframe is significantly reduced in size | |
# We can finally convert back to a pandas dataframe | |
# as pandas is usable across more python packages | |
sd_df = sd_df.to_pandas() | |
# Set index to datetime | |
sd_df = sd_df.set_index("Datetime") | |
# NOTE we added 7 new rows to our weather df | |
# These 7 new rows will essentially be our final pred set | |
# The Target for these rows will be null -> indicating it needs to be predicted | |
# Add these rows to the service dataframe | |
preds_df = pd.DataFrame({'Datetime': pd.date_range(start=sd_df.index[-1], periods=8, freq='D')})[1:] | |
sd_df = pd.concat([sd_df, preds_df.set_index("Datetime")], axis=0) | |
return sd_df | |
# Build all weather data from file | |
def build_weather_data(filename): | |
# Use pandas to read file | |
weather_data = pd.read_csv(filename) | |
# Quickly aggregate Year, Month, Day into a datetime object | |
# This is because the 311 data uses datetime | |
weather_data["Datetime"] = weather_data["Year"].astype("str") + "-" + weather_data["Month"].astype("str") + "-" + weather_data["Day"].astype("str") | |
weather_data = create_datetime(weather_data, "Datetime", format="%Y-%m-%d") | |
# LOCALIZE | |
# Pre-recorded min/max values from the service data (so we don't need again) | |
lat_min = 40.49804421521046 | |
lat_max = 40.91294056699566 | |
long_min = -74.25521082506387 | |
long_max = -73.70038354802529 | |
# Create the conditions for location matching | |
mincon_lat = weather_data["Latitude"] >= lat_min | |
maxcon_lat = weather_data["Latitude"] <= lat_max | |
mincon_long = weather_data["Longitude"] >= long_min | |
maxcon_long = weather_data["Longitude"] <= long_max | |
# Localize our data to match the service data | |
wd_localized = weather_data.loc[mincon_lat & maxcon_lat & mincon_long & maxcon_long] | |
drop_cols = [ | |
"USAF", | |
"WBAN", | |
"StationName", | |
"State", | |
"Latitude", | |
"Longitude" | |
] | |
wd_localized = wd_localized.drop(columns=drop_cols) | |
# AGGREGATE | |
# Map columns with aggregation method | |
mean_cols = [ | |
'MeanTemp', | |
'DewPoint', | |
'Percipitation', | |
'WindSpeed', | |
'Gust', | |
'SnowDepth', | |
] | |
min_cols = [ | |
'MinTemp' | |
] | |
max_cols = [ | |
'MaxTemp', | |
'MaxSustainedWind' | |
] | |
round_cols = [ | |
'Rain', | |
'SnowIce' | |
] | |
# Perform Aggregation | |
mean_df = wd_localized.groupby("Datetime")[mean_cols].mean() | |
min_df = wd_localized.groupby("Datetime")[min_cols].min() | |
max_df = wd_localized.groupby("Datetime")[max_cols].max() | |
round_df = wd_localized.groupby("Datetime")[round_cols].mean().round().astype(np.int8) | |
wd_full = pd.concat([mean_df, min_df, max_df, round_df], axis=1) | |
# Add seasonal features | |
wd_full = build_temporal_features(wd_full, "Datetime") | |
wd_full["Season"] = wd_full["Season"].astype("category") | |
wd_full = wd_full.set_index("Datetime") | |
# We will calculate the imputation for the next 7 days after 12/31/2018 | |
# Along with the 49 missing days | |
# This will act as our "Weather Forecast" | |
time_steps = 49 + 7 | |
# Impute Cols | |
impute_cols = [ | |
'MeanTemp', 'MinTemp', 'MaxTemp', 'DewPoint', | |
'Percipitation', 'WindSpeed', 'MaxSustainedWind', | |
'Gust', 'Rain', 'SnowDepth', 'SnowIce', | |
] | |
# Mean Vars | |
mean_vars = ["WindSpeed", "MaxSustainedWind", "Gust", "SnowDepth"] | |
min_vars = ["SnowIce", "MeanTemp", "MinTemp", "MaxTemp", "DewPoint", "Percipitation"] | |
max_vars = ["Rain"] | |
# Use the imported function to create the imputed data | |
preds_mean = impute_missing_weather(wd_full, strategy="mean", time_steps=time_steps, impute_cols=mean_vars) | |
preds_min = impute_missing_weather(wd_full, strategy="min", time_steps=time_steps, impute_cols=min_vars) | |
preds_max = impute_missing_weather(wd_full, strategy="max", time_steps=time_steps, impute_cols=max_vars) | |
all_preds = pd.concat([preds_mean, preds_min, preds_max], axis=1) | |
all_preds = build_temporal_features(all_preds.loc[:, impute_cols], "Datetime") | |
all_preds = all_preds.set_index("Datetime") | |
wd_curr = wd_full.loc[wd_full["Year"] >= 2016] | |
wd_df = pd.concat([wd_full, all_preds], axis=0, join="outer") | |
time_vars = ["Year", "Month", "Day", "DayOfWeek", "DayOfYear", "is_weekend", "is_holiday", "Season"] | |
wd_df.drop(columns=time_vars) | |
return wd_df | |
class MyNaiveImputer(): | |
def __init__(self, data, time_steps=49, freq="D"): | |
self.data = data.reset_index().copy() | |
start_date = self.data["Datetime"].max() + pd.Timedelta(days=1) | |
end_date = start_date + pd.Timedelta(days=time_steps-1) | |
missing_range = pd.date_range(start_date, end_date, freq="D") | |
self.missing_df = pd.DataFrame(missing_range, columns=["Datetime"]) | |
self.missing_df = build_temporal_features(self.missing_df, "Datetime") | |
def impute(self, col, by="DayOfYear", strategy="mean"): | |
def naive_impute_by(val, impute_X, data, by=by, strategy=strategy): | |
if strategy.lower() == "mean": | |
func = pd.core.groupby.DataFrameGroupBy.mean | |
elif strategy.lower() == "median": | |
func = pd.core.groupby.DataFrameGroupBy.median | |
elif strategy.lower() == "max": | |
func = pd.core.groupby.DataFrameGroupBy.max | |
elif strategy.lower() == "min": | |
func = pd.core.groupby.DataFrameGroupBy.min | |
grouped = func(data.groupby(by)[impute_X]) | |
return grouped[val] | |
return self.missing_df["DayOfYear"].apply(naive_impute_by, args=(col, self.data, by, strategy)) | |
def impute_all(self, cols, by="DayOfYear", strategy="mean"): | |
output_df = self.missing_df.copy() | |
for col in cols: | |
output_df[col] = self.impute(col, by, strategy) | |
return output_df | |
def impute_missing_weather(data, strategy="mean", time_steps=7, impute_cols=impute_cols): | |
final_imputer = MyNaiveImputer(data, time_steps=time_steps) | |
preds = final_imputer.impute_all(impute_cols, strategy=strategy).set_index("Datetime") | |
return preds | |
def get_feature_importance(data, target, split_date="01-01-2016", print_score=False): | |
import torch | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
train = data.loc[data.index <= pd.to_datetime(split_date)] | |
test = data.loc[data.index > pd.to_datetime(split_date)] | |
if type(target) == str: | |
X_train, X_test = train.drop(columns=target), test.drop(columns=target) | |
y_train, y_test = train[target], test[target] | |
else: | |
X_train, X_test = train, test | |
y_train, y_test = target.loc[train.index], target.loc[test.index] | |
target = str(target.name) | |
if 'int' in y_train.dtype.name: | |
# Use binary Classifier | |
metric = "logloss" | |
model = xgb.XGBClassifier( | |
base_score=0.25, | |
n_estimators=500, | |
early_stopping_rounds=50, | |
objective='binary:logistic', | |
device=device, | |
max_depth=3, | |
learning_rate=0.01, | |
enable_categorical=True, | |
eval_metric="logloss", | |
importance_type="gain", | |
random_state=22, | |
) | |
else: | |
metric = "MAPE" | |
model = xgb.XGBRegressor( | |
n_estimators=500, | |
early_stopping_rounds=50, | |
objective='reg:squarederror', | |
device=device, | |
max_depth=3, | |
learning_rate=0.01, | |
enable_categorical=True, | |
eval_metric="mape", | |
importance_type="gain", | |
random_state=22, | |
) | |
_ = model.fit( | |
X_train, y_train, | |
eval_set=[(X_train, y_train), (X_test, y_test)], | |
verbose=False | |
) | |
fig, ax = plt.subplots() | |
ax = plot_importance(model, title=f"Feature Importance for {target}", ax=ax) | |
if print_score: | |
best_score = str(round(100*model.best_score,2))+"%" | |
print(f"Best {metric}: {best_score}") | |
return fig, model | |
def corr_with_lag(data, target_col, covar, lags=[1], method="pearson"): | |
data_lagged = pd.DataFrame() | |
data_lagged["Target"] = data[target_col] | |
for lag in lags: | |
new_col = f"lag_{lag}D" | |
data_lagged[new_col] = data[covar].shift(lag) | |
return data_lagged.dropna().corr(method=method) | |
def plot_correlations(data, target, covar, lags=[0,1,2,3,4,5,6,7,10,14,18,21], method="pearson"): | |
df_corr = corr_with_lag(data, target, covar, lags, method) | |
mask = np.triu(np.ones_like(df_corr, dtype=bool)) | |
z_dim, x_dim = len(df_corr.to_numpy()), len(df_corr.columns) | |
y_dim = x_dim | |
fig = ff.create_annotated_heatmap( | |
z=df_corr.mask(mask).to_numpy(), | |
x=df_corr.columns.tolist(), | |
y=df_corr.columns.tolist(), | |
colorscale=px.colors.diverging.RdBu, | |
zmin=-1, | |
zmax=1, | |
ygap=2, | |
xgap=2, | |
name="", | |
customdata=np.full((x_dim, y_dim, z_dim), covar), | |
hovertemplate='%{customdata[0]}<br>%{x} to %{y}<br>Correlation: %{z:.4f}', | |
showscale=True | |
) | |
fig.update_layout( | |
title_text=f"Correlation Heatmap of Lagged {covar}", | |
title_x=0.5, | |
height=600, | |
xaxis_showgrid=False, | |
yaxis_showgrid=False, | |
xaxis_zeroline=False, | |
yaxis_zeroline=False, | |
yaxis_autorange='reversed', | |
template='plotly_white' | |
) | |
# fig.update_annotations(font=dict(color="black")) | |
for i in range(len(fig.layout.annotations)): | |
if fig.layout.annotations[i].text == 'nan': | |
fig.layout.annotations[i].text = "" | |
else: | |
corr_i = round(float(fig.layout.annotations[i].text), 3) | |
fig.layout.annotations[i].text = corr_i | |
if (corr_i > 0.2 and corr_i < 0.5) or (corr_i < -0.2 and corr_i > -0.5): | |
fig.layout.annotations[i].font.color = "white" | |
return fig | |
def plot_all_correlations(data, data_name="weather", method="pearson", width=1392, height=600): | |
if data_name == "weather": | |
covars = ["MeanTemp", "MinTemp", "MaxTemp", 'DewPoint', 'Percipitation', 'WindSpeed', 'Gust', 'MaxSustainedWind', "SnowDepth", "SnowIce", "Rain", "Target"] | |
elif data_name == "service": | |
covars = [ | |
"num_closed_tickets", | |
# Agency Group Counts | |
'AG_Buildings', 'AG_Environment & Sanitation', 'AG_Health', | |
'AG_Parks', 'AG_Security', 'AG_Transportation', | |
'AG_Other', | |
# Borough Counts | |
'Borough_BRONX', 'Borough_BROOKLYN', 'Borough_MANHATTAN', | |
'Borough_QUEENS', 'Borough_STATEN ISLAND', | |
'Borough_OTHER', | |
# Descriptor Group Counts | |
'DG_damaged_sign_sidewalk_missing', | |
'DG_english_emergency_spanish_chinese', | |
'DG_exemption_commercial_tax_business', | |
'DG_license_complaint_illegal_violation', 'DG_noise_animal_truck_dead', | |
'DG_odor_food_air_smoke', 'DG_order_property_inspection_condition', | |
'DG_water_basin_litter_missed', "Target" | |
] | |
df_corr = data.loc[:, covars].corr(method=method) | |
mask = np.triu(np.ones_like(df_corr, dtype=bool)) | |
fig = ff.create_annotated_heatmap( | |
z=df_corr.mask(mask).to_numpy(), | |
x=df_corr.columns.tolist(), | |
y=df_corr.columns.tolist(), | |
colorscale=px.colors.diverging.RdBu, | |
zmin=-1, | |
zmax=1, | |
ygap=2, | |
xgap=2, | |
name="", | |
hovertemplate='%{x}-%{y} <br>Correlation: %{z:.4f}', | |
showscale=True | |
) | |
fig.update_layout( | |
title_text=f"Correlation Heatmap of Weather Variables & Target", | |
title_x=0.5, | |
height=600, | |
width=width, | |
xaxis_showgrid=False, | |
yaxis_showgrid=False, | |
xaxis_zeroline=False, | |
yaxis_zeroline=False, | |
yaxis_autorange='reversed', | |
template='plotly_white' | |
) | |
fig.update_annotations(font=dict(color="black")) | |
for i in range(len(fig.layout.annotations)): | |
if fig.layout.annotations[i].text == 'nan': | |
fig.layout.annotations[i].text = "" | |
else: | |
corr_i = round(float(fig.layout.annotations[i].text), 3) | |
fig.layout.annotations[i].text = corr_i | |
if corr_i > 0.5 or corr_i < -0.5: | |
fig.layout.annotations[i].font.color = "white" | |
return fig | |
def plot_gust_interpolation(data): | |
fig, ax = plt.subplots(2, 2, figsize=(15,12)) | |
data["Gust_lin"].plot(ax=ax[0][0], color=color_pal[0], title="linear") | |
data["Gust_spline3"].plot(ax=ax[0][1], color=color_pal[1], title="spline3") | |
data["Gust_spline5"].plot(ax=ax[1][0], color=color_pal[2], title="spline5") | |
data["Gust_quad"].plot(ax=ax[1][1], color=color_pal[3], title="quadratic") | |
curr_fig = plt.gcf() | |
plt.close() | |
return curr_fig | |
def plot_train_split(train, val): | |
fig = plt.subplots(figsize=(15, 5)) | |
ax = train["Target"].plot(label="Training Set") | |
val["Target"].plot(label="Validation Set", ax=ax) | |
ax.axvline('2018-04-01', color='black', ls='--') | |
ax.legend() | |
ax.set_title("Train Test Split (2018-04-01)") | |
curr_fig = plt.gcf() | |
plt.close() | |
return curr_fig | |
def plot_predictions(train, val, preds): | |
fig = plt.subplots(figsize=(16, 5)) | |
ax = train["Target"].plot(label="Training Set") | |
val["Target"].plot(label="Validation Set", ax=ax) | |
val["Prediction"] = preds | |
val["Prediction"].plot(label="Prediction", ax=ax) | |
ax.axvline('2018-04-01', color='black', ls='--') | |
ax.legend() | |
ax.set_title("Model Prediction for 311 Call Volume") | |
curr_fig = plt.gcf() | |
plt.close() | |
return curr_fig | |
def plot_final_feature_importance(model): | |
fig, ax = plt.subplots(figsize=(12,6)) | |
ax = plot_importance(model, max_num_features=20, title=f"Feature Importance for 311 Service Calls", ax=ax) | |
curr_fig = plt.gcf() | |
plt.close() | |
return curr_fig | |
def predict_recurse(dataset, test, model, features_to_impute=['Target_L1D', 'Target_Diff7D', 'Target_Diff14D'], last_feature='Target_L6D'): | |
n_steps = len(test) | |
merged_data = pd.concat([dataset[-14:], test], axis=0) | |
all_index = merged_data.index | |
X_test = test.drop(columns="Target") | |
sd = -6 # Starting point for filling next value | |
# For each step, get the predictions | |
for i in range(n_steps-1): | |
pred = model.predict(X_test)[i] | |
# For the three features needed, compute the new value | |
X_test.loc[all_index[sd+i], features_to_impute[0]] = pred | |
X_test.loc[all_index[sd+i], features_to_impute[1]] = pred - merged_data.loc[all_index[sd+i-7], features_to_impute[1]] | |
X_test.loc[all_index[sd+i], features_to_impute[2]] = pred - merged_data.loc[all_index[sd+i-14], features_to_impute[2]] | |
# In the last iteration compute the Lag6D value | |
if i == 5: | |
X_test.loc[all_index[sd+i], last_feature] = pred - merged_data.loc[all_index[sd+i-6], last_feature] | |
final_preds = model.predict(X_test) | |
return final_preds | |