并行优化 API 调用

问题描述 投票:0回答:1

我们有下面的Python代码。它有 2 个 API a) 获取所有客户 - get_all_customers () b) 获取特定客户详细信息 - get_specifc_customer_details()

当前 (b) get_specifc_customer_details 按顺序运行。无论如何,我们可以使用 Spark 并行性,以便我们可以并行调用。类似下面的内容或在 Spark 中使用 API 的任何更好的方法。

maintain all customers in a dataframe df, separate rows

df = df.withColumn(cust_status_1, <invoke api pass specific customer >) \
       .withColumn(cust_status_2, <invoke api pass specific customer >)

finally, df willhave 3 columns -- customer_id,cust_status_1,cust_status_2 

原代码:

def get_all_customers():

    url = "https://test.url.com/api/v1/customers?status=Customer"
    headers = {
    'Content-Type': 'application/json; charset=utf-8',
    'Authorization': 'Bearer Token'
    }
    customer_id_list = []
    response = requests.request("GET", url, headers=headers)
    for cust in response.json()["customers"]:
        customer_id_list.append(cust["amagi_id"])

    return customer_id_list

def get_specifc_customer_details(customer_id_list):
    
    customer_status_overall = []
    for customer_id in customer_id_list:
        
        url = f"https://test.url.com/api/v1/customers/{customer_id}"
        headers = {
        'Content-Type': 'application/json; charset=utf-8',
        'Authorization': 'Bearer Token'
        }
        response = requests.request("GET", url, headers=headers)

        cust_status_1 = response.json()["customer"]["status_1"]
        cust_status_2 = response.json()["customer"]["status_2"]
        customer_status_overall.append((customer_id,cust_status_1,cust_status_2))
    
    customer_status_df = spark.createDataFrame(customer_status_overall, schema)
    return customer_status_df
scala apache-spark pyspark
1个回答
0
投票

我觉得应该是可以的。您甚至可以将 api 调用一个接一个地链接起来。

import json
from pyspark import SparkContext, SQLContext
import requests
from pyspark.sql import Row

sc = SparkContext('local')
sqlContext = SQLContext(sc)


data1 = [

[1],
[2],
[3],
[4],

]
columns1 =["postId"]

df1 = sqlContext.createDataFrame(data=data1, schema=columns1)
print("Given dataframe")
df1.show(n=100, truncate=False)

## Repartition into suitable numbers so that they get processed in proper manner.
## Increase number of parititons if still getting OOM errors.
df1 = df1.repartition(4).cache()

def my_method(input):
    # The API endpoint
    url = "https://jsonplaceholder.typicode.com/posts/"
    # Adding a payload
    payload = {"id": [input], "userId": 1}
    # A get request to the API
    response = requests.get(url, params=payload)
    if response.status_code == 200:
        response_json = response.json()
        return response_json
    else:
        return ""


def mapPartition_inference(partitioned_rows):

    print("Within Map partition")
    result_array_list = []
    print("For loop below")
    for row in partitioned_rows:
        result = my_method(row["postId"])
        print(result)
        result_array_list.append(Row(json.dumps(result))) ## wrap the result in the Row()
    print("result array list")
    print(result_array_list)
    return iter(result_array_list)

result_df = df1.rdd.mapPartitions(mapPartition_inference).toDF(["api_response"])


print("Result DF")
result_df.show(n=30, truncate=False)

输出:

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|api_response                                                                                                                                                                                                                                                                                |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{"userId": 1, "id": 1, "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit", "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"}]|
|[{"userId": 1, "id": 3, "title": "ea molestias quasi exercitationem repellat qui ipsa sit aut", "body": "et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut"}]         |
|[{"userId": 1, "id": 2, "title": "qui est esse", "body": "est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla"}]              |
|[{"userId": 1, "id": 4, "title": "eum et est occaecati", "body": "ullam et saepe reiciendis voluptatem adipisci\nsit amet autem assumenda provident rerum culpa\nquis hic commodi nesciunt rem tenetur doloremque ipsam iure\nquis sunt voluptatem rerum illo velit"}]                      |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
© www.soinside.com 2019 - 2024. All rights reserved.