File size: 4,698 Bytes
1756d68 2703fdd e77b07f 2703fdd e69ab4e 1756d68 2703fdd 1756d68 2703fdd 1756d68 47ad458 1756d68 61f9cd0 2703fdd 61f9cd0 83f6dc4 2703fdd 0f72ff6 b55063e 2703fdd 61f9cd0 2703fdd 61f9cd0 2703fdd e77b07f 2703fdd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
import os
from datetime import datetime
import json
import pandas as pd
from datasets import Dataset, DatasetDict, load_dataset, DownloadMode
from huggingface_hub import login
from utilities.data_processing import data_processing
from utilities.my_logger import setup_logger
from utilities.praw_downloader import praw_downloader
from utilities.praw_processor import preprocess_praw_data
# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/dataset-creator-reddit-{subreddit}"
frequency = os.environ.get("FREQUENCY", '').lower()
if frequency not in ["daily", "hourly"]:
raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'")
# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HF_TOKEN"]
login(auth_token, add_to_git_credential=True)
logger = setup_logger(__name__)
# Dummy row for when we create a new repo make sure to put everything in a list
dummy_data = {
"id": ['id'],
"content": ["This is a sample post content. Just for demonstration purposes!"],
"poster": ["sampleUser123"],
"date_utc": [datetime.strptime("2023-10-26 14:30:45", '%Y-%m-%d %H:%M:%S')],
"flair": ["Discussion"],
"title": ["Sample Post Title: How to Use Hugging Face?"],
"score": [457],
"permalink": ["/r/sampleSubreddit/comments/sampleID/sample_post_title_how_to_use_hugging_face/"],
"updated": [False],
"new": [False],
"nsfw": [False]
}
def load_or_create_dataset():
"""
Loads an existing dataset from the Hugging Face hub or creates a new one if it doesn't exist.
This function attempts to load a dataset specified by 'dataset_name'. If the dataset is not found,
it creates a new dataset with 'dummy_data', pushes it to the Hugging Face hub, and then reloads it.
After reloading, the dummy data is removed from the dataset.
Returns:
dataset (DatasetDict): The loaded or newly created dataset.
Raises:
FileNotFoundError: If the dataset cannot be loaded or created.
"""
# Load the existing dataset from the Hugging Face hub or create a new one
try:
logger.debug(f"Trying to download {dataset_name}")
dataset = load_dataset(dataset_name, download_mode=DownloadMode.FORCE_REDOWNLOAD)
logger.debug("Loading existing dataset")
except FileNotFoundError:
logger.warning("Creating new dataset")
# Creating Initial Repo
dataset = DatasetDict()
dataset['train'] = Dataset.from_dict(dummy_data)
dataset.push_to_hub(repo_id=dataset_name, token=auth_token)
# Pulling from Initial Repo
dataset = load_dataset(dataset_name)
# Remove dummy data
del dataset['train']
return dataset
def merge_data(old_df: pd.DataFrame, new_df: pd.DataFrame) -> pd.DataFrame:
"""
Merges two dataframes, sorts them by 'date_utc', and marks new IDs.
The function first marks rows from the new dataframe, then concatenates the old and new dataframes.
It sorts the resulting dataframe by the 'date_utc' column. Rows from the new dataframe that are not
in the old dataframe are marked as 'new'.
Args:
- old_df (pd.DataFrame): The original dataframe.
- new_df (pd.DataFrame): The new dataframe to be merged with the original dataframe.
Returns:
- pd.DataFrame: The merged, sorted, and marked dataframe.
"""
old_df.drop(columns=['new', 'updated'], inplace=True)
# Concatenate old and new dataframes, sort by 'date_utc', and reset index
df = pd.concat([old_df, new_df], ignore_index=True).sort_values(by='date_utc').reset_index(drop=True)
# Process data accordingly
df = data_processing(df)
# Identify new rows (present in new_df but not in old_df)
df['new'] = df['id'].apply(lambda x: x in set(new_df['id']) - set(old_df['id']))
return df
def remove_filtered_rows(df: pd.DataFrame) -> pd.DataFrame:
"""
Removes rows from the DataFrame where the 'id' is present in filter_ids.json.
:param df: Input DataFrame to be filtered.
:return: DataFrame with rows containing IDs present in filter_ids.json removed.
"""
# Load filter IDs from JSON file
with open('filter_ids.json', 'r') as file:
filter_ids = json.load(file)
# Remove the rows with IDs present in filter_ids
filtered_df = df[~df['id'].isin(filter_ids)]
logger.info(f"Filtered {len(df) - len(filtered_df)} rows from the DataFrame")
return filtered_df
def get_latest_data():
submissions = praw_downloader()
df = preprocess_praw_data(submissions=submissions)
return df
|