geekdoc-python-zh/docs/realpython/token-based-authentication-...

44 KiB
Raw Permalink Blame History

使用 Flask 的基于令牌的认证

原文:https://realpython.com/token-based-authentication-with-flask/

本教程采用测试优先的方法,使用 JSON Web 令牌(jwt)在 Flask 应用程序中实现基于令牌的认证。

更新:

目标

本教程结束时,您将能够…

  1. 讨论使用 jwt 与会话和 cookies 进行身份验证的优势
  2. 用 JWTs 实现用户认证
  3. 必要时将用户令牌列入黑名单
  4. 编写测试来创建和验证 jwt 和用户认证
  5. 实践测试驱动的开发

免费奖励: 点击此处获得免费的 Flask + Python 视频教程,向您展示如何一步一步地构建 Flask web 应用程序。

Remove ads

简介

JSON Web 令牌(或 JWTs)提供了一种从客户端向服务器传输信息的方式,这是一种安全的无状态的方式。

在服务器上jwt 是通过使用秘密密钥对用户信息进行签名而生成的,然后安全地存储在客户机上。这种形式的身份验证与现代的单页面应用程序配合得很好。有关这方面的更多信息,以及使用 JWTs 与会话和基于 cookie 的身份验证的优缺点,请查看以下文章:

  1. 饼干 vs 代币:权威指南
  2. 令牌认证与 cookie
  3. 在 Flask 中会话是如何工作的?

**注意:**请记住,由于 JWT 是由签名的,而不是加密的,它不应该包含像用户密码这样的敏感信息。

开始使用

理论够了,开始实现一些代码吧!

项目设置

首先克隆项目样板文件,然后创建一个新的分支:

$ git clone https://github.com/realpython/flask-jwt-auth.git
$ cd flask-jwt-auth
$ git checkout tags/1.0.0 -b jwt-auth

创建并激活 virtualenv 并安装依赖项:

$ python3.6 -m venv env
$ source env/bin/activate
(env)$ pip install -r requirements.txt

这是可选的,但是创建一个新的 Github 存储库并更新 remote 是个好主意:

(env)$ git remote set-url origin <newurl>

数据库设置

让我们设置 Postgres。

注意:如果你在苹果电脑上,看看的 Postgres 应用

一旦本地 Postgres 服务器运行,从psql创建两个新的数据库,它们与您的项目名称同名:

(env)$  psql #  create  database  flask_jwt_auth; CREATE  DATABASE #  create  database  flask_jwt_auth_test; CREATE  DATABASE #  \q

注意:根据您的 Postgres 版本,上述创建数据库的命令可能会有一些变化。检查 Postgres 文档中的正确命令。

在应用数据库迁移之前,我们需要更新位于 project/server/config.py 中的配置文件。简单更新一下database_name:

database_name = 'flask_jwt_auth'

在终端中设置环境变量:

(env)$ export APP_SETTINGS="project.server.config.DevelopmentConfig"

更新project/tests/test _ _ config . py中的以下测试:

class TestDevelopmentConfig(TestCase):
    def create_app(self):
        app.config.from_object('project.server.config.DevelopmentConfig')
        return app

    def test_app_is_development(self):
        self.assertTrue(app.config['DEBUG'] is True)
        self.assertFalse(current_app is None)
        self.assertTrue(
            app.config['SQLALCHEMY_DATABASE_URI'] == 'postgresql://postgres:@localhost/flask_jwt_auth'
        )

class TestTestingConfig(TestCase):
    def create_app(self):
        app.config.from_object('project.server.config.TestingConfig')
        return app

    def test_app_is_testing(self):
        self.assertTrue(app.config['DEBUG'])
        self.assertTrue(
            app.config['SQLALCHEMY_DATABASE_URI'] == 'postgresql://postgres:@localhost/flask_jwt_auth_test'
        )

运行它们以确保它们仍然通过:

(env)$ python manage.py test

您应该看到:

test_app_is_development (test__config.TestDevelopmentConfig) ... ok
test_app_is_production (test__config.TestProductionConfig) ... ok
test_app_is_testing (test__config.TestTestingConfig) ... ok

----------------------------------------------------------------------
Ran 3 tests in 0.007s

OK

Remove ads

迁移

在“服务器”目录中添加一个 models.py 文件:

# project/server/models.py

import datetime

from project.server import app, db, bcrypt

class User(db.Model):
    """ User Model for storing user related details """
    __tablename__ = "users"

    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    email = db.Column(db.String(255), unique=True, nullable=False)
    password = db.Column(db.String(255), nullable=False)
    registered_on = db.Column(db.DateTime, nullable=False)
    admin = db.Column(db.Boolean, nullable=False, default=False)

    def __init__(self, email, password, admin=False):
        self.email = email
        self.password = bcrypt.generate_password_hash(
            password, app.config.get('BCRYPT_LOG_ROUNDS')
        ).decode()
        self.registered_on = datetime.datetime.now()
        self.admin = admin

在上面的代码片段中,我们定义了一个基本的用户模型,它使用 Flask-Bcrypt 扩展来散列密码。

安装 psycopg2 连接到 Postgres:

(env)$ pip install psycopg2==2.6.2
(env)$ pip freeze > requirements.txt

manage.py 内更改-

from project.server import app, db

from project.server import app, db, models

应用迁移:

(env)$ python manage.py create_db
(env)$ python manage.py db init
(env)$ python manage.py db migrate

健全性检查

成功了吗?

(env)$  psql #  \c  flask_jwt_auth You  are  now  connected  to  database  "flask_jwt_auth"  as  user  "michael.herman". #  \d List  of  relations Schema  |  Name  |  Type  |  Owner --------+-----------------+----------+----------
  public  |  alembic_version  |  table  |  postgres public  |  users  |  table  |  postgres public  |  users_id_seq  |  sequence  |  postgres (3  rows)

JWT 设置

身份验证工作流的工作方式如下:

  • 客户端提供电子邮件和密码,发送给服务器
  • 然后,服务器验证电子邮件和密码是否正确,并使用一个身份验证令牌进行响应
  • 客户端存储令牌,并将其与所有后续请求一起发送给 API
  • 服务器解码令牌并验证它

这个循环重复进行,直到令牌过期或被撤销。在后一种情况下,服务器会发出一个新的令牌。

令牌本身分为三个部分:

  • 页眉
  • 有效载荷
  • 签名

我们将更深入地研究有效负载,但是如果您有兴趣,您可以从 JSON Web Tokens 的文章中阅读关于每个部分的更多内容。

要在我们的应用程序中使用 JSON Web 令牌,请安装 PyJWT 包:

(env)$ pip install pyjwt==1.4.2
(env)$ pip freeze > requirements.txt

Remove ads

编码令牌

将下面的方法添加到项目/服务器/模型. py 中的User()类中:

def encode_auth_token(self, user_id):
    """
 Generates the Auth Token
 :return: string
 """
    try:
        payload = {
            'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, seconds=5),
            'iat': datetime.datetime.utcnow(),
            'sub': user_id
        }
        return jwt.encode(
            payload,
            app.config.get('SECRET_KEY'),
            algorithm='HS256'
        )
    except Exception as e:
        return e

不要忘记添加导入:

import jwt

因此,给定一个用户 id这个方法从有效负载和在 config.py 文件中设置的密钥创建并返回一个令牌。负载是我们添加关于令牌的元数据和关于用户的信息的地方。这些信息通常被称为 JWT 声称的。我们利用以下“声明”:

  • exp:令牌到期日期
  • iat:令牌生成的时间
  • sub:令牌的主题(它标识的用户)

秘密密钥必须是随机的,并且只能在服务器端访问。使用 Python 解释器生成密钥:

>>> import os
>>> os.urandom(24)
b"\xf9'\xe4p(\xa9\x12\x1a!\x94\x8d\x1c\x99l\xc7\xb7e\xc7c\x86\x02MJ\xa0"

将密钥设置为环境变量:

(env)$ export SECRET_KEY="\xf9'\xe4p(\xa9\x12\x1a!\x94\x8d\x1c\x99l\xc7\xb7e\xc7c\x86\x02MJ\xa0"

将此键添加到项目/服务器/配置文件BaseConfig()类内的SECRET_KEY:

SECRET_KEY = os.getenv('SECRET_KEY', 'my_precious')

更新project/tests/test _ _ config . py中的测试,以确保变量设置正确:

def test_app_is_development(self):
    self.assertFalse(app.config['SECRET_KEY'] is 'my_precious')
    self.assertTrue(app.config['DEBUG'] is True)
    self.assertFalse(current_app is None)
    self.assertTrue(
        app.config['SQLALCHEMY_DATABASE_URI'] == 'postgresql://postgres:@localhost/flask_jwt_auth'
    )

class TestTestingConfig(TestCase):
    def create_app(self):
        app.config.from_object('project.server.config.TestingConfig')
        return app

    def test_app_is_testing(self):
        self.assertFalse(app.config['SECRET_KEY'] is 'my_precious')
        self.assertTrue(app.config['DEBUG'])
        self.assertTrue(
            app.config['SQLALCHEMY_DATABASE_URI'] == 'postgresql://postgres:@localhost/flask_jwt_auth_test'
        )

在继续之前,让我们为用户模型编写一个快速的单元测试。将以下代码添加到“项目/测试”中名为 test_user_model.py 的新文件中:

# project/tests/test_user_model.py

import unittest

from project.server import db
from project.server.models import User
from project.tests.base import BaseTestCase

class TestUserModel(BaseTestCase):

    def test_encode_auth_token(self):
        user = User(
            email='test@test.com',
            password='test'
        )
        db.session.add(user)
        db.session.commit()
        auth_token = user.encode_auth_token(user.id)
        self.assertTrue(isinstance(auth_token, bytes))

if __name__ == '__main__':
    unittest.main()

进行测试。他们都应该通过。

解码令牌

类似地,要解码一个令牌,将下面的方法添加到User()类中:

@staticmethod
def decode_auth_token(auth_token):
    """
 Decodes the auth token
 :param auth_token:
 :return: integer|string
 """
    try:
        payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'))
        return payload['sub']
    except jwt.ExpiredSignatureError:
        return 'Signature expired. Please log in again.'
    except jwt.InvalidTokenError:
        return 'Invalid token. Please log in again.'

我们需要对每个 API 请求的 auth 令牌进行解码,并验证其签名,以确保用户的真实性。为了验证auth_token,我们使用了与编码令牌相同的SECRET_KEY

如果auth_token有效,我们从有效载荷的sub索引中获取用户 id。如果无效可能有两种例外情况:

  1. 过期签名:当令牌过期后被使用时,它抛出一个ExpiredSignatureError异常。这意味着有效载荷的exp字段中指定的时间已经过期。
  2. 无效令牌:当提供的令牌不正确或格式不正确时,就会引发一个InvalidTokenError异常。

**注意:**我们使用了一个静态方法,因为它与类的实例无关。

test_user_model.py 添加一个测试:

def test_decode_auth_token(self):
    user = User(
        email='test@test.com',
        password='test'
    )
    db.session.add(user)
    db.session.commit()
    auth_token = user.encode_auth_token(user.id)
    self.assertTrue(isinstance(auth_token, bytes))
    self.assertTrue(User.decode_auth_token(auth_token) == 1)

确保在继续之前通过测试。

**注意:**我们稍后将通过将无效令牌列入黑名单来处理它们。

Remove ads

路线设置

现在,我们可以使用测试优先的方法来配置授权路由:

  • /auth/register
  • /auth/login
  • /auth/logout
  • /auth/user

首先在“项目/服务器”中创建一个名为“auth”的新文件夹。然后在“auth”内添加两个文件 init。py视图。最后,将以下代码添加到 views.py :

# project/server/auth/views.py

from flask import Blueprint, request, make_response, jsonify
from flask.views import MethodView

from project.server import bcrypt, db
from project.server.models import User

auth_blueprint = Blueprint('auth', __name__)

要在应用程序中注册新的蓝图,请将以下内容添加到项目/服务器/init 的底部。py :

from project.server.auth.views import auth_blueprint
app.register_blueprint(auth_blueprint)

现在在“project/tests”中添加一个名为 test_auth.py 的新文件来保存我们对这个蓝图的所有测试:

# project/tests/test_auth.py

import unittest

from project.server import db
from project.server.models import User
from project.tests.base import BaseTestCase

class TestAuthBlueprint(BaseTestCase):
    pass

if __name__ == '__main__':
    unittest.main()

注册路线

从一个测试开始:

def test_registration(self):
    """ Test for user registration """
    with self.client:
        response = self.client.post(
            '/auth/register',
            data=json.dumps(dict(
                email='joe@gmail.com',
                password='123456'
            )),
            content_type='application/json'
        )
        data = json.loads(response.data.decode())
        self.assertTrue(data['status'] == 'success')
        self.assertTrue(data['message'] == 'Successfully registered.')
        self.assertTrue(data['auth_token'])
        self.assertTrue(response.content_type == 'application/json')
        self.assertEqual(response.status_code, 201)

确保添加导入:

import json

进行测试。您应该会看到以下错误:

raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

现在,让我们编写通过测试的代码。将以下内容添加到project/server/auth/views . py中:

class RegisterAPI(MethodView):
    """
 User Registration Resource
 """

    def post(self):
        # get the post data
        post_data = request.get_json()
        # check if user already exists
        user = User.query.filter_by(email=post_data.get('email')).first()
        if not user:
            try:
                user = User(
                    email=post_data.get('email'),
                    password=post_data.get('password')
                )

                # insert the user
                db.session.add(user)
                db.session.commit()
                # generate the auth token
                auth_token = user.encode_auth_token(user.id)
                responseObject = {
                    'status': 'success',
                    'message': 'Successfully registered.',
                    'auth_token': auth_token.decode()
                }
                return make_response(jsonify(responseObject)), 201
            except Exception as e:
                responseObject = {
                    'status': 'fail',
                    'message': 'Some error occurred. Please try again.'
                }
                return make_response(jsonify(responseObject)), 401
        else:
            responseObject = {
                'status': 'fail',
                'message': 'User already exists. Please Log in.',
            }
            return make_response(jsonify(responseObject)), 202

# define the API resources
registration_view = RegisterAPI.as_view('register_api')

# add Rules for API Endpoints
auth_blueprint.add_url_rule(
    '/auth/register',
    view_func=registration_view,
    methods=['POST']
)

这里,我们注册了一个新用户,并为进一步的请求生成了一个新的 auth token我们将它发送回客户端。

运行测试以确保它们全部通过:

Ran 6 tests in 0.132s

OK

接下来,让我们再添加一个测试,以确保在用户已经存在的情况下注册失败:

def test_registered_with_already_registered_user(self):
    """ Test registration with already registered email"""
    user = User(
        email='joe@gmail.com',
        password='test'
    )
    db.session.add(user)
    db.session.commit()
    with self.client:
        response = self.client.post(
            '/auth/register',
            data=json.dumps(dict(
                email='joe@gmail.com',
                password='123456'
            )),
            content_type='application/json'
        )
        data = json.loads(response.data.decode())
        self.assertTrue(data['status'] == 'fail')
        self.assertTrue(
            data['message'] == 'User already exists. Please Log in.')
        self.assertTrue(response.content_type == 'application/json')
        self.assertEqual(response.status_code, 202)

在进入下一条路线之前,再次进行测试。一切都会过去。

Remove ads

登录路线

再次,从一个测试开始。为了验证登录 API让我们测试两种情况:

  1. 注册用户登录
  2. 非注册用户登录

注册用户登录

def test_registered_user_login(self):
    """ Test for login of registered-user login """
    with self.client:
        # user registration
        resp_register = self.client.post(
            '/auth/register',
            data=json.dumps(dict(
                email='joe@gmail.com',
                password='123456'
            )),
            content_type='application/json',
        )
        data_register = json.loads(resp_register.data.decode())
        self.assertTrue(data_register['status'] == 'success')
        self.assertTrue(
            data_register['message'] == 'Successfully registered.'
        )
        self.assertTrue(data_register['auth_token'])
        self.assertTrue(resp_register.content_type == 'application/json')
        self.assertEqual(resp_register.status_code, 201)
        # registered user login
        response = self.client.post(
            '/auth/login',
            data=json.dumps(dict(
                email='joe@gmail.com',
                password='123456'
            )),
            content_type='application/json'
        )
        data = json.loads(response.data.decode())
        self.assertTrue(data['status'] == 'success')
        self.assertTrue(data['message'] == 'Successfully logged in.')
        self.assertTrue(data['auth_token'])
        self.assertTrue(response.content_type == 'application/json')
        self.assertEqual(response.status_code, 200)

在这个测试用例中,注册用户试图登录,正如所料,我们的应用程序应该允许这样做。

进行测试。他们应该失败。现在编写代码:

class LoginAPI(MethodView):
    """
 User Login Resource
 """
    def post(self):
        # get the post data
        post_data = request.get_json()
        try:
            # fetch the user data
            user = User.query.filter_by(
                email=post_data.get('email')
              ).first()
            auth_token = user.encode_auth_token(user.id)
            if auth_token:
                responseObject = {
                    'status': 'success',
                    'message': 'Successfully logged in.',
                    'auth_token': auth_token.decode()
                }
                return make_response(jsonify(responseObject)), 200
        except Exception as e:
            print(e)
            responseObject = {
                'status': 'fail',
                'message': 'Try again'
            }
            return make_response(jsonify(responseObject)), 500

不要忘记将类转换成视图函数:

# define the API resources
registration_view = RegisterAPI.as_view('register_api')
login_view = LoginAPI.as_view('login_api')

# add Rules for API Endpoints
auth_blueprint.add_url_rule(
    '/auth/register',
    view_func=registration_view,
    methods=['POST']
)
auth_blueprint.add_url_rule(
    '/auth/login',
    view_func=login_view,
    methods=['POST']
)

再次运行测试。他们通过了吗?他们应该。在所有测试通过之前,不要继续前进。

非注册用户登录

添加测试:

def test_non_registered_user_login(self):
    """ Test for login of non-registered user """
    with self.client:
        response = self.client.post(
            '/auth/login',
            data=json.dumps(dict(
                email='joe@gmail.com',
                password='123456'
            )),
            content_type='application/json'
        )
        data = json.loads(response.data.decode())
        self.assertTrue(data['status'] == 'fail')
        self.assertTrue(data['message'] == 'User does not exist.')
        self.assertTrue(response.content_type == 'application/json')
        self.assertEqual(response.status_code, 404)

在这种情况下,一个未注册的用户试图登录,正如所料,我们的应用程序不应该允许这样做。

运行测试,然后更新代码:

class LoginAPI(MethodView):
    """
 User Login Resource
 """
    def post(self):
        # get the post data
        post_data = request.get_json()
        try:
            # fetch the user data
            user = User.query.filter_by(
                email=post_data.get('email')
            ).first()
            if user and bcrypt.check_password_hash(
                user.password, post_data.get('password')
            ):
                auth_token = user.encode_auth_token(user.id)
                if auth_token:
                    responseObject = {
                        'status': 'success',
                        'message': 'Successfully logged in.',
                        'auth_token': auth_token.decode()
                    }
                    return make_response(jsonify(responseObject)), 200
            else:
                responseObject = {
                    'status': 'fail',
                    'message': 'User does not exist.'
                }
                return make_response(jsonify(responseObject)), 404
        except Exception as e:
            print(e)
            responseObject = {
                'status': 'fail',
                'message': 'Try again'
            }
            return make_response(jsonify(responseObject)), 500

我们改变了什么?测试通过了吗?邮件正确但密码不正确怎么办?会发生什么?为此写一个测试!

用户状态路线

为了获得当前登录用户的用户详细信息auth 令牌必须与请求一起在报头中发送。

从一个测试开始:

def test_user_status(self):
    """ Test for user status """
    with self.client:
        resp_register = self.client.post(
            '/auth/register',
            data=json.dumps(dict(
                email='joe@gmail.com',
                password='123456'
            )),
            content_type='application/json'
        )
        response = self.client.get(
            '/auth/status',
            headers=dict(
                Authorization='Bearer ' + json.loads(
                    resp_register.data.decode()
                )['auth_token']
            )
        )
        data = json.loads(response.data.decode())
        self.assertTrue(data['status'] == 'success')
        self.assertTrue(data['data'] is not None)
        self.assertTrue(data['data']['email'] == 'joe@gmail.com')
        self.assertTrue(data['data']['admin'] is 'true' or 'false')
        self.assertEqual(response.status_code, 200)

测试应该会失败。现在,在处理程序类中,我们应该:

  • 提取身份验证令牌并检查其有效性
  • 从有效负载中获取用户 id 并获得用户详细信息(当然,如果令牌有效的话)
class UserAPI(MethodView):
    """
 User Resource
 """
    def get(self):
        # get the auth token
        auth_header = request.headers.get('Authorization')
        if auth_header:
            auth_token = auth_header.split(" ")[1]
        else:
            auth_token = ''
        if auth_token:
            resp = User.decode_auth_token(auth_token)
            if not isinstance(resp, str):
                user = User.query.filter_by(id=resp).first()
                responseObject = {
                    'status': 'success',
                    'data': {
                        'user_id': user.id,
                        'email': user.email,
                        'admin': user.admin,
                        'registered_on': user.registered_on
                    }
                }
                return make_response(jsonify(responseObject)), 200
            responseObject = {
                'status': 'fail',
                'message': resp
            }
            return make_response(jsonify(responseObject)), 401
        else:
            responseObject = {
                'status': 'fail',
                'message': 'Provide a valid auth token.'
            }
            return make_response(jsonify(responseObject)), 401

因此,如果令牌有效且未过期,我们将从令牌的有效负载中获取用户 id然后使用它从数据库中获取用户数据。

**注意:**我们仍然需要检查令牌是否被列入黑名单。我们很快就会谈到这一点。

确保添加:

user_view = UserAPI.as_view('user_api')

并且:

auth_blueprint.add_url_rule(
    '/auth/status',
    view_func=user_view,
    methods=['GET']
)

测试应该通过:

Ran 10 tests in 0.240s

OK

还有一条路要走!

Remove ads

注销路由测试

测试有效注销:

def test_valid_logout(self):
    """ Test for logout before token expires """
    with self.client:
        # user registration
        resp_register = self.client.post(
            '/auth/register',
            data=json.dumps(dict(
                email='joe@gmail.com',
                password='123456'
            )),
            content_type='application/json',
        )
        data_register = json.loads(resp_register.data.decode())
        self.assertTrue(data_register['status'] == 'success')
        self.assertTrue(
            data_register['message'] == 'Successfully registered.')
        self.assertTrue(data_register['auth_token'])
        self.assertTrue(resp_register.content_type == 'application/json')
        self.assertEqual(resp_register.status_code, 201)
        # user login
        resp_login = self.client.post(
            '/auth/login',
            data=json.dumps(dict(
                email='joe@gmail.com',
                password='123456'
            )),
            content_type='application/json'
        )
        data_login = json.loads(resp_login.data.decode())
        self.assertTrue(data_login['status'] == 'success')
        self.assertTrue(data_login['message'] == 'Successfully logged in.')
        self.assertTrue(data_login['auth_token'])
        self.assertTrue(resp_login.content_type == 'application/json')
        self.assertEqual(resp_login.status_code, 200)
        # valid token logout
        response = self.client.post(
            '/auth/logout',
            headers=dict(
                Authorization='Bearer ' + json.loads(
                    resp_login.data.decode()
                )['auth_token']
            )
        )
        data = json.loads(response.data.decode())
        self.assertTrue(data['status'] == 'success')
        self.assertTrue(data['message'] == 'Successfully logged out.')
        self.assertEqual(response.status_code, 200)

在第一个测试中,我们注册了一个新用户,让他们登录,然后尝试在令牌过期之前让他们注销。

测试无效注销:

def test_invalid_logout(self):
    """ Testing logout after the token expires """
    with self.client:
        # user registration
        resp_register = self.client.post(
            '/auth/register',
            data=json.dumps(dict(
                email='joe@gmail.com',
                password='123456'
            )),
            content_type='application/json',
        )
        data_register = json.loads(resp_register.data.decode())
        self.assertTrue(data_register['status'] == 'success')
        self.assertTrue(
            data_register['message'] == 'Successfully registered.')
        self.assertTrue(data_register['auth_token'])
        self.assertTrue(resp_register.content_type == 'application/json')
        self.assertEqual(resp_register.status_code, 201)
        # user login
        resp_login = self.client.post(
            '/auth/login',
            data=json.dumps(dict(
                email='joe@gmail.com',
                password='123456'
            )),
            content_type='application/json'
        )
        data_login = json.loads(resp_login.data.decode())
        self.assertTrue(data_login['status'] == 'success')
        self.assertTrue(data_login['message'] == 'Successfully logged in.')
        self.assertTrue(data_login['auth_token'])
        self.assertTrue(resp_login.content_type == 'application/json')
        self.assertEqual(resp_login.status_code, 200)
        # invalid token logout
        time.sleep(6)
        response = self.client.post(
            '/auth/logout',
            headers=dict(
                Authorization='Bearer ' + json.loads(
                    resp_login.data.decode()
                )['auth_token']
            )
        )
        data = json.loads(response.data.decode())
        self.assertTrue(data['status'] == 'fail')
        self.assertTrue(
            data['message'] == 'Signature expired. Please log in again.')
        self.assertEqual(response.status_code, 401)

像上一个测试一样,我们注册一个用户,让他们登录,然后尝试让他们注销。在这种情况下,令牌无效,因为它已经过期。

添加导入:

import time

现在,代码必须:

  1. 验证身份验证令牌
  2. 将令牌列入黑名单(当然,如果有效的话)

在编写路由处理程序之前,让我们为黑名单令牌创建一个新模型…

黑名单

将以下代码添加到项目/服务器/模型. py 中:

class BlacklistToken(db.Model):
    """
 Token Model for storing JWT tokens
 """
    __tablename__ = 'blacklist_tokens'

    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    token = db.Column(db.String(500), unique=True, nullable=False)
    blacklisted_on = db.Column(db.DateTime, nullable=False)

    def __init__(self, token):
        self.token = token
        self.blacklisted_on = datetime.datetime.now()

    def __repr__(self):
        return '<id: token: {}'.format(self.token)

然后创建并应用迁移。完成后,您的数据库应该包含以下表格:

Schema  |  Name  |  Type  |  Owner --------+-------------------------+----------+----------
public  |  alembic_version  |  table  |  postgres public  |  blacklist_tokens  |  table  |  postgres public  |  blacklist_tokens_id_seq  |  sequence  |  postgres public  |  users  |  table  |  postgres public  |  users_id_seq  |  sequence  |  postgres (5  rows)

这样,我们可以添加注销处理程序…

注销路由处理程序

更新视图:

class LogoutAPI(MethodView):
    """
 Logout Resource
 """
    def post(self):
        # get auth token
        auth_header = request.headers.get('Authorization')
        if auth_header:
            auth_token = auth_header.split(" ")[1]
        else:
            auth_token = ''
        if auth_token:
            resp = User.decode_auth_token(auth_token)
            if not isinstance(resp, str):
                # mark the token as blacklisted
                blacklist_token = BlacklistToken(token=auth_token)
                try:
                    # insert the token
                    db.session.add(blacklist_token)
                    db.session.commit()
                    responseObject = {
                        'status': 'success',
                        'message': 'Successfully logged out.'
                    }
                    return make_response(jsonify(responseObject)), 200
                except Exception as e:
                    responseObject = {
                        'status': 'fail',
                        'message': e
                    }
                    return make_response(jsonify(responseObject)), 200
            else:
                responseObject = {
                    'status': 'fail',
                    'message': resp
                }
                return make_response(jsonify(responseObject)), 401
        else:
            responseObject = {
                'status': 'fail',
                'message': 'Provide a valid auth token.'
            }
            return make_response(jsonify(responseObject)), 403

# define the API resources
registration_view = RegisterAPI.as_view('register_api')
login_view = LoginAPI.as_view('login_api')
user_view = UserAPI.as_view('user_api')
logout_view = LogoutAPI.as_view('logout_api')

# add Rules for API Endpoints
auth_blueprint.add_url_rule(
    '/auth/register',
    view_func=registration_view,
    methods=['POST']
)
auth_blueprint.add_url_rule(
    '/auth/login',
    view_func=login_view,
    methods=['POST']
)
auth_blueprint.add_url_rule(
    '/auth/status',
    view_func=user_view,
    methods=['GET']
)
auth_blueprint.add_url_rule(
    '/auth/logout',
    view_func=logout_view,
    methods=['POST']
)

更新导入:

from project.server.models import User, BlacklistToken

当用户注销时,令牌不再有效,因此我们将其添加到黑名单中。

**注意:**通常,较大的应用程序有办法不时更新列入黑名单的令牌,以便系统不会用完有效令牌。

运行测试:

Ran 12 tests in 6.418s

OK

Remove ads

重构

最后,我们需要确保令牌没有被列入黑名单,就在令牌被解码之后- decode_auth_token() -在注销和用户状态路由中。

首先,让我们为注销路由编写一个测试:

def test_valid_blacklisted_token_logout(self):
    """ Test for logout after a valid token gets blacklisted """
    with self.client:
        # user registration
        resp_register = self.client.post(
            '/auth/register',
            data=json.dumps(dict(
                email='joe@gmail.com',
                password='123456'
            )),
            content_type='application/json',
        )
        data_register = json.loads(resp_register.data.decode())
        self.assertTrue(data_register['status'] == 'success')
        self.assertTrue(
            data_register['message'] == 'Successfully registered.')
        self.assertTrue(data_register['auth_token'])
        self.assertTrue(resp_register.content_type == 'application/json')
        self.assertEqual(resp_register.status_code, 201)
        # user login
        resp_login = self.client.post(
            '/auth/login',
            data=json.dumps(dict(
                email='joe@gmail.com',
                password='123456'
            )),
            content_type='application/json'
        )
        data_login = json.loads(resp_login.data.decode())
        self.assertTrue(data_login['status'] == 'success')
        self.assertTrue(data_login['message'] == 'Successfully logged in.')
        self.assertTrue(data_login['auth_token'])
        self.assertTrue(resp_login.content_type == 'application/json')
        self.assertEqual(resp_login.status_code, 200)
        # blacklist a valid token
        blacklist_token = BlacklistToken(
            token=json.loads(resp_login.data.decode())['auth_token'])
        db.session.add(blacklist_token)
        db.session.commit()
        # blacklisted valid token logout
        response = self.client.post(
            '/auth/logout',
            headers=dict(
                Authorization='Bearer ' + json.loads(
                    resp_login.data.decode()
                )['auth_token']
            )
        )
        data = json.loads(response.data.decode())
        self.assertTrue(data['status'] == 'fail')
        self.assertTrue(data['message'] == 'Token blacklisted. Please log in again.')
        self.assertEqual(response.status_code, 401)

在这个测试中,我们在注销路由命中之前将令牌列入黑名单,这使得我们的有效令牌不可用。

更新导入:

from project.server.models import User, BlacklistToken

测试应该会失败,并出现以下异常:

psycopg2.IntegrityError: duplicate key value violates unique constraint "blacklist_tokens_token_key"
DETAIL:  Key (token)=(eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE0ODUyMDgyOTUsImlhdCI6MTQ4NTIwODI5MCwic3ViIjoxfQ.D9annoyh-VwpI5RY3blaSBX4pzK5UJi1H9dmKg2DeLQ) already exists.

现在更新decode_auth_token函数,以便在解码后立即处理已经列入黑名单的令牌,并使用适当的消息进行响应。

@staticmethod
def decode_auth_token(auth_token):
    """
 Validates the auth token
 :param auth_token:
 :return: integer|string
 """
    try:
        payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'))
        is_blacklisted_token = BlacklistToken.check_blacklist(auth_token)
        if is_blacklisted_token:
            return 'Token blacklisted. Please log in again.'
        else:
            return payload['sub']
    except jwt.ExpiredSignatureError:
        return 'Signature expired. Please log in again.'
    except jwt.InvalidTokenError:
        return 'Invalid token. Please log in again.'

最后,将check_blacklist()函数添加到BlacklistToken类中的项目/服务器/模型. py 中:

@staticmethod
def check_blacklist(auth_token):
    # check whether auth token has been blacklisted
    res = BlacklistToken.query.filter_by(token=str(auth_token)).first()
    if res:
        return True  
    else:
        return False

在运行测试之前,更新test_decode_auth_token将 bytes 对象转换成一个字符串:

def test_decode_auth_token(self):
    user = User(
        email='test@test.com',
        password='test'
    )
    db.session.add(user)
    db.session.commit()
    auth_token = user.encode_auth_token(user.id)
    self.assertTrue(isinstance(auth_token, bytes))
    self.assertTrue(User.decode_auth_token(
        auth_token.decode("utf-8") ) == 1)

运行测试:

Ran 13 tests in 9.557s

OK

以类似的方式,为用户状态路由再添加一个测试。

def test_valid_blacklisted_token_user(self):
    """ Test for user status with a blacklisted valid token """
    with self.client:
        resp_register = self.client.post(
            '/auth/register',
            data=json.dumps(dict(
                email='joe@gmail.com',
                password='123456'
            )),
            content_type='application/json'
        )
        # blacklist a valid token
        blacklist_token = BlacklistToken(
            token=json.loads(resp_register.data.decode())['auth_token'])
        db.session.add(blacklist_token)
        db.session.commit()
        response = self.client.get(
            '/auth/status',
            headers=dict(
                Authorization='Bearer ' + json.loads(
                    resp_register.data.decode()
                )['auth_token']
            )
        )
        data = json.loads(response.data.decode())
        self.assertTrue(data['status'] == 'fail')
        self.assertTrue(data['message'] == 'Token blacklisted. Please log in again.')
        self.assertEqual(response.status_code, 401)

与上一个测试类似,我们在用户状态路由命中之前将令牌列入黑名单。

最后一次运行测试:

Ran 14 tests in 10.206s

OK

Remove ads

代码气味

最后看一下 test_auth.py 。注意到重复的代码了吗?例如:

self.client.post(
    '/auth/register',
    data=json.dumps(dict(
        email='joe@gmail.com',
        password='123456'
    )),
    content_type='application/json',
)

这种情况出现了八次。要修复此问题,请在文件顶部添加以下助手:

def register_user(self, email, password):
    return self.client.post(
        '/auth/register',
        data=json.dumps(dict(
            email=email,
            password=password
        )),
        content_type='application/json',
    )

现在,在任何需要注册用户的地方,您都可以呼叫助手:

register_user(self, 'joe@gmail.com', '123456')

登录一个用户怎么样?自己重构它。还能重构什么?下面评论。

重构

对于 PyBites 挑战,让我们重构一些代码来纠正添加到 GitHub repo 中的一个问题。首先向 test_auth.py 添加以下测试:

def test_user_status_malformed_bearer_token(self):
    """ Test for user status with malformed bearer token"""
    with self.client:
        resp_register = register_user(self, 'joe@gmail.com', '123456')
        response = self.client.get(
            '/auth/status',
            headers=dict(
                Authorization='Bearer' + json.loads(
                    resp_register.data.decode()
                )['auth_token']
            )
        )
        data = json.loads(response.data.decode())
        self.assertTrue(data['status'] == 'fail')
        self.assertTrue(data['message'] == 'Bearer token malformed.')
        self.assertEqual(response.status_code, 401)

本质上,如果Authorization头的格式不正确,就会抛出一个错误——例如,Bearer和令牌值之间没有空格。运行测试以确保它们失败,然后更新project/server/auth/views . py中的UserAPI类:

class UserAPI(MethodView):
    """
 User Resource
 """
    def get(self):
        # get the auth token
        auth_header = request.headers.get('Authorization')
        if auth_header:
            try:
                auth_token = auth_header.split(" ")[1]
            except IndexError:
                responseObject = {
                    'status': 'fail',
                    'message': 'Bearer token malformed.'
                }
                return make_response(jsonify(responseObject)), 401
        else:
            auth_token = ''
        if auth_token:
            resp = User.decode_auth_token(auth_token)
            if not isinstance(resp, str):
                user = User.query.filter_by(id=resp).first()
                responseObject = {
                    'status': 'success',
                    'data': {
                        'user_id': user.id,
                        'email': user.email,
                        'admin': user.admin,
                        'registered_on': user.registered_on
                    }
                }
                return make_response(jsonify(responseObject)), 200
            responseObject = {
                'status': 'fail',
                'message': resp
            }
            return make_response(jsonify(responseObject)), 401
        else:
            responseObject = {
                'status': 'fail',
                'message': 'Provide a valid auth token.'
            }
            return make_response(jsonify(responseObject)), 401

最后一次测试。

结论

在本教程中,我们经历了使用 JSON Web 令牌向 Flask 应用程序添加身份验证的过程。回到本教程开头的目标。你能把每一个都付诸行动吗?你学到了什么?

下一步是什么?客户端怎么样?查看使用 Angular 的基于令牌的认证,将 Angular 添加到组合中。

要了解如何使用 Flask 从头构建一个完整的 web 应用程序,请查看我们的视频系列:

免费奖励: 点击此处获得免费的 Flask + Python 视频教程,向您展示如何一步一步地构建 Flask web 应用程序。

欢迎在下面的评论中分享你的评论、问题或建议。完整的代码可以在 flask-jwt-auth 存储库中找到。

干杯!********