Spaces:
Runtime error
Runtime error
import os | |
import pickle | |
import re | |
import nltk | |
import psutil | |
import numpy as np | |
from nltk.tokenize import word_tokenize | |
import tensorflow as tf | |
from tensorflow.keras import regularizers | |
from tensorflow.keras.layers import Layer, Bidirectional, Dense, LayerNormalization, Dropout, Embedding, LSTM, Conv1D, MaxPooling1D, BatchNormalization, GRU, MultiHeadAttention | |
from tensorflow.keras.models import Sequential | |
from tensorflow.keras.preprocessing.sequence import pad_sequences | |
from nltk.corpus import stopwords | |
from nltk.stem import WordNetLemmatizer | |
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score | |
from sklearn.model_selection import train_test_split | |
from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint, EarlyStopping | |
from sklearn.utils import shuffle | |
from typing import List, Optional, Set | |
from gensim.models import KeyedVectors | |
from pathlib import Path | |
import tempfile | |
import zipfile | |
import requests | |
from transformers import AutoTokenizer, AutoModel | |
import random | |
# Konfiguracja 艣rodowiska | |
gpus = tf.config.list_physical_devices("GPU") | |
if gpus: | |
try: | |
for gpu in gpus: | |
tf.config.experimental.set_memory_growth(gpu, True) | |
print("Dynamiczne zarz膮dzanie pami臋ci膮 ustawione dla wszystkich GPU.") | |
except RuntimeError as e: | |
print(f"B艂膮d podczas ustawiania dynamicznego zarz膮dzania pami臋ci膮: {e}") | |
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" | |
tf.keras.mixed_precision.set_global_policy('float32') | |
nltk.download('punkt') | |
nltk.download('wordnet') | |
nltk.download('stopwords') | |
ZAPISZ_KATALOG = "mozgi" | |
KATALOG_LOGOW = "logs" | |
directory = "test" | |
log_dir = Path('logs') | |
tf.keras.backend.clear_session() | |
lemmatizer = WordNetLemmatizer() | |
stop_words = set(stopwords.words('english')) | |
class TextProcessor: | |
class PositionalEncoding(Layer): | |
def __init__(self, d_model, **kwargs): | |
super().__init__(**kwargs) | |
self.d_model = d_model | |
def get_angles(self, position, i): | |
angles = 1 / np.power(10000, (2 * (i // 2)) / np.float32(self.d_model)) | |
return position * angles | |
def call(self, inputs): | |
position = tf.shape(inputs)[1] | |
angle_rads = self.get_angles( | |
position=np.arange(position)[:, np.newaxis], | |
i=np.arange(self.d_model)[np.newaxis, :] | |
) | |
sines = np.sin(angle_rads[:, 0::2]) | |
cosines = np.cos(angle_rads[:, 1::2]) | |
pos_encoding = np.concatenate([sines, cosines], axis=-1) | |
pos_encoding = tf.cast(pos_encoding, dtype=tf.float32) | |
return inputs + pos_encoding | |
class WrappedMultiHeadAttention(Layer): | |
def __init__(self, num_heads, d_model, rate=0.2, **kwargs): | |
super().__init__(**kwargs) | |
self.attention = MultiHeadAttention(num_heads=num_heads, key_dim=d_model, dropout=rate) | |
def call(self, inputs): | |
return self.attention(inputs, inputs) | |
class TransformerBlock(Layer): | |
def __init__(self, num_heads, d_model, dff, rate=0.2, **kwargs): | |
super().__init__(**kwargs) | |
self.attention = TextProcessor.WrappedMultiHeadAttention(num_heads, d_model, rate) | |
self.ffn = Sequential([ | |
Dense(dff, activation='relu'), | |
Dense(d_model) | |
]) | |
self.layernorm1 = LayerNormalization(epsilon=1e-6) | |
self.layernorm2 = LayerNormalization(epsilon=1e-6) | |
self.dropout1 = Dropout(rate) | |
self.dropout2 = Dropout(rate) | |
self.pos_encoding = TextProcessor.PositionalEncoding(d_model) | |
def call(self, inputs, training): | |
inputs = self.pos_encoding(inputs) | |
attn_output = self.attention(inputs) | |
attn_output = self.dropout1(attn_output, training=training) | |
out1 = self.layernorm1(inputs + attn_output) | |
ffn_output = self.ffn(out1) | |
ffn_output = self.dropout2(ffn_output, training=training) | |
return self.layernorm2(out1 + ffn_output) | |
class TextGenerationCallback(tf.keras.callbacks.Callback): | |
def __init__(self, tokenizer, input_sequence_length, model_name, model, temperature=1.0): | |
super().__init__() | |
self.tokenizer = tokenizer | |
self.input_sequence_length = input_sequence_length | |
self.model_name = model_name | |
self.model = model | |
self.temperature = temperature | |
self.generated_text_interval = 5 | |
self.seed_texts = ["Dlaczego Python jest popularny?", "Co to jest AI?", "Wyja艣nij sieci neuronowe", "Dlaczego dane s膮 wa偶ne?"] | |
self.current_seed_text_index = 0 | |
def on_epoch_end(self, epoch, logs=None): | |
if epoch % self.generated_text_interval == 0: | |
seed_text = self.seed_texts[self.current_seed_text_index] | |
self.current_seed_text_index = (self.current_seed_text_index + 1) % len(self.seed_texts) | |
generated_text = self.generate_text(seed_text, self.temperature, self.input_sequence_length) | |
print(f"\nWygenerowany tekst z modelu '{self.model_name}' po epoce {epoch + 1}:\n{generated_text}\n") | |
def generate_text(self, seed_text, temperature=1.0, num_words=50): | |
result = [] | |
for _ in range(num_words): | |
encoded_text = self.tokenizer.encode(seed_text, return_tensors='tf') | |
predictions = self.model(encoded_text) | |
predictions = predictions.logits[:, -1, :] / temperature | |
predicted_id = tf.random.categorical(predictions, num_samples=1)[-1, 0].numpy() | |
seed_text += self.tokenizer.decode([predicted_id]) | |
result.append(self.tokenizer.decode([predicted_id])) | |
return ' '.join(result) | |
def __init__( | |
self, | |
directory: str, | |
oov_token: str = '<OOV>', | |
glove_file: str = None, | |
gpt2_model_dir: str = 'gpt2', | |
model_name: str = 'gpt2', | |
input_sequence_length: int = 100, | |
output_sequence_length: int = 100, | |
batch_size: int = 32, | |
lowercase: bool = False, | |
handle_numbers: bool = True, | |
handle_special_characters: bool = False, | |
handle_stop_words: bool = True, | |
lemmatize: bool = True, | |
handle_python_code: bool = True, | |
lstm_units: int = 128, | |
dropout_rate: float = 0.2, | |
epochs: int = 100, | |
learning_rate: float = 0.00001, | |
amsgrad: bool = True, | |
kernel_regularizer: float = 0.001, | |
recurrent_regularizer: float = 0.001, | |
bias_regularizer: float = 0.001, | |
num_difficult_sequences: int = 50, | |
stop_words: Optional[Set[str]] = None, | |
log_dir: Optional[str] = 'logs', | |
): | |
self.oov_token = oov_token | |
self.directory = directory | |
self.glove_file = glove_file | |
self.gpt2_model_dir = Path(gpt2_model_dir) | |
self.model_name = model_name | |
self.input_sequence_length = input_sequence_length | |
self.output_sequence_length = output_sequence_length | |
self.batch_size = batch_size | |
self.lowercase = lowercase | |
self.handle_numbers = handle_numbers | |
self.handle_special_characters = handle_special_characters | |
self.handle_stop_words = handle_stop_words | |
self.lemmatize = lemmatize | |
self.handle_python_code = handle_python_code | |
self.lstm_units = lstm_units | |
self.dropout_rate = dropout_rate | |
self.epochs = epochs | |
self.learning_rate = learning_rate | |
self.amsgrad = amsgrad | |
self.kernel_regularizer = kernel_regularizer | |
self.recurrent_regularizer = recurrent_regularizer | |
self.bias_regularizer = bias_regularizer | |
self.num_difficult_sequences = num_difficult_sequences | |
self.stop_words = set(stopwords.words('english')) if stop_words is None else stop_words | |
self.tokenizer = None | |
self.embedding_matrix = None | |
self.vocab_size = 0 | |
self.model = None | |
self.processed_texts = [] | |
self.log_dir = log_dir | |
self.glove_model = None | |
self.gpt2_model = None | |
self.gpt2_tokenizer = None | |
self.load_models() | |
def create_tokenizer(self, texts: List[str]) -> None: | |
if not texts: | |
raise ValueError("Lista tekst贸w jest pusta lub None.") | |
self.tokenizer = AutoTokenizer.from_pretrained("gpt2") | |
self.tokenizer.add_special_tokens({'pad_token': '[PAD]'}) | |
print("Tokenizacja zako艅czona. Liczba unikalnych token贸w:", len(self.tokenizer.get_vocab())) | |
def load_models(self): | |
print("艁adowanie modelu GloVe...") | |
self.glove_model = self.load_glove_model() | |
print("Model GloVe za艂adowany.") | |
print("艁adowanie modelu GPT-2...") | |
if not Path(self.gpt2_model_dir).exists(): | |
print(f"Model GPT-2 ({self.model_name}) nie jest dost臋pny lokalnie. Pobieranie...") | |
self.gpt2_model = AutoModel.from_pretrained(self.model_name) | |
self.gpt2_tokenizer = AutoTokenizer.from_pretrained(self.model_name) | |
self.gpt2_model.save_pretrained(self.gpt2_model_dir) | |
self.gpt2_tokenizer.save_pretrained(self.gpt2_model_dir) | |
else: | |
self.load_gpt2_model() | |
print("Model GPT-2 za艂adowany.") | |
def download_file(self, url, save_path): | |
response = requests.get(url, stream=True) | |
total_length = response.headers.get('content-length') | |
if total_length is None: | |
with open(save_path, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
if chunk: | |
f.write(chunk) | |
else: | |
dl = 0 | |
total_length = int(total_length) | |
with open(save_path, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
if chunk: | |
dl += len(chunk) | |
f.write(chunk) | |
done = int(50 * dl / total_length) | |
print("\r[%s%s]" % ('=' * done, ' ' * (50-done)), end='') | |
def load_glove_model(self): | |
glove_file = "glove.6B.100d.txt" | |
if not os.path.exists(glove_file): | |
print(f"Plik {glove_file} nie zosta艂 znaleziony. Rozpoczynam pobieranie...") | |
try: | |
url = "http://nlp.stanford.edu/data/glove.6B.zip" | |
with tempfile.NamedTemporaryFile(delete=False) as tmp_zip: | |
self.download_file(url, tmp_zip.name) | |
with zipfile.ZipFile(tmp_zip.name) as zf: | |
zf.extractall('.') | |
glove_file = 'glove.6B.100d.txt' | |
print("Pobrano i wypakowano plik GloVe.") | |
except Exception as e: | |
print(f"B艂膮d podczas pobierania lub wypakowywania pliku GloVe: {e}") | |
return None | |
glove_model = {} | |
with open(glove_file, 'r', encoding='utf-8') as f: | |
for line in f: | |
split_line = line.split() | |
word = split_line[0] | |
embedding = np.array([float(val) for val in split_line[1:]]) | |
glove_model[word] = embedding | |
return glove_model | |
def load_gpt2_model(self): | |
try: | |
self.gpt2_model = AutoModel.from_pretrained(self.model_name) | |
self.gpt2_tokenizer = AutoTokenizer.from_pretrained(self.model_name) | |
print("Standardowy model GPT-2 za艂adowany pomy艣lnie.") | |
except Exception as e: | |
print(f"B艂膮d podczas wczytywania standardowego modelu GPT-2: {e}") | |
def preprocess_text(self, text_input): | |
if isinstance(text_input, bytes): | |
text = text_input.decode('utf-8') | |
elif isinstance(text_input, tf.Tensor): | |
text = text_input.numpy().decode('utf-8') | |
else: | |
text = text_input | |
tokens = word_tokenize(text) | |
if self.lowercase: | |
tokens = [token.lower() for token in tokens] | |
if self.lemmatize: | |
tokens = [lemmatizer.lemmatize(token) for token in tokens] | |
if self.handle_stop_words: | |
tokens = [token for token in tokens if token not in self.stop_words] | |
return ' '.join(tokens) | |
def create_embedding_matrix(self, vocab_size, embedding_dim=100): | |
embedding_matrix = np.zeros((vocab_size, embedding_dim)) | |
missed_embeddings = 0 | |
all_embeddings = np.stack(list(self.glove_model.values())) | |
mean_embedding = np.mean(all_embeddings, axis=0) | |
for word, idx in self.tokenizer.get_vocab().items(): | |
embedding_vector = self.glove_model.get(word) | |
if embedding_vector is not None: | |
embedding_matrix[idx] = embedding_vector | |
else: | |
missed_embeddings += 1 | |
embedding_matrix[idx] = mean_embedding | |
print(f"Liczba s艂贸w bez dost臋pnego wektora embeddingu: {missed_embeddings}") | |
return embedding_matrix | |
def create_sequences(self): | |
processed_texts, _ = self._load_and_preprocess_files(self.directory, ['.txt']) | |
self.create_tokenizer(processed_texts) | |
vocab_size = len(self.tokenizer.get_vocab()) | |
embedding_matrix = self.create_embedding_matrix(vocab_size) | |
sequences = [] | |
for text in processed_texts: | |
encoded = self.tokenizer.encode(text) | |
for i in range(1, len(encoded)): | |
input_seq = encoded[:i] | |
sequences.append(input_seq) | |
max_sequence_len = max([len(seq) for seq in sequences]) | |
sequences = np.array(pad_sequences(sequences, maxlen=max_sequence_len, padding='pre')) | |
X, y = sequences[:, :-1], sequences[:, -1] | |
y = tf.keras.utils.to_categorical(y, num_classes=vocab_size) | |
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42) | |
return X_train, X_val, y_train, y_val, embedding_matrix, vocab_size, max_sequence_len | |
def _load_and_preprocess_files(self, directory, file_formats): | |
processed_texts = [] | |
word_counts = {} | |
if not os.path.isdir(directory): | |
raise FileNotFoundError(f"B艂膮d: Podana 艣cie偶ka '{directory}' nie jest katalogiem.") | |
files = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f)) and any(f.endswith(format) for format in file_formats)] | |
if not files: | |
raise FileNotFoundError("Brak plik贸w w podanym formacie w katalogu.") | |
for file in files: | |
file_path = os.path.join(directory, file) | |
with open(file_path, "r", encoding='utf-8') as f: | |
lines = f.readlines() | |
if not lines: | |
print(f"Plik {file} jest pusty.") | |
continue | |
for line in lines: | |
processed_line = self.preprocess_text(line) | |
processed_texts.append(processed_line) | |
word_count = len(processed_line.split()) | |
word_counts[file] = word_counts.get(file, 0) + word_count | |
print(f"Przetworzono plik: {file}, liczba s艂贸w: {word_count}") | |
if not processed_texts: | |
raise ValueError("Brak przetworzonych tekst贸w. Prosz臋 sprawdzi膰 zawarto艣膰 katalogu.") | |
else: | |
print(f"Liczba przetworzonych tekst贸w: {len(processed_texts)}") | |
return processed_texts, word_counts | |
def create_and_train_model(self): | |
X_train, X_val, y_train, y_val, embedding_matrix, vocab_size, max_sequence_len = self.create_sequences() | |
model = Sequential() | |
model.add(Embedding(vocab_size, 100, weights=[embedding_matrix], input_length=max_sequence_len - 1, trainable=False)) | |
model.add(Bidirectional(LSTM(self.lstm_units))) | |
model.add(Dropout(self.dropout_rate)) | |
model.add(Dense(vocab_size, activation='softmax')) | |
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) | |
model.summary() | |
log_dir = os.path.join(KATALOG_LOGOW, self.model_name) | |
tensorboard_callback = TensorBoard(log_dir=log_dir) | |
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True) | |
model.fit(X_train, y_train, epochs=self.epochs, validation_data=(X_val, y_val), callbacks=[tensorboard_callback, early_stopping_callback]) | |
self.model = model | |
self.save_model_and_tokenizer() | |
def save_model_and_tokenizer(self): | |
if not os.path.exists(ZAPISZ_KATALOG): | |
os.makedirs(ZAPISZ_KATALOG) | |
self.model.save(f'{ZAPISZ_KATALOG}/{self.model_name}.h5') | |
with open(f'{ZAPISZ_KATALOG}/{self.model_name}_tokenizer.pkl', 'wb') as handle: | |
pickle.dump(self.tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL) | |
print("Model i tokenizer zapisane.") | |
def main(): | |
print("Witaj w AI Code Generator!") | |
directory = "test" | |
model_name = input("Podaj nazw臋 modelu: ") | |
processor = TextProcessor( | |
directory=directory, | |
model_name=model_name, | |
input_sequence_length=100, | |
output_sequence_length=100, | |
epochs=10, | |
) | |
processor.create_and_train_model() | |
print("Model utworzony i wytrenowany pomy艣lnie!") | |
if __name__ == "__main__": | |
main() | |