Source code for ltsm.data_reader.npy_database_reader
import taosws
import pandas as pd
import os
import numpy as np
from datetime import timedelta
# change to your own
datapath = "time_series_preds_upload"
output_folder = 'time_series_preds_download'
database = "time_series_preds"
user = "root"
password = "taosdata"
insert_batch_size = 100
# create_connection() function to connect to the database. (change host and port to your own)
[docs]
def create_connection(host='35.153.211.255', port=6041):
    conn = None
    try:
        conn = taosws.connect(
            user=user,
            password=password,
            host=host,
            port=port,
        )
        print(f"Connected to {host}:{port} successfully.")
        return conn
    except Exception as err:
        print(f"Failed to connect to {host}:{port}, ErrMessage: {err}")
        raise err 
# setup_database() function to create a new database if it doesn't exist.
[docs]
def setup_database(conn, database):
    try:
        cursor = conn.cursor()
        cursor.execute(f"CREATE DATABASE IF NOT EXISTS {database}")
        print(f"Database {database} set up successfully.")
    except Exception as err:
        print(f"Error setting up database: {err}")
        raise err 
# setup_tables() function to create tables based on NPY column names and data types.
[docs]
def setup_tables(conn, database, table_name, df):
    try:
        cursor = conn.cursor()
        cursor.execute(f"USE {database}")
        cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
        columns = df.columns
        schema_columns = ["ts TIMESTAMP"]
        print(df)
        # Infer column types and set schema accordingly
        schema_extend_length=df.shape[1]
        for i in range(schema_extend_length):
            schema_columns.append(f"`{i}` FLOAT")
        schema = f"({', '.join(schema_columns)})"
        cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} {schema}")
        print(f"Table {table_name} set up successfully with schema: {schema}")
    except Exception as err:
        print(f"Error setting up database or table {table_name}: {err}")
        raise err 
print(setup_tables.__code__.co_consts)
# insert_data_from_npy() function to insert data from NPY files into tables.
[docs]
def insert_data_from_npy(conn, database, npy_file, table_name, batch_size=insert_batch_size):
    try:
        cursor = conn.cursor()
        data = np.load(npy_file)
        df = pd.DataFrame(data)
        print(f"input npy {table_name}:{df}")
        # Create table if needed
        setup_tables(conn, database, table_name, df)
        cursor.execute(f"USE {database}")
        current_time = pd.Timestamp.now()  # Start with the current timestamp
        rows = []
        for i, row in df.iterrows():
            values = [f"'{current_time.strftime('%Y-%m-%d %H:%M:%S')}'"]  # Current timestamp value
            current_time += timedelta(seconds=1)  # Increment timestamp by 1 second
            for col in df.columns:
                value = row[col]
                if pd.isna(value):
                    values.append("NULL")
                elif isinstance(value, str):
                    values.append(f"'{value}'")
                elif isinstance(value, bool):
                    values.append("true" if value else "false")
                else:
                    values.append(str(value))
            # Append the formatted row to the batch
            rows.append(f"({', '.join(values)})")
            # If batch size is reached, execute the batch insert
            if len(rows) >= batch_size:
                insert_query = f"INSERT INTO {table_name} VALUES " + ", ".join(rows)
                print(f"Inserting batch of {batch_size} rows")
                cursor.execute(insert_query)
                rows = []  # Reset the batch
        # Insert remaining rows if any
        if rows:
            insert_query = f"INSERT INTO {table_name} VALUES " + ", ".join(rows)
            print(f"Inserting final batch of {len(rows)} rows")
            cursor.execute(insert_query)
        print(f"Data from {npy_file} inserted into table {table_name} successfully.")
    except Exception as err:
        print(f"Error inserting data from {npy_file} into {table_name}: {err}")
        raise err 
# retrieve_data_to_npy() function to retrieve data from a table and save it to a NPY file.
[docs]
def retrieve_data_to_npy(conn, database, table_name, output_file):
    try:
        cursor = conn.cursor()
        cursor.execute(f"USE {database}")
        cursor.execute(f"SELECT * FROM {table_name}")
        data = cursor.fetchall()
        cursor.execute(f"DESCRIBE {table_name}")
        df = pd.DataFrame(data)
        if 0 in df.columns and pd.api.types.is_datetime64_any_dtype(df[0]):
            df.drop(columns=[0], inplace=True)
        data_array = df.to_numpy()
        np.save(output_file, data_array)
        print(f"Data from {table_name} saved to {output_file}.")
        print(f"{table_name}:{df}")
    except Exception as err:
        print(f"Error retrieving data from {table_name}: {err}")
        raise err 
# Example usage
if __name__ == "__main__":
    conn = create_connection()
    if conn:
        try:
            setup_database(conn, database)
            npy_files = [os.path.join(datapath, f) for f in os.listdir(datapath) if f.endswith('.npy')]
            tables = [os.path.splitext(os.path.basename(npy_file))[0] for npy_file in npy_files]
            for npy_file, table_name in zip(npy_files, tables):
                insert_data_from_npy(conn, database, npy_file, table_name)
            if not os.path.exists(output_folder):
                os.makedirs(output_folder)
            for table_name in tables:
                output_file = os.path.join(output_folder, f"{table_name}.npy")
                retrieve_data_to_npy(conn, database, table_name, output_file)
        finally:
            conn.close()