Spaces:
Runtime error
Runtime error
"""File operations for AutoGPT""" | |
from __future__ import annotations | |
import os | |
import os.path | |
from typing import Generator | |
import requests | |
from colorama import Back, Fore | |
from requests.adapters import HTTPAdapter, Retry | |
from autogpt.spinner import Spinner | |
from autogpt.utils import readable_file_size | |
from autogpt.workspace import WORKSPACE_PATH, path_in_workspace | |
LOG_FILE = "file_logger.txt" | |
LOG_FILE_PATH = WORKSPACE_PATH / LOG_FILE | |
def check_duplicate_operation(operation: str, filename: str) -> bool: | |
"""Check if the operation has already been performed on the given file | |
Args: | |
operation (str): The operation to check for | |
filename (str): The name of the file to check for | |
Returns: | |
bool: True if the operation has already been performed on the file | |
""" | |
log_content = read_file(LOG_FILE) | |
log_entry = f"{operation}: {filename}\n" | |
return log_entry in log_content | |
def log_operation(operation: str, filename: str) -> None: | |
"""Log the file operation to the file_logger.txt | |
Args: | |
operation (str): The operation to log | |
filename (str): The name of the file the operation was performed on | |
""" | |
log_entry = f"{operation}: {filename}\n" | |
# Create the log file if it doesn't exist | |
if not os.path.exists(LOG_FILE_PATH): | |
with open(LOG_FILE_PATH, "w", encoding="utf-8") as f: | |
f.write("File Operation Logger ") | |
append_to_file(LOG_FILE, log_entry, shouldLog=False) | |
def split_file( | |
content: str, max_length: int = 4000, overlap: int = 0 | |
) -> Generator[str, None, None]: | |
""" | |
Split text into chunks of a specified maximum length with a specified overlap | |
between chunks. | |
:param content: The input text to be split into chunks | |
:param max_length: The maximum length of each chunk, | |
default is 4000 (about 1k token) | |
:param overlap: The number of overlapping characters between chunks, | |
default is no overlap | |
:return: A generator yielding chunks of text | |
""" | |
start = 0 | |
content_length = len(content) | |
while start < content_length: | |
end = start + max_length | |
if end + overlap < content_length: | |
chunk = content[start : end + overlap - 1] | |
else: | |
chunk = content[start:content_length] | |
# Account for the case where the last chunk is shorter than the overlap, so it has already been consumed | |
if len(chunk) <= overlap: | |
break | |
yield chunk | |
start += max_length - overlap | |
def read_file(filename: str) -> str: | |
"""Read a file and return the contents | |
Args: | |
filename (str): The name of the file to read | |
Returns: | |
str: The contents of the file | |
""" | |
try: | |
filepath = path_in_workspace(filename) | |
with open(filepath, "r", encoding="utf-8") as f: | |
content = f.read() | |
return content | |
except Exception as e: | |
return f"Error: {str(e)}" | |
def ingest_file( | |
filename: str, memory, max_length: int = 4000, overlap: int = 200 | |
) -> None: | |
""" | |
Ingest a file by reading its content, splitting it into chunks with a specified | |
maximum length and overlap, and adding the chunks to the memory storage. | |
:param filename: The name of the file to ingest | |
:param memory: An object with an add() method to store the chunks in memory | |
:param max_length: The maximum length of each chunk, default is 4000 | |
:param overlap: The number of overlapping characters between chunks, default is 200 | |
""" | |
try: | |
print(f"Working with file {filename}") | |
content = read_file(filename) | |
content_length = len(content) | |
print(f"File length: {content_length} characters") | |
chunks = list(split_file(content, max_length=max_length, overlap=overlap)) | |
num_chunks = len(chunks) | |
for i, chunk in enumerate(chunks): | |
print(f"Ingesting chunk {i + 1} / {num_chunks} into memory") | |
memory_to_add = ( | |
f"Filename: {filename}\n" f"Content part#{i + 1}/{num_chunks}: {chunk}" | |
) | |
memory.add(memory_to_add) | |
print(f"Done ingesting {num_chunks} chunks from {filename}.") | |
except Exception as e: | |
print(f"Error while ingesting file '{filename}': {str(e)}") | |
def write_to_file(filename: str, text: str) -> str: | |
"""Write text to a file | |
Args: | |
filename (str): The name of the file to write to | |
text (str): The text to write to the file | |
Returns: | |
str: A message indicating success or failure | |
""" | |
if check_duplicate_operation("write", filename): | |
return "Error: File has already been updated." | |
try: | |
filepath = path_in_workspace(filename) | |
directory = os.path.dirname(filepath) | |
if not os.path.exists(directory): | |
os.makedirs(directory) | |
with open(filepath, "w", encoding="utf-8") as f: | |
f.write(text) | |
log_operation("write", filename) | |
return "File written to successfully." | |
except Exception as e: | |
return f"Error: {str(e)}" | |
def append_to_file(filename: str, text: str, shouldLog: bool = True) -> str: | |
"""Append text to a file | |
Args: | |
filename (str): The name of the file to append to | |
text (str): The text to append to the file | |
Returns: | |
str: A message indicating success or failure | |
""" | |
try: | |
filepath = path_in_workspace(filename) | |
with open(filepath, "a") as f: | |
f.write(text) | |
if shouldLog: | |
log_operation("append", filename) | |
return "Text appended successfully." | |
except Exception as e: | |
return f"Error: {str(e)}" | |
def delete_file(filename: str) -> str: | |
"""Delete a file | |
Args: | |
filename (str): The name of the file to delete | |
Returns: | |
str: A message indicating success or failure | |
""" | |
if check_duplicate_operation("delete", filename): | |
return "Error: File has already been deleted." | |
try: | |
filepath = path_in_workspace(filename) | |
os.remove(filepath) | |
log_operation("delete", filename) | |
return "File deleted successfully." | |
except Exception as e: | |
return f"Error: {str(e)}" | |
def search_files(directory: str) -> list[str]: | |
"""Search for files in a directory | |
Args: | |
directory (str): The directory to search in | |
Returns: | |
list[str]: A list of files found in the directory | |
""" | |
found_files = [] | |
if directory in {"", "/"}: | |
search_directory = WORKSPACE_PATH | |
else: | |
search_directory = path_in_workspace(directory) | |
for root, _, files in os.walk(search_directory): | |
for file in files: | |
if file.startswith("."): | |
continue | |
relative_path = os.path.relpath(os.path.join(root, file), WORKSPACE_PATH) | |
found_files.append(relative_path) | |
return found_files | |
def download_file(url, filename): | |
"""Downloads a file | |
Args: | |
url (str): URL of the file to download | |
filename (str): Filename to save the file as | |
""" | |
safe_filename = path_in_workspace(filename) | |
try: | |
message = f"{Fore.YELLOW}Downloading file from {Back.LIGHTBLUE_EX}{url}{Back.RESET}{Fore.RESET}" | |
with Spinner(message) as spinner: | |
session = requests.Session() | |
retry = Retry(total=3, backoff_factor=1, status_forcelist=[502, 503, 504]) | |
adapter = HTTPAdapter(max_retries=retry) | |
session.mount("http://", adapter) | |
session.mount("https://", adapter) | |
total_size = 0 | |
downloaded_size = 0 | |
with session.get(url, allow_redirects=True, stream=True) as r: | |
r.raise_for_status() | |
total_size = int(r.headers.get("Content-Length", 0)) | |
downloaded_size = 0 | |
with open(safe_filename, "wb") as f: | |
for chunk in r.iter_content(chunk_size=8192): | |
f.write(chunk) | |
downloaded_size += len(chunk) | |
# Update the progress message | |
progress = f"{readable_file_size(downloaded_size)} / {readable_file_size(total_size)}" | |
spinner.update_message(f"{message} {progress}") | |
return f'Successfully downloaded and locally stored file: "{filename}"! (Size: {readable_file_size(total_size)})' | |
except requests.HTTPError as e: | |
return f"Got an HTTP Error whilst trying to download file: {e}" | |
except Exception as e: | |
return "Error: " + str(e) | |