我在 Azure Functions 中继承了以下代码,我想确保这是否正确,并找出利用 Python Azure Functions v2 编程模型调用它的正确方法。
import os
import gnupg
import azure.functions as func
gpg = gnupg.GPG(gnupghome='/tmp')
class PGPDecryption:
def __init__(self, prikey):
self.fullkey = prikey
def decrypt(self, blob_data):
key_data = self.fullkey.strip()
gpg.import_keys(key_data)
decrypted_data = gpg.decrypt(blob_data)
return str(decrypted_data)
# Blob Trigger for input and Blob Output binding for decrypted file
@func.blob_input(name='inputBlob', path='input-container/{name}',
connection='AzureWebJobsStorage')
@func.blob_output(name='outputBlob', path='output-container/{name}.decrypted',
connection='AzureWebJobsStorage')
def main(inputBlob: func.InputStream, outputBlob: func.Out[func.InputStream], context:
func.Context):
prikey = '<Your PGP Private Key>' # Replace with your PGP private key
decryptor = PGPDecryption(prikey)
decrypted_data = decryptor.decrypt(inputBlob.read())
# Writing decrypted data to output binding
outputBlob.set(decrypted_data)
代码的目标是从 Azure Blob 获取文件并对其进行解密,然后将其放回另一个 Azure Blob 位置。 代码是否真的实现了这一点?我该如何调试?
代码实际上实现了这一点吗?我该如何调试?
要调试,您可以使用日志记录并测试代码(获取示例代码,您可以使用您的代码):
import azure.functions as func
import logging
app = func.FunctionApp()
@app.blob_trigger(arg_name="myblob", path="rithwiktest1",
connection="rithwiktest1")
@app.blob_output(arg_name='outputBlob', path='testrithwik2/test',
connection='rithwiktest1')
def blob_trigger(myblob: func.InputStream,outputBlob: func.Out[str]):
logging.info(f"Python blob trigger function processed blob"
f"Name: {myblob.name}"
f"Blob Size: {myblob.length} bytes")
outputBlob.set("Hello Rithwik Bojja")
在这里,当添加新的 blob 时,会触发这个天蓝色的 blob 触发器,然后在目标中创建一个新文件,如下所示:
您需要检查是否创建了新的 blob。