Source code for ltsm.data_reader.npy_database_reader
import taosws
import pandas as pd
import os
import numpy as np
from datetime import timedelta
# change to your own
datapath = "time_series_preds_upload"
output_folder = 'time_series_preds_download'
database = "time_series_preds"
user = "root"
password = "taosdata"
insert_batch_size = 100
# create_connection() function to connect to the database. (change host and port to your own)
[docs]
def create_connection(host='35.153.211.255', port=6041):
conn = None
try:
conn = taosws.connect(
user=user,
password=password,
host=host,
port=port,
)
print(f"Connected to {host}:{port} successfully.")
return conn
except Exception as err:
print(f"Failed to connect to {host}:{port}, ErrMessage: {err}")
raise err
# setup_database() function to create a new database if it doesn't exist.
[docs]
def setup_database(conn, database):
try:
cursor = conn.cursor()
cursor.execute(f"CREATE DATABASE IF NOT EXISTS {database}")
print(f"Database {database} set up successfully.")
except Exception as err:
print(f"Error setting up database: {err}")
raise err
# setup_tables() function to create tables based on NPY column names and data types.
[docs]
def setup_tables(conn, database, table_name, df):
try:
cursor = conn.cursor()
cursor.execute(f"USE {database}")
cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
columns = df.columns
schema_columns = ["ts TIMESTAMP"]
print(df)
# Infer column types and set schema accordingly
schema_extend_length=df.shape[1]
for i in range(schema_extend_length):
schema_columns.append(f"`{i}` FLOAT")
schema = f"({', '.join(schema_columns)})"
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} {schema}")
print(f"Table {table_name} set up successfully with schema: {schema}")
except Exception as err:
print(f"Error setting up database or table {table_name}: {err}")
raise err
print(setup_tables.__code__.co_consts)
# insert_data_from_npy() function to insert data from NPY files into tables.
[docs]
def insert_data_from_npy(conn, database, npy_file, table_name, batch_size=insert_batch_size):
try:
cursor = conn.cursor()
data = np.load(npy_file)
df = pd.DataFrame(data)
print(f"input npy {table_name}:{df}")
# Create table if needed
setup_tables(conn, database, table_name, df)
cursor.execute(f"USE {database}")
current_time = pd.Timestamp.now() # Start with the current timestamp
rows = []
for i, row in df.iterrows():
values = [f"'{current_time.strftime('%Y-%m-%d %H:%M:%S')}'"] # Current timestamp value
current_time += timedelta(seconds=1) # Increment timestamp by 1 second
for col in df.columns:
value = row[col]
if pd.isna(value):
values.append("NULL")
elif isinstance(value, str):
values.append(f"'{value}'")
elif isinstance(value, bool):
values.append("true" if value else "false")
else:
values.append(str(value))
# Append the formatted row to the batch
rows.append(f"({', '.join(values)})")
# If batch size is reached, execute the batch insert
if len(rows) >= batch_size:
insert_query = f"INSERT INTO {table_name} VALUES " + ", ".join(rows)
print(f"Inserting batch of {batch_size} rows")
cursor.execute(insert_query)
rows = [] # Reset the batch
# Insert remaining rows if any
if rows:
insert_query = f"INSERT INTO {table_name} VALUES " + ", ".join(rows)
print(f"Inserting final batch of {len(rows)} rows")
cursor.execute(insert_query)
print(f"Data from {npy_file} inserted into table {table_name} successfully.")
except Exception as err:
print(f"Error inserting data from {npy_file} into {table_name}: {err}")
raise err
# retrieve_data_to_npy() function to retrieve data from a table and save it to a NPY file.
[docs]
def retrieve_data_to_npy(conn, database, table_name, output_file):
try:
cursor = conn.cursor()
cursor.execute(f"USE {database}")
cursor.execute(f"SELECT * FROM {table_name}")
data = cursor.fetchall()
cursor.execute(f"DESCRIBE {table_name}")
df = pd.DataFrame(data)
if 0 in df.columns and pd.api.types.is_datetime64_any_dtype(df[0]):
df.drop(columns=[0], inplace=True)
data_array = df.to_numpy()
np.save(output_file, data_array)
print(f"Data from {table_name} saved to {output_file}.")
print(f"{table_name}:{df}")
except Exception as err:
print(f"Error retrieving data from {table_name}: {err}")
raise err
# Example usage
if __name__ == "__main__":
conn = create_connection()
if conn:
try:
setup_database(conn, database)
npy_files = [os.path.join(datapath, f) for f in os.listdir(datapath) if f.endswith('.npy')]
tables = [os.path.splitext(os.path.basename(npy_file))[0] for npy_file in npy_files]
for npy_file, table_name in zip(npy_files, tables):
insert_data_from_npy(conn, database, npy_file, table_name)
if not os.path.exists(output_folder):
os.makedirs(output_folder)
for table_name in tables:
output_file = os.path.join(output_folder, f"{table_name}.npy")
retrieve_data_to_npy(conn, database, table_name, output_file)
finally:
conn.close()