Facebook BigQuery 连接

问题描述 投票:0回答:1

我创建了一个从广告柜中检索数据的函数。结果,我在 BigQuery 中获得了生成的架构,但在“预览”选项卡中显示消息“没有可显示的数据”

使用下面插入的查询,我获取了数据。但我还是想增加数据量

你能告诉我我哪里做错了吗

from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from datetime import datetime, date, timedelta
import requests
import logging
import json
import base64
from facebook_business.api import FacebookAdsApi
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.adobjects.adaccountuser import AdAccountUser
from facebook_business.adobjects.adsinsights import AdsInsights
from facebook_business.adobjects.campaign import Campaign

logger = logging.getLogger()

schema_exchange_rate = [
    bigquery.SchemaField("date", "DATE", mode="REQUIRED"),
    bigquery.SchemaField("currencies", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("rate", "FLOAT", mode="REQUIRED")
]

schema_facebook_stat = [
    bigquery.SchemaField("date", "DATE", mode="REQUIRED"),
    bigquery.SchemaField("ad_id", "NUMERIC", mode="REQUIRED"),
    bigquery.SchemaField("ad_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("adset_id", "NUMERIC", mode="REQUIRED"),
    bigquery.SchemaField("adset_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("campaign_id", "NUMERIC", mode="REQUIRED"),
    bigquery.SchemaField("campaign_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("clicks", "NUMERIC", mode="REQUIRED"),
    bigquery.SchemaField("impressions", "NUMERIC", mode="REQUIRED"),
    bigquery.SchemaField("reach", "NUMERIC", mode="REQUIRED"),
    bigquery.SchemaField("spend", "NUMERIC", mode="REQUIRED"),
    bigquery.SchemaField("cpm", "NUMERIC", mode="REQUIRED"),
    bigquery.SchemaField("ctr", "NUMERIC", mode="REQUIRED"),
    bigquery.SchemaField("frequency", "NUMERIC", mode="REQUIRED"),
    bigquery.SchemaField('cost_per_action_type', 'RECORD', mode='REPEATED',
        fields=(bigquery.SchemaField('action_type', 'STRING'),
                bigquery.SchemaField('value', 'NUMERIC'))),
    bigquery.SchemaField('conversions', 'RECORD', mode='REPEATED',
        fields=(bigquery.SchemaField('action_type', 'STRING'),
                bigquery.SchemaField('value', 'NUMERIC'))),
    bigquery.SchemaField('actions', 'RECORD', mode='REPEATED',
        fields=(bigquery.SchemaField('action_type', 'STRING'),
                bigquery.SchemaField('value', 'NUMERIC')))

]

clustering_fields_facebook = ['campaign_id', 'campaign_name']

def exist_dataset_table(client, table_id, dataset_id, project_id, schema, clustering_fields=None):

    try:
        dataset_ref = "{}.{}".format(project_id, dataset_id)
        client.get_dataset(dataset_ref)  # Make an API request.

    except NotFound:
        dataset_ref = "{}.{}".format(project_id, dataset_id)
        dataset = bigquery.Dataset(dataset_ref)
        dataset.location = "US"
        dataset = client.create_dataset(dataset)  # Make an API request.
        logger.info("Created dataset {}.{}".format(client.project, dataset.dataset_id))

    try:
        table_ref = "{}.{}.{}".format(project_id, dataset_id, table_id)
        client.get_table(table_ref)  # Make an API request.

    except NotFound:

        table_ref = "{}.{}.{}".format(project_id, dataset_id, table_id)

        table = bigquery.Table(table_ref, schema=schema)

        table.time_partitioning = bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field="date"
        )

        if clustering_fields is not None:
            table.clustering_fields = clustering_fields

        table = client.create_table(table)  # Make an API request.
        logger.info("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))

    return 'ok'


def insert_rows_bq(client, table_id, dataset_id, project_id, data):

    table_ref = "{}.{}.{}".format(project_id, dataset_id, table_id)
    table = client.get_table(table_ref)

    resp = client.insert_rows_json(
        json_rows = data,
        table = table_ref,
    )

    logger.info("Success uploaded to table {}".format(table.table_id))


def get_facebook_data(event, context):

    pubsub_message = base64.b64decode(event['data']).decode('utf-8')
    bigquery_client = bigquery.Client()

    if 'date' in event['attributes']:
        yesterday = event['attributes']['date'].strftime('%Y-%m-%d')
    else:
        yesterday = date.today() - timedelta(1)

    if pubsub_message == 'get_currency':

        table_id = event['attributes']['table_id']
        dataset_id = event['attributes']['dataset_id']
        project_id = event['attributes']['project_id']

        api_key = event['attributes']['api_key']
        from_currency = event['attributes']['from_currency']
        to_currency = event['attributes']['to_currency']
        source = from_currency+to_currency

        cur_source = []

        params = {'access_key': api_key,
                   'currencies': to_currency,
                   'source': from_currency,
                   'date': yesterday.strftime("%Y-%m-%d")
                  }

        url = 'http://api.currencylayer.com/historical'


        try:
            r = requests.get(url, params=params)
        except requests.exceptions.RequestException as e:
            logger.error('request to currencylayer error: {}').format(e)
            return e

        if r.json()["success"] is True:

            exist_dataset_table(bigquery_client, table_id, dataset_id, project_id, schema_exchange_rate)

            cur_source.append({'date': yesterday.strftime("%Y-%m-%d"),
                               'currencies' : source,
                               'rate' : r.json()['quotes'][source]
            })

            insert_rows_bq(bigquery_client, table_id, dataset_id, project_id, cur_source)
        else:
            logger.error('request to currencylayer error: {}').format(r.json()['error']['info'])

        return 'ok'

    elif pubsub_message == 'get_facebook':

        table_id = event['attributes']['table_id']
        dataset_id = event['attributes']['dataset_id']
        project_id = event['attributes']['project_id']

        app_id = event['attributes']['app_id']
        app_secret = event['attributes']['app_secret']
        access_token = event['attributes']['access_token']
        account_id = event['attributes']['account_id']

        try:
            FacebookAdsApi.init(app_id, app_secret, access_token)

            account = AdAccount('act_'+str(account_id))
            insights = account.get_insights(fields=[
                    AdsInsights.Field.account_id,
                    AdsInsights.Field.campaign_id,
                    AdsInsights.Field.campaign_name,
                    AdsInsights.Field.adset_name,
                    AdsInsights.Field.adset_id,
                    AdsInsights.Field.ad_name,
                    AdsInsights.Field.ad_id,
                    AdsInsights.Field.spend,
                    AdsInsights.Field.impressions,
                    AdsInsights.Field.reach,
                    AdsInsights.Field.clicks,
                    AdsInsights.Field.actions,
                    AdsInsights.Field.conversions,
                    AdsInsights.Field.cpm,
                    AdsInsights.Field.ctr,
                    AdsInsights.Field.cost_per_action_type
            ], params={
                'level': 'ad',
                'time_range': {
                    'since':  yesterday.strftime("%Y-%m-%d"),
                    'until': yesterday.strftime("%Y-%m-%d")
                },'time_increment': 1
            })

        except Exception as e:
                logger.info(e)
                print(e)
                raise

        fb_source = []

        for index, item in enumerate(insights):

            actions = []
            conversions = []
            cost_per_action_type = []

            if 'actions' in item:
                for i, value in enumerate(item['actions']):
                    actions.append({'action_type' : value['action_type'], 'value': value['value']})

            if 'conversions' in item:
                for i, value in enumerate(item['conversions']):
                    conversions.append({'action_type' : value['action_type'], 'value': value['value']})

            if 'cost_per_action_type' in item:
                for i, value in enumerate(item['cost_per_action_type']):
                    cost_per_action_type.append({'action_type' : value['action_type'], 'value': value['value']})

            fb_source.append({'date': item['date_start'],
                               'ad_id' : item['ad_id'],
                               'ad_name' : item['ad_name'],
                               'adset_id' : item['adset_id'],
                               'adset_name' : item['adset_name'],
                               'campaign_id' : item['campaign_id'],
                               'campaign_name' : item['campaign_name'],
                               'clicks' : item['clicks'],
                               'impressions' : item['impressions'],
                               'reach' : item['reach'],
                               'spend' : item['spend'],
                               'conversions' : conversions,
                               'actions' : actions,
                               'cpm' : item['cpm'],
                               'ctr' : item['ctr'],
                               'cost_per_action_type' : cost_per_action_type
                            })


        if exist_dataset_table(bigquery_client, table_id, dataset_id, project_id, schema_facebook_stat, clustering_fields_facebook) == 'ok':

            insert_rows_bq(bigquery_client, table_id, dataset_id, project_id, fb_source)

            return 'ok'

仅当我请求此数据时一切正常

FacebookAdsApi.init(app_id, app_secret, access_token)

            account = AdAccount('act_'+str(account_id))
            insights = account.get_insights(fields=[
                    AdsInsights.Field.account_id,
                    AdsInsights.Field.campaign_id,
                    AdsInsights.Field.campaign_name,
                    AdsInsights.Field.adset_name,
                    AdsInsights.Field.adset_id,
                    AdsInsights.Field.ad_name,
                    AdsInsights.Field.ad_id,
                    AdsInsights.Field.spend,
                    AdsInsights.Field.impressions,
                    AdsInsights.Field.clicks,
                    AdsInsights.Field.actions,
                    AdsInsights.Field.conversions
python python-3.x facebook-graph-api google-bigquery facebook-python-business-sdk
1个回答
0
投票

BigQuery 可能会在这里给您一个错误:

resp = client.insert_rows_json(
        json_rows = data,
        table = table_ref,
    )

看看 resp 变量中返回的内容。它没有在任何地方被检查,这就是为什么您可能没有看到错误。我最近实现了这段代码,并遇到了发送类型与表上列类型的问题。

© www.soinside.com 2019 - 2024. All rights reserved.