Basic Usage
from flask import Flask
from flask import jsonify
from flask import request
from flask_jwt_extended import create_access_token
from flask_jwt_extended import get_jwt_identity
from flask_jwt_extended import jwt_required
from flask_jwt_extended import JWTManager
app = Flask(__name__)
# Setup the Flask-JWT-Extended extension
app.config["JWT_SECRET_KEY"] = "super-secret" # Change this!
jwt = JWTManager(app)
# Create a route to authenticate your users and return JWTs. The
# create_access_token() function is used to actually generate the JWT.
@app.route("/login", methods=["POST"])
def login():
username = request.json.get("username", None)
password = request.json.get("password", None)
if username != "test" or password != "test":
return jsonify({"msg": "Bad username or password"}), 401
access_token = create_access_token(identity=username)
return jsonify(access_token=access_token)
# Protect a route with jwt_required, which will kick out requests
# without a valid JWT present.
@app.route("/protected", methods=["GET"])
@jwt_required()
def protected():
# Access the identity of the current user with get_jwt_identity
current_user = get_jwt_identity()
# 获取的 username
return jsonify(logged_in_as=current_user), 200
if __name__ == "__main__":
app.run()Authorization: Bearer <access_token>
Automatic User Loading
In most web applications it is important to have access to the user who is accessing a protected route. We provide a couple callback functions that make this seamless while working with JWTs.
The first is user_identity_loader(), which will convert any User object used to create a JWT into a JSON serializable format.
On the flip side, you can use user_lookup_loader() to automatically load your User object when a JWT is present in the request. The loaded user is available in your protected routes via current_user.
from hmac import compare_digest
from flask import Flask
from flask import jsonify
from flask import request
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import create_access_token
from flask_jwt_extended import current_user
from flask_jwt_extended import jwt_required
from flask_jwt_extended import JWTManager
app = Flask(__name__)
app.config["JWT_SECRET_KEY"] = "super-secret" # Change this!
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite://"
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
jwt = JWTManager(app)
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.Text, nullable=False, unique=True)
full_name = db.Column(db.Text, nullable=False)
# NOTE: In a real application make sure to properly hash and salt passwords
def check_password(self, password):
return compare_digest(password, "password")
# Register a callback function that takes whatever object is passed in as the
# identity when creating JWTs and converts it to a JSON serializable format.
@jwt.user_identity_loader
def user_identity_lookup(user):
return user.id
# Register a callback function that loads a user from your database whenever
# a protected route is accessed. This should return any python object on a
# successful lookup, or None if the lookup failed for any reason (for example
# if the user has been deleted from the database).
@jwt.user_lookup_loader
def user_lookup_callback(_jwt_header, jwt_data):
identity = jwt_data["sub"]
return User.query.filter_by(id=identity).one_or_none()
@app.route("/login", methods=["POST"])
def login():
username = request.json.get("username", None)
password = request.json.get("password", None)
user = User.query.filter_by(username=username).one_or_none()
if not user or not user.check_password(password):
return jsonify("Wrong username or password"), 401
# Notice that we are passing in the actual sqlalchemy user object here
access_token = create_access_token(identity=user)
return jsonify(access_token=access_token)
@app.route("/who_am_i", methods=["GET"])
@jwt_required()
def protected():
# We can now access our sqlalchemy User object via `current_user`.
return jsonify(
id=current_user.id,
full_name=current_user.full_name,
username=current_user.username,
)
if __name__ == "__main__":
db.create_all()
db.session.add(User(full_name="Bruce Wayne", username="batman"))
db.session.add(User(full_name="Ann Takamaki", username="panther"))
db.session.add(User(full_name="Jester Lavore", username="little_sapphire"))
db.session.commit()
app.run()
Storing Additional Data in JWTs
You may want to store additional information in the access token which you could later access in the protected views. This can be done using the additional_claims argument with the create_access_token() or create_refresh_token() functions. The claims can be accessed in a protected route via the get_jwt() function.
It is important to remember that JWTs are not encrypted and the contents of a JWT can be trivially decoded by anyone who has access to it. As such, you should never put any sensitive information in a JWT.
@app.route("/login", methods=["POST"])
def login():
username = request.json.get("username", None)
password = request.json.get("password", None)
if username != "test" or password != "test":
return jsonify({"msg": "Bad username or password"}), 401
# You can use the additional_claims argument to either add
# custom claims or override default claims in the JWT.
additional_claims = {"aud": "some_audience", "foo": "bar"}
access_token = create_access_token(username, additional_claims=additional_claims)
return jsonify(access_token=access_token)
# In a protected view, get the claims you added to the jwt with the
# get_jwt() method
@app.route("/protected", methods=["GET"])
@jwt_required()
def protected():
claims = get_jwt()
return jsonify(foo=claims["foo"])Alternately you can use the additional_claims_loader() decorator to register a callback function that will be called whenever a new JWT is created, and return a dictionary of claims to add to that token. In the case that both additional_claims_loader() and the additional_claims argument are used, both results are merged together, with ties going to the data supplied by the additional_claims argument.
# Using the additional_claims_loader, we can specify a method that will be
# called when creating JWTs. The decorated method must take the identity
# we are creating a token for and return a dictionary of additional
# claims to add to the JWT.
@jwt.additional_claims_loader
def add_claims_to_access_token(identity):
return {
"aud": "some_audience",
"foo": "bar",
"upcase_name": identity.upper(),
}get_jwt 获取所有的内容,其中 sub 存储 identity 的内容,还会存储 additional_claims_loader 中配置的内容,如下面存储了 aud、foo、upcase—name。
{'fresh': False, 'iat': 1698634345, 'jti': '8e64197b-3042-4331-9c28-aad14b5f53d9', 'type': 'access', 'sub': 'test', 'nbf': 1698634345, 'exp': 1698634405, 'aud': 'some_audience', 'foo': 'bar', 'upcase_name': 'TEST'}Partially protecting routes = 可选择鉴权
There may be cases where you want to use the same route regardless of if a JWT is present in the request or not. In these situations, you can use jwt_required() with the optional=True argument. This will allow the endpoint to be accessed regardless of if a JWT is sent in with the request.
If no JWT is present, get_jwt() and get_jwt_header(), will return an empty dictionary. get_jwt_identity(), current_user, and get_current_user() will return None.
@app.route("/optionally_protected", methods=["GET"])
@jwt_required(optional=True)
def optionally_protected():
current_identity = get_jwt_identity()
if current_identity:
return jsonify(logged_in_as=current_identity)
else:
return jsonify(logged_in_as="anonymous user")JWT Locations
JWT Locations — flask-jwt-extended 4.5.3 documentation
主要是可以学习下如何通过 cookie 来进行验证
Changing Default Behaviors
@jwt.expired_token_loader
def my_expired_token_callback(jwt_header, jwt_payload):
return jsonify(code="dave", err="I can't let you do that"), 401There are all sorts of callbacks that can be defined to customize the behaviors of this extension. See the Configuring Flask-JWT-Extended API Documentation for a full list of callback functions that are available in this extension.
Custom Decorators
flask_jwt_extended.verify_jwt_in_request() can be used to build your own decorators. This is the same function used by flask_jwt_extended.jwt_required().
# Here is a custom decorator that verifies the JWT is present in the request,
# as well as insuring that the JWT has a claim indicating that this user is
# an administrator
def admin_required():
def wrapper(fn):
@wraps(fn)
def decorator(*args, **kwargs):
verify_jwt_in_request()
claims = get_jwt()
if claims["is_administrator"]:
return fn(*args, **kwargs)
else:
return jsonify(msg="Admins only!"), 403
return decorator
return wrapper
@app.route("/login", methods=["POST"])
def login():
access_token = create_access_token(
"admin_user", additional_claims={"is_administrator": True}
)
return jsonify(access_token=access_token)
@app.route("/protected", methods=["GET"])
@admin_required()
def protected():
return jsonify(foo="bar")
后记
最后项目里还是没有使用这个功能,原因在于:
- 当前的系统不需要短时效验证,希望是一个长时效的 token。
- flask-jwt-extended 库采用 HS256 对称加密算法,过于简单。当前存在 jwt-tool 工具,可以破解出该库生成的 token。因此 token 中不能存放重要数据,避免使用者数据泄露;也不能存放可伪造的数据,避免伪造身份获取数据库数据。
最终的方案是:
- 基于 user-id、uuid、datatime 生成一个 token,将这个 token 返回给前端。这个 token 不需要能反向解密出来。
- 将 token 通过 md5 或者 sha256 加密,写入到数据库中,可以建立索引来辅助查找。
- 用户携带 token 请求数据,将加密后的数据与数据库中的数据进行比对,获得当前用户。
- 当用户忘记 token,则需要通过登录来重新生成 token 并写入到数据库中。
疑问:
- 如果多台设备进行登录,一个设备登录后,其他设备的 token 就被迫过期了。这应该不是期望的现象。