Source code for ltsm.data_provider.prompt_generator

import ipdb
import pandas as pd
import numpy as np
from pandas import read_csv, read_feather
import sys, os
import torch
from sklearn.preprocessing import StandardScaler

# Add the path to `tsfel` dynamically
tsfel_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../prompt_reader/stat_prompt"))
sys.path.append(tsfel_path)
import tsfel

[docs] def prompt_prune(pt): pt_dict = pt.to_dict() pt_keys = list(pt_dict.keys()) for key in pt_keys: if type(key) == type("abc") and key.startswith("0_FFT mean coefficient"): del pt[key] return pt
[docs] def prompt_generation_single(ts): """Generate prompt data for the input time-series data Args: ts (pd.Series): input time-series data """ cfg = tsfel.get_features_by_domain() prompt = tsfel.time_series_features_extractor(cfg, ts) prompt = prompt_prune(prompt) return prompt
[docs] def prompt_generation(ts, ts_name): """Generate prompt data for the input time-series data Args: ts (pd.DataFrame): input time-series data ts_name (str): name of the time-series data """ if ts.shape[1] == 1: return None else: column_name = [name.replace("/", "-") for name in list(ts.columns)] # column_name_map = {} # column_name = [] # for i, name in enumerate(ts.columns): # if not name.isnumeric(): # new_name = str(i) # else: # new_name = name # column_name.append(new_name) # column_name_map[name] = new_name prompt_buf_train = pd.DataFrame(np.zeros((133, ts.shape[1])), columns=column_name) prompt_buf_val = pd.DataFrame(np.zeros((133, ts.shape[1])), columns=column_name) prompt_buf_test = pd.DataFrame(np.zeros((133, ts.shape[1])), columns=column_name) for index, col in ts.T.iterrows(): if "ETT" in ts_name: ts_len = len(ts) t1, t2 = int(0.6*ts_len), int(0.6*ts_len) + int(0.2*ts_len) ts_train, ts_val, ts_test = col[:t1], col[t1:t2].reset_index(drop=True), col[t2:].reset_index(drop=True) else: ts_len = len(ts) t1, t2 = int(0.7 * ts_len), int(0.7 * ts_len) + int(0.1 * ts_len) ts_train, ts_val, ts_test = col[:t1], col[t1:t2].reset_index(drop=True), col[t2:].reset_index(drop=True) prompt_train = prompt_generation_single(ts_train) prompt_val = prompt_generation_single(ts_val) prompt_test = prompt_generation_single(ts_test) prompt_buf_train[index.replace("/", "-")] = prompt_train.T.values prompt_buf_val[index.replace("/", "-")] = prompt_val.T.values prompt_buf_test[index.replace("/", "-")] = prompt_test.T.values # new_index = column_name_map[index] # prompt_buf_train[new_index] = prompt_train.T.values # prompt_buf_val[new_index] = prompt_val.T.values # prompt_buf_test[new_index] = prompt_test.T.values prompt_buf_total = {"train": prompt_buf_train, "val": prompt_buf_val, "test": prompt_buf_test} print(prompt_buf_total) return prompt_buf_total
[docs] def prompt_save(prompt_buf, output_path, data_name, save_format="pth.tar", ifTest=False): """save prompts to three different files in the output path Args: prompt_buf (dict): dictionary containing prompts for train, val, and test splits output_path (str): path to save the prompt data data_name (str): name of the dataset save_format (str): format to save the prompt data ifTest (bool): if True, test if the saved prompt data is loaded back. Can be used during generating data. """ if prompt_buf["train"].shape[1] == 1: # ipdb.set_trace() return None # prompt_train_fname = os.path.join(prompt_train_data_dir, data_name + "_prompt.pth.tar") # prompt_train = prompt_buf["train"] # print("Export", prompt_train_fname, prompt_train.shape) # # prompt_val_fname = os.path.join(prompt_val_data_dir, data_name + "_prompt.pth.tar") # prompt_val = prompt_buf["val"] # torch.save(prompt_val, prompt_val_fname) # print("Export", prompt_val_fname, prompt_val.shape) # # prompt_test_fname = os.path.join(prompt_test_data_dir, data_name + "_prompt.pth.tar") # prompt_test = prompt_buf["test"] # torch.save(prompt_test, prompt_test_fname) # print("Export", prompt_test_fname, prompt_test.shape) else: for split in ["train", "val", "test"]: split_dir = os.path.join(output_path, split) for index, col in prompt_buf[split].T.iterrows(): file_name = f"{data_name}_{index}_prompt.{save_format}" file_path = os.path.join(split_dir, file_name) # print("split_dir", split_dir) # print("file_name", file_name) # print("file_path", file_path) prompt_data = col prompt_data.columns = [index] prompt_data = prompt_data.T print("Type of prompt data", type(prompt_data), "Shape of prompt data", prompt_data.shape) if save_format == "pth.tar": torch.save(prompt_data, file_path) elif save_format == "csv": prompt_data.to_csv(file_path, index=False) # use csv may result in some loss of precision elif save_format == "npz": np.savez(file_path, data=prompt_data.values, index=prompt_data.index, name=prompt_data.name) else: raise ValueError(f"Unsupported save format: {save_format}") if ifTest: if save_format == "pth.tar": load_data = torch.load(file_path) elif save_format == "csv": load_data = pd.read_csv(file_path) if isinstance(load_data, pd.DataFrame): load_data = load_data.squeeze() elif save_format == "npz": loaded = np.load(file_path) load_data = pd.Series(data=loaded["data"], index=loaded["index"], name=loaded["name"].item()) if isinstance(load_data, pd.DataFrame): load_data = load_data.squeeze() assert type(load_data) == type(prompt_data), f"Type mismatch: {type(load_data)} vs {type(prompt_data)}" # type should be pd.Series assert load_data.shape == prompt_data.shape, f"Shape mismatch: {load_data.shape} vs {prompt_data.shape}" assert load_data.index.equals(prompt_data.index), "Index mismatch" assert load_data.name == prompt_data.name, f"Series names mismatch: {load_data.name} vs {prompt_data.name}" assert np.allclose(load_data.values, prompt_data.values, rtol=1e-8, atol=1e-8), "Data values mismatch" if save_format != "csv": assert load_data.equals(prompt_data), f"Data mismatch: {load_data} vs {prompt_data}" print("All tests passed for", file_path) print("Export", file_path, prompt_data.shape)
[docs] def data_import(path, root_path, format="feather", anomaly=False): if format == "feather": data = read_feather(path) data_name = path.replace(root_path, "").replace(".feather", "") data_dir = data_name[0:data_name.rfind("/")] # ipdb.set_trace() data = data.value else: data = read_csv(path) data_name = path.replace(root_path, "").replace(".csv", "") data_dir = data_name[0:data_name.rfind("/")] if "date" in data.columns: data = data.drop("date", axis=1) if "anomaly" in data.columns: data = data.drop("anomaly", axis=1) print("Drop anomaly column") return data, data_name, data_dir
[docs] def create_data_dir(dir_name): if not os.path.exists(dir_name): os.mkdir(dir_name)
[docs] def prompt_generate_split(root_path: str, output_path: str, save_format:str, dataset_name: str = None, ifTest=False) -> None: """Generate prompt data for the input time-series data Args: root_path (str): path to the dataset output_path (str): path to save the prompt data save_format (str): format to save the prompt data dataset_name (str): name of the dataset ifTest (bool): if True, test if the saved prompt data is loaded back. Can be used during generating data. """ if not dataset_name: dataset_name = [name for name in os.listdir(root_path) if os.path.isdir(os.path.join(root_path, name))] if len(dataset_name) == 0: print("No dataset found in the root path.") sys.exit(0) dataset_fullname = [os.path.join(root_path, name) for name in dataset_name] data_path_buf = [] for dataset_dir in dataset_fullname: for root, dirs, files in os.walk(dataset_dir): for file_name in files: if file_name.endswith(".csv"): file_path = os.path.join(root, file_name) data_path_buf.append(file_path) print(data_path_buf) create_data_dir(output_path) # ipdb.set_trace() for path_idx, path in enumerate(data_path_buf): # print(path) data, data_name, data_dir = data_import(path, root_path, "csv") print("*****************Data Name: ", data_name) # print("Data Shape:", data.shape) if data.shape[0] < 20: print(path, "Skip too short time-series data.", data.shape) continue else: print("Import", path, "data shape", data.shape) create_data_dir(os.path.join(output_path, "train")) create_data_dir(os.path.join(output_path, "val")) create_data_dir(os.path.join(output_path, "test")) create_data_dir(os.path.join(output_path, "train", data_dir)) create_data_dir(os.path.join(output_path, "val", data_dir)) create_data_dir(os.path.join(output_path, "test", data_dir)) print(data) prompt_data_buf = prompt_generation(data, data_name) if prompt_data_buf is not None: prompt_save(prompt_data_buf, output_path, data_name, save_format, ifTest)
[docs] def load_data(data_path, save_format): """Load the prompt data in different format from the input path. The data should be pd.Series. Args: data_path: str, the input path save_format: str, the format of the data saved """ if save_format == "pth.tar": prompt_data = torch.load(data_path) elif save_format == "csv": prompt_data = pd.read_csv(data_path) if isinstance(prompt_data, pd.DataFrame): prompt_data = prompt_data.squeeze() elif save_format == "npz": loaded = np.load(data_path) prompt_data = pd.Series(data=loaded["data"], index=loaded["index"], name=loaded["name"].item()) if isinstance(prompt_data, pd.DataFrame): prompt_data = prompt_data.squeeze() return prompt_data
[docs] def save_data(data, data_path, save_format): """Save the final prompt data to the output path Args: data: pd.DataFrame, the final prompt data data_path: str, the output path save_format: str, the format to save the data """ if save_format == "pth.tar": torch.save(data, data_path) elif save_format == "csv": data.to_csv(data_path, index=False) elif save_format == "npz": np.savez(data_path, data=data.values, index=data.index, columns=data.columns)
[docs] def mean_std_export_ds(root_path, output_path, data_path_buf, normalize_param_fname, save_format="pth.tar"): """Export the mean and std of the prompt data to the output path Args: root_path: str, the root path of the input output_path: str, the output path data_path_buf: list, the list of the input path normalize_param_fname: str, the output path save_format: str, the format of the saved data """ prompt_data_buf = [] output_dir_buf = [] output_path_buf = [] for index, dataset_path in enumerate(data_path_buf): prompt_data = load_data(dataset_path, save_format) prompt_data = prompt_prune(prompt_data) prompt_data_buf.append(prompt_data) data_name = dataset_path.replace(root_path, "").replace(".csv", "") data_dir = data_name[0:data_name.rfind("/")] prompt_dir = os.path.join(output_path, data_dir) prompt_fname = os.path.join(output_path, data_name) # print(prompt_fname) output_dir_buf.append(prompt_dir) output_path_buf.append(prompt_fname) print("Import from {}".format(dataset_path), prompt_data.shape, type(prompt_data)) # ipdb.set_trace() prompt_data_all = pd.concat(prompt_data_buf, axis=1).T print(prompt_data_all) scaler = StandardScaler() scaler.fit(prompt_data_all) sc_mean = pd.DataFrame(scaler.mean_.reshape(1,-1), columns=prompt_data_all.keys()) sc_scale = pd.DataFrame(scaler.scale_.reshape(1,-1), columns=prompt_data_all.keys()) print({"mean": sc_mean, "scale": sc_scale}) print("Save the mean and std to {}".format(normalize_param_fname)) torch.save({"mean": sc_mean, "scale": sc_scale}, normalize_param_fname)
[docs] def standardscale_export(data_path_buf, params_fname, output_path, root_path, save_format="pth.tar"): """Export the standardized prompt data to the output path Args: data_path_buf: list, the list of the input path params_fname: str, the output path of the mean and std output_path: str, the output path of the standardized prompt data root_path: str, the root path of the input""" params = torch.load(params_fname) print("Load from {}".format(params_fname), type(params)) print(type(params["mean"]), type(params["scale"])) mean, std = params["mean"], params["scale"] scaler = StandardScaler() scaler.mean_ = mean scaler.scale_ = std # ipdb.set_trace() for index, dataset_path in enumerate(data_path_buf): prompt_data_raw = load_data(dataset_path, save_format) prompt_data_raw = prompt_prune(prompt_data_raw) prompt_data = scaler.transform(prompt_data_raw.values.reshape(1, -1)) prompt_data_array = prompt_data # print(prompt_data) prompt_data_array[np.isnan(prompt_data_array)] = 0 prompt_data_transform = pd.DataFrame(prompt_data_array, columns=prompt_data.keys()) # ipdb.set_trace() prompt_fname = dataset_path.replace(root_path, output_path) prompt_dir = prompt_fname[0:prompt_fname.rfind("/")] if not os.path.exists(prompt_dir): os.makedirs(prompt_dir) # prompt_data_tramsform: pd.DataFrame,(1,133), column is RandeIndex # torch.save(prompt_data_transform, prompt_fname) save_data(prompt_data_transform, prompt_fname, save_format) print("Save to {}".format(prompt_fname)) del prompt_data
[docs] def prompt_normalization_split(mode: str, save_format: str, root_path_train: str, output_path_train: str, root_path_val: str, output_path_val: str, root_path_test: str, output_path_test: str, dataset_root_path: str, dataset_name: str = None) -> None: """Normalize the prompt data for the input time-series data Args: mode (str): mode to run, "fit" or "transform" save_format (str): format to save the prompt data root_path_train (str): path to the train dataset output_path_train (str): path to save the train prompt data root_path_val (str): path to the val dataset output_path_val (str): path to save the val prompt data root_path_test (str): path to the test dataset output_path_test (str): path to save the test prompt data dataset_root_path (str): path to the dataset root dataset_name (str): name of the dataset """ ds_size = 50 if not dataset_name: dataset_name = [name for name in os.listdir(dataset_root_path) if os.path.isdir(os.path.join(dataset_root_path, name))] # since the params is a mid-state file, I didn't extend the file_format to the params file. data_path_buf = { "train": {"root_path": root_path_train, "output_path": output_path_train, "normalize_param_fname": os.path.join(output_path_train, f"normalization_params.pth.tar")}, "val": {"root_path": root_path_val, "output_path": output_path_val, "normalize_param_fname": os.path.join(output_path_val, f"normalization_params.pth.tar")}, "test": {"root_path": root_path_test, "output_path": output_path_test, "normalize_param_fname": os.path.join(output_path_test, f"normalization_params.pth.tar")}, } for split_name, data_path in data_path_buf.items(): root_path = data_path_buf[split_name]["root_path"] output_path = data_path_buf[split_name]["output_path"] normalize_param_fname = data_path_buf[split_name]["normalize_param_fname"] create_data_dir(output_path) dataset_fullname = [os.path.join(root_path, name) for name in dataset_name] data_path_buf_tmp = [] if mode == "fit": for dataset_dir in dataset_fullname: paths = os.listdir(dataset_dir) new_dataset = [os.path.join(dataset_dir, path) for path in paths] sample_idx = np.random.permutation(len(new_dataset))[:ds_size].astype(np.int64) # ipdb.set_trace() new_dataset = np.array(new_dataset)[sample_idx].tolist() data_path_buf_tmp.extend(new_dataset) else: for dataset_dir in dataset_fullname: paths = os.listdir(dataset_dir) new_dataset = [os.path.join(dataset_dir, path) for path in paths] data_path_buf_tmp.extend(new_dataset) if mode == "fit": mean_std_export_ds(root_path, output_path, data_path_buf_tmp, normalize_param_fname, save_format) else: # ipdb.set_trace() standardscale_export(data_path_buf_tmp, normalize_param_fname, output_path, root_path, save_format)