import os, glob, sys import torch from torchvision.transforms.functional import normalize import numpy as np import cv2 from modules.processing import StableDiffusionProcessingImg2Img from comfy_extras.chainner_models import model_loading import comfy.model_management as model_management import comfy.utils import folder_paths import scripts.reactor_version from scripts.reactor_faceswap import ( FaceSwapScript, get_models, get_current_faces_model, analyze_faces, half_det_size ) from scripts.reactor_logger import logger from reactor_utils import ( batch_tensor_to_pil, batched_pil_to_tensor, tensor_to_pil, img2tensor, tensor2img, save_face_model, load_face_model, download ) from reactor_log_patch import apply_logging_patch from r_facelib.utils.face_restoration_helper import FaceRestoreHelper from r_basicsr.utils.registry import ARCH_REGISTRY import scripts.r_archs.codeformer_arch models_dir = folder_paths.models_dir REACTOR_MODELS_PATH = os.path.join(models_dir, "reactor") FACE_MODELS_PATH = os.path.join(REACTOR_MODELS_PATH, "faces") if not os.path.exists(REACTOR_MODELS_PATH): os.makedirs(REACTOR_MODELS_PATH) if not os.path.exists(FACE_MODELS_PATH): os.makedirs(FACE_MODELS_PATH) dir_facerestore_models = os.path.join(models_dir, "facerestore_models") os.makedirs(dir_facerestore_models, exist_ok=True) folder_paths.folder_names_and_paths["facerestore_models"] = ([dir_facerestore_models], folder_paths.supported_pt_extensions) def get_facemodels(): models_path = os.path.join(FACE_MODELS_PATH, "*") models = glob.glob(models_path) models = [x for x in models if x.endswith(".safetensors")] return models def get_restorers(): models_path = os.path.join(models_dir, "facerestore_models/*") models = glob.glob(models_path) models = [x for x in models if x.endswith(".pth")] if len(models) == 0: fr_urls = [ "https://huggingface.co/datasets/Gourieff/ReActor/resolve/main/models/facerestore_models/GFPGANv1.3.pth", "https://huggingface.co/datasets/Gourieff/ReActor/resolve/main/models/facerestore_models/GFPGANv1.4.pth", "https://huggingface.co/datasets/Gourieff/ReActor/resolve/main/models/facerestore_models/codeformer-v0.1.0.pth" ] for model_url in fr_urls: model_name = os.path.basename(model_url) model_path = os.path.join(dir_facerestore_models, model_name) download(model_url, model_path, model_name) models = glob.glob(models_path) models = [x for x in models if x.endswith(".pth")] return models def get_model_names(get_models): models = get_models() names = ["none"] for x in models: names.append(os.path.basename(x)) return names def model_names(): models = get_models() return {os.path.basename(x): x for x in models} class reactor: @classmethod def INPUT_TYPES(s): return { "required": { "enabled": ("BOOLEAN", {"default": True, "label_off": "OFF", "label_on": "ON"}), "input_image": ("IMAGE",), "swap_model": (list(model_names().keys()),), "facedetection": (["retinaface_resnet50", "retinaface_mobile0.25", "YOLOv5l", "YOLOv5n"],), "face_restore_model": (get_model_names(get_restorers),), "face_restore_visibility": ("FLOAT", {"default": 1, "min": 0.1, "max": 1, "step": 0.05}), "codeformer_weight": ("FLOAT", {"default": 0.5, "min": 0.0, "max": 1, "step": 0.05}), "detect_gender_input": (["no","female","male"], {"default": "no"}), "detect_gender_source": (["no","female","male"], {"default": "no"}), "input_faces_index": ("STRING", {"default": "0"}), "source_faces_index": ("STRING", {"default": "0"}), "console_log_level": ([0, 1, 2], {"default": 1}), }, "optional": { "source_image": ("IMAGE",), "face_model": ("FACE_MODEL",), } } RETURN_TYPES = ("IMAGE","FACE_MODEL") FUNCTION = "execute" CATEGORY = "ReActor" def __init__(self): self.face_helper = None def restore_face( self, input_image, face_restore_model, face_restore_visibility, codeformer_weight, facedetection ): result = input_image if face_restore_model != "none" and not model_management.processing_interrupted(): logger.status(f"Restoring with {face_restore_model}") model_path = folder_paths.get_full_path("facerestore_models", face_restore_model) device = model_management.get_torch_device() if "codeformer" in face_restore_model.lower(): codeformer_net = ARCH_REGISTRY.get("CodeFormer")( dim_embd=512, codebook_size=1024, n_head=8, n_layers=9, connect_list=["32", "64", "128", "256"], ).to(device) checkpoint = torch.load(model_path)["params_ema"] codeformer_net.load_state_dict(checkpoint) facerestore_model = codeformer_net.eval() else: sd = comfy.utils.load_torch_file(model_path, safe_load=True) facerestore_model = model_loading.load_state_dict(sd).eval() facerestore_model.to(device) if self.face_helper is None: self.face_helper = FaceRestoreHelper(1, face_size=512, crop_ratio=(1, 1), det_model=facedetection, save_ext='png', use_parse=True, device=device) image_np = 255. * result.cpu().numpy() total_images = image_np.shape[0] out_images = np.ndarray(shape=image_np.shape) for i in range(total_images): cur_image_np = image_np[i,:, :, ::-1] original_resolution = cur_image_np.shape[0:2] if facerestore_model is None or self.face_helper is None: return result self.face_helper.clean_all() self.face_helper.read_image(cur_image_np) self.face_helper.get_face_landmarks_5(only_center_face=False, resize=640, eye_dist_threshold=5) self.face_helper.align_warp_face() restored_face = None for idx, cropped_face in enumerate(self.face_helper.cropped_faces): cropped_face_t = img2tensor(cropped_face / 255., bgr2rgb=True, float32=True) normalize(cropped_face_t, (0.5, 0.5, 0.5), (0.5, 0.5, 0.5), inplace=True) cropped_face_t = cropped_face_t.unsqueeze(0).to(device) try: with torch.no_grad(): output = facerestore_model(cropped_face_t, w=codeformer_weight)[0] if "codeformer" in face_restore_model.lower() else facerestore_model(cropped_face_t)[0] restored_face = tensor2img(output, rgb2bgr=True, min_max=(-1, 1)) del output torch.cuda.empty_cache() except Exception as error: print(f'\tFailed inference for CodeFormer: {error}', file=sys.stderr) restored_face = tensor2img(cropped_face_t, rgb2bgr=True, min_max=(-1, 1)) if face_restore_visibility < 1: restored_face = cropped_face * (1 - face_restore_visibility) + restored_face * face_restore_visibility restored_face = restored_face.astype('uint8') self.face_helper.add_restored_face(restored_face) self.face_helper.get_inverse_affine(None) restored_img = self.face_helper.paste_faces_to_input_image() restored_img = restored_img[:, :, ::-1] if original_resolution != restored_img.shape[0:2]: restored_img = cv2.resize(restored_img, (0, 0), fx=original_resolution[1]/restored_img.shape[1], fy=original_resolution[0]/restored_img.shape[0], interpolation=cv2.INTER_LINEAR) self.face_helper.clean_all() out_images[i] = restored_img restored_img_np = np.array(out_images).astype(np.float32) / 255.0 restored_img_tensor = torch.from_numpy(restored_img_np) result = restored_img_tensor return result def execute(self, enabled, input_image, swap_model, detect_gender_source, detect_gender_input, source_faces_index, input_faces_index, console_log_level, face_restore_model, face_restore_visibility, codeformer_weight, facedetection, source_image=None, face_model=None): apply_logging_patch(console_log_level) if not enabled: return (input_image,face_model) elif source_image is None and face_model is None: logger.error("Please provide 'source_image' or `face_model`") return (input_image,face_model) if face_model == "none": face_model = None script = FaceSwapScript() pil_images = batch_tensor_to_pil(input_image) if source_image is not None: source = tensor_to_pil(source_image) else: source = None p = StableDiffusionProcessingImg2Img(pil_images) script.process( p=p, img=source, enable=True, source_faces_index=source_faces_index, faces_index=input_faces_index, model=swap_model, swap_in_source=True, swap_in_generated=True, gender_source=detect_gender_source, gender_target=detect_gender_input, face_model=face_model, ) result = batched_pil_to_tensor(p.init_images) if face_model is None: current_face_model = get_current_faces_model() face_model_to_provide = current_face_model[0] if (current_face_model is not None and len(current_face_model) > 0) else face_model else: face_model_to_provide = face_model result = reactor.restore_face(self,result,face_restore_model,face_restore_visibility,codeformer_weight,facedetection) return (result,face_model_to_provide) class LoadFaceModel: @classmethod def INPUT_TYPES(s): return { "required": { "face_model": (get_model_names(get_facemodels),), } } RETURN_TYPES = ("FACE_MODEL",) FUNCTION = "load_model" CATEGORY = "ReActor" def load_model(self, face_model): self.face_model = face_model self.face_models_path = FACE_MODELS_PATH if self.face_model != "none": face_model_path = os.path.join(self.face_models_path, self.face_model) out = load_face_model(face_model_path) else: out = None return (out, ) class SaveFaceModel: def __init__(self): self.output_dir = FACE_MODELS_PATH @classmethod def INPUT_TYPES(s): return { "required": { "save_mode": ("BOOLEAN", {"default": True, "label_off": "OFF", "label_on": "ON"}), "face_model_name": ("STRING", {"default": "default"}), "select_face_index": ("INT", {"default": 0, "min": 0}), }, "optional": { "image": ("IMAGE",), "face_model": ("FACE_MODEL",), } } RETURN_TYPES = () FUNCTION = "save_model" OUTPUT_NODE = True CATEGORY = "ReActor" def save_model(self, save_mode, face_model_name, select_face_index, image=None, face_model=None, det_size=(640, 640)): if save_mode and image is not None: source = tensor_to_pil(image) source = cv2.cvtColor(np.array(source), cv2.COLOR_RGB2BGR) apply_logging_patch(1) logger.status("Building Face Model...") face_model_raw = analyze_faces(source, det_size) if len(face_model_raw) == 0: det_size_half = half_det_size(det_size) face_model_raw = analyze_faces(source, det_size_half) try: face_model = face_model_raw[select_face_index] except: logger.error("No face(s) found") return face_model_name logger.status("--Done!--") if save_mode and (face_model != "none" or face_model is not None): face_model_path = os.path.join(self.output_dir, face_model_name + ".safetensors") save_face_model(face_model,face_model_path) if image is None and face_model is None: logger.error("Please provide `face_model` or `image`") return face_model_name class RestoreFace: @classmethod def INPUT_TYPES(s): return { "required": { "image": ("IMAGE",), "facedetection": (["retinaface_resnet50", "retinaface_mobile0.25", "YOLOv5l", "YOLOv5n"],), "model": (get_model_names(get_restorers),), "visibility": ("FLOAT", {"default": 1, "min": 0.0, "max": 1, "step": 0.05}), "codeformer_weight": ("FLOAT", {"default": 0.5, "min": 0.0, "max": 1, "step": 0.05}), }, } RETURN_TYPES = ("IMAGE",) FUNCTION = "execute" CATEGORY = "ReActor" def __init__(self): self.face_helper = None def execute(self, image, model, visibility, codeformer_weight, facedetection): result = reactor.restore_face(self,image,model,visibility,codeformer_weight,facedetection) return (result,) import numpy as np from ultralytics import YOLO from PIL import Image # Load a pretrained YOLOv8n model model = YOLO(task='detect', model='/root/autodl-tmp/ComfyUI/custom_nodes/comfyui-reactor-node/yolov8m_200e.pt') class Mynode_2: @classmethod def INPUT_TYPES(s): return { "required": { "input_image": ("IMAGE",), "source_image": ("IMAGE",), }, "optional": { } } CATEGORY = "ReActor" RETURN_TYPES = ("IMAGE",) FUNCTION = "method" def method(self, input_image, source_image): input_image_tmp = input_image.squeeze() # Pytorch张量转PIL对象 input_image_pil = Image.fromarray( np.clip(255. * input_image_tmp.cpu().numpy(), 0, 255).astype(np.uint8)).convert('RGBA') # Run inference on an image results = model.predict(source=input_image_pil, conf=0.5) # View results tmp = results[0].boxes.shape judge_face = tmp[0] print(judge_face) if judge_face == 0: # 等于0就是没检测出脸 return (input_image,) else: enabled = True swap_model = "inswapper_128.onnx" facedetection = "retinaface_resnet50" face_restore_model = "GFPGANv1.4.pth" face_restore_visibility = 1 codeformer_weight = 0.5 detect_gender_input = "no" detect_gender_source = "no" input_faces_index = "0" source_faces_index = "0" console_log_level = 1 class_reactor = reactor() change_face_img, face_model = class_reactor.execute(enabled, input_image, swap_model, detect_gender_source, detect_gender_input, source_faces_index, input_faces_index, console_log_level, face_restore_model, face_restore_visibility, codeformer_weight, facedetection, source_image=source_image, face_model=None) return (change_face_img,) NODE_CLASS_MAPPINGS = { "ReActorFaceSwap": reactor, "ReActorLoadFaceModel": LoadFaceModel, "ReActorSaveFaceModel": SaveFaceModel, "ReActorRestoreFace": RestoreFace, "face_detect": Mynode_2, } NODE_DISPLAY_NAME_MAPPINGS = { "ReActorFaceSwap": "ReActor - Fast Face Swap", "ReActorLoadFaceModel": "Load Face Model", "ReActorSaveFaceModel": "Save Face Model", "ReActorRestoreFace": "Restore Face", "face_detect": "face_detect - Reactor", }