MESReport / utils /mes_player_model.py
ChenyuRabbitLove's picture
fix: modify unregistered user adventure log
ce12623
raw
history blame
7.66 kB
import json
import logging
import hashlib
from dataclasses import dataclass, field, InitVar, asdict
from typing import List, Dict, Union
from datetime import datetime, date
import pandas as pd
from google.cloud import bigquery
@dataclass
class Player:
player_backend_user_id: str
player_nickname: str = field(default=None)
player_group: int = field(default=None)
partners: List[str] = field(default_factory=list)
badges: List[str] = field(default_factory=list)
rewards_status: Union[str, Dict[str, Dict[str, Union[bool, str]]]] = field(
default_factory=dict
)
adventure_logs: List[str] = field(default_factory=list)
total_active_days: int = field(default=0)
total_gained_scores: int = field(default=0)
total_finished_contents: int = field(default=0)
finished_videos_before_campaign: List[str] = field(default_factory=list)
created_at_date: datetime.date = field(default_factory=date.today)
updated_at_date: datetime.date = field(default_factory=date.today)
available_achievements: InitVar[list] = field(default=None)
init: InitVar[bool] = field(default=False)
def __post_init__(self, available_achievements: List, init: bool):
# If is_init is True, it means that the player is newly created
# and the rewards_status is not yet initialized
if init:
self.assign_group()
self.add_adventure_log(
"你並未參與這次與狐貍貓的冒險,請在下次活動中參與冒險吧!"
)
if available_achievements is not None:
self.rewards_status.update(
{
achievement_key: {
"is_completed": False,
"is_issued": False,
}
for achievement_key in available_achievements
}
)
self.serialize_rewards_status()
else:
self.badges = self.badges.tolist()
self.partners = self.partners.tolist()
self.adventure_logs = self.adventure_logs.tolist()
self.finished_videos_before_campaign = (
self.finished_videos_before_campaign.tolist()
)
self.assign_weekly_partner()
# Serialization and Deserialization is for content type of rewards_status
# rewards_status have to be serialized before inserting into BigQuery
def serialize_rewards_status(self):
self.rewards_status = json.dumps(self.rewards_status)
def deserialize_rewards_status(self):
self.rewards_status = json.loads(self.rewards_status)
def add_partner(self, partner: str):
if partner and partner not in self.partners:
self.partners.append(partner)
logging.info(
f"Player {self.player_backend_user_id} has a new partner: {partner}"
)
def add_badge(self, badge: str):
if badge and badge not in self.badges:
self.badges.append(badge)
logging.info(
f"Player {self.player_backend_user_id} earned a new badge: {badge}"
)
def add_adventure_log(self, log: str):
self.adventure_logs.append(log)
logging.info(
f"Player {self.player_backend_user_id} has a new adventure log: {log}"
)
def update_total_gained_scores(self, gained_scores: int):
self.total_gained_scores = gained_scores
def update_total_finished_contents(self, finished_contents_count: int):
self.total_finished_contents = finished_contents_count
def update_total_active_days(self, active_days: int):
self.total_active_days = active_days
def update_rewards_status(
self, key: str, value: bool, target=["is_completed", "is_issued"]
):
self.deserialize_rewards_status()
self.rewards_status[key][target] = value
self.serialize_rewards_status()
def hash_user_id(self):
hashed = hashlib.sha256(self.player_backend_user_id.encode()).hexdigest()
hash_int = int(hashed, 16)
return hash_int % 40
def assign_group(self):
self.player_group = self.hash_user_id()
partner_group = self.player_group // 10
group_base_partners = {
0: "phoenix_1",
1: "pegasus_1",
2: "dragon_1",
3: "griffin_1",
}
self.add_partner(group_base_partners[partner_group])
def assign_weekly_partner(self):
event_start_date = datetime(2023, 12, 4).date()
current_date = datetime.now().date()
weeks_elapsed = (current_date - event_start_date).days // 7
if weeks_elapsed not in range(0, 4):
return
group_base_partners = {
0: "phoenix",
1: "pegasus",
2: "dragon",
3: "griffin",
}
base_partner = group_base_partners[self.player_group // 10]
stage_partner = (
f"{base_partner}_{weeks_elapsed + 1}" # +1 to start from stage 1
)
if stage_partner not in self.partners:
self.add_partner(stage_partner)
def display_player_info(self):
logging.info(f"Player Backend User ID: {self.player_backend_user_id}")
logging.info(f"Player Group: {self.player_group}")
logging.info(f"Partners: {self.partners}")
logging.info(f"Badges: {self.badges}")
logging.info(f"Adventure Logs: {self.adventure_logs}")
logging.info(f"Rewards Status: {self.rewards_status}")
logging.info(f"Total Gained Scores: {self.total_gained_scores}")
logging.info(f"Total Finished Contents: {self.total_finished_contents}")
logging.info(f"Total Active Days: {self.total_active_days}")
logging.info(f"Created At: {self.created_at_date}")
logging.info(f"Updated At: {self.updated_at_date}")
def get_incomplete_rewards(self):
self.deserialize_rewards_status()
incomplete_rewards = list(
{
k: v for k, v in self.rewards_status.items() if not v["is_completed"]
}.keys()
)
self.serialize_rewards_status()
return incomplete_rewards
@staticmethod
def get_big_query_schema():
return [
bigquery.SchemaField("player_backend_user_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("player_group", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("partners", "STRING", mode="REPEATED"),
bigquery.SchemaField("badges", "STRING", mode="REPEATED"),
bigquery.SchemaField("rewards_status", "STRING", mode="REQUIRED"),
bigquery.SchemaField("adventure_logs", "STRING", mode="REPEATED"),
bigquery.SchemaField("total_active_days", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("total_gained_scores", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("total_finished_contents", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("created_at_date", "DATE", mode="REQUIRED"),
bigquery.SchemaField("updated_at_date", "DATE", mode="REQUIRED"),
]
@staticmethod
def from_dict(series: pd.Series) -> "Player":
data = series.copy()
return Player(**data)
def to_dict(self) -> Dict:
data = asdict(self)
# Convert datetime.date objects to string
for date_field in ["created_at_date", "updated_at_date"]:
if data.get(date_field):
data[date_field] = data[date_field].strftime("%Y-%m-%d")
return data