Source code for ltsm.data_reader.csv_reader
import os
import pandas as pd
import logging
from pathlib import Path
from ltsm.common.base_reader import BaseReader
from typing import Any
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
)
[docs]
class CSVReader(BaseReader):
"""
Represents a CSV Reader object for processing time-series data.
This class reads .csv files, fills missing values using linear interpolation, and drops any invalid columns.
It assumes that the .csv file's columns represent time-series data, while each row corresponds to a data
instance or feature.
Attributes:
module_id (str): The identifier for the base reader objects.
data_path (str): The file path where the .csv file is located.
"""
module_id = "csv"
def __init__(self, data_path: str):
"""
Initializes the CSVReader class.
Args:
data_path (str): The file path where the .csv file is located.
"""
super().__init__()
self.data_path = data_path
[docs]
def fetch(self) -> pd.DataFrame:
"""
Fetches data in the .csv file specified at data_path.
Reads the .csv file, fills missing values, and drops invalid columns.
Returns:
pd.DataFrame: The data from the .csv as a DataFrame.
"""
# Check if file exists
if not Path(self.data_path).is_file():
raise FileNotFoundError(f"File not found at the specified path: {self.data_path}")
# Read data, extract columns, toss non-datetime columns
try:
loaded_data = pd.read_csv(self.data_path, header=None)
# loaded_data = loaded_data.drop(index=0)
loaded_data.columns = loaded_data.iloc[0]
loaded_data = loaded_data[1:]
loaded_data.reset_index(drop=True, inplace=True)
# Transpose the data if each time-series sequence is saved in the columns
if loaded_data.shape[1] < loaded_data.shape[0]:
# Assuming if there are more rows than columns, then time-series sequence is saved in columns
if loaded_data.shape[1] > 1:
# Drop first column containing time-series indices
loaded_data = loaded_data.drop(columns=[loaded_data.columns[0]])
loaded_data = loaded_data.T
#loaded_data.columns = range(len(loaded_data.columns))
loaded_data.index.name = None
loaded_data.columns.name = None
loaded_data.columns = range(len(loaded_data.columns))
#loaded_data = loaded_data.reset_index(drop=True) # reset index to start from 0
except pd.errors.EmptyDataError:
raise ValueError(f"CSV file at {self.data_path} is empty.")
except pd.errors.ParserError:
raise ValueError(f"Failed to parse CSV file at {self.data_path}.")
except Exception as e:
raise e
for col in loaded_data.columns:
if not pd.api.types.is_float_dtype(loaded_data[col]):
# Try to convert to numeric data type
try:
loaded_data[col]= pd.to_numeric(loaded_data[col])
except Exception as e:
# Drop columns that do not contain float data
logging.info(f"Dropping column '{col}' as it does not contain float data.")
loaded_data.drop(columns=col, inplace=True)
# Function to convert float-like strings to integer strings where possible
def convert_float_index(index):
try:
# Try to convert the index to a float and then to an integer if it's a whole number
float_val = float(index)
int_val = int(float_val)
# Check if the float and integer representations are equivalent
return int_val if float_val == int_val else index
except ValueError:
# If conversion fails, return the original index
return index
# Apply the function to each index
loaded_data.index = loaded_data.index.map(convert_float_index)
# Fill NA through linear interpolation
def fillna(row):
if row.isna().any():
return row.interpolate(method='linear', limit_direction='both', inplace=False)
return row
loaded_data = loaded_data.apply(fillna, axis=1)
return loaded_data
if __name__ == '__main__':
input_folder = './datasets/DK/'
output_folder = './datasets/DK_transformed/'
transform_csv_dataset(input_folder, output_folder)