Source code for ltsm.data_pipeline.anormly_pipeline

"""Pipeline for Anormly Data Detection
    Main Difference from the LTSM : 
        - pred_len == seq_len
        - label is the anomaly label of input seq_len
        - loss is CE/BCE

"""

import numpy as np
import torch
import argparse
import random
import ipdb
from torch import nn
import json

from ltsm.data_provider.data_loader import HF_Dataset
from ltsm.common.base_training_pipeline import BaseTrainingPipeline

from sklearn.metrics import precision_score, recall_score, f1_score

from transformers import (
    Trainer,
    TrainingArguments,
    PretrainedConfig
)

[docs] def compute_loss(model, inputs, return_outputs=False): """ Computes the loss for model training. Args: model (torch.nn.Module): The model used for predictions. inputs (dict): Input data and labels. return_outputs (bool): If True, returns both loss and model outputs. Returns: torch.Tensor or tuple: The computed loss, and optionally the outputs. """ outputs = model(inputs["input_data"]) # output should be B, L, M labels = inputs["labels"] #print(outputs.shape, labels.shape) #B, L, M, _ = outputs.shape loss = nn.functional.cross_entropy(outputs, labels) #loss = nn.functional.cross_entropy(outputs.reshape(B*L,-1), inputs["labels"][:,1:].long().reshape(B*L)) return (loss, outputs) if return_outputs else loss
[docs] def compute_metrics(p): preds = p.predictions[0] if isinstance(p.predictions, tuple) else p.predictions print(preds.shape, p.label_ids.shape) preds = np.squeeze(preds) if preds.shape != p.label_ids.shape: label_ids = np.squeeze(p.label_ids) else: label_ids = p.label_ids print(preds.shape, label_ids.shape) preds_class = (preds > 0.5).astype(int) return { "precision": precision_score(label_ids, preds_class, average="micro"), "recall": recall_score(label_ids, preds_class, average="micro"), "f1": f1_score(label_ids, preds_class, average="micro") }
[docs] class AnomalyTrainingPipeline(BaseTrainingPipeline): """ A pipeline for managing the training and evaluation process of a machine learning model. Attributes: args (argparse.Namespace): Arguments containing training configuration and hyperparameters. model_manager (ModelManager): An instance responsible for creating, managing, and optimizing the model. """ def __init__(self, config: PretrainedConfig, **kwargs): """ Initializes the TrainingPipeline with given arguments and a model manager. Args: args (argparse.Namespace): Contains training settings such as output directory, batch size, learning rate, and other hyperparameters. """ # TODO: Replace PretrainedConfig with TrainingConfig super().__init__(config, compute_loss=compute_loss, compute_metrics=compute_metrics, **kwargs) # Training settings self.training_args = TrainingArguments( output_dir=self.config.output_dir, per_device_train_batch_size=config.batch_size, per_device_eval_batch_size=config.batch_size, evaluation_strategy="steps", num_train_epochs=config.train_epochs, fp16=False, save_steps=100, eval_steps=25, logging_steps=5, learning_rate=config.learning_rate, gradient_accumulation_steps=config.gradient_accumulation_steps, save_total_limit=10, remove_unused_columns=False, push_to_hub=False, load_best_model_at_end=True, )
[docs] def run(self): """ Runs the training and evaluation process for the model. The process includes: - Logging configuration and training arguments. - Creating a model with the model manager. - Setting up training and evaluation parameters. - Loading and formatting training and evaluation datasets. - Training the model and saving metrics and state. - Evaluating the model on test datasets and logging metrics. """ self.log_info(self.config.to_dict()) train_dataset, eval_dataset, test_datasets, _ = self.get_datasets() train_dataset, eval_dataset= HF_Dataset(train_dataset), HF_Dataset(eval_dataset) trainer = Trainer( model=self.model, args=self.training_args, data_collator=self.collate_fn, compute_metrics=self.compute_metrics, train_dataset=train_dataset, eval_dataset=eval_dataset, tokenizer=None, optimizers=(self.optimizer, self.scheduler), ) # Overload the trainer API if not self.args.eval: trainer.compute_loss = self.compute_loss trainer.prediction_step = self.prediction_step train_results = trainer.train() trainer.save_model() trainer.log_metrics("train", train_results.metrics) trainer.save_metrics("train", train_results.metrics) trainer.save_state() # Testing settings for test_dataset in test_datasets: trainer.compute_loss = self.compute_loss trainer.prediction_step = self.prediction_step test_dataset = HF_Dataset(test_dataset) metrics = trainer.evaluate(test_dataset) trainer.log_metrics("Test", metrics) trainer.save_metrics("Test", metrics)
[docs] def anomaly_get_args(): parser = argparse.ArgumentParser(description='LTSM') parser.add_argument('--config_path', type=str, required=True, help='config path') args, unknown = parser.parse_known_args() config_path = args.config_path with open(config_path, 'r') as f: config_dict = json.load(f) args = argparse.Namespace(**config_dict) if args.pred_len is None: # self.log_info(f"Anomaly Mode, Set pred_len to seq_len") args.pred_len = args.seq_len if 'output_dir_template' in config_dict: args.output_dir = config_dict['output_dir_template'].format( learning_rate=args.learning_rate, downsample_rate=args.downsample_rate, freeze=args.freeze, train_epochs=args.train_epochs, pred_len=args.pred_len ) # self.log_info(f"Output Dir: {args.output_dir}") config = PretrainedConfig.from_dict(vars(args)) if hasattr(args, "config") and args.config: config.load(args.config) return config
[docs] def anomaly_seed_all(fixed_seed): random.seed(fixed_seed) torch.manual_seed(fixed_seed) np.random.seed(fixed_seed)