import os import pickle import re import nltk import psutil import numpy as np from nltk.tokenize import word_tokenize import tensorflow as tf from tensorflow.keras import regularizers from tensorflow.keras.layers import Layer, Bidirectional, Dense, LayerNormalization, Dropout, Embedding, LSTM, Conv1D, MaxPooling1D, BatchNormalization, GRU, MultiHeadAttention from tensorflow.keras.models import Sequential from tensorflow.keras.preprocessing.sequence import pad_sequences from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score from sklearn.model_selection import train_test_split from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint, EarlyStopping from sklearn.utils import shuffle from typing import List, Optional, Set from gensim.models import KeyedVectors from pathlib import Path import tempfile import zipfile import requests from transformers import AutoTokenizer, AutoModel import random # Konfiguracja środowiska gpus = tf.config.list_physical_devices("GPU") if gpus: try: for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) print("Dynamiczne zarządzanie pamięcią ustawione dla wszystkich GPU.") except RuntimeError as e: print(f"Błąd podczas ustawiania dynamicznego zarządzania pamięcią: {e}") os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" tf.keras.mixed_precision.set_global_policy('float32') nltk.download('punkt') nltk.download('wordnet') nltk.download('stopwords') ZAPISZ_KATALOG = "mozgi" KATALOG_LOGOW = "logs" directory = "test" log_dir = Path('logs') tf.keras.backend.clear_session() lemmatizer = WordNetLemmatizer() stop_words = set(stopwords.words('english')) class TextProcessor: class PositionalEncoding(Layer): def __init__(self, d_model, **kwargs): super().__init__(**kwargs) self.d_model = d_model def get_angles(self, position, i): angles = 1 / np.power(10000, (2 * (i // 2)) / np.float32(self.d_model)) return position * angles def call(self, inputs): position = tf.shape(inputs)[1] angle_rads = self.get_angles( position=np.arange(position)[:, np.newaxis], i=np.arange(self.d_model)[np.newaxis, :] ) sines = np.sin(angle_rads[:, 0::2]) cosines = np.cos(angle_rads[:, 1::2]) pos_encoding = np.concatenate([sines, cosines], axis=-1) pos_encoding = tf.cast(pos_encoding, dtype=tf.float32) return inputs + pos_encoding class WrappedMultiHeadAttention(Layer): def __init__(self, num_heads, d_model, rate=0.2, **kwargs): super().__init__(**kwargs) self.attention = MultiHeadAttention(num_heads=num_heads, key_dim=d_model, dropout=rate) def call(self, inputs): return self.attention(inputs, inputs) class TransformerBlock(Layer): def __init__(self, num_heads, d_model, dff, rate=0.2, **kwargs): super().__init__(**kwargs) self.attention = TextProcessor.WrappedMultiHeadAttention(num_heads, d_model, rate) self.ffn = Sequential([ Dense(dff, activation='relu'), Dense(d_model) ]) self.layernorm1 = LayerNormalization(epsilon=1e-6) self.layernorm2 = LayerNormalization(epsilon=1e-6) self.dropout1 = Dropout(rate) self.dropout2 = Dropout(rate) self.pos_encoding = TextProcessor.PositionalEncoding(d_model) def call(self, inputs, training): inputs = self.pos_encoding(inputs) attn_output = self.attention(inputs) attn_output = self.dropout1(attn_output, training=training) out1 = self.layernorm1(inputs + attn_output) ffn_output = self.ffn(out1) ffn_output = self.dropout2(ffn_output, training=training) return self.layernorm2(out1 + ffn_output) class TextGenerationCallback(tf.keras.callbacks.Callback): def __init__(self, tokenizer, input_sequence_length, model_name, model, temperature=1.0): super().__init__() self.tokenizer = tokenizer self.input_sequence_length = input_sequence_length self.model_name = model_name self.model = model self.temperature = temperature self.generated_text_interval = 5 self.seed_texts = ["Dlaczego Python jest popularny?", "Co to jest AI?", "Wyjaśnij sieci neuronowe", "Dlaczego dane są ważne?"] self.current_seed_text_index = 0 def on_epoch_end(self, epoch, logs=None): if epoch % self.generated_text_interval == 0: seed_text = self.seed_texts[self.current_seed_text_index] self.current_seed_text_index = (self.current_seed_text_index + 1) % len(self.seed_texts) generated_text = self.generate_text(seed_text, self.temperature, self.input_sequence_length) print(f"\nWygenerowany tekst z modelu '{self.model_name}' po epoce {epoch + 1}:\n{generated_text}\n") def generate_text(self, seed_text, temperature=1.0, num_words=50): result = [] for _ in range(num_words): encoded_text = self.tokenizer.encode(seed_text, return_tensors='tf') predictions = self.model(encoded_text) predictions = predictions.logits[:, -1, :] / temperature predicted_id = tf.random.categorical(predictions, num_samples=1)[-1, 0].numpy() seed_text += self.tokenizer.decode([predicted_id]) result.append(self.tokenizer.decode([predicted_id])) return ' '.join(result) def __init__( self, directory: str, oov_token: str = '', glove_file: str = None, gpt2_model_dir: str = 'gpt2', model_name: str = 'gpt2', input_sequence_length: int = 100, output_sequence_length: int = 100, batch_size: int = 32, lowercase: bool = False, handle_numbers: bool = True, handle_special_characters: bool = False, handle_stop_words: bool = True, lemmatize: bool = True, handle_python_code: bool = True, lstm_units: int = 128, dropout_rate: float = 0.2, epochs: int = 100, learning_rate: float = 0.00001, amsgrad: bool = True, kernel_regularizer: float = 0.001, recurrent_regularizer: float = 0.001, bias_regularizer: float = 0.001, num_difficult_sequences: int = 50, stop_words: Optional[Set[str]] = None, log_dir: Optional[str] = 'logs', ): self.oov_token = oov_token self.directory = directory self.glove_file = glove_file self.gpt2_model_dir = Path(gpt2_model_dir) self.model_name = model_name self.input_sequence_length = input_sequence_length self.output_sequence_length = output_sequence_length self.batch_size = batch_size self.lowercase = lowercase self.handle_numbers = handle_numbers self.handle_special_characters = handle_special_characters self.handle_stop_words = handle_stop_words self.lemmatize = lemmatize self.handle_python_code = handle_python_code self.lstm_units = lstm_units self.dropout_rate = dropout_rate self.epochs = epochs self.learning_rate = learning_rate self.amsgrad = amsgrad self.kernel_regularizer = kernel_regularizer self.recurrent_regularizer = recurrent_regularizer self.bias_regularizer = bias_regularizer self.num_difficult_sequences = num_difficult_sequences self.stop_words = set(stopwords.words('english')) if stop_words is None else stop_words self.tokenizer = None self.embedding_matrix = None self.vocab_size = 0 self.model = None self.processed_texts = [] self.log_dir = log_dir self.glove_model = None self.gpt2_model = None self.gpt2_tokenizer = None self.load_models() def create_tokenizer(self, texts: List[str]) -> None: if not texts: raise ValueError("Lista tekstów jest pusta lub None.") self.tokenizer = AutoTokenizer.from_pretrained("gpt2") self.tokenizer.add_special_tokens({'pad_token': '[PAD]'}) print("Tokenizacja zakończona. Liczba unikalnych tokenów:", len(self.tokenizer.get_vocab())) def load_models(self): print("Ładowanie modelu GloVe...") self.glove_model = self.load_glove_model() print("Model GloVe załadowany.") print("Ładowanie modelu GPT-2...") if not Path(self.gpt2_model_dir).exists(): print(f"Model GPT-2 ({self.model_name}) nie jest dostępny lokalnie. Pobieranie...") self.gpt2_model = AutoModel.from_pretrained(self.model_name) self.gpt2_tokenizer = AutoTokenizer.from_pretrained(self.model_name) self.gpt2_model.save_pretrained(self.gpt2_model_dir) self.gpt2_tokenizer.save_pretrained(self.gpt2_model_dir) else: self.load_gpt2_model() print("Model GPT-2 załadowany.") def download_file(self, url, save_path): response = requests.get(url, stream=True) total_length = response.headers.get('content-length') if total_length is None: with open(save_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) else: dl = 0 total_length = int(total_length) with open(save_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: dl += len(chunk) f.write(chunk) done = int(50 * dl / total_length) print("\r[%s%s]" % ('=' * done, ' ' * (50-done)), end='') def load_glove_model(self): glove_file = "glove.6B.100d.txt" if not os.path.exists(glove_file): print(f"Plik {glove_file} nie został znaleziony. Rozpoczynam pobieranie...") try: url = "http://nlp.stanford.edu/data/glove.6B.zip" with tempfile.NamedTemporaryFile(delete=False) as tmp_zip: self.download_file(url, tmp_zip.name) with zipfile.ZipFile(tmp_zip.name) as zf: zf.extractall('.') glove_file = 'glove.6B.100d.txt' print("Pobrano i wypakowano plik GloVe.") except Exception as e: print(f"Błąd podczas pobierania lub wypakowywania pliku GloVe: {e}") return None glove_model = {} with open(glove_file, 'r', encoding='utf-8') as f: for line in f: split_line = line.split() word = split_line[0] embedding = np.array([float(val) for val in split_line[1:]]) glove_model[word] = embedding return glove_model def load_gpt2_model(self): try: self.gpt2_model = AutoModel.from_pretrained(self.model_name) self.gpt2_tokenizer = AutoTokenizer.from_pretrained(self.model_name) print("Standardowy model GPT-2 załadowany pomyślnie.") except Exception as e: print(f"Błąd podczas wczytywania standardowego modelu GPT-2: {e}") def preprocess_text(self, text_input): if isinstance(text_input, bytes): text = text_input.decode('utf-8') elif isinstance(text_input, tf.Tensor): text = text_input.numpy().decode('utf-8') else: text = text_input tokens = word_tokenize(text) if self.lowercase: tokens = [token.lower() for token in tokens] if self.lemmatize: tokens = [lemmatizer.lemmatize(token) for token in tokens] if self.handle_stop_words: tokens = [token for token in tokens if token not in self.stop_words] return ' '.join(tokens) def create_embedding_matrix(self, vocab_size, embedding_dim=100): embedding_matrix = np.zeros((vocab_size, embedding_dim)) missed_embeddings = 0 all_embeddings = np.stack(list(self.glove_model.values())) mean_embedding = np.mean(all_embeddings, axis=0) for word, idx in self.tokenizer.get_vocab().items(): embedding_vector = self.glove_model.get(word) if embedding_vector is not None: embedding_matrix[idx] = embedding_vector else: missed_embeddings += 1 embedding_matrix[idx] = mean_embedding print(f"Liczba słów bez dostępnego wektora embeddingu: {missed_embeddings}") return embedding_matrix def create_sequences(self): processed_texts, _ = self._load_and_preprocess_files(self.directory, ['.txt']) self.create_tokenizer(processed_texts) vocab_size = len(self.tokenizer.get_vocab()) embedding_matrix = self.create_embedding_matrix(vocab_size) sequences = [] for text in processed_texts: encoded = self.tokenizer.encode(text) for i in range(1, len(encoded)): input_seq = encoded[:i] sequences.append(input_seq) max_sequence_len = max([len(seq) for seq in sequences]) sequences = np.array(pad_sequences(sequences, maxlen=max_sequence_len, padding='pre')) X, y = sequences[:, :-1], sequences[:, -1] y = tf.keras.utils.to_categorical(y, num_classes=vocab_size) X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42) return X_train, X_val, y_train, y_val, embedding_matrix, vocab_size, max_sequence_len def _load_and_preprocess_files(self, directory, file_formats): processed_texts = [] word_counts = {} if not os.path.isdir(directory): raise FileNotFoundError(f"Błąd: Podana ścieżka '{directory}' nie jest katalogiem.") files = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f)) and any(f.endswith(format) for format in file_formats)] if not files: raise FileNotFoundError("Brak plików w podanym formacie w katalogu.") for file in files: file_path = os.path.join(directory, file) with open(file_path, "r", encoding='utf-8') as f: lines = f.readlines() if not lines: print(f"Plik {file} jest pusty.") continue for line in lines: processed_line = self.preprocess_text(line) processed_texts.append(processed_line) word_count = len(processed_line.split()) word_counts[file] = word_counts.get(file, 0) + word_count print(f"Przetworzono plik: {file}, liczba słów: {word_count}") if not processed_texts: raise ValueError("Brak przetworzonych tekstów. Proszę sprawdzić zawartość katalogu.") else: print(f"Liczba przetworzonych tekstów: {len(processed_texts)}") return processed_texts, word_counts def create_and_train_model(self): X_train, X_val, y_train, y_val, embedding_matrix, vocab_size, max_sequence_len = self.create_sequences() model = Sequential() model.add(Embedding(vocab_size, 100, weights=[embedding_matrix], input_length=max_sequence_len - 1, trainable=False)) model.add(Bidirectional(LSTM(self.lstm_units))) model.add(Dropout(self.dropout_rate)) model.add(Dense(vocab_size, activation='softmax')) model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) model.summary() log_dir = os.path.join(KATALOG_LOGOW, self.model_name) tensorboard_callback = TensorBoard(log_dir=log_dir) early_stopping_callback = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True) model.fit(X_train, y_train, epochs=self.epochs, validation_data=(X_val, y_val), callbacks=[tensorboard_callback, early_stopping_callback]) self.model = model self.save_model_and_tokenizer() def save_model_and_tokenizer(self): if not os.path.exists(ZAPISZ_KATALOG): os.makedirs(ZAPISZ_KATALOG) self.model.save(f'{ZAPISZ_KATALOG}/{self.model_name}.h5') with open(f'{ZAPISZ_KATALOG}/{self.model_name}_tokenizer.pkl', 'wb') as handle: pickle.dump(self.tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL) print("Model i tokenizer zapisane.") def main(): print("Witaj w AI Code Generator!") directory = "test" model_name = input("Podaj nazwę modelu: ") processor = TextProcessor( directory=directory, model_name=model_name, input_sequence_length=100, output_sequence_length=100, epochs=10, ) processor.create_and_train_model() print("Model utworzony i wytrenowany pomyślnie!") if __name__ == "__main__": main()