Source code for ltsm.data_reader.monash_reader
import numpy as np
import pandas as pd
from distutils.util import strtobool
from datetime import datetime
from ltsm.common.base_reader import BaseReader
[docs]
class MonashReader(BaseReader):
module_id = "monash"
def __init__(self, data_path):
super().__init__()
self.data_path = data_path
[docs]
def fetch(self):
# input: path
# output: DataFrame
df, frequency, forecast_horizon, contain_missing_values, contain_equal_length = self._tsf_to_dataframe(self.data_path)
def dropna(x):
return x[~np.isnan(x)]
timeseries = [dropna(ts).astype(np.float32) for ts in df.series_value]
return timeseries
def _tsf_to_dataframe(self, data_path: str,
replace_missing_vals_with="NaN",
value_column_name="series_value"):
col_names = []
col_types = []
all_data = {}
line_count = 0
frequency = None
forecast_horizon = None
contain_missing_values = None
contain_equal_length = None
found_data_tag = False
found_data_section = False
started_reading_data_section = False
with open(data_path, "r", encoding="cp1252") as file:
for line in file:
# Strip white space from start/end of line
line = line.strip()
if line:
if line.startswith("@"): # Read meta-data
if not line.startswith("@data"):
line_content = line.split(" ")
if line.startswith("@attribute"):
if (
len(line_content) != 3
): # Attributes have both name and type
raise Exception("Invalid meta-data specification.")
col_names.append(line_content[1])
col_types.append(line_content[2])
else:
if (
len(line_content) != 2
): # Other meta-data have only values
raise Exception("Invalid meta-data specification.")
if line.startswith("@frequency"):
frequency = line_content[1]
elif line.startswith("@horizon"):
forecast_horizon = int(line_content[1])
elif line.startswith("@missing"):
contain_missing_values = bool(
strtobool(line_content[1])
)
elif line.startswith("@equallength"):
contain_equal_length = bool(strtobool(line_content[1]))
else:
if len(col_names) == 0:
raise Exception(
"Missing attribute section. Attribute section must come before data."
)
found_data_tag = True
elif not line.startswith("#"):
if len(col_names) == 0:
raise Exception(
"Missing attribute section. Attribute section must come before data."
)
elif not found_data_tag:
raise Exception("Missing @data tag.")
else:
if not started_reading_data_section:
started_reading_data_section = True
found_data_section = True
all_series = []
for col in col_names:
all_data[col] = []
full_info = line.split(":")
if len(full_info) != (len(col_names) + 1):
raise Exception("Missing attributes/values in series.")
series = full_info[len(full_info) - 1]
series = series.split(",")
if len(series) == 0:
raise Exception(
"A given series should contains a set of comma separated numeric values. At least one numeric value should be there in a series. Missing values should be indicated with ? symbol"
)
numeric_series = []
for val in series:
if val == "?":
numeric_series.append(replace_missing_vals_with)
else:
numeric_series.append(float(val))
if numeric_series.count(replace_missing_vals_with) == len(
numeric_series
):
raise Exception(
"All series values are missing. A given series should contains a set of comma separated numeric values. At least one numeric value should be there in a series."
)
all_series.append(pd.Series(numeric_series).array)
for i in range(len(col_names)):
att_val = None
if col_types[i] == "numeric":
att_val = int(full_info[i])
elif col_types[i] == "string":
att_val = str(full_info[i])
elif col_types[i] == "date":
att_val = datetime.strptime(
full_info[i], "%Y-%m-%d %H-%M-%S"
)
else:
raise Exception(
"Invalid attribute type."
) # Currently, the code supports only numeric, string and date types. Extend this as required.
if att_val is None:
raise Exception("Invalid attribute value.")
else:
all_data[col_names[i]].append(att_val)
line_count = line_count + 1
if line_count == 0:
raise Exception("Empty file.")
if len(col_names) == 0:
raise Exception("Missing attribute section.")
if not found_data_section:
raise Exception("Missing series information under data section.")
all_data[value_column_name] = all_series
loaded_data = pd.DataFrame(all_data)
return (
loaded_data,
frequency,
forecast_horizon,
contain_missing_values,
contain_equal_length,
)