agentic_rag /
omkar334's picture
quantization, reduce chunksize
import re
from collections import OrderedDict, defaultdict
import pymupdf
def sort_text(chunks):
x_threshold = 300
left_column = []
right_column = []
for chunk in chunks:
if chunk["coordinates"][0] < x_threshold:
# Sort the chunks within each column based on the y-coordinate
left_column = sorted(left_column, key=lambda item: item["coordinates"][1])
right_column = sorted(right_column, key=lambda item: item["coordinates"][1])
sorted_text = left_column + right_column
return sorted_text
def majority_element(spans, param):
char_count = defaultdict(int)
for span in spans:
span_text = span["text"]
span_param = span[param] # Get the color or size for this span
char_count[span_param] += len(span_text) # Count characters
# Return the parameter value with the highest character count
return max(char_count, key=char_count.get, default=None)
def clean_text(text):
"""Cleans repeated text (OCR error)"""
words = text.split()
unique_words = OrderedDict.fromkeys(words)
cleaned_text = " ".join(unique_words)
cleaned_text = re.sub(r"\s+", " ", cleaned_text).strip()
return cleaned_text
def get_chunks(doc):
allchunks = []
# Page Iteration
for page_num in range(doc.page_count):
chunks = []
page = doc[page_num]
# Filter images (not needed)
blocks = [i for i in page.get_text("dict")["blocks"] if "image" not in i]
# Block Iteration
for block in blocks:
text = ""
spans = []
# Line iteration
for line in block["lines"]:
for span in line["spans"]:
# Only include text with a size greater than 9
if span["size"] > 9:
span_text = span["text"]
text += span_text + " "
spans.append(span) # Store the span for majority calculation
# Filter empty strings
if text.strip():
"text": clean_text(text.strip()),
"page": page_num,
"coordinates": [round(block["bbox"][0], 1), round(block["bbox"][1], 1)],
"color": majority_element(spans, "color"),
"size": majority_element(spans, "size"),
# Sort text according to column order
return allchunks
def process_activities(chunks):
"""Groups lines of 'Activity' together"""
# activities = []
i = 0
while i < len(chunks):
chunk = chunks[i]
if "Activity" in chunk["text"]:
activity = chunk.copy()
activity_size = chunks[i + 1]["size"] if i + 1 < len(chunks) else None
j = i + 1
while j < len(chunks) and chunks[j]["size"] == activity_size:
activity["text"] += "\n" + chunks[j]["text"]
j += 1
# Replace the range of chunks with the single activity chunk
chunks[i:j] = [activity]
# activities.append(activity)
i += 1
i += 1
return chunks
def index_pdf(path, buffer=False):
if buffer:
doc =, filetype="pdf")
doc =
chunks = get_chunks(doc)
chunks = process_activities(chunks)
print("--- pdf indexed")
return chunks