Spaces:
Paused
Paused
File size: 4,754 Bytes
78db0f1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
import torch
import os
import functools
import time
import base64
import numpy as np
import safetensors.torch
import logging
from typing import Any, Callable, Dict
from modules.safe import unsafe_torch_load
from scripts.logging import logger
def load_state_dict(ckpt_path, location="cpu"):
_, extension = os.path.splitext(ckpt_path)
if extension.lower() == ".safetensors":
state_dict = safetensors.torch.load_file(ckpt_path, device=location)
else:
state_dict = unsafe_torch_load(ckpt_path, map_location=torch.device(location))
state_dict = get_state_dict(state_dict)
logger.info(f"Loaded state_dict from [{ckpt_path}]")
return state_dict
def get_state_dict(d):
return d.get("state_dict", d)
def ndarray_lru_cache(max_size: int = 128, typed: bool = False):
"""
Decorator to enable caching for functions with numpy array arguments.
Numpy arrays are mutable, and thus not directly usable as hash keys.
The idea here is to wrap the incoming arguments with type `np.ndarray`
as `HashableNpArray` so that `lru_cache` can correctly handles `np.ndarray`
arguments.
`HashableNpArray` functions exactly the same way as `np.ndarray` except
having `__hash__` and `__eq__` overriden.
"""
def decorator(func: Callable):
"""The actual decorator that accept function as input."""
class HashableNpArray(np.ndarray):
def __new__(cls, input_array):
# Input array is an instance of ndarray.
# The view makes the input array and returned array share the same data.
obj = np.asarray(input_array).view(cls)
return obj
def __eq__(self, other) -> bool:
return np.array_equal(self, other)
def __hash__(self):
# Hash the bytes representing the data of the array.
return hash(self.tobytes())
@functools.lru_cache(maxsize=max_size, typed=typed)
def cached_func(*args, **kwargs):
"""This function only accepts `HashableNpArray` as input params."""
return func(*args, **kwargs)
# Preserves original function.__name__ and __doc__.
@functools.wraps(func)
def decorated_func(*args, **kwargs):
"""The decorated function that delegates the original function."""
def convert_item(item: Any):
return HashableNpArray(item) if isinstance(item, np.ndarray) else item
args = [convert_item(arg) for arg in args]
kwargs = {k: convert_item(arg) for k, arg in kwargs.items()}
return cached_func(*args, **kwargs)
return decorated_func
return decorator
def timer_decorator(func):
"""Time the decorated function and output the result to debug logger."""
if logger.level != logging.DEBUG:
return func
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
duration = end_time - start_time
# Only report function that are significant enough.
if duration > 1e-3:
logger.debug(f"{func.__name__} ran in: {duration} sec")
return result
return wrapper
class TimeMeta(type):
""" Metaclass to record execution time on all methods of the
child class. """
def __new__(cls, name, bases, attrs):
for attr_name, attr_value in attrs.items():
if callable(attr_value):
attrs[attr_name] = timer_decorator(attr_value)
return super().__new__(cls, name, bases, attrs)
# svgsupports
svgsupport = False
try:
import io
from svglib.svglib import svg2rlg
from reportlab.graphics import renderPM
svgsupport = True
except ImportError:
pass
def svg_preprocess(inputs: Dict, preprocess: Callable):
if not inputs:
return None
if inputs["image"].startswith("data:image/svg+xml;base64,") and svgsupport:
svg_data = base64.b64decode(
inputs["image"].replace("data:image/svg+xml;base64,", "")
)
drawing = svg2rlg(io.BytesIO(svg_data))
png_data = renderPM.drawToString(drawing, fmt="PNG")
encoded_string = base64.b64encode(png_data)
base64_str = str(encoded_string, "utf-8")
base64_str = "data:image/png;base64," + base64_str
inputs["image"] = base64_str
return preprocess(inputs)
def get_unique_axis0(data):
arr = np.asanyarray(data)
idxs = np.lexsort(arr.T)
arr = arr[idxs]
unique_idxs = np.empty(len(arr), dtype=np.bool_)
unique_idxs[:1] = True
unique_idxs[1:] = np.any(arr[:-1, :] != arr[1:, :], axis=-1)
return arr[unique_idxs]
|