请注意: 尽管我在这里提到了 Azure Databricks,但我相信这本质上是一个 Python/GNUPG 问题,因此任何具有 Python/GNUPG 加密经验的人都可以回答。
我的 Azure Databricks 笔记本中有以下 Python 代码:
%python
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, lit
from pyspark.sql.types import StringType
import os
import gnupg
from azure.storage.blob import BlobServiceClient, BlobPrefix
import hashlib
from pyspark.sql import Row
from pyspark.sql.functions import collect_list
# Initialize Spark session
spark = SparkSession.builder.appName("DecryptData").getOrCreate()
storage_account_name = "mycontainer"
storage_account_key = "<redacted>"
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)
clientsDF = spark.read.table("myapp.internal.Clients")
row = clientsDF.first()
clientsLabel = row["Label"]
encryptedFilesSource = f"wasbs://{clientsLabel}@mycontainer.blob.core.windows.net/data/*"
decryptedDF = spark.sql(f"""
SELECT
REVERSE(SUBSTRING_INDEX(REVERSE(input_file_name()), '/', 1)) AS FileName,
REPLACE(value, '"', '[Q]') AS FileData,
'{clientsLabel}' as ClientLabel
FROM
read_files(
'{encryptedFilesSource}',
format => 'text',
wholeText => true
)
""")
decryptedDF.show()
decryptedDF = decryptedDF.select("FileData");
encryptedData = decryptedDF.first()['FileData']
def decrypt_pgp_data(encrypted_data, private_key_data, passphrase):
# Initialize GPG object
gpg = gnupg.GPG()
print("Loading private key...")
# Load private key
private_key = gpg.import_keys(private_key_data)
if private_key.count == 1:
keyid = private_key.fingerprints[0]
gpg.trust_keys(keyid, 'TRUST_ULTIMATE')
print("Private key loaded, attempting decryption...")
try:
decrypted_data = gpg.decrypt(encrypted_data, passphrase=passphrase, always_trust=True)
except Exception as e:
print("Error during decryption:", e)
return
print("Decryption finished and decrypted_data is of type: " + str(type(decrypted_data)))
if decrypted_data.ok:
print("Decryption successful!")
print("Decrypted Data:")
print(decrypted_data.data.decode())
else:
print("Decryption failed.")
print("Status:", decrypted_data.status)
print("Error:", decrypted_data.stderr)
print("Trust Level:", decrypted_data.trust_text)
print("Valid:", decrypted_data.valid)
private_key_data = '''-----BEGIN PGP PRIVATE KEY BLOCK-----
<redacted>
-----END PGP PRIVATE KEY BLOCK-----'''
passphrase = '<redacted>'
encrypted_data = b'encryptedData'
decrypt_pgp_data(encrypted_data, private_key_data, passphrase)
如您所见,我正在将 PGP 加密的文件从 Azure Blob 存储帐户容器读取到 Dataframe 中,然后通过使用的解密器函数发送第一行(稍后我将更改此笔记本以处理所有行) GNUPG。
运行时,它会在驱动程序日志中提供以下输出:
+--------------------+--------------------+-------+
| FileName| FileData| ClientLabel |
+--------------------+--------------------+-------+
| fizz.pgp|���mIj�h�#{... | acme|
+--------------------+--------------------+-------+
Decrypting: <redacted>
Loading private key...
WARNING:gnupg:gpg returned a non-zero error code: 2
Private key loaded, attempting decryption...
Decryption finished and decrypted_data is of type: <class 'gnupg.Crypt'>
Decryption failed.
Status: no data was provided
Error: gpg: no valid OpenPGP data found.
[GNUPG:] NODATA 1
[GNUPG:] NODATA 2
[GNUPG:] FAILURE decrypt 4294967295
gpg: decrypt_message failed: Unknown system error
Trust Level: None
Valid: False
任何人都可以找出解密失败的原因,或者帮助我排除故障以找出罪魁祸首吗?设置调试器不是一个选项,因为这是在笔记本内部发生的。我在想:
有人能发现我哪里出了问题吗?
问题不在于
python-gnupg
模块。
在下面的示例代码中,我们首先生成一个私钥,然后用它加密一些数据,然后将密钥和加密数据传递给您的
decrypt_pgp_data
函数。一切似乎都按预期进行;运行以下代码会导致:
gpg: keybox '/tmp/tmpng8xm_d_/pubring.kbx' created
gpg: /tmp/tmpng8xm_d_/trustdb.gpg: trustdb created
gpg: directory '/tmp/tmpng8xm_d_/openpgp-revocs.d' created
gpg: revocation certificate stored as '/tmp/tmpng8xm_d_/openpgp-revocs.d/8DF4D8326BAD790E37B75C8A66F05BDC77FAF5BE.rev'
gpg: checking the trustdb
gpg: marginals needed: 3 completes needed: 1 trust model: pgp
gpg: depth: 0 valid: 1 signed: 0 trust: 0-, 0q, 0n, 0m, 0f, 1u
Loading private key...
Private key loaded, attempting decryption...
Decryption finished and decrypted_data is of type: <class 'gnupg.Crypt'>
Decryption successful!
Decrypted Data:
This is a test
这表明问题在于您如何生成加密数据或私钥,但由于您没有在问题中显示该过程,因此很难诊断。
这是代码:
import os
import tempfile
import subprocess
import gnupg
def decrypt_pgp_data(encrypted_data, private_key_data, passphrase):
# Initialize GPG object
gpg = gnupg.GPG()
print("Loading private key...")
# Load private key
private_key = gpg.import_keys(private_key_data)
if private_key.count != 1:
raise ValueError("invalid private key")
keyid = private_key.fingerprints[0]
gpg.trust_keys(keyid, "TRUST_ULTIMATE")
print("Private key loaded, attempting decryption...")
try:
decrypted_data = gpg.decrypt(
encrypted_data, passphrase=passphrase, always_trust=True
)
except Exception as e:
print("Error during decryption:", e)
return
print(
"Decryption finished and decrypted_data is of type: "
+ str(type(decrypted_data))
)
if decrypted_data.ok:
print("Decryption successful!")
print("Decrypted Data:")
print(decrypted_data.data.decode())
else:
print("Decryption failed.")
print("Status:", decrypted_data.status)
print("Error:", decrypted_data.stderr)
print("Trust Level:", decrypted_data.trust_text)
print("Valid:", decrypted_data.valid)
passphrase = "secret passphrase"
# Create a temprary directory and use that as GNUPGHOME to avoid mucking
# about with our actual gpg configuration.
with tempfile.TemporaryDirectory() as gnupghome:
os.environ["GNUPGHOME"] = gnupghome
# Generate a new private key non-interactively
genkey = subprocess.Popen(["gpg", "--batch", "--gen-key"], stdin=subprocess.PIPE)
genkey.communicate(
input="\n".join(
[
"Key-Type: 1",
"Key-Length: 2048",
"Subkey-Type: 1",
"Subkey-Length: 2048",
"Name-Real: Example User",
"Name-Email: [email protected]",
"Expire-Date: 0",
f"Passphrase: {passphrase}",
]
).encode()
)
genkey.wait()
# Export the private key.
private_key_data = subprocess.check_output(
[
"gpg",
"--export-secret-key",
"-a",
"--pinentry-mode=loopback",
f"--passphrase={passphrase}",
"[email protected]",
]
)
# Encrypt a sample message with the private key.
encrypt = subprocess.Popen(
["gpg", "-ea", "-r", "[email protected]"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
encrypted_data, _ = encrypt.communicate(input="This is a test".encode())
encrypt.wait()
# Now we start with a new, empty GNUGPGHOME directory so that we're
# confident that we're successfully importing the private key rather than
# using a key already in our keystore.
with tempfile.TemporaryDirectory() as gnupghome:
os.environ["GNUPGHOME"] = gnupghome
decrypt_pgp_data(encrypted_data, private_key_data, passphrase)
当前的问题是,Python 没有任何可以执行 PGP 解密的现代模块/库,不依赖于已安装并可从 shell 访问的
gpg
本机二进制文件。
我最终编写了一个使用 PainlessGPG 的 Scala 笔记本,尽管我必须为所有 PainlessPGP 的传递依赖项创建一个自定义的“胖”(阴影)JAR,而这对于任何不擅长 Java 的开发人员来说都是不可行的。
TL;DR --> 不建议从 ADB 笔记本内部进行基于 Python 的解密。