0

SQLAlchemy を使用したすべての SQL 操作のメソッドを使用して、各テーブルにクラス モデルを使用します。

pgp_sym_encrypt と pgp_sym_decrypt を PostgreSQL の拡張子でインストールします

CREATE EXTENSION IF NOT EXISTS pgcrypto WITH SCHEMA my_database;

SQLAlchemy ドキュメントの助けを借りて my クラス User を使用した後、PostgreSQL のパッケージ pgcrypto の機能である pgp_sym_encrypt を使用してデータを暗号化します (バージョン 13.4 を使用します)。

セキュリティのために、postgreSQL のネイティブ pgp_sym_encrypt および pgp_sym_decrypt ネイティブ関数を使用したいユーザーのデータを暗号化します。psycopg2でプレーンテキストクエリを使用する前に。SQLAlchemy ORM を使用しているため、Flask-SQLALchemy 拡張機能を使用しています。

私は解決策を見つけました: https://docs.sqlalchemy.org/en/14/core/custom_types.html#applying-sql-level-bind-result-processing

しかし、どれもクエリ内で使用する例を示していません

実際、私のコードは次のとおりです。

import uuid
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, String, DateTime, func, type_coerce, TypeDecorator
from sqlalchemy.dialects.postgresql import UUID, BYTEA
from instance.config import ENCRYPTION_KEY

db = SQLAlchemy()


class PGPString(TypeDecorator):
    impl = BYTEA

    cache_ok = True

    def __init__(self, passphrase):
        super(PGPString, self).__init__()
        self.passphrase = passphrase

    def bind_expression(self, bindvalue):
        # convert the bind's type from PGPString to
        # String, so that it's passed to psycopg2 as is without
        # a dbapi.Binary wrapper
        bindvalue = type_coerce(bindvalue, String)
        return func.pgp_sym_encrypt(bindvalue, self.passphrase)

    def column_expression(self, col):
        return func.pgp_sym_decrypt(col, self.passphrase)
    
    
class User(db.Model):
    __tablename__ = "user"
    __table_args__ = {'schema': 'private'}
    id = Column('id', UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    sexe = Column("sexe", String)
    username = Column("username", PGPString(ENCRYPTION_KEY), unique=True)
    firstname = Column("firstname", PGPString(ENCRYPTION_KEY), unique=True)
    lastname = Column("lastname", PGPString(ENCRYPTION_KEY), unique=True)
    email = Column("email", PGPString(ENCRYPTION_KEY), unique=True)
    password = Column("password", PGPString(ENCRYPTION_KEY), unique=True)
    registration_date = Column("registration_date", DateTime)
    validation_date = Column("validation_date", DateTime)
    role = Column("role", String)

    @classmethod
    def create_user(cls, **kw):
        """
        Create an User
        """
        obj = cls(**kw)
        db.session.add(obj)
        db.session.commit()

create_user メソッドを使用すると、ユーザー データのデータが暗号化されます。

しかし、私のクラスで、このメソッドでユーザー名、電子メール、またはその他のデータをチェックして、ユーザーが存在するかどうかをチェックするメソッドを追加した場合:

class User(db.Model)
    ....
    @classmethod
    def check_user_exist(cls, **kwargs):
        """
        Check if user exist
        :param kwargs:
        :return:
        """
        exists = db.session.query(cls).with_entities(cls.username).filter_by(**kwargs).first() is not None
        return exists

これにより、次のクエリが生成されます。

SELECT private."user".id AS private_user_id,
       private."user".sexe AS private_user_sexe,
       pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY)  AS private_user_username,
       pgp_sym_decrypt(private."user".firstname, ENCRYPTION_KEY) AS private_user_firstname,
       pgp_sym_decrypt(private."user".lastname, ENCRYPTION_KEY)  AS private_user_lastname,
       pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY)     AS private_user_email,
       pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY)  AS private_user_password,
       private."user".registration_date AS private_user_registration_date,
       private."user".validation_date   AS private_user_validation_date,
       private."user".role              AS private_user_role

FROM private."user";

WHERE private."user".username = pgp_sym_encrypt(username, ENCRYPTION_KEY)
  AND private."user".email = pgp_sym_encrypt(email, ENCRYPTION_KEY)
  AND private."user".password = pgp_sym_encrypt(password, ENCRYPTION_KEY);

このクエリが返さNoneれるのは、実際の ENCRYPTION_KEY で pgp_sym_encrypt を使用すると、データが新しい方法で暗号化されるためです。

https://www.postgresql.org/docs/13/pgcrypto.html#id-1.11.7.34.8 https://en.wikipedia.org/wiki/Pretty_Good_Privacy#/media/File:PGP_diagram.svg

PGPString(TypeDecorator):pgp_sym_decrypt を使用して SQLAlchemy ドキュメント関数を変更し、以下のクエリのように暗号化された変数ではなく、列を復号化する方法は次のとおりです。

SELECT private."user".id AS private_user_id,
       private."user".sexe AS private_user_sexe,
       pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY)  AS private_user_username,
       pgp_sym_decrypt(private."user".firstname, ENCRYPTION_KEY) AS private_user_firstname,
       pgp_sym_decrypt(private."user".lastname, ENCRYPTION_KEY)  AS private_user_lastname,
       pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY)     AS private_user_email,
       pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY)  AS private_user_password,
       private."user".registration_date AS private_user_registration_date,
       private."user".validation_date   AS private_user_validation_date,
       private."user".role              AS private_user_role
FROM private."user"
WHERE
      pgp_sym_decrypt(private."user".username, ENCRYPTION_KEY) = 'username'
    AND pgp_sym_decrypt(private."user".email, ENCRYPTION_KEY)  = 'email'
    AND   pgp_sym_decrypt(private."user".password, ENCRYPTION_KEY) = 'password';

通常、私は今のところ Python と SQLAlchemy でうまくやっていますが、これは本当に私のスキルを超えており、私にはまったく知られていない機能に影響するため、この関数を変更する方法が本当にわかりません。

とりあえずテキスト機能を使ってますが、これはイマイチです

たとえば、ユーザー名から ID を取得する別の方法 (暗号化されたデータ)

    @classmethod
    def get_id(cls, username):
        query = """
        SELECT private.user.id
        FROM private."user"
        WHERE pgp_sym_decrypt(private."user".username, :ENCRYPTION_KEY) = :username;
        """
        q = db.session.query(cls).from_statement(text(query)).params(ENCRYPTION_KEY=ENCRYPTION_KEY,
                                                                     username=username).first()
        return q.id

助けてくれてありがとう!よろしくお願いします

4

1 に答える 1

0

を追加して解決:

class PGPString(TypeDecorator):
    (...)
    def pgp_sym_decrypt(self, col):
        return func.pgp_sym_decrypt(col, self.passphrase, type_=String)

PGPString に次のように使用します。

class User(db.session):
    (...)
    @classmethod
    def check_user_exist(cls, **kw):
        exists = db.session.query(cls).with_entities(cls.username)\
            .filter(func.pgp_sym_decrypt(User.username, ENCRYPTION_KEY, type_=String) == kw['username'])\
            .filter(func.pgp_sym_decrypt(User.email, ENCRYPTION_KEY, type_=String) == kw['email'])\
            .filter(func.pgp_sym_decrypt(User.password, ENCRYPTION_KEY, type_=String) == kw['password'])\
            .first() is not None
    return exist

sqlalchemy チームに感謝: https://github.com/sqlalchemy/sqlalchemy/discussions/7268

于 2021-11-05T15:24:10.803 に答える