File size: 2,680 Bytes
749d1d8 285612d 749d1d8 285612d ed3130d 749d1d8 32235fd 285612d 32235fd 749d1d8 1b56724 4469d41 1b56724 285612d cdbb4c0 b65cbe6 285612d cdbb4c0 285612d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
import os
import time
from datetime import datetime, timedelta
import pandas as pd
import schedule
from datasets import DatasetDict, load_dataset, Dataset
from huggingface_hub import login
from utilities.data_collator import merge_and_filter_data
from utilities.my_logger import setup_logger
from utilities.readme_update import update_readme
# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/dataset-creator-reddit-{subreddit}"
dataset_readme_path = "README.md"
# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"]
login(auth_token, add_to_git_credential=True)
logger = setup_logger(__name__)
def get_dataset():
# Load the existing dataset from the Hugging Face hub or create a new one
try:
dataset = load_dataset(dataset_name, download_mode="reuse_cache_if_exists", ignore_verifications=True)
logger.debug("Loading existing dataset")
if "__index_level_0__" in dataset["train"].column_names:
dataset = dataset.remove_columns(["__index_level_0__"])
except FileNotFoundError:
logger.warning("Creating new dataset")
dataset = DatasetDict()
return dataset
def main():
date = datetime.now().strftime('%Y-%m-%d')
logger.warning(f"Running main function for date: {date}")
dataset = get_dataset()
# Get Latest Data and merge with historic data
old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame()
new_df = merge_and_filter_data(old_df=old_df)
dataset['train'] = Dataset.from_pandas(new_df, preserve_index=False)
# Update README
new_rows = len(new_df) - len(old_df)
update_readme(dataset_name=dataset_name, subreddit=subreddit, latest_date=date, new_rows=new_rows)
logger.info(f"Adding {new_rows} rows for {date}.")
# Push the augmented dataset to the Hugging Face hub
logger.debug(f"Pushing data for {date} to the Hugging Face hub")
dataset.push_to_hub(dataset_name, token=auth_token)
logger.info(f"Processed and pushed data for {date} to the Hugging Face Hub")
def schedule_daily_task():
"""
Schedule the daily_task to run at the specific time every day.
"""
# start_time = (datetime.now() + timedelta(minutes=1)).time().strftime('%H:%M') # Now + 30 seconds
start_time = '05:00'
logger.info(f'Scheduling tasks to run every day at: {start_time}')
schedule.every().day.at(start_time).do(main)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
schedule_daily_task()
|