diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 316a9af5..5b7949be 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -14,7 +14,7 @@ repos: - id: check-merge-conflict - id: fix-byte-order-marker - repo: https://github.com/asottile/pyupgrade - rev: v2.37.3 + rev: v2.38.0 hooks: - id: pyupgrade args: [--py37-plus] diff --git a/CHANGES.rst b/CHANGES.rst index f702fafd..5a7a3987 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -3,6 +3,17 @@ Flask-Security Changelog Here you can see the full list of changes between each Flask-Security release. +Version 5.0.2 +------------- + +Released September x, 2022 + +Fixes ++++++ +- (:issue:`673`) Role permissions backwards compatibility bug. For SQL based datastores + that use Flask-Security's models.fsqla_vx - there should be NO issues. If you declare + your own models - please see the 5.0.0 releases notes for required change. + Version 5.0.1 ------------- @@ -47,10 +58,11 @@ Deprecations - (:pr:`657`) The ability to pass in a json_encoder_cls as part of initialization has been removed since Flask 2.2 has deprecated and replaced that functionality. - (:pr:`655`) Flask has deprecated @before_first_request. This was used mostly in examples/quickstart. - These have been changed to use app.app_context() prior to running the app. FS itself used it in + These have been changed to use app.app_context() prior to running the app. Flask-Security itself used it in 2 places - to populate `_` in jinja globals if Babel wasn't initialized and to perform - various configuration sanity checks w.r.t. WTF CSRF. All FS templates have been converted - to use `_fsdomain` rather than ``_`` so FS no longer will populate ``_``. The configuration checks + various configuration sanity checks w.r.t. WTF CSRF. All Flask-Security templates have been converted + to use `_fsdomain` rather than ``_`` so Flask-Security no longer sets ``_`` into jinja2 globals. + The configuration checks have been moved to the end of Security::init_app() - so it is now imperative that `FlaskWTF::CSRFProtect()` be called PRIOR to initializing Flask-Security. - encrypt_password method has been removed. It has been deprecated since 2.0.2 @@ -134,9 +146,20 @@ Other: The key `field_errors` will contain the dict as specified by WTForms. Please note that starting with WTForms 3.0 form-level errors are supported and show up in the dict with the field name/key of "none". There are no changes to non-error related JSON responses. -- Permissions - The Role Model now stores permissions as a list, and requires that the underlying DB ORM map that to a supported - DB type. For SQLAlchemy, this is mapped to a comma separated string (as before). For Mongo, a ListField can be directly used. For - SQLAlchemy DBs the Column type (UnicodeText) didn't change so no data migration should be required. +- Permissions **THIS IS A BREAKING CHANGE**. The Role Model now stores permissions as a list, and requires that the underlying DB ORM map that to a supported + DB type. For SQLAlchemy, this is mapped to a comma separated string (as before). For + SQLAlchemy DBs the underlying Column type (UnicodeText) didn't change so no data migration should be required. + However, the ORM Column type did change and requires the following change to your model:: + + from flask_security import AsaList + from sqlalchemy.ext.mutable import MutableList + class Role(Base, RoleMixin): + ... + permissions = Column(MutableList.as_mutable(AsaList()), nullable=True) + ... + + If your application makes use of Flask-Security's models.fsqla_vX classes - no changes are required. + For Mongo, a ListField can be directly used. - CSRF - As mentioned above, it is now required that `FlaskWTF::CSRFProtect()`, if used, must be called PRIOR to initializing Flask-Security. - json_encoder_cls - As mentioned above - Flask-Security initialization on longer accepts overriding the json_encoder class. If this is required, update to Flask >=2.2 and implement Flask's JSONProvider interface. diff --git a/docs/models.rst b/docs/models.rst index 001f47ef..1f2a6145 100644 --- a/docs/models.rst +++ b/docs/models.rst @@ -121,7 +121,7 @@ Permissions If you want to protect endpoints with permissions, and assign permissions to roles that are then assigned to users, the ``Role`` model requires: -* ``permissions`` (list of string/UnicodeText, nullable) +* ``permissions`` (list of UnicodeText, nullable) WebAuthn ^^^^^^^^ diff --git a/docs/quickstart.rst b/docs/quickstart.rst index b8b2058f..a648a7c8 100644 --- a/docs/quickstart.rst +++ b/docs/quickstart.rst @@ -474,7 +474,7 @@ in 2 ways: * Complete app with in-memory/temporary DB (with little or no mocking). Look in the `Flask-Security repo`_ *examples* directory for actual code that implements the -first approach. +second approach which is much simpler and with an in-memory DB fairly fast. You also might want to set the following configurations in your conftest.py: diff --git a/examples/fsqlalchemy1/app.py b/examples/fsqlalchemy1/app.py index 2a2e0c35..371dde77 100644 --- a/examples/fsqlalchemy1/app.py +++ b/examples/fsqlalchemy1/app.py @@ -31,35 +31,8 @@ ) from flask_security.models import fsqla_v2 as fsqla -# Create app -app = Flask(__name__) -app.config["DEBUG"] = True -# generated using: secrets.token_urlsafe() -app.config["SECRET_KEY"] = "pf9Wkove4IKEAXvy-cQkeDPhv9Cb3Ag-wyJILbq_dFw" -app.config["SECURITY_PASSWORD_HASH"] = "argon2" -# argon2 uses double hashing by default - so provide key. -# For python3: secrets.SystemRandom().getrandbits(128) -app.config["SECURITY_PASSWORD_SALT"] = "146585145368132386173505678016728509634" - -# Take password complexity seriously -app.config["SECURITY_PASSWORD_COMPLEXITY_CHECKER"] = "zxcvbn" - -# Allow registration of new users without confirmation -app.config["SECURITY_REGISTERABLE"] = True - -app.config["SQLALCHEMY_DATABASE_URI"] = os.environ.get( - "SQLALCHEMY_DATABASE_URI", "sqlite://" -) -app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False - -# As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the -# underlying engine. This option makes sure that DB connections from the pool -# are still valid. Important for entire application since many DBaaS options -# automatically close idle connections. -app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {"pool_pre_ping": True} - # Create database connection object -db = SQLAlchemy(app) +db = SQLAlchemy() # Define models - for this example - we change the default table names fsqla.FsModels.set_db_info(db, user_table_name="myuser", role_table_name="myrole") @@ -81,122 +54,156 @@ class Blog(db.Model): text = Column(UnicodeText) -# Setup Flask-Security -user_datastore = SQLAlchemyUserDatastore(db, User, Role) -app.security = Security(app, user_datastore) - -# Setup Babel - not strictly necessary but since our virtualenv has Flask-Babel -# we need to initialize it -Babel(app) +# Create app +def create_app(): + app = Flask(__name__) + app.config["DEBUG"] = True + # generated using: secrets.token_urlsafe() + app.config["SECRET_KEY"] = "pf9Wkove4IKEAXvy-cQkeDPhv9Cb3Ag-wyJILbq_dFw" + app.config["SECURITY_PASSWORD_HASH"] = "argon2" + # argon2 uses double hashing by default - so provide key. + # For python3: secrets.SystemRandom().getrandbits(128) + app.config["SECURITY_PASSWORD_SALT"] = "146585145368132386173505678016728509634" + + # Take password complexity seriously + app.config["SECURITY_PASSWORD_COMPLEXITY_CHECKER"] = "zxcvbn" + + # Allow registration of new users without confirmation + app.config["SECURITY_REGISTERABLE"] = True + + app.config["SQLALCHEMY_DATABASE_URI"] = os.environ.get( + "SQLALCHEMY_DATABASE_URI", "sqlite://" + ) + app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False + + # As of Flask-SQLAlchemy 2.4.0 it is easy to pass in options directly to the + # underlying engine. This option makes sure that DB connections from the pool + # are still valid. Important for entire application since many DBaaS options + # automatically close idle connections. + app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {"pool_pre_ping": True} + + # Setup Flask-Security + db.init_app(app) + user_datastore = SQLAlchemyUserDatastore(db, User, Role) + app.security = Security(app, user_datastore) + + # Setup Babel - not strictly necessary but since our virtualenv has Flask-Babel + # we need to initialize it + Babel(app) + + # Set this so unit tests can mock out. + app.blog_cls = Blog + + # Views + # Note that we always add @auth_required so that if a client isn't logged in + # we will get a proper '401' and redirected to login page. + @app.route("/") + @auth_required() + def home(): + return render_template_string("Hello {{ current_user.email }}") + + @app.route("/admin") + @auth_required() + @permissions_accepted("admin-read", "admin-write") + def admin(): + return render_template_string( + "Hello on admin page. Current user {} password is {}".format( + current_user.email, current_user.password + ) + ) -# Set this so unit tests can mock out. -app.blog_cls = Blog + @app.route("/ops") + @auth_required() + @roles_accepted("monitor") + def monitor(): + # Example of using just a role. Note that 'admin' can't access this + # since it doesn't have the 'monitor' role - even though it has + # all the permissions that the 'monitor' role has. + return render_template_string("Hello OPS") + + @app.route("/blog/", methods=["GET", "POST"]) + @auth_required() + @permissions_required("user-write") + def update_blog(bid): + # Yes caller has write permission - but do they OWN this blog? + blog = current_app.blog_cls.query.get(bid) + if not blog: + abort(404) + if current_user != blog.user: + abort(403) + return render_template_string("Yes, {{ current_user.email }} can update blog") + + @app.route("/myblogs", methods=["GET"]) + @auth_required() + @permissions_accepted("user-read") + def list_my_blogs(): + blogs = current_user.blogs + blist = "" + cnt = 0 + for blog in blogs: + blist += f" {blog.title}" + cnt += 1 + if not blogs: + abort(404) + return render_template_string(f"Found {cnt} of yours with titles {blist}") + + return app # Create users and roles (and first blog!) def create_users(): if current_app.testing: return - security = current_app.security - security.datastore.db.create_all() - security.datastore.find_or_create_role( - name="admin", - permissions={"admin-read", "admin-write", "user-read", "user-write"}, - ) - security.datastore.find_or_create_role( - name="monitor", permissions={"admin-read", "user-read"} - ) - security.datastore.find_or_create_role( - name="user", permissions={"user-read", "user-write"} - ) - security.datastore.find_or_create_role(name="reader", permissions={"user-read"}) - - if not security.datastore.find_user(email="admin@me.com"): - security.datastore.create_user( - email="admin@me.com", password=hash_password("password"), roles=["admin"] + with current_app.app_context(): + security = current_app.security + security.datastore.db.create_all() + security.datastore.find_or_create_role( + name="admin", + permissions={"admin-read", "admin-write", "user-read", "user-write"}, ) - if not security.datastore.find_user(email="ops@me.com"): - security.datastore.create_user( - email="ops@me.com", password=hash_password("password"), roles=["monitor"] + security.datastore.find_or_create_role( + name="monitor", permissions={"admin-read", "user-read"} ) - real_user = security.datastore.find_user(email="user@me.com") - if not real_user: - real_user = security.datastore.create_user( - email="user@me.com", password=hash_password("password"), roles=["user"] + security.datastore.find_or_create_role( + name="user", permissions={"user-read", "user-write"} ) - if not security.datastore.find_user(email="reader@me.com"): - security.datastore.create_user( - email="reader@me.com", password=hash_password("password"), roles=["reader"] + security.datastore.find_or_create_role(name="reader", permissions={"user-read"}) + + if not security.datastore.find_user(email="admin@me.com"): + security.datastore.create_user( + email="admin@me.com", + password=hash_password("password"), + roles=["admin"], + ) + if not security.datastore.find_user(email="ops@me.com"): + security.datastore.create_user( + email="ops@me.com", + password=hash_password("password"), + roles=["monitor"], + ) + real_user = security.datastore.find_user(email="user@me.com") + if not real_user: + real_user = security.datastore.create_user( + email="user@me.com", password=hash_password("password"), roles=["user"] + ) + if not security.datastore.find_user(email="reader@me.com"): + security.datastore.create_user( + email="reader@me.com", + password=hash_password("password"), + roles=["reader"], + ) + + # create initial blog + blog = current_app.blog_cls( + title="First Blog", text="my first blog is short", user=real_user ) - - # create initial blog - blog = app.blog_cls( - title="First Blog", text="my first blog is short", user=real_user - ) - security.datastore.db.session.add(blog) - security.datastore.db.session.commit() - print(f"First blog id {blog.id}") - - -# Views -# Note that we always add @auth_required so that if a client isn't logged in -# we will get a proper '401' and redirected to login page. -@app.route("/") -@auth_required() -def home(): - return render_template_string("Hello {{ current_user.email }}") - - -@app.route("/admin") -@auth_required() -@permissions_accepted("admin-read", "admin-write") -def admin(): - return render_template_string( - "Hello on admin page. Current user {} password is {}".format( - current_user.email, current_user.password - ) - ) - - -@app.route("/ops") -@auth_required() -@roles_accepted("monitor") -def monitor(): - # Example of using just a role. Note that 'admin' can't access this - # since it doesn't have the 'monitor' role - even though it has - # all the permissions that the 'monitor' role has. - return render_template_string("Hello OPS") - - -@app.route("/blog/", methods=["GET", "POST"]) -@auth_required() -@permissions_required("user-write") -def update_blog(bid): - # Yes caller has write permission - but do they OWN this blog? - blog = current_app.blog_cls.query.get(bid) - if not blog: - abort(404) - if current_user != blog.user: - abort(403) - return render_template_string("Yes, {{ current_user.email }} can update blog") - - -@app.route("/myblogs", methods=["GET"]) -@auth_required() -@permissions_accepted("user-read") -def list_my_blogs(): - blogs = current_user.blogs - blist = "" - cnt = 0 - for blog in blogs: - blist += f" {blog.title}" - cnt += 1 - if not blogs: - abort(404) - return render_template_string(f"Found {cnt} of yours with titles {blist}") + security.datastore.db.session.add(blog) + security.datastore.db.session.commit() + print(f"First blog id {blog.id}") if __name__ == "__main__": - with app.app_context(): + myapp = create_app() + with myapp.app_context(): create_users() - app.run(port=5003) + myapp.run(port=5003) diff --git a/examples/fsqlalchemy1/tests/conftest.py b/examples/fsqlalchemy1/tests/conftest.py index cb4eb881..42802c22 100644 --- a/examples/fsqlalchemy1/tests/conftest.py +++ b/examples/fsqlalchemy1/tests/conftest.py @@ -1,25 +1,19 @@ # Copyright 2019 by J. Christopher Wagner (jwag). All rights reserved. - -from unittest.mock import Mock import pytest -from .test_utils import WrapApp - - @pytest.fixture def myapp(): """ Create a wrapped flask app. This is used for unittests that want to mock out all - underlying singletons (such as DBs). + underlying singletons (e.g. blog). Assumes that app.security has been set. """ - from fsqlalchemy1.app import app, User, Role + from fsqlalchemy1.app import create_app + app = create_app() app.config["TESTING"] = True - bmock = Mock() - app.blog_cls = bmock - return WrapApp(app, User, Role, mocks={"blog_mock": bmock}) + return app diff --git a/examples/fsqlalchemy1/tests/test_api.py b/examples/fsqlalchemy1/tests/test_api.py index 51639dc7..8e6f4be7 100644 --- a/examples/fsqlalchemy1/tests/test_api.py +++ b/examples/fsqlalchemy1/tests/test_api.py @@ -1,33 +1,47 @@ -# Copyright 2019 by J. Christopher Wagner (jwag). All rights reserved. +# Copyright 2019-2022 by J. Christopher Wagner (jwag). All rights reserved. from fsqlalchemy1.app import Blog -from .test_utils import create_fake_user, set_current_user +from .test_utils import set_current_user def test_monitor_404(myapp): - user = create_fake_user(myapp.user_cls, roles=myapp.role_cls(name="basic")) - set_current_user(myapp.app, user) + ds = myapp.security.datastore + with myapp.app_context(): + ds.db.create_all() + + r1 = ds.create_role(name="basic") + ds.create_user(email="unittest@me.com", password="password", roles=[r1]) + ds.commit() + + set_current_user(myapp, ds, "unittest@me.com") # This requires "monitor" role - resp = myapp.test_client.get( + resp = myapp.test_client().get( "/ops", - headers={myapp.app.config["SECURITY_TOKEN_AUTHENTICATION_HEADER"]: "token"}, + headers={myapp.config["SECURITY_TOKEN_AUTHENTICATION_HEADER"]: "token"}, ) assert resp.status_code == 403 def test_blog_write(myapp): - user_role = myapp.role_cls(name="user", permissions={"user-read", "user-write"}) - user = create_fake_user(myapp.user_cls, roles=user_role) - set_current_user(myapp.app, user) + ds = myapp.security.datastore + with myapp.app_context(): + ds.db.create_all() + + r1 = ds.create_role(name="user", permissions={"user-read", "user-write"}) + user = ds.create_user(email="unittest@me.com", password="password", roles=[r1]) + + b1 = Blog(id=1, text="hi blog", user=user) + ds.put(b1) + ds.commit() + + set_current_user(myapp, ds, "unittest@me.com") - b1 = Blog(id=1, text="hi blog", user=user) - myapp.mocks["blog_mock"].query.get.return_value = b1 # This requires "user-write" permission - resp = myapp.test_client.post( + resp = myapp.test_client().post( "/blog/1", - headers={myapp.app.config["SECURITY_TOKEN_AUTHENTICATION_HEADER"]: "token"}, + headers={myapp.config["SECURITY_TOKEN_AUTHENTICATION_HEADER"]: "token"}, data=dict({"text": "A new blog"}), ) assert resp.status_code == 200 diff --git a/examples/fsqlalchemy1/tests/test_utils.py b/examples/fsqlalchemy1/tests/test_utils.py index 40e33064..ca2f0be6 100644 --- a/examples/fsqlalchemy1/tests/test_utils.py +++ b/examples/fsqlalchemy1/tests/test_utils.py @@ -1,39 +1,14 @@ # Copyright 2019 by J. Christopher Wagner (jwag). All rights reserved. -class WrapApp: - def __init__(self, app, user_cls=None, role_cls=None, mocks=None): - """Used to help create a app test fixture - with optionally passing in mocks""" - self.app = app - self.user_cls = user_cls - self.role_cls = role_cls - self.test_client = app.test_client() - self.mocks = mocks - - -def set_current_user(app, user): +def set_current_user(app, ds, email): """Set up so that when request is received, the token will cause 'user' to be made the current_user """ def token_cb(request): if request.headers.get("Authentication-Token") == "token": - return user + return ds.find_user(email=email) return app.security.login_manager.anonymous_user() app.security.login_manager.request_loader(token_cb) - - -def create_fake_user(user_cls, email="unittest@me.com", userid=1, roles=None): - """Create fake user optionally with roles""" - user = user_cls() - user.email = email - user.id = userid - user.password = "mypassword" - user.active = True - if roles: - if isinstance(roles, list): - user.roles = roles - else: - user.roles = [roles] - return user diff --git a/flask_security/__init__.py b/flask_security/__init__.py index a59b3952..f56604de 100644 --- a/flask_security/__init__.py +++ b/flask_security/__init__.py @@ -23,6 +23,7 @@ from .datastore import ( UserDatastore, SQLAlchemyUserDatastore, + AsaList, MongoEngineUserDatastore, PeeweeUserDatastore, PonyUserDatastore, diff --git a/flask_security/datastore.py b/flask_security/datastore.py index f49b3403..74ee42a1 100644 --- a/flask_security/datastore.py +++ b/flask_security/datastore.py @@ -35,6 +35,32 @@ def delete(self, model): raise NotImplementedError +try: + import sqlalchemy.types as types + + class AsaList(types.TypeDecorator): + # SQL-like DBs don't have a List type - so do that here by converting to a comma + # separate string. + impl = types.UnicodeText + + def process_bind_param(self, value, dialect): + # produce a string from an iterable + try: + return ",".join(value) + except TypeError: + return value + + def process_result_value(self, value, dialect): + if value: + return value.split(",") + return [] + +except ImportError: # pragma: no cover + + class AsaList: # type: ignore + pass + + class SQLAlchemyDatastore(Datastore): def commit(self): self.db.session.commit() diff --git a/flask_security/models/fsqla.py b/flask_security/models/fsqla.py index e9ca347b..685ad889 100644 --- a/flask_security/models/fsqla.py +++ b/flask_security/models/fsqla.py @@ -22,10 +22,11 @@ ForeignKey, ) from sqlalchemy.ext.declarative import declared_attr +from sqlalchemy.ext.mutable import MutableList from sqlalchemy.orm import relationship from sqlalchemy.sql import func -from flask_security import RoleMixin, UserMixin +from flask_security import AsaList, RoleMixin, UserMixin class FsModels: @@ -70,7 +71,9 @@ class FsRoleMixin(RoleMixin): name = Column(String(80), unique=True, nullable=False) description = Column(String(255)) # A comma separated list of strings - permissions = Column(UnicodeText, nullable=True) # type: ignore + permissions = Column( + MutableList.as_mutable(AsaList()), nullable=True # type: ignore + ) update_datetime = Column( type_=DateTime, nullable=False, diff --git a/flask_security/models/fsqla_v3.py b/flask_security/models/fsqla_v3.py index fb626cd8..d0ccf7eb 100644 --- a/flask_security/models/fsqla_v3.py +++ b/flask_security/models/fsqla_v3.py @@ -28,31 +28,12 @@ from sqlalchemy.ext.declarative import declared_attr from sqlalchemy.ext.mutable import MutableList from sqlalchemy.sql import func -import sqlalchemy.types as types from .fsqla_v2 import FsModels as FsModelsV2 from .fsqla_v2 import FsUserMixin as FsUserMixinV2 from .fsqla_v2 import FsRoleMixin as FsRoleMixinV2 -from flask_security import WebAuthnMixin - - -class AsaList(types.TypeDecorator): - # SQL-like DBs don't have a List type - so do that here by converting to a comma - # separate string. - impl = types.UnicodeText - - def process_bind_param(self, value, dialect): - # produce a string from an iterable - try: - return ",".join(value) - except TypeError: - return value - - def process_result_value(self, value, dialect): - if value: - return value.split(",") - return [] +from flask_security import AsaList, WebAuthnMixin class FsModels(FsModelsV2): @@ -60,9 +41,7 @@ class FsModels(FsModelsV2): class FsRoleMixin(FsRoleMixinV2): - permissions = Column( - MutableList.as_mutable(AsaList()), nullable=True # type: ignore - ) + pass class FsUserMixin(FsUserMixinV2): diff --git a/tests/conftest.py b/tests/conftest.py index f23d8774..b9076413 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -466,7 +466,7 @@ def sqlalchemy_session_setup(request, app, tmpdir, realdburl): Text, ForeignKey, ) - from flask_security.models.fsqla_v3 import AsaList + from flask_security import AsaList f, path = tempfile.mkstemp( prefix="flask-security-test-db", suffix=".db", dir=str(tmpdir) diff --git a/tests/test_common.py b/tests/test_common.py index 00059e1f..2b99bcf0 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -850,6 +850,8 @@ def test_verifying_token_from_version_3x(in_app_context): def test_change_token_uniquifier(app): + pytest.importorskip("sqlalchemy") + # make sure that existing token no longer works once we change the token uniquifier from sqlalchemy import Column, String from flask_sqlalchemy import SQLAlchemy @@ -901,6 +903,8 @@ class User(db.Model, fsqla.FsUserMixin): def test_null_token_uniquifier(app): + pytest.importorskip("sqlalchemy") + # If existing record has a null fs_token_uniquifier, should be set on first use. from sqlalchemy import Column, String from flask_sqlalchemy import SQLAlchemy diff --git a/tests/test_datastore.py b/tests/test_datastore.py index 90930d53..d61ae0d8 100644 --- a/tests/test_datastore.py +++ b/tests/test_datastore.py @@ -269,6 +269,15 @@ def test_permissions_types(app, datastore): t4 = ds.find_role("test4") assert {"read", "write"} == t4.get_permissions() + ds.create_role( + name="test5", + permissions={"read"}, + ) + ds.commit() + + t5 = ds.find_role("test5") + assert {"read"} == t5.get_permissions() + def test_modify_permissions(app, datastore): ds = datastore @@ -369,6 +378,7 @@ def test_modify_permissions_multi(app, datastore): def test_uuid(app, request, tmpdir, realdburl): """Test that UUID extension of postgresql works as a primary id for users""" + importorskip("sqlalchemy") import uuid from flask_sqlalchemy import SQLAlchemy from sqlalchemy import Boolean, Column, DateTime, Integer, ForeignKey, String @@ -556,3 +566,111 @@ def test_mf_recovery_codes(app, datastore): user = datastore.find_user(email="matt@lp.com") codes = datastore.mf_get_recovery_codes(user) assert codes == ["r1", "r3"] + + +def test_permissions_fsqla_v2(app): + importorskip("sqlalchemy") + # Make sure folks with fsqla_v2 work with new AsList column type + from sqlalchemy import insert + from flask_sqlalchemy import SQLAlchemy + from flask_security.models import fsqla_v2 as fsqla + from flask_security import Security + from flask_security import SQLAlchemyUserDatastore + + app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:" + db = SQLAlchemy(app) + + fsqla.FsModels.set_db_info(db) + + class Role(db.Model, fsqla.FsRoleMixin): + pass + + class User(db.Model, fsqla.FsUserMixin): + pass + + with app.app_context(): + db.create_all() + meta_data = db.MetaData(bind=db.engine) + db.MetaData.reflect(meta_data) + role_table = meta_data.tables["role"] + + # Start by manually creating a role in the 4.1.x style + stmt = insert(role_table).values( + name="r1", description="r1 v41", permissions="read,write" + ) + with db.engine.connect() as conn: + conn.execute(stmt) + + ds = SQLAlchemyUserDatastore(db, User, Role) + app.security = Security(app, datastore=ds) + + with app.app_context(): + # Verify can read something written by 4.x + r1 = ds.find_role("r1") + assert r1.get_permissions() == {"read", "write"} + + ds.create_role(name="test5", permissions={"read"}) + ds.commit() + + t5 = ds.find_role("test5") + assert {"read"} == t5.get_permissions() + + +def test_permissions_41(request, app, realdburl): + importorskip("sqlalchemy") + # Check compatibility with 4.1 DB + from sqlalchemy import Column, insert + from flask_sqlalchemy import SQLAlchemy + from flask_security.models import fsqla_v2 as fsqla + from flask_security import Security + from flask_security import SQLAlchemyUserDatastore + from tests.conftest import _setup_realdb, _teardown_realdb + + if realdburl: + db_url, db_info = _setup_realdb(realdburl) + app.config["SQLALCHEMY_DATABASE_URI"] = db_url + else: + app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///:memory:" + + def tear_down(): + if realdburl: + db.drop_all() + _teardown_realdb(db_info) + + request.addfinalizer(tear_down) + + db = SQLAlchemy(app) + fsqla.FsModels.set_db_info(db) + + class Role(db.Model, fsqla.FsRoleMixin): + # permissions = Column(UnicodeText, nullable=True) # type: ignore + from flask_security import AsaList + from sqlalchemy.ext.mutable import MutableList + + # A comma separated list of strings + permissions = Column( + MutableList.as_mutable(AsaList()), nullable=True # type: ignore + ) + + class User(db.Model, fsqla.FsUserMixin): + pass + + with app.app_context(): + db.create_all() + meta_data = db.MetaData(bind=db.engine) + db.MetaData.reflect(meta_data) + role_table = meta_data.tables["role"] + + # Start by manually creating a role in the 4.1.x style + stmt = insert(role_table).values( + name="r1", description="r1 v41", permissions="read,write" + ) + with db.engine.connect() as conn: + conn.execute(stmt) + + ds = SQLAlchemyUserDatastore(db, User, Role) + app.security = Security(app, datastore=ds) + + with app.app_context(): + r1 = ds.find_role("r1") + assert r1.get_permissions() == {"read", "write"} diff --git a/tests/test_two_factor.py b/tests/test_two_factor.py index 509d5ee8..d1229513 100644 --- a/tests/test_two_factor.py +++ b/tests/test_two_factor.py @@ -1161,6 +1161,8 @@ def test_bad_sender(app, client, get_message): def test_replace_send_code(app, get_message): + pytest.importorskip("sqlalchemy") + # replace tf_send_code - and have it return an error to check that. from flask_sqlalchemy import SQLAlchemy from flask_security.models import fsqla_v2 as fsqla @@ -1290,6 +1292,8 @@ def test_setup_nofresh(app, client, get_message): @pytest.mark.settings(two_factor_enabled_methods=["email"]) def test_no_sms(app, get_message): + pytest.importorskip("sqlalchemy") + # Make sure that don't require tf_phone_number if SMS isn't an option. from sqlalchemy import ( Boolean, diff --git a/tests/test_unified_signin.py b/tests/test_unified_signin.py index 3139edc9..7fddcdc5 100644 --- a/tests/test_unified_signin.py +++ b/tests/test_unified_signin.py @@ -1659,6 +1659,8 @@ def test_bad_sender(app, client, get_message): @pytest.mark.registerable() def test_replace_send_code(app, get_message): + pytest.importorskip("sqlalchemy") + from flask_sqlalchemy import SQLAlchemy from flask_security.models import fsqla_v2 as fsqla from flask_security import Security, us_send_security_token