Source code for ltsm.data_reader.train_database_reader

import taosws
import pandas as pd
from datetime import timedelta
import os

# change to your own
datapath = "train_data_upload"
output_folder = 'train_data_download'
database = "train_Data"
user = "root"
password = "taosdata"

# create_connection() function to connect to the database. (change host and port to your own)
[docs] def create_connection(host='35.153.211.255', port=6041): conn = None try: conn = taosws.connect( user=user, password=password, host=host, port=port, ) print(f"Connected to {host}:{port} successfully.") return conn except Exception as err: print(f"Failed to connect to {host}:{port}, ErrMessage: {err}") raise err
# setup_database() function to create a new database if it doesn't exist.
[docs] def setup_database(conn, database): try: cursor = conn.cursor() cursor.execute(f"CREATE DATABASE IF NOT EXISTS {database}") print(f"Database {database} set up successfully.") except Exception as err: print(f"Error setting up database: {err}") raise err
# setup_tables() function to create tables based on CSV column names and data types.
[docs] def setup_tables(conn, database, table_name, df): try: cursor = conn.cursor() cursor.execute(f"USE {database}") cursor.execute(f"DROP TABLE IF EXISTS {table_name}") columns = df.columns schema_columns = ["ts TIMESTAMP"] # Infer column types and set schema accordingly for column in columns: dtype = df[column].dtype if pd.api.types.is_float_dtype(dtype): schema_columns.append(f"`{column.replace(' ', '_')}` FLOAT") elif pd.api.types.is_integer_dtype(dtype): schema_columns.append(f"{column.replace(' ', '_')} INT") elif pd.api.types.is_bool_dtype(dtype): schema_columns.append(f"{column.replace(' ', '_')} BOOL") elif pd.api.types.is_datetime64_any_dtype(dtype): schema_columns.append(f"{column.replace(' ', '_')} TIMESTAMP") else: # Treat as STRING for other types like object (text) schema_columns.append(f"{column.replace(' ', '_')} STRING") schema = f"({', '.join(schema_columns)})" cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} {schema}") print(f"Table {table_name} set up successfully with schema: {schema}") except Exception as err: print(f"Error setting up database or table {table_name}: {err}") raise err
print(setup_tables.__code__.co_consts) # insert_data_from_csv() function to insert data from CSV files into tables.
[docs] def insert_data_from_csv(conn, database, csv_file, table_name): try: cursor = conn.cursor() df = pd.read_csv(csv_file) setup_tables(conn, database, table_name, df) cursor.execute(f"USE {database}") current_time = pd.Timestamp.now() # Start with the current timestamp for _, row in df.iterrows(): # Format the current timestamp and ensure uniqueness by incrementing it for each row values = [f"'{current_time.strftime('%Y-%m-%d %H:%M:%S')}'"] # Current timestamp value current_time += timedelta(seconds=1) # Increment timestamp by 1 second for col in df.columns: value = row[col] if pd.isna(value): values.append("NULL") elif isinstance(value, str): values.append(f"'{value}'") elif isinstance(value, bool): values.append("true" if value else "false") else: values.append(str(value)) insert_query = f"INSERT INTO {table_name} VALUES({', '.join(values)})" print(f"Inserting data: {insert_query}") cursor.execute(insert_query) print(f"Data from {csv_file} inserted into table {table_name} successfully.") except Exception as err: print(f"Error inserting data from {csv_file} into {table_name}: {err}") raise err
# retrieve_data_to_csv() function to retrieve data from a table and save it to a CSV file.
[docs] def retrieve_data_to_csv(conn, database, table_name, output_file): try: cursor = conn.cursor() cursor.execute(f"USE {database}") cursor.execute(f"SELECT * FROM {table_name}") data = cursor.fetchall() # Get column names and remove 'ts' cursor.execute(f"DESCRIBE {table_name}") columns = [desc[0] for desc in cursor.fetchall()] # Remove 'ts' from columns list if it exists if 'ts' in columns: ts_index = columns.index('ts') columns.remove('ts') # Remove the 'ts' column from each row in data data = [tuple(item for i, item in enumerate(row) if i != ts_index) for row in data] # Create the DataFrame with columns excluding 'ts' df = pd.DataFrame(data, columns=columns) df.to_csv(output_file, index=False) print(f"Data from {table_name} saved to {output_file}.") except Exception as err: print(f"Error retrieving data from {table_name}: {err}") raise err
# Example usage if __name__ == "__main__": conn = create_connection() if conn: try: setup_database(conn, database) csv_files = [os.path.join(datapath, f) for f in os.listdir(datapath) if f.endswith('.csv')] tables = [os.path.splitext(os.path.basename(csv_file))[0] for csv_file in csv_files] for csv_file, table_name in zip(csv_files, tables): insert_data_from_csv(conn, database, csv_file, table_name) if not os.path.exists(output_folder): os.makedirs(output_folder) for table_name in tables: output_file = os.path.join(output_folder, f"{table_name}.csv") retrieve_data_to_csv(conn, database, table_name, output_file) finally: conn.close()