yaodong
7/28/2016 - 4:11 PM

encrypted_column.py

from cryptography.fernet import Fernet
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.types import TypeDecorator, BINARY

secret_key = 'change me!'

class EncryptedTypeBase(TypeDecorator):
    impl = BINARY

    def process_bind_param(self, value, dialect):
        if value is not None:
            value = EncryptedType.cipher_suite().encrypt(json.dumps(value).encode('utf-8'))
        return value

    def process_result_value(self, value, dialect):
        if value is not None:
            value = json.loads(EncryptedType.cipher_suite().decrypt(value).decode('utf-8'))
        return value

    @classmethod
    def cipher_suite(cls):
        if getattr(cls, '_cipher_suite', None) is None:
            cls._cipher_suite = Fernet(secret_key)
        return cls._cipher_suite

    @property
    def python_type(self):
        raise NotImplementedError()

    def process_literal_param(self, value, dialect):
        raise NotImplementedError()


EncryptedType = MutableDict.as_mutable(EncryptedTypeBase)