优化 Supabase 插入

问题描述 投票:0回答:0

我正在编写一段代码,它从 Supabase 获取大约 100k 数据并将其插入到默认的 Django 数据库中,这花费了太多时间。请帮助我让它更快。

我正在使用两个 Supabase 数据库;一个包含数据,另一个是 Django 的默认数据库。

我也在这段代码中使用多线程。

from django.db import connections
from django.core.management.base import BaseCommand
import json
from concurrent.futures import ThreadPoolExecutor
from api.GAmodel import Organization,Client
from django.utils import timezone
from api.GoogleConsoleModel import GC_Search_AB_Queries, GC_Search_AB_Pages, GC_Search_AB_Country, GC_Search_AB_Device

class Command(BaseCommand):
    help = 'Inserts JSON data into the ModelName model'

    def handle(self, *args, **options):
        start=timezone.now()
        other_db_connection = connections['other']

        # Define the SQL query to retrieve table names
        sql_query = """
            SELECT table_name
            FROM information_schema.tables
            WHERE table_schema = 'public'
            AND table_name IN ('_airbyte_raw_search_analytics_by_query', '_airbyte_raw_search_analytics_by_page','_airbyte_raw_search_analytics_by_device','_airbyte_raw_search_analytics_by_country')
            ORDER BY table_name;
        """
        org_name="abc"
        org_table,created_org=Organization.objects.get_or_create(company=org_name)

        try:
            with other_db_connection.cursor() as cursor:
                cursor.execute(sql_query)
                rows = cursor.fetchall()

            # Extract table names from the query results
            table_names = [row[0] for row in rows]

            with ThreadPoolExecutor() as executor:
                futures = []
                for table_name in table_names:
                    futures.append(executor.submit(self.process_table, table_name, org_table))
                # Wait for all futures to complete
                for future in futures:
                    future.result()

        except Exception as e:
            print(f"Error occurred: {e}")
        end = timezone.now()
        print(f"Total time taken: {end-start}")
            
    def process_table(self, table_name, org_table):
        print(table_name)
        data_query = f"SELECT * FROM {table_name}"

        with connections['other'].cursor() as cursor:
            cursor.execute(data_query)
            print(f"Executing query for{table_name}")
            rows = cursor.fetchall()
            print("row Fetched!!!")
            
        if table_name=="query":
            gsc_data_SAQuery_list=[]
            count=0
            for row in rows:
                if (count==100):
                    break
                maindata=json.loads(row[1])
                client_name = maindata["site_url"][12:][:-5]
                
                data,created=Client.objects.get_or_create(organizationName=org_table,name=client_name)
                gsc_data_SAQuery_list.append(GC_Search_AB_Queries(
                                clientName = data,
                                date=maindata["date"],
                                query=maindata["query"],
                                clicks=maindata["clicks"],
                                position=maindata["position"],
                                site_url = maindata["site_url"],
                                impressions = maindata["impressions"]
                            ))
                count=count+1
            
            try:
                GC_Search_AB_Queries.objects.bulk_create(gsc_data_SAQuery_list)
                print(f"Inserted Data of {table_name} ")
            except:
                print(f"Problem in the insertion of file: {table_name}")

        elif table_name=="page":
            gsc_data_SAPage_list=[]
            count=0
            for row in rows:
                if (count==100):
                    break
                maindata=json.loads(row[1])
                client_name = maindata["site_url"][12:][:-5]
                
                data,created=Client.objects.get_or_create(organizationName=org_table,name=client_name)
                gsc_data_SAPage_list.append(GC_Search_AB_Pages(
                                clientName = data,
                                date=maindata["date"],
                                page=maindata["page"],
                                clicks=maindata["clicks"],
                                position=maindata["position"],
                                site_url = maindata["site_url"],
                                impressions = maindata["impressions"]
                            ))
                count=count+1

            try:
                GC_Search_AB_Pages.objects.bulk_create(gsc_data_SAPage_list)
                print(f"Inserted Data of {table_name} ")
            except:
                print(f"Problem in the insertion of file: {table_name}")

        elif table_name=="device":
            gsc_data_SADevice_list=[]
            count=0
            for row in rows:
                if (count==100):
                    break
                maindata=json.loads(row[1])
                client_name = maindata["site_url"][12:][:-5]
                data,created=Client.objects.get_or_create(organizationName=org_table,name=client_name)
                gsc_data_SADevice_list.append(GC_Search_AB_Device(
                                clientName = data,
                                date=maindata["date"],
                                device=maindata["device"],
                                site_url = maindata["site_url"],
                            ))
                count = count+1
            try:
                GC_Search_AB_Device.objects.bulk_create(gsc_data_SADevice_list)
                print(f"Inserted Data of {table_name} ")
            except:
                print(f"Problem in the insertion of file: {table_name}")

        elif table_name=="country":
            gsc_data_SACountry_list=[]
            count = 0
            for row in rows:
                if (count==100):
                    break

                maindata=json.loads(row[1])
                client_name = maindata["site_url"][12:][:-5]
                data,created=Client.objects.get_or_create(organizationName=org_table,name=client_name)
                gsc_data_SACountry_list.append(GC_Search_AB_Country(
                                clientName = data,
                                ctr=maindata["ctr"],
                                date=maindata["date"],
                                clicks=maindata["clicks"],
                                country=maindata["country"],
                                position=maindata["position"],
                                site_url = maindata["site_url"]
                            ))
                count = count+1

            try:
                GC_Search_AB_Country.objects.bulk_create(gsc_data_SACountry_list)
                print(f"Inserted Data of {table_name} ")
            except:
                print(f"Problem in the insertion of file: {table_name}")
sql django optimization supabase insertion
© www.soinside.com 2019 - 2024. All rights reserved.