+ $ pip install flask-security flask-sqlalchemy
+
+
+Two-factor Application
+~~~~~~~~~~~~~~~~~~~~~~
+
+The following code sample illustrates how to get started as quickly as
+possible using SQLAlchemy:
+
+::
+
+ from flask import Flask, current_app, render_template
+ from flask_sqlalchemy import SQLAlchemy
+ from flask_security import Security, SQLAlchemyUserDatastore, \
+ UserMixin, RoleMixin, login_required
+
+
+ # At top of file
+ from flask_mail import Mail
+
+
+ # Convenient references
+ from werkzeug.datastructures import MultiDict
+ from werkzeug.local import LocalProxy
+
+
+ _security = LocalProxy(lambda: current_app.extensions['security'])
+
+ _datastore = LocalProxy(lambda: _security.datastore)
+
+ # Create app
+ app = Flask(__name__)
+ app.config['DEBUG'] = True
+ app.config['SECRET_KEY'] = 'super-secret'
+ app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite://'
+
+ app.config['SECURITY_TWO_FACTOR_ENABLED_METHODS'] = ['mail',
+ 'google_authenticator'] # 'sms' also valid but requires an sms provider
+ app.config["SECURITY_TWO_FACTOR"] = True
+ app.config['SECURITY_TWO_FACTOR_RESCUE_MAIL'] = 'put_your_mail@gmail.com'
+ app.config['SECURITY_TWO_FACTOR_URI_SERVICE_NAME'] = 'put_your_app_name'
+
+ # Create database connection object
+ db = SQLAlchemy(app)
+
+ # Define models
+ roles_users = db.Table('roles_users',
+ db.Column('user_id', db.Integer(), db.ForeignKey('user.id')),
+ db.Column('role_id', db.Integer(), db.ForeignKey('role.id')))
+
+ class Role(db.Model, RoleMixin):
+ id = db.Column(db.Integer(), primary_key=True)
+ name = db.Column(db.String(80), unique=True)
+ description = db.Column(db.String(255))
+
+ class User(db.Model, UserMixin):
+ id = db.Column(db.Integer, primary_key=True)
+ email = db.Column(db.String(255), unique=True)
+ password = db.Column(db.String(255))
+ active = db.Column(db.Boolean())
+ confirmed_at = db.Column(db.DateTime())
+ roles = db.relationship('Role', secondary=roles_users,
+ backref=db.backref('users', lazy='dynamic'))
+ phone_number = db.Column(db.String(15))
+ two_factor_primary_method = db.Column(db.String(140))
+ totp_secret = db.Column(db.String(16))
+
+ # Setup Flask-Security
+ user_datastore = SQLAlchemyUserDatastore(db, User, Role)
+ security = Security(app, user_datastore)
+
+ mail = Mail(app)
+
+ # Create a user to test with
+ @app.before_first_request
+ def create_user():
+ db.create_all()
+ user_datastore.create_user(email='gal@lp.com', password='password', username='gal',
+ totp_secret=None, two_factor_primary_method=None)
+ db.session.commit()
+
+ # Views
+ @app.route('/')
+ @login_required
+ def home():
+ return render_template('index.html')
+
+ if __name__ == '__main__':
+ app.run()
diff --git a/flask_security/core.py b/flask_security/core.py
index 597aa00e..2c19db2e 100644
--- a/flask_security/core.py
+++ b/flask_security/core.py
@@ -12,6 +12,7 @@
"""
from datetime import datetime
+import sys
import pkg_resources
from flask import current_app, render_template
@@ -26,15 +27,17 @@
from werkzeug.datastructures import ImmutableList
from werkzeug.local import LocalProxy
-from .forms import ChangePasswordForm, ConfirmRegisterForm, \
- ForgotPasswordForm, LoginForm, PasswordlessLoginForm, RegisterForm, \
- ResetPasswordForm, SendConfirmationForm
-from .utils import _
+from .forms import LoginForm, ConfirmRegisterForm, RegisterForm, \
+ ForgotPasswordForm, ChangePasswordForm, ResetPasswordForm, \
+ SendConfirmationForm, PasswordlessLoginForm, TwoFactorVerifyCodeForm, \
+ TwoFactorSetupForm, TwoFactorChangeMethodVerifyPasswordForm,\
+ TwoFactorRescueForm
from .utils import config_value as cv
-from .utils import get_config, hash_data, localize_callback, send_mail, \
- string_types, url_for_security, verify_and_update_password, verify_hash
+from .utils import _, get_config, hash_data, localize_callback, string_types, \
+ url_for_security, verify_hash, send_mail, verify_and_update_password
from .views import create_blueprint
+
# Convenient references
_security = LocalProxy(lambda: current_app.extensions['security'])
@@ -84,23 +87,34 @@
'CHANGE_PASSWORD_TEMPLATE': 'security/change_password.html',
'SEND_CONFIRMATION_TEMPLATE': 'security/send_confirmation.html',
'SEND_LOGIN_TEMPLATE': 'security/send_login.html',
+ 'TWO_FACTOR_VERIFY_CODE_TEMPLATE':
+ 'security/two_factor_verify_code.html',
+ 'TWO_FACTOR_CHOOSE_METHOD_TEMPLATE':
+ 'security/two_factor_choose_method.html',
+ 'TWO_FACTOR_CHANGE_METHOD_PASSWORD_CONFIRMATION_TEMPLATE':
+ 'security/two_factor_change_method_password_confirmation.html',
'CONFIRMABLE': False,
'REGISTERABLE': False,
'RECOVERABLE': False,
'TRACKABLE': False,
'PASSWORDLESS': False,
'CHANGEABLE': False,
+ 'TWO_FACTOR': False,
'SEND_REGISTER_EMAIL': True,
'SEND_PASSWORD_CHANGE_EMAIL': True,
'SEND_PASSWORD_RESET_EMAIL': True,
'SEND_PASSWORD_RESET_NOTICE_EMAIL': True,
'LOGIN_WITHIN': '1 days',
+ 'TWO_FACTOR_GOOGLE_AUTH_VALIDITY': 0,
+ 'TWO_FACTOR_MAIL_VALIDITY': 1,
+ 'TWO_FACTOR_SMS_VALIDITY': 5,
'CONFIRM_EMAIL_WITHIN': '5 days',
'RESET_PASSWORD_WITHIN': '5 days',
'LOGIN_WITHOUT_CONFIRMATION': False,
'EMAIL_SENDER': LocalProxy(lambda: current_app.config.get(
'MAIL_DEFAULT_SENDER', 'no-reply@localhost'
)),
+ 'TWO_FACTOR_RESCUE_MAIL': 'no-reply@localhost',
'TOKEN_AUTHENTICATION_KEY': 'auth_token',
'TOKEN_AUTHENTICATION_HEADER': 'Authentication-Token',
'TOKEN_MAX_AGE': None,
@@ -116,10 +130,12 @@
'EMAIL_SUBJECT_PASSWORDLESS': _('Login instructions'),
'EMAIL_SUBJECT_PASSWORD_NOTICE': _('Your password has been reset'),
'EMAIL_SUBJECT_PASSWORD_CHANGE_NOTICE': _(
- 'Your password has been changed'),
+ 'Your password has been changed'),
'EMAIL_SUBJECT_PASSWORD_RESET': _('Password reset instructions'),
'EMAIL_PLAINTEXT': True,
'EMAIL_HTML': True,
+ 'EMAIL_SUBJECT_TWO_FACTOR': 'Two-factor Login',
+ 'EMAIL_SUBJECT_TWO_FACTOR_RESCUE': 'Two-factor Rescue',
'USER_IDENTITY_ATTRIBUTES': ['email'],
'PASSWORD_SCHEMES': [
'bcrypt',
@@ -138,6 +154,14 @@
],
'DEPRECATED_HASHING_SCHEMES': ['hex_md5'],
'DATETIME_FACTORY': datetime.utcnow,
+ 'TWO_FACTOR_ENABLED_METHODS': ['mail', 'google_authenticator', 'sms'],
+ 'TWO_FACTOR_URI_SERVICE_NAME': 'service_name',
+ 'TWO_FACTOR_SMS_SERVICE': 'Dummy',
+ 'TWO_FACTOR_SMS_SERVICE_CONFIG': {
+ 'ACCOUNT_SID': None,
+ 'AUTH_TOKEN': None,
+ 'PHONE_NUMBER': None,
+ }
}
#: Default Flask-Security messages
@@ -217,6 +241,23 @@
_('Please log in to access this page.'), 'info'),
'REFRESH': (
_('Please reauthenticate to access this page.'), 'info'),
+ 'TWO_FACTOR_INVALID_TOKEN': (
+ _('Invalid Token'), 'error'),
+ 'TWO_FACTOR_LOGIN_SUCCESSFUL': (
+ _('Your token has been confirmed'), 'success'),
+ 'TWO_FACTOR_CHANGE_METHOD_SUCCESSFUL': (
+ _('You successfully changed your two-factor method.'),
+ 'success'),
+ 'TWO_FACTOR_PASSWORD_CONFIRMATION_DONE': (
+ _('You successfully confirmed password'), 'success'),
+ 'TWO_FACTOR_PASSWORD_CONFIRMATION_NEEDED': (
+ _('Password confirmation is needed in order to access page'),
+ 'error'),
+ 'TWO_FACTOR_PERMISSION_DENIED': (
+ _('You currently do not have permissions to access this page'),
+ 'error'),
+ 'TWO_FACTOR_METHOD_NOT_AVAILABLE': (
+ _('Marked method is not valid'), 'error'),
}
_default_forms = {
@@ -228,6 +269,11 @@
'change_password_form': ChangePasswordForm,
'send_confirmation_form': SendConfirmationForm,
'passwordless_login_form': PasswordlessLoginForm,
+ 'two_factor_verify_code_form': TwoFactorVerifyCodeForm,
+ 'two_factor_setup_form': TwoFactorSetupForm,
+ 'two_factor_change_method_verify_password_form':
+ TwoFactorChangeMethodVerifyPasswordForm,
+ 'two_factor_rescue_form': TwoFactorRescueForm
}
@@ -493,8 +539,12 @@ class Security(object):
:param anonymous_user: class to use for anonymous user
"""
- def __init__(self, app=None, datastore=None, register_blueprint=True,
+ def __init__(self,
+ app=None,
+ datastore=None,
+ register_blueprint=True,
**kwargs):
+
self.app = app
self._datastore = datastore
self._register_blueprint = register_blueprint
@@ -508,7 +558,10 @@ def __init__(self, app=None, datastore=None, register_blueprint=True,
register_blueprint=register_blueprint,
**kwargs)
- def init_app(self, app, datastore=None, register_blueprint=None, **kwargs):
+ def init_app(self,
+ app, datastore=None,
+ register_blueprint=None,
+ **kwargs):
"""Initializes the Flask-Security extension for the specified
application and datastore implementation.
@@ -557,8 +610,51 @@ def _register_i18n():
if state.cli_roles_name:
app.cli.add_command(roles, state.cli_roles_name)
+ # configuration mismatch check
+ if cv('TWO_FACTOR', app=app) is True and\
+ len(cv('TWO_FACTOR_ENABLED_METHODS',
+ app=app)) < 1: # pragma: no cover
+
+ raise ValueError()
+
+ config_value = cv('TWO_FACTOR', app=app)
+ if config_value: # pragma: no cover
+ self.check_two_factor_modules('onetimepass',
+ 'TWO_FACTOR', config_value)
+ self.check_two_factor_modules('pyqrcode',
+ 'TWO_FACTOR', config_value)
+
+ config_value = cv('TWO_FACTOR_SMS_SERVICE', app=app)
+
+ if config_value == 'Twilio': # pragma: no cover
+ self.check_two_factor_modules('twilio',
+ 'TWO_FACTOR_SMS_SERVICE',
+ config_value)
+
return state
+ def check_two_factor_modules(self, module,
+ config_name,
+ config_value): # pragma: no cover
+ PY3 = sys.version_info[0] == 3
+ if PY3:
+ from importlib.util import find_spec
+ module_exists = find_spec(module)
+
+ else:
+ import imp
+ try:
+ imp.find_module(module)
+ module_exists = True
+ except ImportError:
+ module_exists = False
+
+ if not module_exists:
+ raise ValueError('{} is required for {} = {}'
+ .format(module, config_name, config_value))
+
+ return module_exists
+
def render_template(self, *args, **kwargs):
return render_template(*args, **kwargs)
diff --git a/flask_security/datastore.py b/flask_security/datastore.py
index c8109eb8..9e93f9f9 100644
--- a/flask_security/datastore.py
+++ b/flask_security/datastore.py
@@ -228,6 +228,7 @@ class SQLAlchemyUserDatastore(SQLAlchemyDatastore, UserDatastore):
"""A SQLAlchemy datastore implementation for Flask-Security that assumes the
use of the Flask-SQLAlchemy extension.
"""
+
def __init__(self, db, user_model, role_model):
SQLAlchemyDatastore.__init__(self, db)
UserDatastore.__init__(self, user_model, role_model)
@@ -272,11 +273,13 @@ class SQLAlchemySessionUserDatastore(SQLAlchemyUserDatastore,
"""A SQLAlchemy datastore implementation for Flask-Security that assumes the
use of the flask_sqlalchemy_session extension.
"""
+
def __init__(self, session, user_model, role_model):
class PretendFlaskSQLAlchemyDb(object):
""" This is a pretend db object, so we can just pass in a session.
"""
+
def __init__(self, session):
self.session = session
@@ -300,6 +303,7 @@ class MongoEngineUserDatastore(MongoEngineDatastore, UserDatastore):
"""A MongoEngine datastore implementation for Flask-Security that assumes
the use of the Flask-MongoEngine extension.
"""
+
def __init__(self, db, user_model, role_model):
MongoEngineDatastore.__init__(self, db)
UserDatastore.__init__(self, user_model, role_model)
@@ -351,6 +355,7 @@ class PeeweeUserDatastore(PeeweeDatastore, UserDatastore):
:param role_model: A role model class definition
:param role_link: A model implementing the many-to-many user-role relation
"""
+
def __init__(self, db, user_model, role_model, role_link):
PeeweeDatastore.__init__(self, db)
UserDatastore.__init__(self, user_model, role_model)
diff --git a/flask_security/forms.py b/flask_security/forms.py
index 5e974ac0..cfab7a36 100644
--- a/flask_security/forms.py
+++ b/flask_security/forms.py
@@ -12,16 +12,18 @@
import inspect
-from flask import Markup, current_app, flash, request
+from flask import Markup, current_app, request
+from flask import session
+from wtforms import BooleanField, Field, HiddenField, PasswordField, \
+ StringField, SubmitField, ValidationError, validators, RadioField
from flask_login import current_user
from flask_wtf import FlaskForm as BaseForm
from speaklater import make_lazy_gettext
-from wtforms import BooleanField, Field, HiddenField, PasswordField, \
- StringField, SubmitField, ValidationError, validators
from .confirmable import requires_confirmation
from .utils import _, _datastore, config_value, get_message, hash_password, \
- localize_callback, url_for_security, validate_redirect_url
+ localize_callback, url_for_security, validate_redirect_url, do_flash
+from .twofactor import verify_totp
lazy_gettext = make_lazy_gettext(lambda: localize_callback)
@@ -37,7 +39,10 @@
'retype_password': _('Retype Password'),
'new_password': _('New Password'),
'change_password': _('Change Password'),
- 'send_login_link': _('Send Login Link')
+ 'send_login_link': _('Send Login Link'),
+ 'verify_password': _('Verify Method'),
+ 'change_method': _('Change Method'),
+ 'phone': _('Phone Number'),
}
@@ -136,7 +141,7 @@ class NextFormMixin():
def validate_next(self, field):
if field.data and not validate_redirect_url(field.data):
field.data = ''
- flash(*get_message('INVALID_REDIRECT'))
+ do_flash(*get_message('INVALID_REDIRECT'))
raise ValidationError(get_message('INVALID_REDIRECT')[0])
@@ -298,3 +303,104 @@ def validate(self):
self.password.errors.append(get_message('PASSWORD_IS_THE_SAME')[0])
return False
return True
+
+
+class TwoFactorSetupForm(Form, UserEmailFormMixin):
+ """The Two-factor token validation form"""
+
+ setup = RadioField('Available Methods',
+ choices=[('mail', 'Set Up Using Mail'),
+ ('google_authenticator',
+ 'Set Up Using Google Authenticator'),
+ ('sms', 'Set Up Using SMS')])
+ phone = StringField(get_form_field_label('phone'))
+ submit = SubmitField(get_form_field_label('sumbit'))
+
+ def __init__(self, *args, **kwargs):
+ super(TwoFactorSetupForm, self).__init__(*args, **kwargs)
+
+ def validate(self):
+ if 'setup' not in self.data or self.data['setup']\
+ not in config_value('TWO_FACTOR_ENABLED_METHODS'):
+ do_flash(*get_message('TWO_FACTOR_METHOD_NOT_AVAILABLE'))
+ return False
+
+ return True
+
+
+class TwoFactorVerifyCodeForm(Form, UserEmailFormMixin):
+ """The Two-factor token validation form"""
+
+ code = StringField(get_form_field_label('code'))
+ submit = SubmitField(get_form_field_label('submit code'))
+
+ def __init__(self, *args, **kwargs):
+ super(TwoFactorVerifyCodeForm, self).__init__(*args, **kwargs)
+
+ def validate(self):
+ if 'email' in session:
+ self.user = _datastore.find_user(email=session['email'])
+ elif 'password_confirmed' in session:
+ self.user = current_user
+ else:
+ return False
+ # codes sent by sms or mail will be valid for another window cycle
+ if session['primary_method'] == 'google_authenticator':
+ self.window = config_value('TWO_FACTOR_GOOGLE_AUTH_VALIDITY')
+ elif session['primary_method'] == 'mail':
+ self.window = config_value('TWO_FACTOR_MAIL_VALIDITY')
+ elif session['primary_method'] == 'sms':
+ self.window = config_value('TWO_FACTOR_SMS_VALIDITY')
+ else:
+ return False
+
+ # verify entered token with user's totp secret
+ if not verify_totp(token=self.code.data,
+ totp_secret=session['totp_secret'],
+ window=self.window):
+ do_flash(*get_message('TWO_FACTOR_INVALID_TOKEN'))
+ return False
+
+ return True
+
+
+class TwoFactorChangeMethodVerifyPasswordForm(Form, PasswordFormMixin):
+ """The default change password form"""
+
+ submit = SubmitField(get_form_field_label('verify_password'))
+
+ def validate(self):
+ if not super(TwoFactorChangeMethodVerifyPasswordForm,
+ self).validate():
+ do_flash(*get_message('INVALID_PASSWORD'))
+ return False
+ self.user = current_user
+ if not self.user.verify_and_update_password(self.password.data):
+ self.password.errors.append(get_message('INVALID_PASSWORD')[0])
+ return False
+
+ return True
+
+
+class TwoFactorRescueForm(Form, UserEmailFormMixin):
+ """The Two-factor Rescue validation form"""
+
+ help_setup = RadioField('Trouble Accessing Your Account?',
+ choices=[('lost_device',
+ 'Can not access mobile device?'),
+ ('no_mail_access',
+ 'Can not access mail account?')])
+ submit = SubmitField(get_form_field_label('submit'))
+
+ def __init__(self, *args, **kwargs):
+ super(TwoFactorRescueForm, self).__init__(*args, **kwargs)
+
+ def validate(self):
+
+ self.user = _datastore.find_user(email=session['email'])
+
+ if 'primary_method' not in session or 'totp_secret' not in session:
+ do_flash(*get_message('TWO_FACTOR_PERMISSION_DENIED'))
+ return False
+
+ return True
diff --git a/flask_security/signals.py b/flask_security/signals.py
index 53fdf1b2..27895ef2 100644
--- a/flask_security/signals.py
+++ b/flask_security/signals.py
@@ -17,6 +17,8 @@
user_confirmed = signals.signal("user-confirmed")
+user_two_factored = signals.signal("user-two-factored")
+
confirm_instructions_sent = signals.signal("confirm-instructions-sent")
login_instructions_sent = signals.signal("login-instructions-sent")
@@ -25,5 +27,7 @@
password_changed = signals.signal("password-changed")
+two_factor_method_changed = signals.signal("two-factor-method-changed")
+
reset_password_instructions_sent = signals.signal(
"password-reset-instructions-sent")
diff --git a/flask_security/templates/security/email/two_factor_instructions.html b/flask_security/templates/security/email/two_factor_instructions.html
new file mode 100644
index 00000000..c9ad7cb6
--- /dev/null
+++ b/flask_security/templates/security/email/two_factor_instructions.html
@@ -0,0 +1,4 @@
+Welcome {{ user.username }}!
+
+You can log into your account using the following code: {{ token }}
+
diff --git a/flask_security/templates/security/email/two_factor_instructions.txt b/flask_security/templates/security/email/two_factor_instructions.txt
new file mode 100644
index 00000000..e177828e
--- /dev/null
+++ b/flask_security/templates/security/email/two_factor_instructions.txt
@@ -0,0 +1,4 @@
+Welcome {{ user.username }}!
+
+You can log into your account using the following code: {{ token }}
+
diff --git a/flask_security/templates/security/email/two_factor_rescue.html b/flask_security/templates/security/email/two_factor_rescue.html
new file mode 100644
index 00000000..238dc403
--- /dev/null
+++ b/flask_security/templates/security/email/two_factor_rescue.html
@@ -0,0 +1 @@
+ {{ user.email }} can not access mail account
diff --git a/flask_security/templates/security/email/two_factor_rescue.txt b/flask_security/templates/security/email/two_factor_rescue.txt
new file mode 100644
index 00000000..8b5a6158
--- /dev/null
+++ b/flask_security/templates/security/email/two_factor_rescue.txt
@@ -0,0 +1,2 @@
+{{ user.email }} can not access mail account
+
diff --git a/flask_security/templates/security/two_factor_change_method_password_confirmation.html b/flask_security/templates/security/two_factor_change_method_password_confirmation.html
new file mode 100644
index 00000000..5f7b92d2
--- /dev/null
+++ b/flask_security/templates/security/two_factor_change_method_password_confirmation.html
@@ -0,0 +1,9 @@
+{% from "security/_macros.html" import render_field_with_errors, render_field %}
+{% include "security/_messages.html" %}
+Please Enter Your Password
+
\ No newline at end of file
diff --git a/flask_security/templates/security/two_factor_choose_method.html b/flask_security/templates/security/two_factor_choose_method.html
new file mode 100644
index 00000000..afd89d62
--- /dev/null
+++ b/flask_security/templates/security/two_factor_choose_method.html
@@ -0,0 +1,32 @@
+{% from "security/_macros.html" import render_field_with_errors, render_field, render_field_no_label %}
+{% include "security/_messages.html" %}
+Two-factor authentication adds an extra layer of security to your account
+In addition to your username and password, you'll need to use a code that we will send you
+
+
+{% include "security/_menu.html" %}
\ No newline at end of file
diff --git a/flask_security/templates/security/two_factor_verify_code.html b/flask_security/templates/security/two_factor_verify_code.html
new file mode 100644
index 00000000..030116a7
--- /dev/null
+++ b/flask_security/templates/security/two_factor_verify_code.html
@@ -0,0 +1,21 @@
+{% from "security/_macros.html" import render_field_with_errors, render_field %}
+{% include "security/_messages.html" %}
+Two-factor Authentication
+Please enter your authentication code
+
+
+{% include "security/_menu.html" %}
\ No newline at end of file
diff --git a/flask_security/translations/flask_security.pot b/flask_security/translations/flask_security.pot
index a6df1467..4f294fe8 100644
--- a/flask_security/translations/flask_security.pot
+++ b/flask_security/translations/flask_security.pot
@@ -316,3 +316,30 @@ msgstr ""
msgid "You can confirm your email through the link below:"
msgstr ""
+#: flask_security/core.py:244
+msgid "Invalid Token"
+msgstr ""
+
+#: flask_security/core.py:246
+msgid "Your token has been confirmed"
+msgstr ""
+
+#: flask_security/core.py:248
+msgid "You successfully changed your two-factor method."
+msgstr ""
+
+#: flask_security/core.py:251
+msgid "You successfully confirmed password"
+msgstr ""
+
+#: flask_security/core.py:253
+msgid "Password confirmation is needed in order to access page"
+msgstr ""
+
+#: flask_security/core.py:256
+msgid "You currently do not have permissions to access this page"
+msgstr ""
+
+#: flask_security/core.py:259
+msgid "Marked method is not valid"
+msgstr ""
diff --git a/flask_security/twofactor.py b/flask_security/twofactor.py
new file mode 100644
index 00000000..2fc50de5
--- /dev/null
+++ b/flask_security/twofactor.py
@@ -0,0 +1,130 @@
+# -*- coding: utf-8 -*-
+"""
+ flask_security.two_factor
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ Flask-Security two_factor module
+
+ :copyright: (c) 2016 by Gal Stainfeld, at Emedgene
+"""
+
+import os
+import base64
+from passlib.totp import TOTP
+
+import onetimepass
+from flask import current_app as app, session
+from werkzeug.local import LocalProxy
+
+from .utils import send_mail, config_value, get_message, do_flash,\
+ SmsSenderFactory, login_user
+from .signals import user_two_factored, two_factor_method_changed
+
+# Convenient references
+_security = LocalProxy(lambda: app.extensions['security'])
+
+_datastore = LocalProxy(lambda: _security.datastore)
+
+
+def send_security_token(user, method, totp_secret):
+ """Sends the security token via email for the specified user.
+ :param user: The user to send the code to
+ :param method: The method in which the code will be sent
+ ('mail' or 'sms') at the moment
+ :param totp_secret: a unique shared secret of the user
+ """
+ token_to_be_sent = get_totp_password(totp_secret)
+ if method == 'mail':
+ send_mail(config_value('EMAIL_SUBJECT_TWO_FACTOR'),
+ user.email,
+ 'two_factor_instructions',
+ user=user,
+ token=token_to_be_sent)
+ elif method == 'sms':
+ msg = "Use this code to log in: %s" % token_to_be_sent
+ from_number = config_value('TWO_FACTOR_SMS_SERVICE_CONFIG')[
+ 'PHONE_NUMBER']
+ if 'phone_number' in session:
+ to_number = session['phone_number']
+ else:
+ to_number = user.phone_number
+ sms_sender = SmsSenderFactory.createSender(
+ config_value('TWO_FACTOR_SMS_SERVICE'))
+ sms_sender.send_sms(from_number=from_number,
+ to_number=to_number, msg=msg)
+
+ elif method == 'google_authenticator':
+ # password are generated automatically in the google authenticator app
+ pass
+
+
+def get_totp_uri(username, totp_secret):
+ """ Generate provisioning url for use with the qrcode
+ scanner built into the app
+ :param username: username of the current user
+ :param totp_secret: a unique shared secret of the user
+ :return:
+ """
+ tp = TOTP(totp_secret)
+ service_name = config_value('TWO_FACTOR_URI_SERVICE_NAME')
+ return tp.to_uri(username + '@' + service_name, service_name)
+
+
+def verify_totp(token, totp_secret, window=0):
+ """ Verifies token for specific user_totp
+ :param token - token to be check against user's secret
+ :param totp_secret - a unique shared secret of the user
+ :param window - optional, compensate for clock skew, number of
+ intervals to check on each side of the current time.
+ (default is 0 - only check the current clock time)
+ :return:
+ """
+ return onetimepass.valid_totp(token, totp_secret, window=window)
+
+
+def get_totp_password(totp_secret):
+ """Get time-based one-time password on the basis of given secret and time
+ :param totp_secret - a unique shared secret of the user
+ """
+ return onetimepass.get_totp(totp_secret)
+
+
+def generate_totp():
+ return base64.b32encode(os.urandom(10)).decode('utf-8')
+
+
+def complete_two_factor_process(user):
+ """clean session according to process (login or changing two-factor method)
+ and perform action accordingly
+ :param user - user's to update in database and log in if necessary
+ """
+ totp_secret_changed = user.totp_secret != session['totp_secret']
+ if totp_secret_changed or user.two_factor_primary_method\
+ != session['primary_method']:
+ user.totp_secret = session['totp_secret']
+ user.two_factor_primary_method = session['primary_method']
+
+ if 'phone_number' in session:
+ user.phone_number = session['phone_number']
+ del session['phone_number']
+
+ _datastore.put(user)
+
+ del session['primary_method']
+ del session['totp_secret']
+
+ # if we are changing two-factor method
+ if 'password_confirmed' in session:
+ del session['password_confirmed']
+ do_flash(*get_message('TWO_FACTOR_CHANGE_METHOD_SUCCESSFUL'))
+ two_factor_method_changed.send(app._get_current_object(),
+ user=user)
+
+ # if we are logging in for the first time
+ else:
+ del session['email']
+ del session['has_two_factor']
+ do_flash(*get_message('TWO_FACTOR_LOGIN_SUCCESSFUL'))
+ user_two_factored.send(app._get_current_object(), user=user)
+ login_user(user)
+ return
diff --git a/flask_security/utils.py b/flask_security/utils.py
index c56174e7..385ad252 100644
--- a/flask_security/utils.py
+++ b/flask_security/utils.py
@@ -8,7 +8,7 @@
:copyright: (c) 2012 by Matt Wright.
:license: MIT, see LICENSE for more details.
"""
-
+import abc
import base64
import hashlib
import hmac
@@ -515,3 +515,53 @@ def _on(app, **data):
yield reset_requests
finally:
reset_password_instructions_sent.disconnect(_on)
+
+
+class SmsSenderBaseClass(object):
+ __metaclass__ = abc.ABCMeta
+
+ def __init__(self):
+ pass
+
+ @abc.abstractmethod
+ def send_sms(self, from_number, to_number, msg):
+ """ Abstract method for sensing sms messages"""
+ return
+
+
+class DummySmsSender(SmsSenderBaseClass):
+ def send_sms(self, from_number, to_number, msg):
+ return
+
+
+class SmsSenderFactory(object):
+ senders = {
+
+ 'Dummy': DummySmsSender
+ }
+
+ @classmethod
+ def createSender(cls, name, *args, **kwargs):
+ return cls.senders[name](*args, **kwargs)
+
+
+try:
+ from twilio.rest import Client
+
+ class TwilioSmsSender(SmsSenderBaseClass):
+ def __init__(self):
+ self.account_sid = config_value(
+ 'TWO_FACTOR_SMS_SERVICE_CONFIG')['ACCOUNT_SID']
+ self.auth_token = config_value(
+ 'TWO_FACTOR_SMS_SERVICE_CONFIG')['AUTH_TOKEN']
+
+ def send_sms(self, from_number, to_number, msg):
+ client = Client(self.account_sid, self.auth_token)
+ client.messages.create(
+ to=to_number,
+ from_=from_number,
+ body=msg,)
+
+ SmsSenderFactory.senders['Twilio'] = TwilioSmsSender
+except:
+ pass
diff --git a/flask_security/views.py b/flask_security/views.py
index 25807fbf..c2aca97c 100644
--- a/flask_security/views.py
+++ b/flask_security/views.py
@@ -9,11 +9,12 @@
:license: MIT, see LICENSE for more details.
"""
-from flask import Blueprint, after_this_request, current_app, jsonify, \
- redirect, request
+from flask import current_app, redirect, request, jsonify, \
+ after_this_request, Blueprint, session, abort
from flask_login import current_user
from werkzeug.datastructures import MultiDict
from werkzeug.local import LocalProxy
+import pyqrcode
from .changeable import change_user_password
from .confirmable import confirm_email_token_status, confirm_user, \
@@ -23,11 +24,12 @@
from .recoverable import reset_password_token_status, \
send_reset_password_instructions, update_password
from .registerable import register_user
-from .utils import url_for_security as url_for
-from .utils import config_value, do_flash, get_message, \
- get_post_login_redirect, get_post_logout_redirect, \
- get_post_register_redirect, get_url, login_user, logout_user, \
- slash_url_suffix
+from .utils import config_value, do_flash, get_url, get_post_login_redirect, \
+ get_post_register_redirect, get_message, login_user, logout_user, \
+ url_for_security as url_for, slash_url_suffix, send_mail,\
+ get_post_logout_redirect
+from .twofactor import send_security_token, generate_totp, \
+ complete_two_factor_process, get_totp_uri
# Convenient references
_security = LocalProxy(lambda: current_app.extensions['security'])
@@ -91,6 +93,8 @@ def login():
def logout():
"""View function which handles a logout request."""
+ if config_value('TWO_FACTOR') is True and 'password_confirmed' in session:
+ del session['password_confirmed']
if current_user.is_authenticated:
logout_user()
@@ -129,6 +133,7 @@ def register():
redirect_url = get_post_register_redirect()
return redirect(redirect_url)
+
return _render_json(form, include_auth_token=True)
if request.is_json:
@@ -198,7 +203,7 @@ def send_confirmation():
send_confirmation_instructions(form.user)
if not request.is_json:
do_flash(*get_message('CONFIRMATION_REQUEST',
- email=form.user.email))
+ email=form.user.email))
if request.is_json:
return _render_json(form)
@@ -259,7 +264,7 @@ def forgot_password():
send_reset_password_instructions(form.user)
if not request.is_json:
do_flash(*get_message('PASSWORD_RESET_REQUEST',
- email=form.user.email))
+ email=form.user.email))
if request.is_json:
return _render_json(form, include_user=False)
@@ -334,6 +339,316 @@ def change_password():
)
+@anonymous_user_required
+def two_factor_login():
+ """View function for two-factor authentication login"""
+ # if we already validated email&password, there is no need to do it again
+ form_class = _security.login_form
+
+ if request.is_json:
+ form = form_class(MultiDict(request.get_json()))
+ else:
+ form = form_class()
+
+ # if user's email&password approved
+ if form.validate_on_submit():
+ user = form.user
+ session['email'] = user.email
+ # if user's two-factor properties are not configured
+
+ if user.two_factor_primary_method is None or\
+ user.totp_secret is None:
+ session['has_two_factor'] = False
+ if not request.is_json:
+ return redirect(url_for('two_factor_setup_function'))
+
+ # if user's two-factor properties are configured
+ else:
+ session['has_two_factor'] = True
+ session['primary_method'] = user.two_factor_primary_method
+ session['totp_secret'] = user.totp_secret
+ send_security_token(user=user,
+ method=user.two_factor_primary_method,
+ totp_secret=user.totp_secret)
+ if not request.is_json:
+ return redirect(url_for('two_factor_token_validation'))
+
+ if request.is_json:
+ return _render_json(form, include_user=False)
+
+ return _security.render_template(config_value('LOGIN_USER_TEMPLATE'),
+ login_user_form=form,
+ **_ctx('login'))
+
+
+def two_factor_setup_function():
+ """View function for two-factor setup during login process"""
+
+ # user's email&password not approved or we are
+ # logged in and didn't validate password
+ form_class = _security.two_factor_setup_form
+
+ if request.is_json:
+ form = form_class(MultiDict(request.get_json()))
+ else:
+ form = form_class()
+
+ if 'password_confirmed' not in session:
+
+ if 'email' not in session or 'has_two_factor' not in session:
+ if not request.is_json:
+ do_flash(*get_message('TWO_FACTOR_PERMISSION_DENIED'))
+ return redirect(get_url(_security.login_url))
+ else:
+ m, c = get_message('TWO_FACTOR_PERMISSION_DENIED')
+ form._errors = m
+ return _render_json(form)
+
+ # user's email&password approved and
+ # two-factor properties were configured before
+ if session['has_two_factor'] is True:
+ if not request.is_json:
+ do_flash(*get_message('TWO_FACTOR_PERMISSION_DENIED'))
+ return redirect(url_for('two_factor_token_validation'))
+ else:
+ m, c = get_message('TWO_FACTOR_PERMISSION_DENIED')
+ form._errors = m
+ return _render_json(form)
+
+ user = _datastore.find_user(email=session['email'])
+ else:
+ user = current_user
+
+ if form.validate_on_submit():
+ # totp and primary_method are added to
+ # session to flag the user's temporary choice
+ session['totp_secret'] = generate_totp()
+ session['primary_method'] = form['setup'].data
+ if len(form.data['phone']) > 0:
+ session['phone_number'] = form.data['phone']
+ send_security_token(user=user, method=session['primary_method'],
+ totp_secret=session['totp_secret'])
+ code_form = _security.two_factor_verify_code_form()
+ if not request.is_json:
+ return _security.render_template(
+ config_value('TWO_FACTOR_CHOOSE_METHOD_TEMPLATE'),
+ two_factor_setup_form=form,
+ two_factor_verify_code_form=code_form,
+ choices=config_value(
+ 'TWO_FACTOR_ENABLED_METHODS'),
+ chosen_method=session['primary_method'],
+ **_ctx('two_factor_setup_function'))
+
+ if request.is_json:
+ return _render_json(form, include_user=False)
+
+ code_form = _security.two_factor_verify_code_form()
+ return _security.render_template(
+ config_value('TWO_FACTOR_CHOOSE_METHOD_TEMPLATE'),
+ two_factor_setup_form=form,
+ two_factor_verify_code_form=code_form,
+ choices=config_value(
+ 'TWO_FACTOR_ENABLED_METHODS'),
+ **_ctx('two_factor_setup_function'))
+
+
+def two_factor_token_validation():
+ """View function for two-factor token validation during login process"""
+ # if we are in login process and not changing current two-factor method
+
+ form_class = _security.two_factor_verify_code_form
+
+ if request.is_json:
+ form = form_class(MultiDict(request.get_json()))
+ else:
+ form = form_class()
+
+ if 'password_confirmed' not in session:
+ # user's email&password not approved or we are logged in
+ # and didn't validate password
+ if 'has_two_factor' not in session:
+ if not request.is_json:
+ do_flash(*get_message('TWO_FACTOR_PERMISSION_DENIED'))
+ return redirect(get_url(_security.login_url))
+ else:
+ m, c = get_message('TWO_FACTOR_PERMISSION_DENIED')
+ form._errors = m
+ return _render_json(form)
+ # make sure user has or has chosen a two-factor
+ # method before we try to validate
+ if 'totp_secret' not in session or 'primary_method' not in session:
+ if not request.is_json:
+ do_flash(*get_message('TWO_FACTOR_PERMISSION_DENIED'))
+ return redirect(url_for('two_factor_setup_function'))
+ else:
+ m, c = get_message('TWO_FACTOR_PERMISSION_DENIED')
+ form._errors = m
+ return _render_json(form)
+
+ if form.validate_on_submit():
+ complete_two_factor_process(form.user)
+ after_this_request(_commit)
+ if not request.is_json:
+ return redirect(get_post_login_redirect())
+
+ if request.is_json:
+ form.user = current_user
+ return _render_json(form)
+
+ # if we were trying to validate a new method
+ if 'password_confirmed' in session or session['has_two_factor'] is False:
+ setup_form = _security.two_factor_setup_form()
+
+ return _security.render_template(
+ config_value('TWO_FACTOR_CHOOSE_METHOD_TEMPLATE'),
+ two_factor_setup_form=setup_form,
+ two_factor_verify_code_form=form,
+ choices=config_value(
+ 'TWO_FACTOR_ENABLED_METHODS'),
+ **_ctx('two_factor_setup_function'))
+
+ # if we were trying to validate an existing method
+ else:
+
+ rescue_form = _security.two_factor_rescue_form()
+
+ return _security.render_template(
+ config_value('TWO_FACTOR_VERIFY_CODE_TEMPLATE'),
+ two_factor_rescue_form=rescue_form,
+ two_factor_verify_code_form=form,
+ problem=None,
+ **_ctx('two_factor_token_validaion'))
+
+
+@anonymous_user_required
+def two_factor_rescue_function():
+ """ Function that handles a situation where user can't
+ enter his two-factor validation code"""
+ # user's email&password yet to be approved
+
+ form_class = _security.two_factor_rescue_form
+
+ if request.is_json:
+ form = form_class(MultiDict(request.get_json()))
+ else:
+ form = form_class()
+
+ if 'email' not in session:
+ if not request.is_json:
+ do_flash(*get_message('TWO_FACTOR_PERMISSION_DENIED'))
+ return redirect(get_url(_security.login_url))
+ else:
+ m, c = get_message('TWO_FACTOR_PERMISSION_DENIED')
+ form._errors = m
+ return _render_json(form, include_user=False)
+ # user's email&password approved and two-factor properties
+ # were not configured
+ if 'totp_secret' not in session or 'primary_method' not in session:
+ if not request.is_json:
+ do_flash(*get_message('TWO_FACTOR_PERMISSION_DENIED'))
+ return redirect(get_url(_security.login_url))
+
+ else:
+ m, c = get_message('TWO_FACTOR_PERMISSION_DENIED')
+ form._errors = m
+ return _render_json(form, include_user=False)
+
+ problem = None
+ if form.validate_on_submit():
+ problem = form.data['help_setup']
+ # if the problem is that user can't access his device, w
+ # e send him code through mail
+ if problem == 'lost_device':
+ send_security_token(user=form.user, method='mail',
+ totp_secret=form.user.totp_secret)
+ # send app provider a mail message regarding trouble
+ elif problem == 'no_mail_access':
+ send_mail(config_value('EMAIL_SUBJECT_TWO_FACTOR_RESCUE'),
+ config_value('TWO_FACTOR_RESCUE_MAIL'),
+ 'two_factor_rescue',
+ user=form.user)
+ else:
+ return "", 404
+
+ if request.is_json:
+ return _render_json(form, include_user=False)
+
+ code_form = _security.two_factor_verify_code_form()
+ return _security.render_template(
+ config_value('TWO_FACTOR_VERIFY_CODE_TEMPLATE'),
+ two_factor_verify_code_form=code_form,
+ two_factor_rescue_form=form,
+ rescue_mail=config_value(
+ 'TWO_FACTOR_RESCUE_MAIL'),
+ problem=str(problem),
+ **_ctx('two_factor_token_validation'))
+
+
+@login_required
+def two_factor_password_confirmation():
+ """View function which handles a change two-factor method request."""
+ form_class = _security.two_factor_change_method_verify_password_form
+
+ if request.is_json:
+ form = form_class(MultiDict(request.get_json()))
+ else:
+ form = form_class()
+
+ if form.validate_on_submit():
+ session['password_confirmed'] = True
+ if not request.is_json:
+ do_flash(get_message('TWO_FACTOR_PASSWORD_CONFIRMATION_DONE'))
+ return redirect(url_for('two_factor_setup_function'))
+
+ else:
+ m, c = get_message('TWO_FACTOR_PASSWORD_CONFIRMATION_DONE')
+ form._errors = m
+ return _render_json(form)
+
+ if request.is_json:
+ form.user = current_user
+ return _render_json(form)
+
+ return _security.render_template(
+ config_value(
+ 'TWO_FACTOR_CHANGE_METHOD_PASSWORD_CONFIRMATION_TEMPLATE'),
+ two_factor_change_method_verify_password_form=form,
+ **_ctx('two_factor_change_method_password_confirmation'))
+
+
+def two_factor_qrcode():
+ return generate_qrcode()
+
+
+def generate_qrcode():
+ if 'google_authenticator' not in\
+ config_value('TWO_FACTOR_ENABLED_METHODS'):
+ return abort(404)
+ if 'primary_method' not in session or\
+ session['primary_method'] != 'google_authenticator' \
+ or 'totp_secret' not in session:
+ return abort(404)
+
+ if 'email' in session:
+ email = session['email']
+ elif 'password_confirmed' in session:
+ email = current_user.email
+ else:
+ return abort(404)
+
+ name = email.split('@')[0]
+ totp = session['totp_secret']
+ url = pyqrcode.create(get_totp_uri(name, totp))
+ from io import BytesIO
+ stream = BytesIO()
+ url.svg(stream, scale=3)
+ return stream.getvalue(), 200, {
+ 'Content-Type': 'image/svg+xml',
+ 'Cache-Control': 'no-cache, no-store, must-revalidate',
+ 'Pragma': 'no-cache',
+ 'Expires': '0'}
+
+
def create_blueprint(state, import_name):
"""Creates the security extension blueprint"""
@@ -351,6 +666,32 @@ def create_blueprint(state, import_name):
bp.route(state.login_url + slash_url_suffix(state.login_url,
''),
endpoint='token_login')(token_login)
+
+ elif state.two_factor:
+ tf_setup_function = 'two_factor_setup_function'
+ tf_token_validation = 'two_factor_token_validation'
+ tf_qrcode = 'two_factor_qrcode'
+ tf_rescue_function = 'two_factor_rescue_function'
+ tf_pass_validation = 'two_factor_password_confirmation'
+ bp.route(state.login_url,
+ methods=['GET', 'POST'],
+ endpoint='login')(two_factor_login)
+ bp.route('/' + slash_url_suffix('/', tf_setup_function),
+ methods=['GET', 'POST'],
+ endpoint=tf_setup_function)(two_factor_setup_function)
+ bp.route('/' + slash_url_suffix('/', tf_token_validation),
+ methods=['GET', 'POST'],
+ endpoint=tf_token_validation)(two_factor_token_validation)
+ bp.route('/' + slash_url_suffix('/', tf_qrcode),
+ endpoint=tf_qrcode)(two_factor_qrcode)
+ bp.route('/' + slash_url_suffix('/', tf_rescue_function),
+ methods=['GET', 'POST'],
+ endpoint=tf_rescue_function)(two_factor_rescue_function)
+ bp.route(state.change_url + slash_url_suffix(
+ state.change_url, tf_pass_validation),
+ methods=['GET', 'POST'],
+ endpoint=tf_pass_validation)(two_factor_password_confirmation)
+
else:
bp.route(state.login_url,
methods=['GET', 'POST'],
diff --git a/pytest.ini b/pytest.ini
index ba53d747..97268d11 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -1,2 +1,2 @@
[pytest]
-addopts = -xrs --cov flask_security --cov-report term-missing --pep8 --flakes --translations --cache-clear
+addopts = -xrs --cov flask_security --cov-report term-missing --pep8 --flakes --cache-clear
diff --git a/scripts/release.py b/scripts/release.py
index 0aae4f0e..be200d61 100755
--- a/scripts/release.py
+++ b/scripts/release.py
@@ -111,12 +111,14 @@ def build_and_upload():
def fail(message, *args):
- print >> sys.stderr, 'Error:', message % args
+ import sys
+ sys.stderr.write('Error:' + message % args)
sys.exit(1)
def info(message, *args):
- print >> sys.stderr, message % args
+ import sys
+ sys.stderr.write('Error:' + message % args)
def get_git_tags():
diff --git a/setup.py b/setup.py
index 03c171da..7336f84c 100755
--- a/setup.py
+++ b/setup.py
@@ -8,24 +8,27 @@
tests_require = [
'Flask-CLI>=0.4.0',
- 'Flask-Mongoengine>=0.7.0',
+ 'Flask-Mongoengine>=0.9.5',
'Flask-Peewee>=0.6.5',
- 'Flask-SQLAlchemy>=1.0',
- 'bcrypt>=1.0.2',
+ 'Flask-SQLAlchemy>=2.4',
+ 'bcrypt>=3.1',
+ 'msgcheck>=2.9',
'check-manifest>=0.25',
'coverage>=4.0',
'isort>=4.2.2',
'mock>=1.3.0',
- 'mongoengine>=0.10.0',
- 'pony>=0.7.1',
+ 'mongoengine>=0.12.0',
+ 'onetimepass>=1.0.1',
+ 'pony>=0.7.4',
'pydocstyle>=1.0.0',
'pytest-cache>=1.0',
'pytest-cov>=2.4.0',
'pytest-flakes>=1.0.1',
'pytest-pep8>=1.0.6',
- 'pytest-translations>=2.0.0',
'pytest>=3.3.0',
- 'sqlalchemy>=0.8.0',
+ 'pyqrcode>=1.2',
+ 'sqlalchemy>=1.1.0',
+
]
extras_require = {
@@ -46,14 +49,16 @@
]
install_requires = [
- 'Flask>=0.11',
- 'Flask-Login>=0.3.0',
- 'Flask-Mail>=0.7.3',
- 'Flask-Principal>=0.3.3',
- 'Flask-WTF>=0.13.1',
+ 'Flask>=1.0.2',
+ 'Flask-Login>=0.4.1',
+ 'Flask-Mail>=0.9.1',
+ 'Flask-Principal>=0.4.0',
+ 'Flask-WTF>=0.14.2',
'Flask-BabelEx>=0.9.3',
- 'itsdangerous>=0.21',
+ 'itsdangerous>=1.1.0',
'passlib>=1.7',
+ 'pyqrcode>=1.2',
+ 'onetimepass>=1.0.1',
]
packages = find_packages()
@@ -89,6 +94,7 @@
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
+ 'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: Implementation :: PyPy',
'Development Status :: 4 - Beta',
diff --git a/tests/conftest.py b/tests/conftest.py
index 3c0f11a9..270f7ab6 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -46,21 +46,28 @@ def app(request):
app.config['TESTING'] = True
app.config['LOGIN_DISABLED'] = False
app.config['WTF_CSRF_ENABLED'] = False
+ app.config['SECURITY_TWO_FACTOR_SMS_SERVICE'] = 'test'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECURITY_PASSWORD_SALT'] = 'salty'
for opt in ['changeable', 'recoverable', 'registerable',
- 'trackable', 'passwordless', 'confirmable']:
+ 'trackable', 'passwordless', 'confirmable', 'two_factor']:
app.config['SECURITY_' + opt.upper()] = opt in request.keywords
- if 'settings' in request.keywords:
- for key, value in request.keywords['settings'].kwargs.items():
+ pytest_major = int(pytest.__version__.split('.')[0])
+ if pytest_major >= 4:
+ marker_getter = request.node.get_closest_marker
+ else:
+ marker_getter = request.keywords.get
+ settings = marker_getter('settings')
+ babel = marker_getter('babel')
+ if settings is not None:
+ for key, value in settings.kwargs.items():
app.config['SECURITY_' + key.upper()] = value
mail = Mail(app)
- if 'babel' not in request.keywords or \
- request.keywords['babel'].args[0]:
+ if babel is None or babel.args[0]:
babel = Babel(app)
app.babel = babel
app.json_encoder = JSONEncoder
@@ -168,6 +175,10 @@ class User(db.Document, UserMixin):
password = db.StringField(required=False, max_length=255)
last_login_at = db.DateTimeField()
current_login_at = db.DateTimeField()
+ two_factor_primary_method = db.StringField(
+ max_length=255)
+ totp_secret = db.StringField(max_length=255)
+ phone_number = db.StringField(max_length=255)
last_login_ip = db.StringField(max_length=100)
current_login_ip = db.StringField(max_length=100)
login_count = db.IntField()
@@ -208,6 +219,9 @@ class User(db.Model, UserMixin):
username = db.Column(db.String(255))
password = db.Column(db.String(255))
last_login_at = db.Column(db.DateTime())
+ two_factor_primary_method = db.Column(db.String(255), nullable=True)
+ totp_secret = db.Column(db.String(255), nullable=True)
+ phone_number = db.Column(db.String(255), nullable=True)
current_login_at = db.Column(db.DateTime())
last_login_ip = db.Column(db.String(100))
current_login_ip = db.Column(db.String(100))
@@ -270,6 +284,9 @@ class User(Base, UserMixin):
password = Column(String(255))
last_login_at = Column(DateTime())
current_login_at = Column(DateTime())
+ two_factor_primary_method = Column(String(255), nullable=True)
+ totp_secret = Column(String(255), nullable=True)
+ phone_number = Column(String(255), nullable=True)
last_login_ip = Column(String(100))
current_login_ip = Column(String(100))
login_count = Column(Integer)
@@ -316,6 +333,9 @@ class User(db.Model, UserMixin):
password = TextField(null=True)
last_login_at = DateTimeField(null=True)
current_login_at = DateTimeField(null=True)
+ two_factor_primary_method = TextField(null=True)
+ totp_secret = TextField(null=True)
+ phone_number = TextField(null=True)
last_login_ip = TextField(null=True)
current_login_ip = TextField(null=True)
login_count = IntegerField(null=True)
@@ -325,13 +345,14 @@ class User(db.Model, UserMixin):
class UserRoles(db.Model):
""" Peewee does not have built-in many-to-many support, so we have to
create this mapping class to link users to roles."""
- user = ForeignKeyField(User, related_name='roles')
- role = ForeignKeyField(Role, related_name='users')
+ user = ForeignKeyField(User, backref='roles')
+ role = ForeignKeyField(Role, backref='users')
name = property(lambda self: self.role.name)
description = property(lambda self: self.role.description)
with app.app_context():
for Model in (Role, User, UserRoles):
+ Model.drop_table()
Model.create_table()
def tear_down():
@@ -363,6 +384,9 @@ class User(db.Entity):
password = Optional(str, nullable=True)
last_login_at = Optional(datetime)
current_login_at = Optional(datetime)
+ two_factor_primary_method = Optional(str, nullable=True)
+ totp_secret = Optional(str, nullable=True)
+ phone_number = Optional(str, nullable=True)
last_login_ip = Optional(str)
current_login_ip = Optional(str)
login_count = Optional(int)
diff --git a/tests/test_changeable.py b/tests/test_changeable.py
index 89482f45..f2f09f0e 100644
--- a/tests/test_changeable.py
+++ b/tests/test_changeable.py
@@ -82,7 +82,6 @@ def on_password_changed(app, user):
assert get_message('PASSWORD_CHANGE') in response.data
assert b'Home Page' in response.data
assert len(recorded) == 1
- assert len(outbox) == 1
assert "Your password has been changed" in outbox[0].html
# Test leading & trailing whitespace not stripped
diff --git a/tests/test_hashing.py b/tests/test_hashing.py
index 46786cf6..8f0ed690 100644
--- a/tests/test_hashing.py
+++ b/tests/test_hashing.py
@@ -10,7 +10,7 @@
from utils import authenticate, init_app_with_options
from passlib.hash import pbkdf2_sha256, django_pbkdf2_sha256, plaintext
-from flask_security.utils import encrypt_password, verify_password, get_hmac
+from flask_security.utils import hash_password, verify_password, get_hmac
def test_verify_password_bcrypt_double_hash(app, sqlalchemy_datastore):
@@ -20,7 +20,7 @@ def test_verify_password_bcrypt_double_hash(app, sqlalchemy_datastore):
'SECURITY_PASSWORD_SINGLE_HASH': False,
})
with app.app_context():
- assert verify_password('pass', encrypt_password('pass'))
+ assert verify_password('pass', hash_password('pass'))
def test_verify_password_bcrypt_single_hash(app, sqlalchemy_datastore):
@@ -30,7 +30,7 @@ def test_verify_password_bcrypt_single_hash(app, sqlalchemy_datastore):
'SECURITY_PASSWORD_SINGLE_HASH': True,
})
with app.app_context():
- assert verify_password('pass', encrypt_password('pass'))
+ assert verify_password('pass', hash_password('pass'))
def test_verify_password_single_hash_list(app, sqlalchemy_datastore):
@@ -44,7 +44,7 @@ def test_verify_password_single_hash_list(app, sqlalchemy_datastore):
})
with app.app_context():
# double hash
- assert verify_password('pass', encrypt_password('pass'))
+ assert verify_password('pass', hash_password('pass'))
assert verify_password('pass', pbkdf2_sha256.hash(get_hmac('pass')))
# single hash
assert verify_password('pass', django_pbkdf2_sha256.hash('pass'))
@@ -59,7 +59,7 @@ def test_verify_password_backward_compatibility(app, sqlalchemy_datastore):
})
with app.app_context():
# double hash
- assert verify_password('pass', encrypt_password('pass'))
+ assert verify_password('pass', hash_password('pass'))
# single hash
assert verify_password('pass', plaintext.hash('pass'))
diff --git a/tests/test_misc.py b/tests/test_misc.py
index 81dc6fce..a50b7076 100644
--- a/tests/test_misc.py
+++ b/tests/test_misc.py
@@ -147,6 +147,16 @@ def test_addition_identity_attributes(app, sqlalchemy_datastore):
assert b'Hello matt@lp.com' in response.data
+def test_passwordless_and_two_factor_configuration_mismatch(
+ app,
+ sqlalchemy_datastore):
+ with pytest.raises(ValueError):
+ init_app_with_options(app, sqlalchemy_datastore, **{
+ 'SECURITY_TWO_FACTOR': True,
+ 'SECURITY_TWO_FACTOR_ENABLED_METHODS': []
+ })
+
+
def test_flash_messages_off(app, sqlalchemy_datastore, get_message):
init_app_with_options(app, sqlalchemy_datastore, **{
'SECURITY_FLASH_MESSAGES': False
diff --git a/tests/test_recoverable.py b/tests/test_recoverable.py
index b202ad33..167e130f 100644
--- a/tests/test_recoverable.py
+++ b/tests/test_recoverable.py
@@ -158,6 +158,7 @@ def test_expired_reset_token(client, get_message):
def test_reset_token_deleted_user(app, client, get_message,
sqlalchemy_datastore):
with capture_reset_password_requests() as requests:
+
client.post(
'/reset',
data=dict(
diff --git a/tests/test_two_factor.py b/tests/test_two_factor.py
new file mode 100644
index 00000000..9c61c962
--- /dev/null
+++ b/tests/test_two_factor.py
@@ -0,0 +1,284 @@
+# -*- coding: utf-8 -*-
+"""
+ test_two_factor
+ ~~~~~~~~~~~~~~~~~
+
+ two_factor tests
+"""
+
+import onetimepass
+import pytest
+
+from utils import logout
+from flask_security.utils import SmsSenderBaseClass, SmsSenderFactory
+
+pytestmark = pytest.mark.two_factor()
+
+
+class SmsTestSender(SmsSenderBaseClass):
+ SmsSenderBaseClass.messages = []
+ SmsSenderBaseClass.count = 0
+
+ def __init__(self):
+ super(SmsTestSender, self).__init__()
+
+ def send_sms(self, from_number, to_number, msg):
+ SmsSenderBaseClass.messages.append(msg)
+ SmsSenderBaseClass.count += 1
+ return
+
+ def get_count(self):
+ return SmsSenderBaseClass.count
+
+
+SmsSenderFactory.senders['test'] = SmsTestSender
+
+
+class TestMail():
+
+ # def __init__(self):
+ # self.count = 0
+ # self.msg = ""
+
+ def send(self, msg):
+ if not self.msg:
+ self.msg = ""
+ if not self.count:
+ self.count = 0
+ self.msg = msg
+ self.count += 1
+
+
+def assert_flashes(client, expected_message, expected_category='message'):
+ with client.session_transaction() as session:
+ try:
+ category, message = session['_flashes'][0]
+ except KeyError:
+ raise AssertionError('nothing flashed')
+ assert expected_message in message
+ assert expected_category == category
+
+
+def test_two_factor_two_factor_setup_function_anonymous(app, client):
+
+ # trying to pick method without doing earlier stage
+ data = dict(setup="mail")
+ response = client.post('/two_factor_setup_function/', data=data)
+ assert response.status_code == 302
+ flash_message = 'You currently do not have permissions to access this page'
+ assert_flashes(client, flash_message, expected_category='error')
+
+
+def test_two_factor_flag(app, client):
+ # trying to verify code without going through two-factor
+ # first login function
+ wrong_code = b'000000'
+ response = client.post('/two_factor_token_validation/',
+ data=dict(code=wrong_code),
+ follow_redirects=True)
+
+ message = b'You currently do not have permissions to access this page'
+ assert message in response.data
+
+ # Test login using invalid email
+ data = dict(email="nobody@lp.com", password="password")
+ response = client.post('/login', data=data, follow_redirects=True)
+ assert b'Specified user does not exist' in response.data
+ json_data = '{"email": "nobody@lp.com", "password": "password"}'
+ response = client.post('/login',
+ data=json_data,
+ headers={'Content-Type': 'application/json'},
+ follow_redirects=True)
+ assert b'Specified user does not exist' in response.data
+
+ # Test login using valid email and invalid password
+ data = dict(email="gal@lp.com", password="wrong_pass")
+ response = client.post('/login', data=data, follow_redirects=True)
+ assert b'Invalid password' in response.data
+ json_data = '{"email": "gal@lp.com", "password": "wrong_pass"}'
+ response = client.post('/login', data=json_data,
+ headers={'Content-Type': 'application/json'},
+ follow_redirects=True)
+ assert b'Invalid password' in response.data
+
+ # Test two-factor authentication first login
+ data = dict(email="matt@lp.com", password="password")
+ response = client.post('/login', data=data, follow_redirects=True)
+ message = b'Two-factor authentication adds an extra layer of security'
+ assert message in response.data
+ response = client.post('/two_factor_setup_function/',
+ data=dict(setup="not_a_method"),
+ follow_redirects=True)
+ assert b'Marked method is not valid' in response.data
+
+ # try non-existing setup on setup page (using json)
+ json_data = '{"setup": "not_a_method"}'
+ response = client.post('/two_factor_setup_function/',
+ data=json_data,
+ headers={'Content-Type': 'application/json'},
+ follow_redirects=True)
+ assert b'"response": {}' in response.data
+
+ json_data = '{"setup": "mail"}'
+ response = client.post('/two_factor_setup_function/',
+ data=json_data,
+ headers={'Content-Type': 'application/json'},
+ follow_redirects=True)
+
+ # Test for sms in process of valid login
+ sms_sender = SmsSenderFactory.createSender('test')
+ json_data = '{"email": "gal@lp.com", "password": "password"}'
+ response = client.post('/login', data=json_data,
+ headers={'Content-Type': 'application/json'},
+ follow_redirects=True)
+ assert b'"code": 200' in response.data
+ assert sms_sender.get_count() == 1
+
+ code = sms_sender.messages[0].split()[-1]
+
+ # submit bad token to two_factor_token_validation
+ response = client.post('/two_factor_token_validation/',
+ data=dict(code=wrong_code))
+ assert b'Invalid Token' in response.data
+
+ # sumbit right token and show appropriate response
+ response = client.post('/two_factor_token_validation/',
+ data=dict(code=code),
+ follow_redirects=True)
+ assert b'Your token has been confirmed' in response.data
+
+ # try confirming password with a wrong one
+ response = client.post('/change/two_factor_password_confirmation',
+ data=dict(password=""),
+ follow_redirects=True)
+ assert b'Invalid password' in response.data
+
+ # try confirming password with a wrong one + json
+ json_data = '{"password": "wrong_password"}'
+ response = client.post('/change/two_factor_password_confirmation',
+ data=json_data, headers={
+ 'Content-Type': 'application/json'},
+ follow_redirects=True)
+
+ assert response.jdata['meta']['code'] == 400
+
+ # Test change two_factor password confirmation view to mail
+ password = 'password'
+ response = client.post('/change/two_factor_password_confirmation',
+ data=dict(password=password), follow_redirects=True)
+
+ assert b'You successfully confirmed password' in response.data
+ message = b'Two-factor authentication adds an extra layer of security'
+ assert message in response.data
+
+ # change method (from sms to mail)
+ setup_data = dict(setup='mail')
+ testMail = TestMail()
+ testMail.msg = ""
+ testMail.count = 0
+ app.extensions['mail'] = testMail
+ response = client.post('/two_factor_setup_function/',
+ data=setup_data, follow_redirects=True)
+ msg = b'To complete logging in, please enter the code sent to your mail'
+ assert msg in response.data
+
+ code = testMail.msg.body.split()[-1]
+ # sumbit right token and show appropriate response
+ response = client.post('/two_factor_token_validation/',
+ data=dict(code=code),
+ follow_redirects=True)
+ assert b'You successfully changed your two-factor method' in response.data
+
+ # Test change two_factor password confirmation view to google authenticator
+ password = 'password'
+ response = client.post('/change/two_factor_password_confirmation',
+ data=dict(password=password), follow_redirects=True)
+ assert b'You successfully confirmed password' in response.data
+ message = b'Two-factor authentication adds an extra layer of security'
+ assert message in response.data
+ setup_data = dict(setup='google_authenticator')
+ response = client.post('/two_factor_setup_function/',
+ data=setup_data, follow_redirects=True)
+
+ assert b'Open Google Authenticator on your device' in response.data
+ qrcode_page_response = client.get('/two_factor_qrcode/', data=setup_data,
+ follow_redirects=True)
+ print(qrcode_page_response)
+ assert b'svg' in qrcode_page_response.data
+
+ logout(client)
+
+ # Test for google_authenticator (test)
+ json_data = '{"email": "gal2@lp.com", "password": "password"}'
+ response = client.post('/login', data=json_data,
+ headers={'Content-Type': 'application/json'},
+ follow_redirects=True)
+ totp_secret = u'RCTE75AP2GWLZIFR'
+ code = str(onetimepass.get_totp(totp_secret))
+ response = client.post('/two_factor_token_validation/',
+ data=dict(code=code),
+ follow_redirects=True)
+ assert b'Your token has been confirmed' in response.data
+
+ logout(client)
+
+ # Test two-factor authentication first login
+ data = dict(email="matt@lp.com", password="password")
+ response = client.post('/login', data=data, follow_redirects=True)
+ message = b'Two-factor authentication adds an extra layer of security'
+ assert message in response.data
+
+ # check availability of qrcode page when this option is not picked
+ qrcode_page_response = client.get(
+ '/two_factor_qrcode/', follow_redirects=False)
+ assert qrcode_page_response.status_code == 404
+
+ # check availability of qrcode page when this option is picked
+ setup_data = dict(setup='google_authenticator')
+ response = client.post('/two_factor_setup_function/',
+ data=setup_data, follow_redirects=True)
+ assert b'Open Google Authenticator on your device' in response.data
+
+ qrcode_page_response = client.get('/two_factor_qrcode/', data=setup_data,
+ follow_redirects=True)
+ print(qrcode_page_response)
+ assert b'svg' in qrcode_page_response.data
+
+ # check appearence of setup page when sms picked and phone number entered
+ sms_sender = SmsSenderFactory.createSender('test')
+ data = dict(setup='sms', phone="+111111111111")
+ response = client.post('/two_factor_setup_function/',
+ data=data, follow_redirects=True)
+ assert b'To Which Phone Number Should We Send Code To' in response.data
+ assert sms_sender.get_count() == 2
+ code = sms_sender.messages[1].split()[-1]
+
+ response = client.post('/two_factor_token_validation/',
+ data=dict(code=code),
+ follow_redirects=True)
+ assert b'Your token has been confirmed' in response.data
+
+ logout(client)
+
+ # check when two_factor_rescue function should not appear
+ rescue_data_json = '{"help_setup": "lost_device"}'
+ response = client.post('/two_factor_rescue_function/',
+ data=rescue_data_json,
+ headers={'Content-Type': 'application/json'})
+ assert b'"code": 400' in response.data
+
+ # check when two_factor_rescue function should appear
+ data = dict(email="gal2@lp.com", password="password")
+ response = client.post('/login', data=data, follow_redirects=True)
+ assert b'Please enter your authentication code' in response.data
+ rescue_data = dict(help_setup='lost_device')
+ response = client.post('/two_factor_rescue_function/',
+ data=rescue_data, follow_redirects=True)
+ message = b'The code for authentication was sent to your email address'
+ assert message in response.data
+ rescue_data = dict(help_setup='no_mail_access')
+ response = client.post('/two_factor_rescue_function/',
+ data=rescue_data, follow_redirects=True)
+ message = (b'A mail was sent to us in order' +
+ b' to reset your application account')
+ assert message in response.data
diff --git a/tests/utils.py b/tests/utils.py
index b2726393..ce60e67d 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -10,7 +10,7 @@
from flask import json
from flask_security import Security
-from flask_security.utils import encrypt_password
+from flask_security.utils import hash_password
_missing = object
@@ -48,26 +48,34 @@ def create_roles(ds):
def create_users(ds, count=None):
- users = [('matt@lp.com', 'matt', 'password', ['admin'], True),
- ('joe@lp.com', 'joe', 'password', ['editor'], True),
- ('dave@lp.com', 'dave', 'password', ['admin', 'editor'], True),
- ('jill@lp.com', 'jill', 'password', ['author'], True),
- ('tiya@lp.com', 'tiya', 'password', [], False),
- ('gene@lp.com', 'gene', 'password', [], True),
- ('jess@lp.com', 'jess', None, [], True)]
+ users = [('matt@lp.com', 'matt', 'password', ['admin'], True, None, None),
+ ('joe@lp.com', 'joe', 'password', ['editor'], True, None, None),
+ ('dave@lp.com', 'dave', 'password', [
+ 'admin', 'editor'], True, None, None),
+ ('jill@lp.com', 'jill', 'password', ['author'], True, None, None),
+ ('tiya@lp.com', 'tiya', 'password', [], False, None, None),
+ ('jess@lp.com', 'jess', None, [], True, None, None),
+ ('gal@lp.com', 'gal', 'password', [
+ 'admin'], True, 'sms', u'RCTE75AP2GWLZIFR'),
+ ('gal2@lp.com', 'gal2', 'password', ['admin'], True,
+ 'google_authenticator', u'RCTE75AP2GWLZIFR'),
+ ('gal3@lp.com', 'gal3', 'password', [
+ 'admin'], True, 'mail', u'RCTE75AP2GWLZIFR'),
+ ('gene@lp.com', 'gene', 'password', [], True, None, None)]
count = count or len(users)
for u in users[:count]:
pw = u[2]
if pw is not None:
- pw = encrypt_password(pw)
+ pw = hash_password(pw)
roles = [ds.find_or_create_role(rn) for rn in u[3]]
ds.commit()
- user = ds.create_user(
- email=u[0],
- username=u[1],
- password=pw,
- active=u[4])
+ user = ds.create_user(email=u[0],
+ username=u[1],
+ password=pw,
+ active=u[4],
+ two_factor_primary_method=u[5],
+ totp_secret=u[6])
ds.commit()
for role in roles:
ds.add_role_to_user(user, role)
diff --git a/tox.ini b/tox.ini
index 64f71f5b..b1a659a1 100644
--- a/tox.ini
+++ b/tox.ini
@@ -1,5 +1,5 @@
[tox]
-envlist = py27, py34, py35, pypy
+envlist = py27, py34, py35, py36, py37, pypy
[testenv]
commands =