44 KiB
使用 Flask 的基于令牌的认证
原文:https://realpython.com/token-based-authentication-with-flask/
本教程采用测试优先的方法,使用 JSON Web 令牌(jwt)在 Flask 应用程序中实现基于令牌的认证。
更新:
- 08/04/2017 :为 PyBites 挑战赛重构路线处理程序。
目标
本教程结束时,您将能够…
- 讨论使用 jwt 与会话和 cookies 进行身份验证的优势
- 用 JWTs 实现用户认证
- 必要时将用户令牌列入黑名单
- 编写测试来创建和验证 jwt 和用户认证
- 实践测试驱动的开发
免费奖励: 点击此处获得免费的 Flask + Python 视频教程,向您展示如何一步一步地构建 Flask web 应用程序。
简介
JSON Web 令牌(或 JWTs)提供了一种从客户端向服务器传输信息的方式,这是一种安全的无状态的方式。
在服务器上,jwt 是通过使用秘密密钥对用户信息进行签名而生成的,然后安全地存储在客户机上。这种形式的身份验证与现代的单页面应用程序配合得很好。有关这方面的更多信息,以及使用 JWTs 与会话和基于 cookie 的身份验证的优缺点,请查看以下文章:
- 饼干 vs 代币:权威指南
- 令牌认证与 cookie
- 在 Flask 中会话是如何工作的?
**注意:**请记住,由于 JWT 是由签名的,而不是加密的,它不应该包含像用户密码这样的敏感信息。
开始使用
理论够了,开始实现一些代码吧!
项目设置
首先克隆项目样板文件,然后创建一个新的分支:
$ git clone https://github.com/realpython/flask-jwt-auth.git
$ cd flask-jwt-auth
$ git checkout tags/1.0.0 -b jwt-auth
创建并激活 virtualenv 并安装依赖项:
$ python3.6 -m venv env
$ source env/bin/activate
(env)$ pip install -r requirements.txt
这是可选的,但是创建一个新的 Github 存储库并更新 remote 是个好主意:
(env)$ git remote set-url origin <newurl>
数据库设置
让我们设置 Postgres。
注意:如果你在苹果电脑上,看看的 Postgres 应用。
一旦本地 Postgres 服务器运行,从psql创建两个新的数据库,它们与您的项目名称同名:
(env)$ psql # create database flask_jwt_auth; CREATE DATABASE # create database flask_jwt_auth_test; CREATE DATABASE # \q
注意:根据您的 Postgres 版本,上述创建数据库的命令可能会有一些变化。检查 Postgres 文档中的正确命令。
在应用数据库迁移之前,我们需要更新位于 project/server/config.py 中的配置文件。简单更新一下database_name:
database_name = 'flask_jwt_auth'
在终端中设置环境变量:
(env)$ export APP_SETTINGS="project.server.config.DevelopmentConfig"
更新project/tests/test _ _ config . py中的以下测试:
class TestDevelopmentConfig(TestCase):
def create_app(self):
app.config.from_object('project.server.config.DevelopmentConfig')
return app
def test_app_is_development(self):
self.assertTrue(app.config['DEBUG'] is True)
self.assertFalse(current_app is None)
self.assertTrue(
app.config['SQLALCHEMY_DATABASE_URI'] == 'postgresql://postgres:@localhost/flask_jwt_auth'
)
class TestTestingConfig(TestCase):
def create_app(self):
app.config.from_object('project.server.config.TestingConfig')
return app
def test_app_is_testing(self):
self.assertTrue(app.config['DEBUG'])
self.assertTrue(
app.config['SQLALCHEMY_DATABASE_URI'] == 'postgresql://postgres:@localhost/flask_jwt_auth_test'
)
运行它们以确保它们仍然通过:
(env)$ python manage.py test
您应该看到:
test_app_is_development (test__config.TestDevelopmentConfig) ... ok
test_app_is_production (test__config.TestProductionConfig) ... ok
test_app_is_testing (test__config.TestTestingConfig) ... ok
----------------------------------------------------------------------
Ran 3 tests in 0.007s
OK
迁移
在“服务器”目录中添加一个 models.py 文件:
# project/server/models.py
import datetime
from project.server import app, db, bcrypt
class User(db.Model):
""" User Model for storing user related details """
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
email = db.Column(db.String(255), unique=True, nullable=False)
password = db.Column(db.String(255), nullable=False)
registered_on = db.Column(db.DateTime, nullable=False)
admin = db.Column(db.Boolean, nullable=False, default=False)
def __init__(self, email, password, admin=False):
self.email = email
self.password = bcrypt.generate_password_hash(
password, app.config.get('BCRYPT_LOG_ROUNDS')
).decode()
self.registered_on = datetime.datetime.now()
self.admin = admin
在上面的代码片段中,我们定义了一个基本的用户模型,它使用 Flask-Bcrypt 扩展来散列密码。
安装 psycopg2 连接到 Postgres:
(env)$ pip install psycopg2==2.6.2
(env)$ pip freeze > requirements.txt
在 manage.py 内更改-
from project.server import app, db
到
from project.server import app, db, models
应用迁移:
(env)$ python manage.py create_db
(env)$ python manage.py db init
(env)$ python manage.py db migrate
健全性检查
成功了吗?
(env)$ psql # \c flask_jwt_auth You are now connected to database "flask_jwt_auth" as user "michael.herman". # \d List of relations Schema | Name | Type | Owner --------+-----------------+----------+----------
public | alembic_version | table | postgres public | users | table | postgres public | users_id_seq | sequence | postgres (3 rows)
JWT 设置
身份验证工作流的工作方式如下:
- 客户端提供电子邮件和密码,发送给服务器
- 然后,服务器验证电子邮件和密码是否正确,并使用一个身份验证令牌进行响应
- 客户端存储令牌,并将其与所有后续请求一起发送给 API
- 服务器解码令牌并验证它
这个循环重复进行,直到令牌过期或被撤销。在后一种情况下,服务器会发出一个新的令牌。
令牌本身分为三个部分:
- 页眉
- 有效载荷
- 签名
我们将更深入地研究有效负载,但是如果您有兴趣,您可以从 JSON Web Tokens 的文章中阅读关于每个部分的更多内容。
要在我们的应用程序中使用 JSON Web 令牌,请安装 PyJWT 包:
(env)$ pip install pyjwt==1.4.2
(env)$ pip freeze > requirements.txt
编码令牌
将下面的方法添加到项目/服务器/模型. py 中的User()类中:
def encode_auth_token(self, user_id):
"""
Generates the Auth Token
:return: string
"""
try:
payload = {
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, seconds=5),
'iat': datetime.datetime.utcnow(),
'sub': user_id
}
return jwt.encode(
payload,
app.config.get('SECRET_KEY'),
algorithm='HS256'
)
except Exception as e:
return e
不要忘记添加导入:
import jwt
因此,给定一个用户 id,这个方法从有效负载和在 config.py 文件中设置的密钥创建并返回一个令牌。负载是我们添加关于令牌的元数据和关于用户的信息的地方。这些信息通常被称为 JWT 声称的。我们利用以下“声明”:
exp:令牌到期日期iat:令牌生成的时间sub:令牌的主题(它标识的用户)
秘密密钥必须是随机的,并且只能在服务器端访问。使用 Python 解释器生成密钥:
>>> import os
>>> os.urandom(24)
b"\xf9'\xe4p(\xa9\x12\x1a!\x94\x8d\x1c\x99l\xc7\xb7e\xc7c\x86\x02MJ\xa0"
将密钥设置为环境变量:
(env)$ export SECRET_KEY="\xf9'\xe4p(\xa9\x12\x1a!\x94\x8d\x1c\x99l\xc7\xb7e\xc7c\x86\x02MJ\xa0"
将此键添加到项目/服务器/配置文件中BaseConfig()类内的SECRET_KEY:
SECRET_KEY = os.getenv('SECRET_KEY', 'my_precious')
更新project/tests/test _ _ config . py中的测试,以确保变量设置正确:
def test_app_is_development(self):
self.assertFalse(app.config['SECRET_KEY'] is 'my_precious')
self.assertTrue(app.config['DEBUG'] is True)
self.assertFalse(current_app is None)
self.assertTrue(
app.config['SQLALCHEMY_DATABASE_URI'] == 'postgresql://postgres:@localhost/flask_jwt_auth'
)
class TestTestingConfig(TestCase):
def create_app(self):
app.config.from_object('project.server.config.TestingConfig')
return app
def test_app_is_testing(self):
self.assertFalse(app.config['SECRET_KEY'] is 'my_precious')
self.assertTrue(app.config['DEBUG'])
self.assertTrue(
app.config['SQLALCHEMY_DATABASE_URI'] == 'postgresql://postgres:@localhost/flask_jwt_auth_test'
)
在继续之前,让我们为用户模型编写一个快速的单元测试。将以下代码添加到“项目/测试”中名为 test_user_model.py 的新文件中:
# project/tests/test_user_model.py
import unittest
from project.server import db
from project.server.models import User
from project.tests.base import BaseTestCase
class TestUserModel(BaseTestCase):
def test_encode_auth_token(self):
user = User(
email='test@test.com',
password='test'
)
db.session.add(user)
db.session.commit()
auth_token = user.encode_auth_token(user.id)
self.assertTrue(isinstance(auth_token, bytes))
if __name__ == '__main__':
unittest.main()
进行测试。他们都应该通过。
解码令牌
类似地,要解码一个令牌,将下面的方法添加到User()类中:
@staticmethod
def decode_auth_token(auth_token):
"""
Decodes the auth token
:param auth_token:
:return: integer|string
"""
try:
payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'))
return payload['sub']
except jwt.ExpiredSignatureError:
return 'Signature expired. Please log in again.'
except jwt.InvalidTokenError:
return 'Invalid token. Please log in again.'
我们需要对每个 API 请求的 auth 令牌进行解码,并验证其签名,以确保用户的真实性。为了验证auth_token,我们使用了与编码令牌相同的SECRET_KEY。
如果auth_token有效,我们从有效载荷的sub索引中获取用户 id。如果无效,可能有两种例外情况:
- 过期签名:当令牌过期后被使用时,它抛出一个
ExpiredSignatureError异常。这意味着有效载荷的exp字段中指定的时间已经过期。 - 无效令牌:当提供的令牌不正确或格式不正确时,就会引发一个
InvalidTokenError异常。
**注意:**我们使用了一个静态方法,因为它与类的实例无关。
向 test_user_model.py 添加一个测试:
def test_decode_auth_token(self):
user = User(
email='test@test.com',
password='test'
)
db.session.add(user)
db.session.commit()
auth_token = user.encode_auth_token(user.id)
self.assertTrue(isinstance(auth_token, bytes))
self.assertTrue(User.decode_auth_token(auth_token) == 1)
确保在继续之前通过测试。
**注意:**我们稍后将通过将无效令牌列入黑名单来处理它们。
路线设置
现在,我们可以使用测试优先的方法来配置授权路由:
/auth/register/auth/login/auth/logout/auth/user
首先在“项目/服务器”中创建一个名为“auth”的新文件夹。然后,在“auth”内添加两个文件, init。py 和视图。最后,将以下代码添加到 views.py :
# project/server/auth/views.py
from flask import Blueprint, request, make_response, jsonify
from flask.views import MethodView
from project.server import bcrypt, db
from project.server.models import User
auth_blueprint = Blueprint('auth', __name__)
要在应用程序中注册新的蓝图,请将以下内容添加到项目/服务器/init 的底部。py :
from project.server.auth.views import auth_blueprint
app.register_blueprint(auth_blueprint)
现在,在“project/tests”中添加一个名为 test_auth.py 的新文件来保存我们对这个蓝图的所有测试:
# project/tests/test_auth.py
import unittest
from project.server import db
from project.server.models import User
from project.tests.base import BaseTestCase
class TestAuthBlueprint(BaseTestCase):
pass
if __name__ == '__main__':
unittest.main()
注册路线
从一个测试开始:
def test_registration(self):
""" Test for user registration """
with self.client:
response = self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'success')
self.assertTrue(data['message'] == 'Successfully registered.')
self.assertTrue(data['auth_token'])
self.assertTrue(response.content_type == 'application/json')
self.assertEqual(response.status_code, 201)
确保添加导入:
import json
进行测试。您应该会看到以下错误:
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
现在,让我们编写通过测试的代码。将以下内容添加到project/server/auth/views . py中:
class RegisterAPI(MethodView):
"""
User Registration Resource
"""
def post(self):
# get the post data
post_data = request.get_json()
# check if user already exists
user = User.query.filter_by(email=post_data.get('email')).first()
if not user:
try:
user = User(
email=post_data.get('email'),
password=post_data.get('password')
)
# insert the user
db.session.add(user)
db.session.commit()
# generate the auth token
auth_token = user.encode_auth_token(user.id)
responseObject = {
'status': 'success',
'message': 'Successfully registered.',
'auth_token': auth_token.decode()
}
return make_response(jsonify(responseObject)), 201
except Exception as e:
responseObject = {
'status': 'fail',
'message': 'Some error occurred. Please try again.'
}
return make_response(jsonify(responseObject)), 401
else:
responseObject = {
'status': 'fail',
'message': 'User already exists. Please Log in.',
}
return make_response(jsonify(responseObject)), 202
# define the API resources
registration_view = RegisterAPI.as_view('register_api')
# add Rules for API Endpoints
auth_blueprint.add_url_rule(
'/auth/register',
view_func=registration_view,
methods=['POST']
)
这里,我们注册了一个新用户,并为进一步的请求生成了一个新的 auth token,我们将它发送回客户端。
运行测试以确保它们全部通过:
Ran 6 tests in 0.132s
OK
接下来,让我们再添加一个测试,以确保在用户已经存在的情况下注册失败:
def test_registered_with_already_registered_user(self):
""" Test registration with already registered email"""
user = User(
email='joe@gmail.com',
password='test'
)
db.session.add(user)
db.session.commit()
with self.client:
response = self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(
data['message'] == 'User already exists. Please Log in.')
self.assertTrue(response.content_type == 'application/json')
self.assertEqual(response.status_code, 202)
在进入下一条路线之前,再次进行测试。一切都会过去。
登录路线
再次,从一个测试开始。为了验证登录 API,让我们测试两种情况:
- 注册用户登录
- 非注册用户登录
注册用户登录
def test_registered_user_login(self):
""" Test for login of registered-user login """
with self.client:
# user registration
resp_register = self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json',
)
data_register = json.loads(resp_register.data.decode())
self.assertTrue(data_register['status'] == 'success')
self.assertTrue(
data_register['message'] == 'Successfully registered.'
)
self.assertTrue(data_register['auth_token'])
self.assertTrue(resp_register.content_type == 'application/json')
self.assertEqual(resp_register.status_code, 201)
# registered user login
response = self.client.post(
'/auth/login',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'success')
self.assertTrue(data['message'] == 'Successfully logged in.')
self.assertTrue(data['auth_token'])
self.assertTrue(response.content_type == 'application/json')
self.assertEqual(response.status_code, 200)
在这个测试用例中,注册用户试图登录,正如所料,我们的应用程序应该允许这样做。
进行测试。他们应该失败。现在编写代码:
class LoginAPI(MethodView):
"""
User Login Resource
"""
def post(self):
# get the post data
post_data = request.get_json()
try:
# fetch the user data
user = User.query.filter_by(
email=post_data.get('email')
).first()
auth_token = user.encode_auth_token(user.id)
if auth_token:
responseObject = {
'status': 'success',
'message': 'Successfully logged in.',
'auth_token': auth_token.decode()
}
return make_response(jsonify(responseObject)), 200
except Exception as e:
print(e)
responseObject = {
'status': 'fail',
'message': 'Try again'
}
return make_response(jsonify(responseObject)), 500
不要忘记将类转换成视图函数:
# define the API resources
registration_view = RegisterAPI.as_view('register_api')
login_view = LoginAPI.as_view('login_api')
# add Rules for API Endpoints
auth_blueprint.add_url_rule(
'/auth/register',
view_func=registration_view,
methods=['POST']
)
auth_blueprint.add_url_rule(
'/auth/login',
view_func=login_view,
methods=['POST']
)
再次运行测试。他们通过了吗?他们应该。在所有测试通过之前,不要继续前进。
非注册用户登录
添加测试:
def test_non_registered_user_login(self):
""" Test for login of non-registered user """
with self.client:
response = self.client.post(
'/auth/login',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(data['message'] == 'User does not exist.')
self.assertTrue(response.content_type == 'application/json')
self.assertEqual(response.status_code, 404)
在这种情况下,一个未注册的用户试图登录,正如所料,我们的应用程序不应该允许这样做。
运行测试,然后更新代码:
class LoginAPI(MethodView):
"""
User Login Resource
"""
def post(self):
# get the post data
post_data = request.get_json()
try:
# fetch the user data
user = User.query.filter_by(
email=post_data.get('email')
).first()
if user and bcrypt.check_password_hash(
user.password, post_data.get('password')
):
auth_token = user.encode_auth_token(user.id)
if auth_token:
responseObject = {
'status': 'success',
'message': 'Successfully logged in.',
'auth_token': auth_token.decode()
}
return make_response(jsonify(responseObject)), 200
else:
responseObject = {
'status': 'fail',
'message': 'User does not exist.'
}
return make_response(jsonify(responseObject)), 404
except Exception as e:
print(e)
responseObject = {
'status': 'fail',
'message': 'Try again'
}
return make_response(jsonify(responseObject)), 500
我们改变了什么?测试通过了吗?邮件正确但密码不正确怎么办?会发生什么?为此写一个测试!
用户状态路线
为了获得当前登录用户的用户详细信息,auth 令牌必须与请求一起在报头中发送。
从一个测试开始:
def test_user_status(self):
""" Test for user status """
with self.client:
resp_register = self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
response = self.client.get(
'/auth/status',
headers=dict(
Authorization='Bearer ' + json.loads(
resp_register.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'success')
self.assertTrue(data['data'] is not None)
self.assertTrue(data['data']['email'] == 'joe@gmail.com')
self.assertTrue(data['data']['admin'] is 'true' or 'false')
self.assertEqual(response.status_code, 200)
测试应该会失败。现在,在处理程序类中,我们应该:
- 提取身份验证令牌并检查其有效性
- 从有效负载中获取用户 id 并获得用户详细信息(当然,如果令牌有效的话)
class UserAPI(MethodView):
"""
User Resource
"""
def get(self):
# get the auth token
auth_header = request.headers.get('Authorization')
if auth_header:
auth_token = auth_header.split(" ")[1]
else:
auth_token = ''
if auth_token:
resp = User.decode_auth_token(auth_token)
if not isinstance(resp, str):
user = User.query.filter_by(id=resp).first()
responseObject = {
'status': 'success',
'data': {
'user_id': user.id,
'email': user.email,
'admin': user.admin,
'registered_on': user.registered_on
}
}
return make_response(jsonify(responseObject)), 200
responseObject = {
'status': 'fail',
'message': resp
}
return make_response(jsonify(responseObject)), 401
else:
responseObject = {
'status': 'fail',
'message': 'Provide a valid auth token.'
}
return make_response(jsonify(responseObject)), 401
因此,如果令牌有效且未过期,我们将从令牌的有效负载中获取用户 id,然后使用它从数据库中获取用户数据。
**注意:**我们仍然需要检查令牌是否被列入黑名单。我们很快就会谈到这一点。
确保添加:
user_view = UserAPI.as_view('user_api')
并且:
auth_blueprint.add_url_rule(
'/auth/status',
view_func=user_view,
methods=['GET']
)
测试应该通过:
Ran 10 tests in 0.240s
OK
还有一条路要走!
注销路由测试
测试有效注销:
def test_valid_logout(self):
""" Test for logout before token expires """
with self.client:
# user registration
resp_register = self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json',
)
data_register = json.loads(resp_register.data.decode())
self.assertTrue(data_register['status'] == 'success')
self.assertTrue(
data_register['message'] == 'Successfully registered.')
self.assertTrue(data_register['auth_token'])
self.assertTrue(resp_register.content_type == 'application/json')
self.assertEqual(resp_register.status_code, 201)
# user login
resp_login = self.client.post(
'/auth/login',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data_login = json.loads(resp_login.data.decode())
self.assertTrue(data_login['status'] == 'success')
self.assertTrue(data_login['message'] == 'Successfully logged in.')
self.assertTrue(data_login['auth_token'])
self.assertTrue(resp_login.content_type == 'application/json')
self.assertEqual(resp_login.status_code, 200)
# valid token logout
response = self.client.post(
'/auth/logout',
headers=dict(
Authorization='Bearer ' + json.loads(
resp_login.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'success')
self.assertTrue(data['message'] == 'Successfully logged out.')
self.assertEqual(response.status_code, 200)
在第一个测试中,我们注册了一个新用户,让他们登录,然后尝试在令牌过期之前让他们注销。
测试无效注销:
def test_invalid_logout(self):
""" Testing logout after the token expires """
with self.client:
# user registration
resp_register = self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json',
)
data_register = json.loads(resp_register.data.decode())
self.assertTrue(data_register['status'] == 'success')
self.assertTrue(
data_register['message'] == 'Successfully registered.')
self.assertTrue(data_register['auth_token'])
self.assertTrue(resp_register.content_type == 'application/json')
self.assertEqual(resp_register.status_code, 201)
# user login
resp_login = self.client.post(
'/auth/login',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data_login = json.loads(resp_login.data.decode())
self.assertTrue(data_login['status'] == 'success')
self.assertTrue(data_login['message'] == 'Successfully logged in.')
self.assertTrue(data_login['auth_token'])
self.assertTrue(resp_login.content_type == 'application/json')
self.assertEqual(resp_login.status_code, 200)
# invalid token logout
time.sleep(6)
response = self.client.post(
'/auth/logout',
headers=dict(
Authorization='Bearer ' + json.loads(
resp_login.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(
data['message'] == 'Signature expired. Please log in again.')
self.assertEqual(response.status_code, 401)
像上一个测试一样,我们注册一个用户,让他们登录,然后尝试让他们注销。在这种情况下,令牌无效,因为它已经过期。
添加导入:
import time
现在,代码必须:
- 验证身份验证令牌
- 将令牌列入黑名单(当然,如果有效的话)
在编写路由处理程序之前,让我们为黑名单令牌创建一个新模型…
黑名单
将以下代码添加到项目/服务器/模型. py 中:
class BlacklistToken(db.Model):
"""
Token Model for storing JWT tokens
"""
__tablename__ = 'blacklist_tokens'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
token = db.Column(db.String(500), unique=True, nullable=False)
blacklisted_on = db.Column(db.DateTime, nullable=False)
def __init__(self, token):
self.token = token
self.blacklisted_on = datetime.datetime.now()
def __repr__(self):
return '<id: token: {}'.format(self.token)
然后创建并应用迁移。完成后,您的数据库应该包含以下表格:
Schema | Name | Type | Owner --------+-------------------------+----------+----------
public | alembic_version | table | postgres public | blacklist_tokens | table | postgres public | blacklist_tokens_id_seq | sequence | postgres public | users | table | postgres public | users_id_seq | sequence | postgres (5 rows)
这样,我们可以添加注销处理程序…
注销路由处理程序
更新视图:
class LogoutAPI(MethodView):
"""
Logout Resource
"""
def post(self):
# get auth token
auth_header = request.headers.get('Authorization')
if auth_header:
auth_token = auth_header.split(" ")[1]
else:
auth_token = ''
if auth_token:
resp = User.decode_auth_token(auth_token)
if not isinstance(resp, str):
# mark the token as blacklisted
blacklist_token = BlacklistToken(token=auth_token)
try:
# insert the token
db.session.add(blacklist_token)
db.session.commit()
responseObject = {
'status': 'success',
'message': 'Successfully logged out.'
}
return make_response(jsonify(responseObject)), 200
except Exception as e:
responseObject = {
'status': 'fail',
'message': e
}
return make_response(jsonify(responseObject)), 200
else:
responseObject = {
'status': 'fail',
'message': resp
}
return make_response(jsonify(responseObject)), 401
else:
responseObject = {
'status': 'fail',
'message': 'Provide a valid auth token.'
}
return make_response(jsonify(responseObject)), 403
# define the API resources
registration_view = RegisterAPI.as_view('register_api')
login_view = LoginAPI.as_view('login_api')
user_view = UserAPI.as_view('user_api')
logout_view = LogoutAPI.as_view('logout_api')
# add Rules for API Endpoints
auth_blueprint.add_url_rule(
'/auth/register',
view_func=registration_view,
methods=['POST']
)
auth_blueprint.add_url_rule(
'/auth/login',
view_func=login_view,
methods=['POST']
)
auth_blueprint.add_url_rule(
'/auth/status',
view_func=user_view,
methods=['GET']
)
auth_blueprint.add_url_rule(
'/auth/logout',
view_func=logout_view,
methods=['POST']
)
更新导入:
from project.server.models import User, BlacklistToken
当用户注销时,令牌不再有效,因此我们将其添加到黑名单中。
**注意:**通常,较大的应用程序有办法不时更新列入黑名单的令牌,以便系统不会用完有效令牌。
运行测试:
Ran 12 tests in 6.418s
OK
重构
最后,我们需要确保令牌没有被列入黑名单,就在令牌被解码之后- decode_auth_token() -在注销和用户状态路由中。
首先,让我们为注销路由编写一个测试:
def test_valid_blacklisted_token_logout(self):
""" Test for logout after a valid token gets blacklisted """
with self.client:
# user registration
resp_register = self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json',
)
data_register = json.loads(resp_register.data.decode())
self.assertTrue(data_register['status'] == 'success')
self.assertTrue(
data_register['message'] == 'Successfully registered.')
self.assertTrue(data_register['auth_token'])
self.assertTrue(resp_register.content_type == 'application/json')
self.assertEqual(resp_register.status_code, 201)
# user login
resp_login = self.client.post(
'/auth/login',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
data_login = json.loads(resp_login.data.decode())
self.assertTrue(data_login['status'] == 'success')
self.assertTrue(data_login['message'] == 'Successfully logged in.')
self.assertTrue(data_login['auth_token'])
self.assertTrue(resp_login.content_type == 'application/json')
self.assertEqual(resp_login.status_code, 200)
# blacklist a valid token
blacklist_token = BlacklistToken(
token=json.loads(resp_login.data.decode())['auth_token'])
db.session.add(blacklist_token)
db.session.commit()
# blacklisted valid token logout
response = self.client.post(
'/auth/logout',
headers=dict(
Authorization='Bearer ' + json.loads(
resp_login.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(data['message'] == 'Token blacklisted. Please log in again.')
self.assertEqual(response.status_code, 401)
在这个测试中,我们在注销路由命中之前将令牌列入黑名单,这使得我们的有效令牌不可用。
更新导入:
from project.server.models import User, BlacklistToken
测试应该会失败,并出现以下异常:
psycopg2.IntegrityError: duplicate key value violates unique constraint "blacklist_tokens_token_key"
DETAIL: Key (token)=(eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE0ODUyMDgyOTUsImlhdCI6MTQ4NTIwODI5MCwic3ViIjoxfQ.D9annoyh-VwpI5RY3blaSBX4pzK5UJi1H9dmKg2DeLQ) already exists.
现在更新decode_auth_token函数,以便在解码后立即处理已经列入黑名单的令牌,并使用适当的消息进行响应。
@staticmethod
def decode_auth_token(auth_token):
"""
Validates the auth token
:param auth_token:
:return: integer|string
"""
try:
payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'))
is_blacklisted_token = BlacklistToken.check_blacklist(auth_token)
if is_blacklisted_token:
return 'Token blacklisted. Please log in again.'
else:
return payload['sub']
except jwt.ExpiredSignatureError:
return 'Signature expired. Please log in again.'
except jwt.InvalidTokenError:
return 'Invalid token. Please log in again.'
最后,将check_blacklist()函数添加到BlacklistToken类中的项目/服务器/模型. py 中:
@staticmethod
def check_blacklist(auth_token):
# check whether auth token has been blacklisted
res = BlacklistToken.query.filter_by(token=str(auth_token)).first()
if res:
return True
else:
return False
在运行测试之前,更新test_decode_auth_token将 bytes 对象转换成一个字符串:
def test_decode_auth_token(self):
user = User(
email='test@test.com',
password='test'
)
db.session.add(user)
db.session.commit()
auth_token = user.encode_auth_token(user.id)
self.assertTrue(isinstance(auth_token, bytes))
self.assertTrue(User.decode_auth_token(
auth_token.decode("utf-8") ) == 1)
运行测试:
Ran 13 tests in 9.557s
OK
以类似的方式,为用户状态路由再添加一个测试。
def test_valid_blacklisted_token_user(self):
""" Test for user status with a blacklisted valid token """
with self.client:
resp_register = self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json'
)
# blacklist a valid token
blacklist_token = BlacklistToken(
token=json.loads(resp_register.data.decode())['auth_token'])
db.session.add(blacklist_token)
db.session.commit()
response = self.client.get(
'/auth/status',
headers=dict(
Authorization='Bearer ' + json.loads(
resp_register.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(data['message'] == 'Token blacklisted. Please log in again.')
self.assertEqual(response.status_code, 401)
与上一个测试类似,我们在用户状态路由命中之前将令牌列入黑名单。
最后一次运行测试:
Ran 14 tests in 10.206s
OK
代码气味
最后看一下 test_auth.py 。注意到重复的代码了吗?例如:
self.client.post(
'/auth/register',
data=json.dumps(dict(
email='joe@gmail.com',
password='123456'
)),
content_type='application/json',
)
这种情况出现了八次。要修复此问题,请在文件顶部添加以下助手:
def register_user(self, email, password):
return self.client.post(
'/auth/register',
data=json.dumps(dict(
email=email,
password=password
)),
content_type='application/json',
)
现在,在任何需要注册用户的地方,您都可以呼叫助手:
register_user(self, 'joe@gmail.com', '123456')
登录一个用户怎么样?自己重构它。还能重构什么?下面评论。
重构
对于 PyBites 挑战,让我们重构一些代码来纠正添加到 GitHub repo 中的一个问题。首先向 test_auth.py 添加以下测试:
def test_user_status_malformed_bearer_token(self):
""" Test for user status with malformed bearer token"""
with self.client:
resp_register = register_user(self, 'joe@gmail.com', '123456')
response = self.client.get(
'/auth/status',
headers=dict(
Authorization='Bearer' + json.loads(
resp_register.data.decode()
)['auth_token']
)
)
data = json.loads(response.data.decode())
self.assertTrue(data['status'] == 'fail')
self.assertTrue(data['message'] == 'Bearer token malformed.')
self.assertEqual(response.status_code, 401)
本质上,如果Authorization头的格式不正确,就会抛出一个错误——例如,Bearer和令牌值之间没有空格。运行测试以确保它们失败,然后更新project/server/auth/views . py中的UserAPI类:
class UserAPI(MethodView):
"""
User Resource
"""
def get(self):
# get the auth token
auth_header = request.headers.get('Authorization')
if auth_header:
try:
auth_token = auth_header.split(" ")[1]
except IndexError:
responseObject = {
'status': 'fail',
'message': 'Bearer token malformed.'
}
return make_response(jsonify(responseObject)), 401
else:
auth_token = ''
if auth_token:
resp = User.decode_auth_token(auth_token)
if not isinstance(resp, str):
user = User.query.filter_by(id=resp).first()
responseObject = {
'status': 'success',
'data': {
'user_id': user.id,
'email': user.email,
'admin': user.admin,
'registered_on': user.registered_on
}
}
return make_response(jsonify(responseObject)), 200
responseObject = {
'status': 'fail',
'message': resp
}
return make_response(jsonify(responseObject)), 401
else:
responseObject = {
'status': 'fail',
'message': 'Provide a valid auth token.'
}
return make_response(jsonify(responseObject)), 401
最后一次测试。
结论
在本教程中,我们经历了使用 JSON Web 令牌向 Flask 应用程序添加身份验证的过程。回到本教程开头的目标。你能把每一个都付诸行动吗?你学到了什么?
下一步是什么?客户端怎么样?查看使用 Angular 的基于令牌的认证,将 Angular 添加到组合中。
要了解如何使用 Flask 从头构建一个完整的 web 应用程序,请查看我们的视频系列:
免费奖励: 点击此处获得免费的 Flask + Python 视频教程,向您展示如何一步一步地构建 Flask web 应用程序。
欢迎在下面的评论中分享你的评论、问题或建议。完整的代码可以在 flask-jwt-auth 存储库中找到。
干杯!********