Python 密码学从加密的 SQLite 表中读取

问题描述 投票:0回答:1

我对加密有点陌生,但是我有一个类可以加密数据帧并写入 SQLite 文件。然后逆一个类来解密同一个 SQLite 表

import sqlite3
import pandas as pd
from cryptography.fernet import Fernet
import base64
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import getpass
import sys
        
class Data_Encryptor_SQLite:
    def __init__(self, db_name, df, table_name):
        self.db_name = db_name
        self.df = df
        self.table_name = table_name
        self.password = getpass.getpass(prompt = "Enter password to lock: ").encode()
        self.conn = sqlite3.connect(db_name)
        self.salt = None

        # Ensure password is provided
        if not self.password:
            raise ValueError("Password is required.")
        
        self.get_salt()
        # Generate key and Cipher from password 
        self.hash_password()
        # Create table and encrypt data
        self.create_table()
        self.encrypt_and_store_dataframe()
    
    def get_salt(self):
        with open('locationof.key_file', 'rb') as file:
            self.salt = file.read()
        
        
    def encrypt_data(self, data):
        #Cipher Encryption from fernet key(data encoded into bytes) then decoded into a string to store in SQL 
        return self.cipher.encrypt(data.encode()).decode()

    def hash_password(self):

        def kdf():
            return PBKDF2HMAC(algorithm=hashes.SHA256(), 
                              length=32, 
                              salt=self.salt, 
                              iterations=100000)

        
        key = base64.urlsafe_b64encode(kdf().derive(self.password))
                 
        self.cipher = Fernet(key)
          

    def create_table(self):
        columns_def = ', '.join([f"{col} TEXT" for col in self.df.columns])
        create_table_sql = f"CREATE TABLE IF NOT EXISTS {self.table_name} ({columns_def})"
        self.conn.execute(create_table_sql)
        self.conn.commit()

    def encrypt_and_store_dataframe(self):
        for _, row in self.df.iterrows():
            # encrypted_row = [self.encryptor.encrypt_data(str(val)) for val in row]
            encrypted_row = [self.encrypt_data(str(val)) for val in row]
            insert_sql = f"INSERT INTO {self.table_name} VALUES ({','.join(['?' for _ in range(len(row))])})"
            self.conn.execute(insert_sql, encrypted_row)
        self.conn.commit()
        self.conn.close()
        
        print('encrypted table complete')


class Data_Decryptor_SQLite:
    def __init__(self, db_name, table_name):
        self.db_name = db_name
        self.table_name = table_name
        self.password = getpass.getpass(prompt = "Enter password to lock: ").encode()
        self.conn = sqlite3.connect(db_name)
        self.salt = None
        self.df = None

        # Ensure password is provided
        if not self.password:
            raise ValueError("Password is required.")
        
        self.get_salt()
        # Generate key and Cipher from password 
        self.hash_password()

        #Run over SQl table and return df decrypted 
        self.fetch_and_decrypt_dataframe()
        
            
    def get_salt(self):
        with open('locationof.key_file', 'rb') as file:
            self.salt = file.read()
        
        
    def decrypt_data(self, data):
        #Cipher Encryption from fernet key(data encoded into bytes) then decoded into a string to store in SQL 
        return self.cipher.decrypt(data.encode()).decode()

    def hash_password(self):

        def kdf():
            return PBKDF2HMAC(algorithm=hashes.SHA256(), 
                              length=32, 
                              salt=self.salt, 
                              iterations=100000)

        
        key = base64.urlsafe_b64encode(kdf().derive(self.password))
        
        self.cipher = Fernet(key)
          
    
    def fetch_and_decrypt_dataframe(self):
        cursor = self.conn.cursor()
        cursor.execute(f"SELECT * FROM {self.table_name}")
        encrypted_data = cursor.fetchall()
        decrypted_data = []
        for row in encrypted_data:
            # decrypted_row = [self.encryptor.decrypt_data(encrypted_val) for encrypted_val in row]
            decrypted_row = [self.decrypt_data(encrypted_val) for encrypted_val in row]
            decrypted_data.append(decrypted_row)
        self.df = pd.DataFrame(decrypted_data, columns=[d[0] for d in cursor.description])
        self.conn.close()

加密工作完美,我已将代码复制到解密类,但是我遇到了相同的错误

cryptography.fernet.InvalidToken

python sqlite cryptography
1个回答
0
投票

解决方案如下:

`

class Data_Encryptor_SQLite:
    def __init__(self, db_name, df, table_name):
        self.db_name = db_name
        self.df = df
        self.table_name = table_name
        self.password = getpass.getpass(prompt="Enter password to lock: ").encode()
        self.salt = None

        # Ensure password is provided
        if not self.password:
            raise ValueError("Password is required.")

        self.get_salt()
        # Generate key and Cipher from password
        self.hash_password()
        # Create table and encrypt data
        self.create_table()
        self.encrypt_and_store_dataframe()

    def get_salt(self):
        with open('locationof.key_file', 'rb') as file:
            self.salt = file.read()

    def encrypt_data(self, data):
        return self.cipher.encrypt(data.encode()).decode()

    def hash_password(self):
        kdf = self.get_kdf()
        key = base64.urlsafe_b64encode(kdf.derive(self.password))
        self.cipher = Fernet(key)

    def get_kdf(self):
        return PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=self.salt,
            iterations=100000,
        )

    def create_table(self):
        columns_def = ', '.join([f"{col} TEXT" for col in self.df.columns])
        create_table_sql = f"CREATE TABLE IF NOT EXISTS {self.table_name} ({columns_def})"
        self.conn = sqlite3.connect(self.db_name)
        self.conn.execute(create_table_sql)
        self.conn.commit()
        self.conn.close()

    def encrypt_and_store_dataframe(self):
        self.conn = sqlite3.connect(self.db_name)
        for _, row in self.df.iterrows():
            encrypted_row = [self.encrypt_data(str(val)) for val in row]
            insert_sql = f"INSERT INTO {self.table_name} VALUES ({','.join(['?' for _ in range(len(row))])})"
            self.conn.execute(insert_sql, encrypted_row)
        self.conn.commit()
        self.conn.close()
        print('Encrypted table complete')


class Data_Decryptor_SQLite:
    def __init__(self, db_name, table_name):
        self.db_name = db_name
        self.table_name = table_name
        self.password = getpass.getpass(prompt="Enter password to lock: ").encode()
        self.salt = None
        self.df = None

        # Ensure password is provided
        if not self.password:
            raise ValueError("Password is required.")

        self.get_salt()
        # Generate key and Cipher from password
        self.hash_password()

        # Run over SQL table and return decrypted DataFrame
        self.fetch_and_decrypt_dataframe()

    def get_salt(self):
        with open('locationof.key_file', 'rb') as file:
            self.salt = file.read()

    def decrypt_data(self, data):
        return self.cipher.decrypt(data.encode()).decode()

    def hash_password(self):
        kdf = self.get_kdf()
        key = base64.urlsafe_b64encode(kdf.derive(self.password))
        self.cipher = Fernet(key)

    def get_kdf(self):
        return PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=self.salt,
            iterations=100000,
        )

    def fetch_and_decrypt_dataframe(self):
        self.conn = sqlite3.connect(self.db_name)
        cursor = self.conn.cursor()
        cursor.execute(f"SELECT * FROM {self.table_name}")
        encrypted_data = cursor.fetchall()
        decrypted_data = []
        for row in encrypted_data:
            decrypted_row = [self.decrypt_data(encrypted_val) for encrypted_val in row]
            decrypted_data.append(decrypted_row)
        self.df = pd.DataFrame(decrypted_data, columns=[d[0] for d in cursor.description])
        self.conn.close()

# Example usage
# encryptor = Data_Encryptor_SQLite('example.db', df, 'encrypted_table')
# decryptor = Data_Decryptor_SQLite('example.db', 'encrypted_table')

`

© www.soinside.com 2019 - 2024. All rights reserved.