|
import os |
|
import subprocess |
|
import sys |
|
import argparse |
|
import time |
|
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) |
|
|
|
RED = "\033[91m" |
|
YELLOW = "\033[93m" |
|
GREEN = "\033[92m" |
|
RESET = "\033[0m" |
|
|
|
GB = 1024 * 1024 * 1024 |
|
|
|
def check_and_install_zipnn(): |
|
try: |
|
import zipnn |
|
except ImportError: |
|
print("zipnn not found. Installing...") |
|
subprocess.check_call([sys.executable, "-m", "pip", "install", "zipnn"]) |
|
import zipnn |
|
|
|
|
|
def decompress_file(input_file, delete=False, force=False, hf_cache=False): |
|
import zipnn |
|
|
|
if not input_file.endswith(".znn"): |
|
raise ValueError("Input file does not have the '.znn' suffix") |
|
|
|
if os.path.exists(input_file): |
|
decompressed_path = input_file[:-4] |
|
if not force and os.path.exists(decompressed_path): |
|
|
|
user_input = ( |
|
input(f"{decompressed_path} already exists; overwrite (y/n)? ").strip().lower() |
|
) |
|
|
|
if user_input not in ("yes", "y"): |
|
print(f"Skipping {input_file}...") |
|
return |
|
print(f"Decompressing {input_file}...") |
|
|
|
output_file = input_file[:-4] |
|
zpn = zipnn.ZipNN(is_streaming=True) |
|
|
|
file_size_before = 0 |
|
file_size_after = 0 |
|
start_time = time.time() |
|
with open(input_file, "rb") as infile, open(output_file, "wb") as outfile: |
|
d_data = b"" |
|
chunk = infile.read() |
|
file_size_before = len(chunk) |
|
d_data += zpn.decompress(chunk) |
|
file_size_after = len(d_data) |
|
outfile.write(d_data) |
|
print(f"Decompressed {input_file} to {output_file}") |
|
end_time = time.time() - start_time |
|
|
|
print( |
|
f"{GREEN}Back to original size: {file_size_after/GB:.02f}GB size before decompression: {file_size_before/GB:.02f}GB, time: {end_time:.02f}{RESET}" |
|
) |
|
|
|
|
|
if delete and not hf_cache: |
|
print(f"Deleting {input_file}...") |
|
os.remove(input_file) |
|
|
|
if hf_cache: |
|
|
|
print(f"{YELLOW}Reorganizing Hugging Face cache...{RESET}") |
|
try: |
|
snapshot_path = os.path.dirname(input_file) |
|
blob_name = os.path.join(snapshot_path, os.readlink(input_file)) |
|
os.rename(output_file, blob_name) |
|
os.symlink(blob_name, output_file) |
|
|
|
if os.path.exists(input_file): |
|
os.remove(input_file) |
|
except Exception as e: |
|
raise Exception(f"Error reorganizing Hugging Face cache: {e}") |
|
|
|
else: |
|
print(f"Error: The file {input_file} does not exist.") |
|
|
|
|
|
if __name__ == "__main__": |
|
check_and_install_zipnn() |
|
|
|
parser = argparse.ArgumentParser(description="Enter a file path to decompress.") |
|
parser.add_argument("input_file", type=str, help="Specify the path to the file to decompress.") |
|
parser.add_argument( |
|
"--delete", |
|
action="store_true", |
|
help="A flag that triggers deletion of a single compressed file instead of decompression", |
|
) |
|
parser.add_argument( |
|
"--force", action="store_true", help="A flag that forces overwriting when decompressing." |
|
) |
|
parser.add_argument( |
|
"--hf_cache", |
|
action="store_true", |
|
help="A flag that indicates if the file is in the Hugging Face cache.", |
|
) |
|
args = parser.parse_args() |
|
optional_kwargs = {} |
|
if args.delete: |
|
optional_kwargs["delete"] = args.delete |
|
if args.force: |
|
optional_kwargs["force"] = args.force |
|
if args.hf_cache: |
|
optional_kwargs["hf_cache"] = args.hf_cache |
|
|
|
decompress_file(args.input_file, **optional_kwargs) |
|
|