我对加密有点陌生,但是我有一个类可以加密数据帧并写入 SQLite 文件。然后逆一个类来解密同一个 SQLite 表
import sqlite3
import pandas as pd
from cryptography.fernet import Fernet
import base64
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import getpass
import sys
class Data_Encryptor_SQLite:
def __init__(self, db_name, df, table_name):
self.db_name = db_name
self.df = df
self.table_name = table_name
self.password = getpass.getpass(prompt = "Enter password to lock: ").encode()
self.conn = sqlite3.connect(db_name)
self.salt = None
# Ensure password is provided
if not self.password:
raise ValueError("Password is required.")
self.get_salt()
# Generate key and Cipher from password
self.hash_password()
# Create table and encrypt data
self.create_table()
self.encrypt_and_store_dataframe()
def get_salt(self):
with open('locationof.key_file', 'rb') as file:
self.salt = file.read()
def encrypt_data(self, data):
#Cipher Encryption from fernet key(data encoded into bytes) then decoded into a string to store in SQL
return self.cipher.encrypt(data.encode()).decode()
def hash_password(self):
def kdf():
return PBKDF2HMAC(algorithm=hashes.SHA256(),
length=32,
salt=self.salt,
iterations=100000)
key = base64.urlsafe_b64encode(kdf().derive(self.password))
self.cipher = Fernet(key)
def create_table(self):
columns_def = ', '.join([f"{col} TEXT" for col in self.df.columns])
create_table_sql = f"CREATE TABLE IF NOT EXISTS {self.table_name} ({columns_def})"
self.conn.execute(create_table_sql)
self.conn.commit()
def encrypt_and_store_dataframe(self):
for _, row in self.df.iterrows():
# encrypted_row = [self.encryptor.encrypt_data(str(val)) for val in row]
encrypted_row = [self.encrypt_data(str(val)) for val in row]
insert_sql = f"INSERT INTO {self.table_name} VALUES ({','.join(['?' for _ in range(len(row))])})"
self.conn.execute(insert_sql, encrypted_row)
self.conn.commit()
self.conn.close()
print('encrypted table complete')
class Data_Decryptor_SQLite:
def __init__(self, db_name, table_name):
self.db_name = db_name
self.table_name = table_name
self.password = getpass.getpass(prompt = "Enter password to lock: ").encode()
self.conn = sqlite3.connect(db_name)
self.salt = None
self.df = None
# Ensure password is provided
if not self.password:
raise ValueError("Password is required.")
self.get_salt()
# Generate key and Cipher from password
self.hash_password()
#Run over SQl table and return df decrypted
self.fetch_and_decrypt_dataframe()
def get_salt(self):
with open('locationof.key_file', 'rb') as file:
self.salt = file.read()
def decrypt_data(self, data):
#Cipher Encryption from fernet key(data encoded into bytes) then decoded into a string to store in SQL
return self.cipher.decrypt(data.encode()).decode()
def hash_password(self):
def kdf():
return PBKDF2HMAC(algorithm=hashes.SHA256(),
length=32,
salt=self.salt,
iterations=100000)
key = base64.urlsafe_b64encode(kdf().derive(self.password))
self.cipher = Fernet(key)
def fetch_and_decrypt_dataframe(self):
cursor = self.conn.cursor()
cursor.execute(f"SELECT * FROM {self.table_name}")
encrypted_data = cursor.fetchall()
decrypted_data = []
for row in encrypted_data:
# decrypted_row = [self.encryptor.decrypt_data(encrypted_val) for encrypted_val in row]
decrypted_row = [self.decrypt_data(encrypted_val) for encrypted_val in row]
decrypted_data.append(decrypted_row)
self.df = pd.DataFrame(decrypted_data, columns=[d[0] for d in cursor.description])
self.conn.close()
加密工作完美,我已将代码复制到解密类,但是我遇到了相同的错误
cryptography.fernet.InvalidToken
解决方案如下:
`
class Data_Encryptor_SQLite:
def __init__(self, db_name, df, table_name):
self.db_name = db_name
self.df = df
self.table_name = table_name
self.password = getpass.getpass(prompt="Enter password to lock: ").encode()
self.salt = None
# Ensure password is provided
if not self.password:
raise ValueError("Password is required.")
self.get_salt()
# Generate key and Cipher from password
self.hash_password()
# Create table and encrypt data
self.create_table()
self.encrypt_and_store_dataframe()
def get_salt(self):
with open('locationof.key_file', 'rb') as file:
self.salt = file.read()
def encrypt_data(self, data):
return self.cipher.encrypt(data.encode()).decode()
def hash_password(self):
kdf = self.get_kdf()
key = base64.urlsafe_b64encode(kdf.derive(self.password))
self.cipher = Fernet(key)
def get_kdf(self):
return PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=self.salt,
iterations=100000,
)
def create_table(self):
columns_def = ', '.join([f"{col} TEXT" for col in self.df.columns])
create_table_sql = f"CREATE TABLE IF NOT EXISTS {self.table_name} ({columns_def})"
self.conn = sqlite3.connect(self.db_name)
self.conn.execute(create_table_sql)
self.conn.commit()
self.conn.close()
def encrypt_and_store_dataframe(self):
self.conn = sqlite3.connect(self.db_name)
for _, row in self.df.iterrows():
encrypted_row = [self.encrypt_data(str(val)) for val in row]
insert_sql = f"INSERT INTO {self.table_name} VALUES ({','.join(['?' for _ in range(len(row))])})"
self.conn.execute(insert_sql, encrypted_row)
self.conn.commit()
self.conn.close()
print('Encrypted table complete')
class Data_Decryptor_SQLite:
def __init__(self, db_name, table_name):
self.db_name = db_name
self.table_name = table_name
self.password = getpass.getpass(prompt="Enter password to lock: ").encode()
self.salt = None
self.df = None
# Ensure password is provided
if not self.password:
raise ValueError("Password is required.")
self.get_salt()
# Generate key and Cipher from password
self.hash_password()
# Run over SQL table and return decrypted DataFrame
self.fetch_and_decrypt_dataframe()
def get_salt(self):
with open('locationof.key_file', 'rb') as file:
self.salt = file.read()
def decrypt_data(self, data):
return self.cipher.decrypt(data.encode()).decode()
def hash_password(self):
kdf = self.get_kdf()
key = base64.urlsafe_b64encode(kdf.derive(self.password))
self.cipher = Fernet(key)
def get_kdf(self):
return PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=self.salt,
iterations=100000,
)
def fetch_and_decrypt_dataframe(self):
self.conn = sqlite3.connect(self.db_name)
cursor = self.conn.cursor()
cursor.execute(f"SELECT * FROM {self.table_name}")
encrypted_data = cursor.fetchall()
decrypted_data = []
for row in encrypted_data:
decrypted_row = [self.decrypt_data(encrypted_val) for encrypted_val in row]
decrypted_data.append(decrypted_row)
self.df = pd.DataFrame(decrypted_data, columns=[d[0] for d in cursor.description])
self.conn.close()
# Example usage
# encryptor = Data_Encryptor_SQLite('example.db', df, 'encrypted_table')
# decryptor = Data_Decryptor_SQLite('example.db', 'encrypted_table')
`