File size: 3,790 Bytes
d6a54fa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import os
import subprocess
import sys
import argparse
import time

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))

RED = "\033[91m"
YELLOW = "\033[93m"
GREEN = "\033[92m"
RESET = "\033[0m"

GB = 1024 * 1024 * 1024

def check_and_install_zipnn():
    try:
        import zipnn
    except ImportError:
        print("zipnn not found. Installing...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", "zipnn"])
        import zipnn


def decompress_file(input_file, delete=False, force=False, hf_cache=False):
    import zipnn

    if not input_file.endswith(".znn"):
        raise ValueError("Input file does not have the '.znn' suffix")

    if os.path.exists(input_file):
        decompressed_path = input_file[:-4]
        if not force and os.path.exists(decompressed_path):

            user_input = (
                input(f"{decompressed_path} already exists; overwrite (y/n)? ").strip().lower()
            )

            if user_input not in ("yes", "y"):
                print(f"Skipping {input_file}...")
                return
        print(f"Decompressing {input_file}...")

        output_file = input_file[:-4]
        zpn = zipnn.ZipNN(is_streaming=True)

        file_size_before = 0
        file_size_after = 0
        start_time = time.time()
        with open(input_file, "rb") as infile, open(output_file, "wb") as outfile:
            d_data = b""
            chunk = infile.read()
            file_size_before = len(chunk)
            d_data += zpn.decompress(chunk)
            file_size_after = len(d_data)
            outfile.write(d_data)
            print(f"Decompressed {input_file} to {output_file}")
        end_time = time.time() - start_time

        print(
            f"{GREEN}Back to original size: {file_size_after/GB:.02f}GB size before decompression: {file_size_before/GB:.02f}GB, time: {end_time:.02f}{RESET}"
            )


        if delete and not hf_cache:
            print(f"Deleting {input_file}...")
            os.remove(input_file)

        if hf_cache:
            # If the file is in the Hugging Face cache, fix the symlinks
            print(f"{YELLOW}Reorganizing Hugging Face cache...{RESET}")
            try:
                snapshot_path = os.path.dirname(input_file)
                blob_name = os.path.join(snapshot_path, os.readlink(input_file))
                os.rename(output_file, blob_name)
                os.symlink(blob_name, output_file)
                
                if os.path.exists(input_file):
                    os.remove(input_file)
            except Exception as e:
                raise Exception(f"Error reorganizing Hugging Face cache: {e}")

    else:
        print(f"Error: The file {input_file} does not exist.")


if __name__ == "__main__":
    check_and_install_zipnn()

    parser = argparse.ArgumentParser(description="Enter a file path to decompress.")
    parser.add_argument("input_file", type=str, help="Specify the path to the file to decompress.")
    parser.add_argument(
        "--delete",
        action="store_true",
        help="A flag that triggers deletion of a single compressed file instead of decompression",
    )
    parser.add_argument(
        "--force", action="store_true", help="A flag that forces overwriting when decompressing."
    )
    parser.add_argument(
        "--hf_cache",
        action="store_true",
        help="A flag that indicates if the file is in the Hugging Face cache.",
    )
    args = parser.parse_args()
    optional_kwargs = {}
    if args.delete:
        optional_kwargs["delete"] = args.delete
    if args.force:
        optional_kwargs["force"] = args.force
    if args.hf_cache:
        optional_kwargs["hf_cache"] = args.hf_cache

    decompress_file(args.input_file, **optional_kwargs)