import numpy as np
import torch
import argparse
import random
import json
import ipdb
from ltsm.common.base_training_pipeline import BaseTrainingPipeline, TrainingConfig
from ltsm.data_provider.data_loader import HF_Dataset, HF_Timestamp_Dataset
from ltsm.models.utils import print_trainable_parameters
from ltsm.models import model_dict
from transformers import (
Trainer,
TrainingArguments,
PretrainedConfig
)
[docs]
class StatisticalTrainingPipeline(BaseTrainingPipeline):
"""
A pipeline for managing the training and evaluation process of a machine learning model.
Attributes:
args (argparse.Namespace): Arguments containing training configuration and hyperparameters.
model_manager (ModelManager): An instance responsible for creating, managing, and optimizing the model.
"""
def __init__(self, config: TrainingConfig, **kwargs):
"""
Initializes the TrainingPipeline with given arguments and a model manager.
Args:
args (argparse.Namespace): Contains training settings such as output directory, batch size,
learning rate, and other hyperparameters.
"""
super().__init__(config, **kwargs)
self.training_args = TrainingArguments(
output_dir=config.train_params["output_dir"],
per_device_train_batch_size=config.train_params["batch_size"],
per_device_eval_batch_size=config.train_params["batch_size"],
evaluation_strategy="steps",
num_train_epochs=config.train_params["train_epochs"],
fp16=False,
save_steps=100,
eval_steps=25,
logging_steps=5,
learning_rate=config.train_params["learning_rate"],
gradient_accumulation_steps=config.train_params["gradient_accumulation_steps"],
save_total_limit=10,
remove_unused_columns=False,
push_to_hub=False,
load_best_model_at_end=True,
)
[docs]
def run(self):
"""
Runs the training and evaluation process for the model.
The process includes:
- Logging config.train_params["ration and training arguments.
- Creating a model with the model manager.
- Setting up training and evaluation parameters.
- Loading and formatting training and evaluation datasets.
- Training the model and saving metrics and state.
- Evaluating the model on test datasets and logging metrics.
"""
self.log_info(self.config.__repr__())
train_dataset, eval_dataset, test_datasets, _ = self.get_datasets()
if self.config.train_params["model"] == "Informer":
train_dataset, eval_dataset = HF_Timestamp_Dataset(train_dataset), HF_Timestamp_Dataset(eval_dataset)
else:
train_dataset, eval_dataset= HF_Dataset(train_dataset), HF_Dataset(eval_dataset)
print_trainable_parameters(self.model)
trainer = Trainer(
model=self.model,
args=self.training_args,
data_collator=self.collate_fn if self.collate_fn else BaseTrainingPipeline.default_collate_fn,
compute_metrics=self.compute_metrics if self.compute_metrics else BaseTrainingPipeline.default_compute_metrics,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
tokenizer=None,
optimizers=(self.optimizer, self.scheduler),
)
# Overload the trainer API
if not self.config.train_params["eval"]:
trainer.compute_loss = self.compute_loss if self.compute_loss else BaseTrainingPipeline.default_compute_loss
trainer.prediction_step = self.prediction_step if self.prediction_step else BaseTrainingPipeline.default_prediction_step
train_results = trainer.train()
trainer.save_model()
trainer.log_metrics("train", train_results.metrics)
trainer.save_metrics("train", train_results.metrics)
trainer.save_state()
# Testing settings
for test_dataset in test_datasets:
if self.config.train_params["model"] == "Informer":
test_ds = HF_Timestamp_Dataset(test_dataset)
else:
test_ds = HF_Dataset(test_dataset)
trainer.compute_loss = self.compute_loss if self.compute_loss else BaseTrainingPipeline.default_compute_loss
trainer.prediction_step = self.prediction_step if self.prediction_step else BaseTrainingPipeline.default_prediction_step
test_dataset = HF_Dataset(test_dataset)
metrics = trainer.evaluate(test_ds)
trainer.log_metrics("Test", metrics)
trainer.save_metrics("Test", metrics)
[docs]
def get_args() -> TrainingConfig:
parser = argparse.ArgumentParser(description='LTSM')
# Load JSON config file
parser.add_argument('--config', type=str, help='Path to JSON configuration file')
# Basic Config
parser.add_argument('--model_id', type=str, default='test_run', help='model id')
parser.add_argument('--model_name_or_path', type=str, default="gpt2-medium", help='model name')
parser.add_argument('--seed', type=int, default=2024, help='random seed')
parser.add_argument('--device', type=str, default="cuda:0")
parser.add_argument('--checkpoints', type=str, default='./checkpoints/')
# Data Settings
parser.add_argument('--data', help='dataset type')
parser.add_argument('--data_path', nargs='+', default='dataset/weather.csv', help='data files')
parser.add_argument('--test_data_path_list', nargs='+', help='test data file')
parser.add_argument('--prompt_data_path', type=str, default='./weather.csv', help='prompt data file')
parser.add_argument('--data_processing', type=str, default="standard_scaler", help='data processing method')
parser.add_argument('--train_ratio', type=float, default=0.7, help='train data ratio')
parser.add_argument('--val_ratio', type=float, default=0.1, help='validation data ratio')
parser.add_argument('--do_anomaly', type=bool, default=False, help='do anomaly detection')
# Forecasting Settings
parser.add_argument('--seq_len', type=int, default=336, help='input sequence length')
parser.add_argument('--pred_len', type=int, default=96, help='prediction sequence length')
parser.add_argument('--prompt_len', type=int, default=133, help='prompt sequence length')
# Model Settings
parser.add_argument('--lora', action="store_true", help='use lora')
parser.add_argument('--lora_dim', type=int, default=128, help='dimension of lora')
parser.add_argument('--gpt_layers', type=int, default=3, help='number of gpt layers')
parser.add_argument('--d_model', type=int, default=1024, help='dimension of model')
parser.add_argument('--n_heads', type=int, default=16, help='number of heads')
parser.add_argument('--d_ff', type=int, default=512, help='dimension of fcn')
parser.add_argument('--dropout', type=float, default=0.2, help='dropout')
parser.add_argument('--enc_in', type=int, default=1, help='encoder input size')
parser.add_argument('--dec_in', type=int, default=7, help='decoder input size')
parser.add_argument('--c_out', type=int, default=862, help='output size')
parser.add_argument('--patch_size', type=int, default=16, help='patch size')
parser.add_argument('--pretrain', type=int, default=1, help='is pretrain')
parser.add_argument('--local_pretrain', type=str, default="None", help='local pretrain weight')
parser.add_argument('--freeze', type=int, default=1, help='is model weight frozen')
parser.add_argument('--model', type=str, default='model', help='model name, , options:[LTSM, LTSM_WordPrompt, LTSM_Tokenizer, DLinear, PatchTST, Informer]')
parser.add_argument('--stride', type=int, default=8, help='stride')
parser.add_argument('--tmax', type=int, default=10, help='tmax')
parser.add_argument('--embed', type=str, default='timeF',
help='time features encoding, options:[timeF, fixed, learned]')
parser.add_argument('--activation', type=str, default='gelu', help='activation')
parser.add_argument('--output_attention', action='store_true', help='whether to output attention in ecoder')
parser.add_argument('--do_predict', action='store_true', help='whether to predict unseen future data')
parser.add_argument('--moving_avg', type=int, default=25, help='window size of moving average')
parser.add_argument('--factor', type=int, default=1, help='attn factor')
parser.add_argument('--distil', action='store_false',
help='whether to use distilling in encoder, using this argument means not using distilling',
default=True)
parser.add_argument('--e_layers', type=int, default=2, help='num of encoder layers')
parser.add_argument('--d_layers', type=int, default=1, help='num of decoder layers')
parser.add_argument('--embed_type', type=int, default=0, help='0: default 1: value embedding + temporal embedding + positional embedding 2: value embedding + temporal embedding 3: value embedding + positional embedding 4: value embedding')
parser.add_argument('--freq', type=str, default='h',
help='freq for time features encoding, options:[s:secondly, t:minutely, h:hourly, d:daily, b:business days, w:weekly, m:monthly], you can also use more detailed freq like 15min or 3h')
# Training Settings
parser.add_argument('--eval', type=int, default=0, help='evaluation')
parser.add_argument('--itr', type=int, default=1, help='experiments times')
parser.add_argument('--output_dir', type=str, default='output/ltsm_train_lr0005/', help='output directory')
parser.add_argument('--downsample_rate', type=int, default=100, help='downsample rate')
parser.add_argument('--llm_layers', type=int, default=32)
parser.add_argument('--decay_fac', type=float, default=0.75, help='decay factor')
parser.add_argument('--learning_rate', type=float, default=0.0001, help='learning rate')
parser.add_argument('--batch_size', type=int, default=512, help='batch size')
parser.add_argument('--num_workers', type=int, default=10, help='number of workers')
parser.add_argument('--train_epochs', type=int, default=1, help='number of epochs')
parser.add_argument('--lradj', type=str, default='type1', help='learning rate adjustment type')
parser.add_argument('--patience', type=int, default=3, help='early stopping patience')
parser.add_argument('--gradient_accumulation_steps', type=int, default=64, help='gradient accumulation steps')
# PatchTST
parser.add_argument('--fc_dropout', type=float, default=0.05, help='fully connected dropout')
parser.add_argument('--head_dropout', type=float, default=0.0, help='head dropout')
parser.add_argument('--patch_len', type=int, default=16, help='patch length')
parser.add_argument('--padding_patch', default='end', help='None: None; end: padding on the end')
parser.add_argument('--revin', type=int, default=1, help='RevIN; True 1 False 0')
parser.add_argument('--affine', type=int, default=0, help='RevIN-affine; True 1 False 0')
parser.add_argument('--subtract_last', type=int, default=0, help='0: subtract mean; 1: subtract last')
parser.add_argument('--decomposition', type=int, default=0, help='decomposition; True 1 False 0')
parser.add_argument('--kernel_size', type=int, default=25, help='decomposition-kernel')
parser.add_argument('--individual', type=int, default=0, help='individual head; True 1 False 0')
args, unknown = parser.parse_known_args()
# Split arguments into model and training parameters
args_dict = vars(args)
train_params_keys = TrainingConfig.train_params.keys()
train_params = {k: v for k, v in args_dict.items() if k in train_params_keys}
model_params = {k: v for k, v in args_dict.items() if k not in train_params_keys}
if hasattr(args, "config") and args.config:
with open(args.config, 'r') as f:
config_dict = json.load(f)
train_params.update(config_dict["train_params"])
model_params.update(config_dict["model_config"])
# Create the pretrained config from model parameters
if train_params["model"] in model_dict:
config_class = model_dict[train_params["model"]].config_class
config = config_class.from_dict(model_params)
else:
config = PretrainedConfig.from_dict(model_params)
train_config = TrainingConfig(config, **train_params)
return train_config
[docs]
def seed_all(fixed_seed):
random.seed(fixed_seed)
torch.manual_seed(fixed_seed)
np.random.seed(fixed_seed)