Source code for ltsm.data_provider.data_factory

import os
import torch
import numpy as np
import pandas as pd
from ltsm.data_reader import reader_dict
from ltsm.data_provider.data_splitter import SplitterByTimestamp
from ltsm.data_provider.tokenizer import processor_dict
from ltsm.data_provider.dataset import TSDataset,  TSPromptDataset, TSTokenDataset

from typing import Tuple, List, Union, Dict
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
)

[docs] class DatasetFactory: """ A factory class for time-series datasets. """ def __init__( self, data_paths: List[str], prompt_data_path: str, data_processing: str, seq_len: int, pred_len: int, train_ratio: float, val_ratio: float, model: str= None, scale_on_train: bool = False, downsample_rate: int = 10, split_test_sets: bool = True, do_anomaly: bool = False ): """ Initializes the DatasetFactory with the given arguments. Args: data_paths (List[str]): A list of file paths where the source data is stored. prompt_data_path (str): The file path to the prompt data folder. data_processing (str): The module ID of the processor in processor_dict. seq_len (int): The number of timesteps used in the input sequence. pred_len (int): The number of timesteps the model should predict for the output sequence. train_ratio (float): The training set ratio. val_ratio (float): The validation set ratio. model (str): The model name. Options includes 'LTSM', 'LTSM_WordPrompt', and 'LTSM_Tokenizer'. scale_on_train (bool): Indicates whether the datasets should be scaled based on the training data. downsample_rate (int): The downsampling rate for training and validation datasets. split_test_sets (bool): Indicates whether the test sets should be saved separately by data_path. """ self.data_paths = data_paths self.prompt_data_path = prompt_data_path self.model = model self.seq_len = seq_len self.pred_len = pred_len self.scale_on_train = scale_on_train self.downsample_rate = downsample_rate self.split_test_sets = split_test_sets self.do_anomaly = do_anomaly # Initialize dataset splitter self.splitter = SplitterByTimestamp( seq_len, pred_len, train_ratio, val_ratio ) # Initialize the data preprocessor self.processor = processor_dict[data_processing]()
[docs] def fetch(self, data_path: str)->pd.DataFrame: """ Retrieves data from the filesystem at the specified data_path. Selects the appropriate BaseReader implementation based on the file's extension or location. Args: data_path (str): The file path to the source data. Returns: pd.DataFrame: A Pandas DataFrame containing the data at data_path. """ # If data path is in monash directory, use monash reader dir_name = os.path.split(os.path.dirname(data_path))[-1] if dir_name == 'monash': return reader_dict[dir_name](data_path).fetch() # Get file extension ext = os.path.splitext(data_path)[-1] return reader_dict[ext[1:]](data_path).fetch()
def __get_prompt(self, prompt_data_path:str, data_name: str, idx_file_name: str) -> List[np.float64]: """ Private helper function to load prompt data files. Args: prompt_data_path (str): The path to the directory where the prompt data files are stored. data_name (str): The name of the data source. idx_file_name (str): The row label corresponding to the data the prompt file was generated from. Returns: List[np.float64]: The raw prompt data. """ # Prompt file name replaces '/' in row labels with '-' idx_file_name = idx_file_name.replace("/", "-") # Certain characters cannot be used in file names # idx_file_name = idx_file_name.replace("**", "_") # idx_file_name = idx_file_name.replace("%", "_") if os.path.split(os.path.dirname(data_name))[-1] == "monash": # Monash prompt_name = data_name.split("/")[-1] prompt_name = prompt_name.replace(".tsf", "") prompt_path = os.path.join(prompt_data_path, prompt_name, "T"+str(int(idx_file_name)+1)+"_prompt") else: # CSV and other prompt_name = data_name.split('/')[-2]+'/'+data_name.split('/')[-1].split('.')[0] prompt_path = os.path.join(prompt_data_path,prompt_name+'_'+str(idx_file_name)+"_prompt") try: # Check for the existence of the prompt file in different formats if os.path.exists(prompt_path + '.csv'): prompt_path += '.csv' print(f"Prompt file {prompt_path} exists") prompt_data = pd.read_csv(prompt_path) prompt_data.columns = prompt_data.columns.astype(int) elif os.path.exists(prompt_path + '.pth.tar'): prompt_path += '.pth.tar' prompt_data = torch.load(prompt_path) elif os.path.exists(prompt_path + '.npz'): prompt_path += '.npz' loaded_data = np.load(prompt_path) prompt_data = pd.DataFrame(loaded_data['data']) # this should match the key saved in prompt_generate_split.py else: logging.error(f"Prompt file {prompt_path} does not exist in any supported format") return [] # after load the data, it should be (1, 133). 133 is decided in prompt_generate_split.py prompt_data = prompt_data.T[0] # should be (133,) prompt_data = [ prompt_data.iloc[i] for i in range(len(prompt_data)) ] except Exception as e: logging.error(e) return [] return prompt_data
[docs] def loadPrompts(self, data_path: str, prompt_data_path:str, buff: List[Union[int, str]])->Tuple[Dict[Union[int, str], List[np.float64]], List[Union[int, str]]]: """ Loads the prompt data from prompt_data_path. Args: data_path (str): The file path to the source data. prompt_data_path (str): The file path to the directory where the prompt data files are stored. buff (List[Union[int, str]]): The list of row labels of the data. Returns: Dict[Union[int, str], List[np.float64]]: A dictionary of data indices and the prompt data corresponding to each index. List[Union[int, str]]: A list of indices with missing prompt data """ prompt_data = [] missing = [] if "WordPrompt" in self.model: # Load index of every data class for each instance, as prompt data will be different for different datasets for _ in buff: prompt_data.append([self.data_paths.index(data_path)]) else: for instance_idx in buff: if instance_idx == 'anomaly': continue # anomaly is the label for anomaly data, no prompt data is available instance_prompt = self.__get_prompt( prompt_data_path, data_path, str(instance_idx) ) # If no prompt is loaded for this instance, save instance_idx to missing list if len(instance_prompt) == 0: missing.append(instance_idx) logging.info(f"Prompt data for index {instance_idx} of data path {data_path} cannot be read. Skipping this row.") prompt_data.append(instance_prompt) return prompt_data, missing
[docs] def createTorchDS(self, data: List[np.ndarray], prompt_data: List[List[np.float64]], downsample_rate: int, do_anomaly:bool)->TSDataset: """ Creates a pyTorch Dataset from a list of sequences and a list of their corresponding prompts. Args: data (List[np.ndarray]): A list of sequences. prompt_data (List[List[np.float64]]): A list of prompts. downsample_rate: The downsampling rate. Returns: TSDataset: A time-series dataset. """ if len(data) == 0 or len(prompt_data) == 0: return None if "Tokenizer" in self.model: return TSTokenDataset( data=data, prompt=prompt_data, seq_len=self.seq_len, pred_len=self.pred_len, downsample_rate=downsample_rate, do_anomaly=do_anomaly ) else: return TSPromptDataset( data=data, prompt=prompt_data, seq_len=self.seq_len, pred_len=self.pred_len, downsample_rate=downsample_rate, do_anomaly=do_anomaly )
[docs] def getDatasets(self)->Tuple[TSDataset, TSDataset, List[TSDataset]]: """ Loads, splits, and sclaes the time-series data. Loads the prompts and creates TSDatasets for training, validation, and testing. Returns: Tuple[TSDataset, TSDataset, List[TSDataset]]: A tuple consisting of the time-series datasets for training, validation, and testing. The training and validation datasets combine all data sources and sequences into a single dataset, respectively. Test data is kept separate and are returned as a list of time-series datasets where each dataset corresponds to one of the data sources. """ train_data, val_data, test_data, train_prompt_data, val_prompt_data, test_prompt_data = [], [], [], [], [], [] test_ds_list = [] for data_path in self.data_paths: # Step 0: Read data, the output is a list of 1-d time-series df_data = self.fetch(data_path) # Step 1: Get train, val, and test splits sub_train_data, sub_val_data, sub_test_data, buff = self.splitter.get_csv_splits(df_data, self.do_anomaly) # Step 2: Scale the datasets. We fit on the whole sequence by default. # To fit on the train sequence only, set scale_on_train=True sub_train_data, sub_val_data, sub_test_data = self.processor.process( raw_data=df_data.to_numpy(), train_data=sub_train_data, val_data=sub_val_data, test_data=sub_test_data, fit_train_only=self.scale_on_train, do_anomaly=self.do_anomaly ) #print(type(sub_train_data), type(sub_val_data), type(sub_test_data)) logging.info(f"Data {data_path} has been split into train, val, test sets with the following shapes: {sub_train_data[0].shape}, {sub_val_data[0].shape}, {sub_test_data[0].shape}") # Step 2.5: Load prompt for each instance missing = set() # Train Prompt train_prompt_data_path = self.prompt_data_path + '/train' train_prompts, tmp = self.loadPrompts(data_path, train_prompt_data_path, buff) missing.update(set(tmp)) # Validation Prompt val_prompt_data_path = self.prompt_data_path + '/val' val_prompts, tmp = self.loadPrompts(data_path, val_prompt_data_path, buff) missing.update(set(tmp)) # Test Prompt test_prompt_data_path = self.prompt_data_path + '/train' sub_test_prompt_data, tmp = self.loadPrompts(data_path, test_prompt_data_path, buff) missing.update(set(tmp)) # Remove rows from data that have missing prompt data sub_train_data = [data for data, instance_idx in zip(sub_train_data, buff) if instance_idx not in missing] sub_val_data = [data for data, instance_idx in zip(sub_val_data, buff) if instance_idx not in missing] sub_test_data = [data for data, instance_idx in zip(sub_test_data, buff) if instance_idx not in missing] # Only maintain instances that are able to load prompt data for training, validation, and testing train_prompts = [data for data, instance_idx in zip(train_prompts, buff) if instance_idx not in missing] val_prompts = [data for data, instance_idx in zip(val_prompts, buff) if instance_idx not in missing] sub_test_prompt_data = [data for data, instance_idx in zip(sub_test_prompt_data, buff) if instance_idx not in missing] if self.do_anomaly: label_train = sub_train_data[-1] label_val = sub_val_data[-1] label_test = sub_test_data[-1] sub_train_data = [[(x,y) for x,y in zip(data, label_train)] for data in sub_train_data[:-1]] sub_val_data = [[(x,y) for x,y in zip(data, label_val)] for data in sub_val_data[:-1]] sub_test_data = [[(x,y) for x,y in zip(data, label_test)] for data in sub_test_data[:-1]] train_prompt_data.extend(train_prompts) val_prompt_data.extend(val_prompts) train_data.extend(sub_train_data) val_data.extend(sub_val_data) if self.split_test_sets: # Create a Torch dataset for each sub test dataset test_ds_list.append(self.createTorchDS(sub_test_data, sub_test_prompt_data, 1, self.do_anomaly)) else: test_data.extend(sub_test_data) test_prompt_data.extend(sub_test_prompt_data) # Step 3: Create Torch datasets (samplers) train_ds = self.createTorchDS(train_data, train_prompt_data, self.downsample_rate, self.do_anomaly) if os.path.split(os.path.dirname(self.data_paths[0]))[-1] == "monash": val_ds = self.createTorchDS(val_data, val_prompt_data, 54, self.do_anomaly) else: val_ds = self.createTorchDS(val_data, val_prompt_data, self.downsample_rate, self.do_anomaly) if not self.split_test_sets: test_ds_list.append(self.createTorchDS(test_data, test_prompt_data, 1, self.do_anomaly)) return train_ds, val_ds, test_ds_list