我有以下形式的Pandas数据框:
Time Temperature Voltage Current
0.0 7.8 14 56
0.1 7.9 12 58
0.2 7.6 15 55
... So on for a few hundred thousand rows...
我需要尽快将数据批量插入PostgreSQL数据库。这是针对Django项目的,并且我目前正在使用ORM进行数据库操作和构建查询,但是如果有更有效的方法来完成任务,则可以提出建议。
我的数据模型如下所示:
class Data(models.Model):
time = models.DateTimeField(db_index=True)
parameter = models.ForeignKey(Parameter, on_delete=models.CASCADE)
parameter_value = models.FloatField()
所以Time
是row[0]
数据帧,然后对每个头列中,我抓住对应于它,使用报头作为值parameter
。因此row[0]
,示例表将Data
在我的数据库中生成3个对象:
Data(time=0.0, parameter="Temperature", parameter_value=7.8)
Data(time=0.0, parameter="Voltage", parameter_value=14)
Data(time=0.0, parameter="Current", parameter_value=56)
我们的应用程序允许用户解析以毫秒为单位的数据文件。因此,我们从单个文件中生成很多单独的数据对象。我当前的任务是改进解析器,使其更加高效,直到我们在硬件级别遇到I / O约束为止。
我当前的解决方案是遍历每一行,Data
为每一行创建一个对象,time + parameter + value
然后将该对象附加到数组中,这样我就可以Data.objects.bulk_create(all_data_objects)
通过Django。当然,我知道这是低效的,可能会有很多改进。
使用此代码:
# Convert DataFrame to dict
df_records = df.to_dict('records')
# Start empty dta array
all_data_objects = []
# Go through each row creating objects and appending to data array
for row in df_records:
for parameter, parameter_value in row.items():
if parameter != "Time":
all_data_objects.append(Data(
time=row["Time"],
parameter_value=parameter_value,
parameter=parameter))
# Commit data to Postgres DB
Data.objects.bulk_create(all_data)
当前,对于不包含DB插入操作(写入磁盘)的整个操作(即仅生成Data
对象数组),一个55mb的文件(生成大约600万个单个Data
对象)大约需要370秒。仅该df_records = df.to_dict('records')
线就需要83秒钟。使用time.time()
每个部分的两端并计算差值来测量时间。
我该如何改善这些时间?
如果您确实需要快速解决方案,建议您直接使用来哑表pandas
。
首先让我们为您的示例创建数据:
import pandas as pd
data = {
'Time': {0: 0.0, 1: 0.1, 2: 0.2},
'Temperature': {0: 7.8, 1: 7.9, 2: 7.6},
'Voltage': {0: 14, 1: 12, 2: 15},
'Current': {0: 56, 1: 58, 2: 55}
}
df = pd.DataFrame(data)
现在,您应该转换数据框,以使您具有所需的列melt
:
df = df.melt(["Time"], var_name="parameter", value_name="parameter_value")
此时,您应该将parameter
值映射到外部id
。我将params
举一个例子:
params = {"Temperature": 1, "Voltage": 2, "Current": 3}
df["parameter"] = df["parameter"].map(params)
此时,数据框将如下所示:
Time parameter parameter_value
0 0.0 1 7.8
1 0.1 1 7.9
2 0.2 1 7.6
3 0.0 2 14.0
4 0.1 2 12.0
5 0.2 2 15.0
6 0.0 3 56.0
7 0.1 3 58.0
8 0.2 3 55.0
现在,要使用 pandas 导出,您可以使用:
import sqlalchemy as sa
engine = sa.create_engine("use your connection data")
df.to_sql(name="my_table", con=engine, if_exists="append", index=False)
但是,当我使用它时,它的速度还不足以满足我们的要求。因此,我建议您使用cursor.copy_from
insted,因为速度更快:
from io import StringIO
output = StringIO()
df.to_csv(output, sep=';', header=False, index=False, columns=df.columns)
output.getvalue()
# jump to start of stream
output.seek(0)
# Insert df into postgre
connection = engine.raw_connection()
with connection.cursor() as cursor:
cursor.copy_from(output, "my_table", sep=';', null="NULL", columns=(df.columns))
connection.commit()
我们尝试了数百万次,这是使用PostgreSQL时最快的方法。
在尝试此方法后,出现错误
cursor.copy_from(output, "data", sep=';', null="NULL", columns=(df.columns))
。回溯显示为:Expected bytes or unicode string, got numpy.float64 instead
,我认为这是由于未在某处提供正确的数据值。(data
是我要插入的表的名称)。我以前没有使用过这种方法,您对这里发生的事情有一个了解吗?(此时,我已经走出了我的舒适区,以前从未使用过sqlalchemy或stringIO,因此在尝试学习的同时,我几乎复制/粘贴了您的代码段)我不太确定为什么会收到此错误,但似乎您有一些值作为numpy数字而不是字符串。您是否有可能在数据库中将其中一列定义为字符串?我建议您
df.to_sql
在执行cursor.copy_from
替代方法之前先尝试该选项。有可能该选项对您足够快。我可以发布一个示例示例,
df.to_csv()
如果有帮助,将在主体中执行我可能还缺少一些用于复制操作的参数,例如,我的标头不是参数的名称,而是ID。我该如何指定?
我不确定你的意思。您能否在数据应该存放的地方添加SQL表的定义?