import os import time from datetime import datetime, timedelta import pandas as pd from datasets import Dataset, DatasetDict, load_dataset from huggingface_hub import login from my_logger import setup_logger from utilities.pushshift_data import scrape_submissions_by_day, submissions_to_dataframe # Set dataset name, path to README.md, and existing dataset details subreddit = os.environ["SUBREDDIT"] username = os.environ["USERNAME"] dataset_name = f"{username}/dataset-creator-{subreddit}" dataset_readme_path = "README.md" # Authenticate with Hugging Face using an auth token auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"] login(auth_token, add_to_git_credential=True) logger = setup_logger(__name__) def update_readme(dataset_name, subreddit, date_to_fetch): readme_text = f""" # {dataset_name} ## Dataset Overview The goal is to have an open dataset of `{subreddit}` submissions. This has been taken from the Pushshift API. ## Data Collection This has been collected with sequential calls that follow the pagination of the pushshift request. ## Data Structure - `all_days`: All the data after `{os.environ["START_DATE"]}` ## Update Frequency The dataset is updated daily and covers the period from `{os.environ["START_DATE"]}` to two days ago. ## Attribution Data sourced from the Pushshift API. ## Change Log
Click to expand - **{datetime.now().strftime('%Y-%m-%d')}:** Added data for {date_to_fetch} to the 'all_days' split and saved as CSV
""" return readme_text def main(date_to_fetch): """ Runs the main data processing function to fetch and process subreddit data for the specified date. Args: date_to_fetch (datetime.date): The date to fetch subreddit data for Returns: most_recent_date (str): Most recent date in dataset """ # Load the existing dataset from the Hugging Face hub or create a new one try: dataset = load_dataset(dataset_name, download_mode="reuse_cache_if_exists", ignore_verifications=True) logger.info("Loading existing dataset") if "__index_level_0__" in dataset["all_days"].column_names: dataset = dataset.remove_columns(["__index_level_0__"]) except FileNotFoundError: logger.info("Creating new dataset") dataset = DatasetDict() # Call get_subreddit_day with the calculated date logger.info(f"Fetching data for {str(date_to_fetch)}") submissions = scrape_submissions_by_day(subreddit, str(date_to_fetch)) df = submissions_to_dataframe(submissions) logger.info(f"Data fetched for {str(date_to_fetch)}") most_recent_date = start_date # Append DataFrame to split 'all_days' or create new split if "all_days" in dataset: logger.info("Appending data to split 'all_days'") # Merge the new submissions old_data = dataset['all_days'].to_pandas() new_data = pd.concat([old_data, df], ignore_index=True) if '__index_level_0__' in new_data.columns: new_data = new_data.drop('__index_level_0__', axis=1) # Drop duplicates just in case new_data = new_data.drop_duplicates(subset=['id'], keep="first") # Figure out dates when we restart old_data_most_recent_date = old_data['date'].max() most_recent_date = max(old_data_most_recent_date, most_recent_date) if len(old_data) == len(new_data): logger.warning("Data in hub is much more recent, using that next!") return most_recent_date # Convert back to dataset dataset["all_days"] = Dataset.from_pandas(new_data) else: logger.info("Creating new split 'all_days'") dataset["all_days"] = Dataset.from_pandas(df) # Log appending or creating split 'all' logger.info("Appended or created split 'all_days'") # Push the augmented dataset to the Hugging Face hub logger.info(f"Pushing data for {date_to_fetch} to the Hugging Face hub") readme_text = update_readme(dataset_name, subreddit, date_to_fetch) dataset.description = readme_text dataset.push_to_hub(dataset_name, token=auth_token) logger.info(f"Processed and pushed data for {date_to_fetch} to the Hugging Face Hub") return most_recent_date def run_main_continuously(): """ This function runs the given `main_function` continuously, starting from the date specified in the environment variable "START_DATE" until two days ago. Once it reaches two days ago, it will wait until tomorrow to start again at the same time as when it started today. """ start_date_str = os.environ.get("START_DATE") start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date() # Calculate the start time for running the main_function every day. start_time = datetime.now().time() while True: today = datetime.now().date() two_days_ago = today - timedelta(days=2) if start_date <= two_days_ago: logger.info(f"Running main function for date: {start_date}") most_recent_date = main(start_date) start_date = most_recent_date + timedelta(days=1) else: tomorrow = today + timedelta(days=1) now = datetime.now() start_of_tomorrow = datetime.combine(tomorrow, start_time) wait_until_tomorrow = (start_of_tomorrow - now).total_seconds() logger.info(f"Waiting until tomorrow: {wait_until_tomorrow} seconds") time.sleep(wait_until_tomorrow) if __name__ == '__main__': run_main_continuously()