"""Pipeline for tokenizer-ltsm.
Task: Time Series Forecasting.
"""
import numpy as np
import torch
import argparse
import random
import ipdb
from torch import nn
from ltsm.data_provider.data_loader import HF_Dataset
from ltsm.data_provider.tokenizer.tokenizer_processor import TokenizerConfig, ChronosTokenizer
from ltsm.common.base_training_pipeline import BaseTrainingPipeline
from peft import get_peft_model, LoraConfig
from transformers import (
Trainer,
TrainingArguments,
PretrainedConfig
)
[docs]
class TokenizerTrainingPipeline(BaseTrainingPipeline):
"""
A pipeline for managing the training and evaluation process of a machine learning model.
Attributes:
args (argparse.Namespace): Arguments containing training configuration and hyperparameters.
model_manager (ModelManager): An instance responsible for creating, managing, and optimizing the model.
"""
def __init__(self, config: PretrainedConfig, **kwargs):
"""
Initializes the TrainingPipeline with given arguments and a model manager.
Args:
config (PretrainedConfig): Contains training settings such as output directory, batch size,
learning rate, and other hyperparameters.
kwargs: Additional keyword arguments for BaseTrainingPipeline initialization.
"""
# TODO: Replace PretrainedConfig with TrainingConfig
super().__init__(config, **kwargs)
self.tokenizer = self.create_tokenizer()
# Training settings
self.training_args = TrainingArguments(
output_dir=config.output_dir,
per_device_train_batch_size=config.batch_size,
per_device_eval_batch_size=config.batch_size,
evaluation_strategy="steps",
num_train_epochs=config.train_epochs,
fp16=False,
save_steps=100,
eval_steps=25,
logging_steps=5,
learning_rate=config.learning_rate,
gradient_accumulation_steps=config.gradient_accumulation_steps,
save_total_limit=10,
remove_unused_columns=False,
push_to_hub=False,
load_best_model_at_end=True,
)
[docs]
def create_tokenizer(self) -> ChronosTokenizer:
"""
Creates a tokenizer for the model based on the configuration settings.
The tokenizer is configured to handle input sequences, output sequences, and various
parameters related to the model's architecture and training process.
Returns:
ChronosTokenizer: An instance of the tokenizer configured for the model.
"""
context_length = self.config.seq_len+self.config.pred_len
prediction_length = self.config.pred_len
n_tokens = 1024
n_special_tokens = 2
tokenizer_config = TokenizerConfig(
tokenizer_class="MeanScaleUniformBins",
tokenizer_kwargs=dict(low_limit=-3.0, high_limit=3.0),
n_tokens=n_tokens,
n_special_tokens=n_special_tokens,
pad_token_id=0,
eos_token_id=1,
use_eos_token=0,
model_type="causal",
context_length=context_length,
prediction_length=prediction_length,
num_samples=20,
temperature=1.0,
top_k=50,
top_p=1.0,
)
tokenizer = tokenizer_config.create_tokenizer()
def compute_loss(model, inputs, return_outputs=False):
"""
Computes the loss for model training.
Args:
model (torch.nn.Module): The model used for predictions.
inputs (dict): Input data and labels.
return_outputs (bool): If True, returns both loss and model outputs.
Returns:
torch.Tensor or tuple: The computed loss, and optionally the outputs.
"""
outputs = model(inputs["input_data"])
B, L, M, _ = outputs.shape
loss = nn.functional.cross_entropy(outputs.reshape(B*L,-1), inputs["labels"][:,1:].long().reshape(B*L))
return (loss, outputs) if return_outputs else loss
@torch.no_grad()
def prediction_step(model, inputs, prediction_loss_only=False, ignore_keys=None):
input_data = inputs["input_data"].to(model.module.device)
labels = inputs["labels"].to(model.module.device)
scale = labels[:,0]
labels = labels[:,1:]
outputs = model(input_data)
indices = torch.max(outputs, dim=-1).indices
output_value = tokenizer.output_transform(indices, scale)
label_value = tokenizer.output_transform(labels.unsqueeze(-1).long(), scale)
loss = nn.functional.mse_loss(output_value, label_value)
return (loss, output_value, label_value)
self.compute_loss = compute_loss
self.prediction_step = prediction_step
return tokenizer
[docs]
def run(self):
"""
Runs the training and evaluation process for the model.
The process includes:
- Logging configuration and training arguments.
- Creating a model with the model manager.
- Setting up training and evaluation parameters.
- Loading and formatting training and evaluation datasets.
- Training the model and saving metrics and state.
- Evaluating the model on test datasets and logging metrics.
"""
self.log_info(self.config)
train_dataset, eval_dataset, test_datasets, _ = self.get_datasets()
train_dataset, eval_dataset= HF_Dataset(train_dataset), HF_Dataset(eval_dataset)
trainer = Trainer(
model=self.model,
args=self.training_args,
data_collator=self.collate_fn,
compute_metrics=self.compute_metrics,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
tokenizer=None,
optimizers=(self.optimizer, self.scheduler),
)
# Overload the trainer API
if not self.config.eval:
trainer.compute_loss = self.compute_loss
trainer.prediction_step = self.prediction_step
train_results = trainer.train()
trainer.save_model()
trainer.log_metrics("train", train_results.metrics)
trainer.save_metrics("train", train_results.metrics)
trainer.save_state()
# Testing settings
for test_dataset in test_datasets:
trainer.compute_loss = self.compute_loss
trainer.prediction_step = self.prediction_step
test_dataset = HF_Dataset(test_dataset)
metrics = trainer.evaluate(test_dataset)
trainer.log_metrics("Test", metrics)
trainer.save_metrics("Test", metrics)
[docs]
def tokenizer_get_args():
parser = argparse.ArgumentParser(description='LTSM')
# Basic Config
parser.add_argument('--model_id', type=str, default='test_run', help='model id')
parser.add_argument('--model_name_or_path', type=str, default="gpt2-medium", help='model name')
parser.add_argument('--seed', type=int, default=2024, help='random seed')
parser.add_argument('--device', type=str, default="cuda:0")
parser.add_argument('--checkpoints', type=str, default='./checkpoints/')
# Data Settings
parser.add_argument('--data_path', nargs='+', default='dataset/weather.csv', help='data files')
parser.add_argument('--test_data_path_list', nargs='+', required=True, help='test data file')
parser.add_argument('--prompt_data_path', type=str, default='./weather.csv', help='prompt data file')
parser.add_argument('--data_processing', type=str, default="standard_scaler", help='data processing method')
parser.add_argument('--train_ratio', type=float, default=0.7, help='train data ratio')
parser.add_argument('--val_ratio', type=float, default=0.1, help='validation data ratio')
parser.add_argument('--do_anomaly', type=bool, default=False, help='do anomaly detection')
# Forecasting Settings
parser.add_argument('--seq_len', type=int, default=336, help='input sequence length')
parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length')
parser.add_argument('--prompt_len', type=int, default=133, help='prompt sequence length')
# Model Settings
parser.add_argument('--lora', action="store_true", help='use lora')
parser.add_argument('--lora_dim', type=int, default=128, help='dimension of lora')
parser.add_argument('--gpt_layers', type=int, default=3, help='number of gpt layers')
parser.add_argument('--d_model', type=int, default=1024, help='dimension of model')
parser.add_argument('--n_heads', type=int, default=16, help='number of heads')
parser.add_argument('--d_ff', type=int, default=512, help='dimension of fcn')
parser.add_argument('--dropout', type=float, default=0.2, help='dropout')
parser.add_argument('--enc_in', type=int, default=1, help='encoder input size')
parser.add_argument('--c_out', type=int, default=862, help='output size')
parser.add_argument('--patch_size', type=int, default=16, help='patch size')
parser.add_argument('--pretrain', type=int, default=1, help='is pretrain')
parser.add_argument('--local_pretrain', type=str, default="None", help='local pretrain weight')
parser.add_argument('--freeze', type=int, default=1, help='is model weight frozen')
parser.add_argument('--model', type=str, default='model', help='model name, , options:[LTSM, LTSM_WordPrompt, LTSM_Tokenizer]')
parser.add_argument('--stride', type=int, default=8, help='stride')
parser.add_argument('--tmax', type=int, default=10, help='tmax')
# Training Settings
parser.add_argument('--eval', type=int, default=0, help='evaluation')
parser.add_argument('--itr', type=int, default=1, help='experiments times')
parser.add_argument('--output_dir', type=str, default='output/ltsm_train_lr0005/', help='output directory')
parser.add_argument('--downsample_rate', type=int, default=100, help='downsample rate')
parser.add_argument('--llm_layers', type=int, default=32)
parser.add_argument('--decay_fac', type=float, default=0.75, help='decay factor')
parser.add_argument('--learning_rate', type=float, default=0.0001, help='learning rate')
parser.add_argument('--batch_size', type=int, default=512, help='batch size')
parser.add_argument('--num_workers', type=int, default=10, help='number of workers')
parser.add_argument('--train_epochs', type=int, default=1, help='number of epochs')
parser.add_argument('--lradj', type=str, default='type1', help='learning rate adjustment type')
parser.add_argument('--patience', type=int, default=3, help='early stopping patience')
parser.add_argument('--gradient_accumulation_steps', type=int, default=64, help='gradient accumulation steps')
args, unknown = parser.parse_known_args()
config = PretrainedConfig.from_dict(vars(args))
if hasattr(args, "config") and args.config:
config.load(args.config)
return config
[docs]
def tokenizer_seed_all(fixed_seed):
random.seed(fixed_seed)
torch.manual_seed(fixed_seed)
np.random.seed(fixed_seed)