我正在编写一段代码,它从 Supabase 获取大约 100k 数据并将其插入到默认的 Django 数据库中,这花费了太多时间。请帮助我让它更快。
我正在使用两个 Supabase 数据库;一个包含数据,另一个是 Django 的默认数据库。
我也在这段代码中使用多线程。
from django.db import connections
from django.core.management.base import BaseCommand
import json
from concurrent.futures import ThreadPoolExecutor
from api.GAmodel import Organization,Client
from django.utils import timezone
from api.GoogleConsoleModel import GC_Search_AB_Queries, GC_Search_AB_Pages, GC_Search_AB_Country, GC_Search_AB_Device
class Command(BaseCommand):
help = 'Inserts JSON data into the ModelName model'
def handle(self, *args, **options):
start=timezone.now()
other_db_connection = connections['other']
# Define the SQL query to retrieve table names
sql_query = """
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name IN ('_airbyte_raw_search_analytics_by_query', '_airbyte_raw_search_analytics_by_page','_airbyte_raw_search_analytics_by_device','_airbyte_raw_search_analytics_by_country')
ORDER BY table_name;
"""
org_name="abc"
org_table,created_org=Organization.objects.get_or_create(company=org_name)
try:
with other_db_connection.cursor() as cursor:
cursor.execute(sql_query)
rows = cursor.fetchall()
# Extract table names from the query results
table_names = [row[0] for row in rows]
with ThreadPoolExecutor() as executor:
futures = []
for table_name in table_names:
futures.append(executor.submit(self.process_table, table_name, org_table))
# Wait for all futures to complete
for future in futures:
future.result()
except Exception as e:
print(f"Error occurred: {e}")
end = timezone.now()
print(f"Total time taken: {end-start}")
def process_table(self, table_name, org_table):
print(table_name)
data_query = f"SELECT * FROM {table_name}"
with connections['other'].cursor() as cursor:
cursor.execute(data_query)
print(f"Executing query for{table_name}")
rows = cursor.fetchall()
print("row Fetched!!!")
if table_name=="query":
gsc_data_SAQuery_list=[]
count=0
for row in rows:
if (count==100):
break
maindata=json.loads(row[1])
client_name = maindata["site_url"][12:][:-5]
data,created=Client.objects.get_or_create(organizationName=org_table,name=client_name)
gsc_data_SAQuery_list.append(GC_Search_AB_Queries(
clientName = data,
date=maindata["date"],
query=maindata["query"],
clicks=maindata["clicks"],
position=maindata["position"],
site_url = maindata["site_url"],
impressions = maindata["impressions"]
))
count=count+1
try:
GC_Search_AB_Queries.objects.bulk_create(gsc_data_SAQuery_list)
print(f"Inserted Data of {table_name} ")
except:
print(f"Problem in the insertion of file: {table_name}")
elif table_name=="page":
gsc_data_SAPage_list=[]
count=0
for row in rows:
if (count==100):
break
maindata=json.loads(row[1])
client_name = maindata["site_url"][12:][:-5]
data,created=Client.objects.get_or_create(organizationName=org_table,name=client_name)
gsc_data_SAPage_list.append(GC_Search_AB_Pages(
clientName = data,
date=maindata["date"],
page=maindata["page"],
clicks=maindata["clicks"],
position=maindata["position"],
site_url = maindata["site_url"],
impressions = maindata["impressions"]
))
count=count+1
try:
GC_Search_AB_Pages.objects.bulk_create(gsc_data_SAPage_list)
print(f"Inserted Data of {table_name} ")
except:
print(f"Problem in the insertion of file: {table_name}")
elif table_name=="device":
gsc_data_SADevice_list=[]
count=0
for row in rows:
if (count==100):
break
maindata=json.loads(row[1])
client_name = maindata["site_url"][12:][:-5]
data,created=Client.objects.get_or_create(organizationName=org_table,name=client_name)
gsc_data_SADevice_list.append(GC_Search_AB_Device(
clientName = data,
date=maindata["date"],
device=maindata["device"],
site_url = maindata["site_url"],
))
count = count+1
try:
GC_Search_AB_Device.objects.bulk_create(gsc_data_SADevice_list)
print(f"Inserted Data of {table_name} ")
except:
print(f"Problem in the insertion of file: {table_name}")
elif table_name=="country":
gsc_data_SACountry_list=[]
count = 0
for row in rows:
if (count==100):
break
maindata=json.loads(row[1])
client_name = maindata["site_url"][12:][:-5]
data,created=Client.objects.get_or_create(organizationName=org_table,name=client_name)
gsc_data_SACountry_list.append(GC_Search_AB_Country(
clientName = data,
ctr=maindata["ctr"],
date=maindata["date"],
clicks=maindata["clicks"],
country=maindata["country"],
position=maindata["position"],
site_url = maindata["site_url"]
))
count = count+1
try:
GC_Search_AB_Country.objects.bulk_create(gsc_data_SACountry_list)
print(f"Inserted Data of {table_name} ")
except:
print(f"Problem in the insertion of file: {table_name}")