import json import os import random from zipfile import ZipFile import zipfile import gradio as gr from config import * import shutil from PIL import Image#上传保存遮罩和比较遮罩width height能否和章节图片匹配的时候用 from taskMap import * def load_config(): return get_variables() class configData(): def __init__(self): self.data = load_config() def update(self): self.data = load_config() print(self.data) configData = configData()#复用实例 # 解压缩函数 def unzip_file(file_path, extract_path): with zipfile.ZipFile(file_path, 'r') as zip_ref: zip_ref.extractall(extract_path) # 上传并解压缩函数 #upload_and_unzip(file:"文件",extract_path:"解压路径")->"void 成功提示": def upload_and_unzip(file_obj, extract_path): print("执行上传解压函数") ##这样写如果新的多,那么替换为多的一方,如果新的少,也只会替换已经存在的部分--->符合预期想法 with ZipFile(file_obj.name) as zfile: zfile.extractall(extract_path) return "File uploaded and extracted successfully to "+extract_path #构造函数获取当前目录file list信息-不放config进行返回是为了避免引入旧值 def update_file_info(root_dir): info = {} for root, dirs, files in os.walk(root_dir): info[root] = { "directories": dirs, "files": files } for dir_name in dirs: update_file_info(os.path.join(root, dir_name)) return info def random_chapter(): # for root, info in configData.data['file_info'].items(): #手动更新避免引入旧值-成功 file_info = update_file_info(configData.data['current_dir']) configData.data['file_info'] = file_info for root, info in configData.data['file_info'].items(): if root.startswith(configData.data['manga_abs_dir']): for directory in info["directories"]: if any(file.endswith('.jpg') or file.endswith('.png') for file in os.listdir(os.path.join(root, directory))): chapter_path = os.path.join(root, directory) # 删除章节目录 print("章节目录是:",chapter_path) #获取标题 # 将路径转换为标题 folders = chapter_path.split(os.sep) if len(folders) >= 2: chapter_title = " ".join([folders[-2], folders[-1]]) # 获取倒数第二个和倒数第一个目录名作为标题 else: chapter_title = chapter_path # 如果目录层级不足2层,直接使用路径作为标题 # 获取标题 return chapter_title,chapter_path#返回章节标题和章节目录给任务-1批量顺序上传原始图片到服务器manga. return None,None def list_files(): return configData.data['file_info'] def save_mask(mask_file, output_dir): #上传遮罩保存到mask下面,名称随意 # 检查遮罩文件是否存在 # 构造保存文件的路径-保证linux和windows都能转义 save_path = os.path.join(output_dir, '0.jpg') # 创建目录(如果目录不存在) os.makedirs(output_dir, exist_ok=True) # 使用with open方式保存文件 mask_file_img = mask_file #这个是Image.open后的对象 mask_file_img.save(save_path) return "文件保存成功到:" + str(save_path) def update_config_json(newConfig:str): newConfig = json.loads(newConfig) #转换str为dict with open(configData.data['current_config_json'], "w") as file: file.write(json.dumps(newConfig, indent=4, ensure_ascii=False)) # 指定indent参数来保持JSON格式 #返回新的json文件内容 with open(configData.data['current_config_json'], "r") as f: content =f.read() # 读取文件内容 return content def is_asp_task_valid()->bool: # 启动状态检测 -> # 1config允许定时任务 2 mask目录下齐备 3 manga_all目录下有可用素材 4 mask的width height和素材匹配 # 5应当将config写入文件而不是py随时可以修改,以防ck账号被封等情况下不发送->同时删除不可用账号 -全部space中账号反馈不可用的时候停止定时任务 # 6应该避免任务扎堆进行,对ocr space造成负担 if configData.data['allow_scheduler'] == False: print("当前配置不允许定时任务,停止定时任务") return False chapter_title ,cur_chapter= random_chapter() if cur_chapter == None: print("当前没有可用章节了,停止定时任务") return False mask_file_path = os.path.join(configData.data['mask_dir'], '0.jpg') if not os.path.exists(mask_file_path): print("遮罩文件不存在,停止定时任务") return False # 读取遮罩图片长宽比较当前章节下面图片的长宽 mask_image = Image.open(mask_file_path) random_chapter_image_name = random.choice(os.listdir(cur_chapter)) random_chapter_image_path = os.path.join(cur_chapter, random_chapter_image_name) random_chapter_image = Image.open(random_chapter_image_path) if mask_image.size != random_chapter_image.size: print("遮罩大小和章节随机一个图片的大小不匹配,停止定时任务") return False return True #定时任务流程函数集合 ->返回None说明非正常执行情况 def run_asp_task(): if not is_asp_task_valid(): return None #遍历bili_spaces中的space url并执行日常任务 for space_url in configData.data["bili_spaces"]: process_task_list("0 1 2",space_url) #实现gradio接受上传压缩文件,解压到manga_all目录下面 with gr.Blocks() as mangaManager: #上传文件选择列表 file_selected = gr.File(label="待上传压缩章节",file_types=['.zip', '.tar', '.gz']) #上传zip的解压保存路径或mask的地址(必选) 根据需要选择 output_dir_gr = gr.Dropdown(label="上传zip的解压保存路径或mask的地址(必选)", choices=configData.data['default_values']) #压缩包上传按钮 file_upload_btn = gr.Button("开始上传") #获取到返回的结果-可以是上传的,可以是查看files信息,也可以是别的 someResult = gr.Textbox(label="获取按钮返回信息", type="text") # 设置按钮点击事件(调用上传解压函数,将压缩包内容解压到指定目录 默认是manga_all下面) file_upload_btn.click(fn=upload_and_unzip, inputs=[file_selected, output_dir_gr],outputs=someResult) #设置按钮查看json类型的listFiles信息 filesInfoBtn = gr.Button("查看files信息") filesInfoBtn.click(fn=list_files,outputs=someResult) #设置mask上传遮罩按钮保存遮罩到mask目录-因为直接重启space利用docker上传会触发定时任务,也许造成不好结果 # mask_selected = gr.inputs.Image(label="待上传遮罩", type='pil') mask_selected = gr.components.Image(label="待上传遮罩", type='pil')#代替inputs的新写法 markUploadBtn = gr.Button("上传遮罩mask") markUploadBtn.click(fn=save_mask,inputs=[mask_selected, output_dir_gr],outputs=someResult) #设置按钮关联新config输入进行更新操作 config_update_text = gr.Textbox(placeholder="输入新的JSON数据")#str类型 config_update_btn = gr.Button("更新config.json数据") config_update_btn.click(fn=update_config_json, inputs=[config_update_text], outputs=someResult) # 定义一个函数来处理输入的步骤转list然后执行,返回执行结果->taskResult/None # gradio输入0 1 2 3就行,会自动转"0 1 2 3"然后给process_task_list处理 def process_task_list(input_task_str:str = None,baseUrl:str =None): if not is_asp_task_valid(): print("任务基本条件不符合") return None if baseUrl is None: print("baseUrl不存在") return None if input_task_str is None: print("input_task_str不存在") return None chapter_title,cur_chapter_path = random_chapter() task_results = {}#保存任务执行结果 #临时变量记录是否上传了章节-上传了需要删除,而且只能在for循环任务结束后删除 manga_has_uploaded = False print(input_task_str)# "a b c" - >['a', 'b', 'c'] task_list = input_task_str.split(' ') for task in task_list: if task in task_functions: task_func = task_functions[task]#func task_param = tasks_params[task]#dict if "baseUrl" in task_param: task_param["baseUrl"] = baseUrl if "mask_path" in task_param: #使用后遮罩不删,否则维护成本太高 task_param["mask_path"] = configData.data['mask_abs_dir'] if "manga_path" in task_param: task_param["manga_path"] = cur_chapter_path#上传一个章节作为本次素材,保存到space的manga下面 print("上传了章节:",cur_chapter_path,"下面提交后删除") manga_has_uploaded = True #执行了这个任务就要删除该章节 if "bili_meta" in task_param: bili_meta_data["title"] = chapter_title task_param["bili_meta"] = bili_meta_data result = task_func(**task_param)#执行对应函数获取结果 task_results[task] = "success" if result is None else result#保存对应函数执行结果 else: print("该执行流程函数不存在") return None#立即推出循环结束函数 if manga_has_uploaded is True: #如果上传章节任务得到了执行,那么删除 shutil.rmtree(cur_chapter_path) # 递归地删除指定路径的目录及其所有内容,包括文件和子目录 #删除应该在返回的时候删 return task_results # 创建一个输入块,接受str+空格类型的输入,比如设置默认值3只查看结果output.mp4 # 将按钮添加到任务管理器中 with gr.Blocks() as taskManager: # 任务需要 1:查看指定space的output video状态 2: 手动修改执行流程->比如上传后是发送还是查看output效果 # 3:可以添加按钮跳转路由 task_list_text = gr.components.Textbox(type="text", label="输入任务列表元素,用空格分隔(一般用来执首尾比如查看output状态)", lines=5) baseSpaceUrl = gr.components.Textbox(type="text", label="指定手动任务的执行容器地址", lines=3) # 获取到返回的结果-可以是上传的,可以是查看files信息,也可以是别的 someResult = gr.components.Textbox(label="获取按钮返回信息", type="text") # 创建一个按钮块来触发处理函数 taskBtn = gr.Button("处理列表") taskBtn.click(fn=process_task_list,inputs=[task_list_text, baseSpaceUrl],outputs=someResult) new_text = gr.components.Textbox(placeholder="敬请期待") # str类型