此代码将数据输入到一个已存在的 SQL 表中,该表包含前一天(09/10/23)的数据,该表中的主键是名为“id”的列。当尝试输入昨天和前几天的数据时,例如将 api 参数更改为 06/10/23-09/10/23 时,它将不会输入任何数据,因为 09/10/23 的数据已经存在。我已经在这里尝试了多种解决方案,但没有一个有效。我基本上需要一个可以迭代行的循环,如果主键已存在于表中,则继续并忽略完整性错误,如果它不在表中,则插入条目。任何帮助将不胜感激。
### Libraries ###
import json
import requests
import pandas as pd
from sqlalchemy import create_engine, exc
from datetime import datetime, timedelta
import pyodbc
### API Call ###
# Calculate the date for previous day
previous_day = datetime.now() - timedelta(days=1)
date_from = previous_day.strftime('%Y-%m-%dT00:00:00')
date_to = previous_day.strftime('%Y-%m-%dT23:59:59')
# API Credentials
url = f"XXXX"
payload = {}
headers = {'Authorization': 'XXXX'}
# API Request
response = requests.request("GET", url, headers=headers, data=payload)
# Check if the request was successful
if response.status_code == 200:
# Parse the JSON response into a DataFrame
data = response.json()
# Extract relevant data from dictionaries within dictionaries
df = pd.json_normalize(data['data'])
# Modify 'charging_periods' column
def extract_tariff_id(charging_periods):
if isinstance(charging_periods, list) and len(charging_periods) > 0:
# Get the first dictionary in the list (if it exists)
first_entry = charging_periods[0]
# Extract 'tariff_id' from the first dictionary
tariff_id = first_entry.get('tariff_id', 'Unknown Tariff ID')
return tariff_id
else:
return 'Unknown Tariff ID'
# Apply the modification to 'charging_periods' and rename the column
df['tariff_id'] = df['charging_periods'].apply(extract_tariff_id)
# Drop the original 'charging_periods' column
df.drop(columns=['charging_periods'], inplace=True)
### Azure DB ###
# Parameters
server = 'XXXX'
database = 'XXXX'
username = 'XXXX'
password = 'XXXX'
driver = 'XXXX'
# Create a connection to the Azure SQL Database
conn = pyodbc.connect(f'DRIVER={{XXXX}};SERVER={server};DATABASE={database};UID={username};PWD={password}')
# Create a cursor
cursor = conn.cursor()
# Engine (Mini SQL Enviroment)
engine = create_engine(f"mssql+pyodbc:///?odbc_connect={f'DRIVER={{XXXX}};SERVER={server};DATABASE={database};UID={username};PWD={password}'}")
# Insert to SQL
df.to_sql('TABLE_NAME', engine, if_exists='append', index=False, schema='ev')
# Commit the changes and close the connection
conn.commit()
conn.close()
# Print tabular format and data types
print('Success!')
print(df)
print(df.info())
else:
print(f"API request failed with status code: {response.status_code}")
# Remove Duplicates
for i in range(len(df)):
try:
df.iloc[i:i+1].to_sql(name="TABLE_NAME",if_exists='append',con = Engine)
except exc.IntegrityError as e:
pass
那么如果您只是想避免重复,您可以尝试捕获错误并继续?
for _, row in df.iterrows():
try:
# Convert the row into a one-row DataFrame
single_row_df = pd.DataFrame([row])
single_row_df.to_sql('TABLE_NAME', engine, if_exists='append', index=False, schema='ev')
except IntegrityError:
# This means the row already exists, so we skip it
pass
选项2:
或者您正在寻找的功能是 INSERT IGNORE。 sql alchemy orm 和 pandas 均不支持它,您必须将数据集转换为字典,并迭代它,填充 SQL 语句 INSERT INGORE INTO table_name,然后使用 sql alchemy 文本功能执行。 也许尝试一下 Azure SQL 支持的功能,我不是 100% 确定,因为我从未使用过 Azure SQL