我创建了一个从广告柜中检索数据的函数。结果,我在 BigQuery 中获得了生成的架构,但在“预览”选项卡中显示消息“没有可显示的数据”
使用下面插入的查询,我获取了数据。但我还是想增加数据量
你能告诉我我哪里做错了吗
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from datetime import datetime, date, timedelta
import requests
import logging
import json
import base64
from facebook_business.api import FacebookAdsApi
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.adobjects.adaccountuser import AdAccountUser
from facebook_business.adobjects.adsinsights import AdsInsights
from facebook_business.adobjects.campaign import Campaign
logger = logging.getLogger()
schema_exchange_rate = [
bigquery.SchemaField("date", "DATE", mode="REQUIRED"),
bigquery.SchemaField("currencies", "STRING", mode="REQUIRED"),
bigquery.SchemaField("rate", "FLOAT", mode="REQUIRED")
]
schema_facebook_stat = [
bigquery.SchemaField("date", "DATE", mode="REQUIRED"),
bigquery.SchemaField("ad_id", "NUMERIC", mode="REQUIRED"),
bigquery.SchemaField("ad_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("adset_id", "NUMERIC", mode="REQUIRED"),
bigquery.SchemaField("adset_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("campaign_id", "NUMERIC", mode="REQUIRED"),
bigquery.SchemaField("campaign_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("clicks", "NUMERIC", mode="REQUIRED"),
bigquery.SchemaField("impressions", "NUMERIC", mode="REQUIRED"),
bigquery.SchemaField("reach", "NUMERIC", mode="REQUIRED"),
bigquery.SchemaField("spend", "NUMERIC", mode="REQUIRED"),
bigquery.SchemaField("cpm", "NUMERIC", mode="REQUIRED"),
bigquery.SchemaField("ctr", "NUMERIC", mode="REQUIRED"),
bigquery.SchemaField("frequency", "NUMERIC", mode="REQUIRED"),
bigquery.SchemaField('cost_per_action_type', 'RECORD', mode='REPEATED',
fields=(bigquery.SchemaField('action_type', 'STRING'),
bigquery.SchemaField('value', 'NUMERIC'))),
bigquery.SchemaField('conversions', 'RECORD', mode='REPEATED',
fields=(bigquery.SchemaField('action_type', 'STRING'),
bigquery.SchemaField('value', 'NUMERIC'))),
bigquery.SchemaField('actions', 'RECORD', mode='REPEATED',
fields=(bigquery.SchemaField('action_type', 'STRING'),
bigquery.SchemaField('value', 'NUMERIC')))
]
clustering_fields_facebook = ['campaign_id', 'campaign_name']
def exist_dataset_table(client, table_id, dataset_id, project_id, schema, clustering_fields=None):
try:
dataset_ref = "{}.{}".format(project_id, dataset_id)
client.get_dataset(dataset_ref) # Make an API request.
except NotFound:
dataset_ref = "{}.{}".format(project_id, dataset_id)
dataset = bigquery.Dataset(dataset_ref)
dataset.location = "US"
dataset = client.create_dataset(dataset) # Make an API request.
logger.info("Created dataset {}.{}".format(client.project, dataset.dataset_id))
try:
table_ref = "{}.{}.{}".format(project_id, dataset_id, table_id)
client.get_table(table_ref) # Make an API request.
except NotFound:
table_ref = "{}.{}.{}".format(project_id, dataset_id, table_id)
table = bigquery.Table(table_ref, schema=schema)
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="date"
)
if clustering_fields is not None:
table.clustering_fields = clustering_fields
table = client.create_table(table) # Make an API request.
logger.info("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))
return 'ok'
def insert_rows_bq(client, table_id, dataset_id, project_id, data):
table_ref = "{}.{}.{}".format(project_id, dataset_id, table_id)
table = client.get_table(table_ref)
resp = client.insert_rows_json(
json_rows = data,
table = table_ref,
)
logger.info("Success uploaded to table {}".format(table.table_id))
def get_facebook_data(event, context):
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
bigquery_client = bigquery.Client()
if 'date' in event['attributes']:
yesterday = event['attributes']['date'].strftime('%Y-%m-%d')
else:
yesterday = date.today() - timedelta(1)
if pubsub_message == 'get_currency':
table_id = event['attributes']['table_id']
dataset_id = event['attributes']['dataset_id']
project_id = event['attributes']['project_id']
api_key = event['attributes']['api_key']
from_currency = event['attributes']['from_currency']
to_currency = event['attributes']['to_currency']
source = from_currency+to_currency
cur_source = []
params = {'access_key': api_key,
'currencies': to_currency,
'source': from_currency,
'date': yesterday.strftime("%Y-%m-%d")
}
url = 'http://api.currencylayer.com/historical'
try:
r = requests.get(url, params=params)
except requests.exceptions.RequestException as e:
logger.error('request to currencylayer error: {}').format(e)
return e
if r.json()["success"] is True:
exist_dataset_table(bigquery_client, table_id, dataset_id, project_id, schema_exchange_rate)
cur_source.append({'date': yesterday.strftime("%Y-%m-%d"),
'currencies' : source,
'rate' : r.json()['quotes'][source]
})
insert_rows_bq(bigquery_client, table_id, dataset_id, project_id, cur_source)
else:
logger.error('request to currencylayer error: {}').format(r.json()['error']['info'])
return 'ok'
elif pubsub_message == 'get_facebook':
table_id = event['attributes']['table_id']
dataset_id = event['attributes']['dataset_id']
project_id = event['attributes']['project_id']
app_id = event['attributes']['app_id']
app_secret = event['attributes']['app_secret']
access_token = event['attributes']['access_token']
account_id = event['attributes']['account_id']
try:
FacebookAdsApi.init(app_id, app_secret, access_token)
account = AdAccount('act_'+str(account_id))
insights = account.get_insights(fields=[
AdsInsights.Field.account_id,
AdsInsights.Field.campaign_id,
AdsInsights.Field.campaign_name,
AdsInsights.Field.adset_name,
AdsInsights.Field.adset_id,
AdsInsights.Field.ad_name,
AdsInsights.Field.ad_id,
AdsInsights.Field.spend,
AdsInsights.Field.impressions,
AdsInsights.Field.reach,
AdsInsights.Field.clicks,
AdsInsights.Field.actions,
AdsInsights.Field.conversions,
AdsInsights.Field.cpm,
AdsInsights.Field.ctr,
AdsInsights.Field.cost_per_action_type
], params={
'level': 'ad',
'time_range': {
'since': yesterday.strftime("%Y-%m-%d"),
'until': yesterday.strftime("%Y-%m-%d")
},'time_increment': 1
})
except Exception as e:
logger.info(e)
print(e)
raise
fb_source = []
for index, item in enumerate(insights):
actions = []
conversions = []
cost_per_action_type = []
if 'actions' in item:
for i, value in enumerate(item['actions']):
actions.append({'action_type' : value['action_type'], 'value': value['value']})
if 'conversions' in item:
for i, value in enumerate(item['conversions']):
conversions.append({'action_type' : value['action_type'], 'value': value['value']})
if 'cost_per_action_type' in item:
for i, value in enumerate(item['cost_per_action_type']):
cost_per_action_type.append({'action_type' : value['action_type'], 'value': value['value']})
fb_source.append({'date': item['date_start'],
'ad_id' : item['ad_id'],
'ad_name' : item['ad_name'],
'adset_id' : item['adset_id'],
'adset_name' : item['adset_name'],
'campaign_id' : item['campaign_id'],
'campaign_name' : item['campaign_name'],
'clicks' : item['clicks'],
'impressions' : item['impressions'],
'reach' : item['reach'],
'spend' : item['spend'],
'conversions' : conversions,
'actions' : actions,
'cpm' : item['cpm'],
'ctr' : item['ctr'],
'cost_per_action_type' : cost_per_action_type
})
if exist_dataset_table(bigquery_client, table_id, dataset_id, project_id, schema_facebook_stat, clustering_fields_facebook) == 'ok':
insert_rows_bq(bigquery_client, table_id, dataset_id, project_id, fb_source)
return 'ok'
仅当我请求此数据时一切正常
FacebookAdsApi.init(app_id, app_secret, access_token)
account = AdAccount('act_'+str(account_id))
insights = account.get_insights(fields=[
AdsInsights.Field.account_id,
AdsInsights.Field.campaign_id,
AdsInsights.Field.campaign_name,
AdsInsights.Field.adset_name,
AdsInsights.Field.adset_id,
AdsInsights.Field.ad_name,
AdsInsights.Field.ad_id,
AdsInsights.Field.spend,
AdsInsights.Field.impressions,
AdsInsights.Field.clicks,
AdsInsights.Field.actions,
AdsInsights.Field.conversions
BigQuery 可能会在这里给您一个错误:
resp = client.insert_rows_json(
json_rows = data,
table = table_ref,
)
看看 resp 变量中返回的内容。它没有在任何地方被检查,这就是为什么您可能没有看到错误。我最近实现了这段代码,并遇到了发送类型与表上列类型的问题。