Chinaunix

标题: flask_restful token 反签名失败 [打印本页]

作者: venux    时间: 2018-08-03 17:47
标题: flask_restful token 反签名失败
------models--------文件

#coding=utf8
from app import db,login_manager
from config import Config
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import UserMixin
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from itsdangerous import SignatureExpired,BadSignature

class User(UserMixin,db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key = True)
    username = db.Column(db.String(32), index = True)
    password_hash = db.Column(db.String(12)
    email = db.Column(db.String(30))


    def generate_auth_token(self, expiration = 3600):
        s = Serializer('mystr',expires_in = expiration)
        return s.dumps({ 'id': self.id })

    @staticmethod
    def verify_auth_token(token):
        print token
        s = Serializer('mystr')
        try:
            data = s.loads(token)
        except SignatureExpired:
            print "过期"
            return None # valid token, but expired
        except BadSignature as bs:
            print bs
            return None # invalid token
        user = User.query.get(data['id'])
        return user





--------api  view----------------文件

#coding:utf-8
from flask import abort,url_for
from flask_restful import reqparse,Resource,abort,fields, marshal_with
from . import api
from flask_httpauth import HTTPBasicAuth,HTTPTokenAuth
from app.utils import Fix_Url,serialize
from app.account.models import User
# ...


# auth = HTTPBasicAuth()
# 认证相关
auth = HTTPTokenAuth(scheme="token"
# auth = HTTPTokenAuth()
# TOKENS = {
#     "fejiasdfhu",
#     "fejiuufjeh"
# }


@auth.verify_token
def verify_token(token):
    res=User.verify_auth_token(token)
    print res,"123"
    if res:
        return True



# user=User.query.all()

users=[ serialize(user) for user in User.query.all()]
def abort_if_todo_doesnt_exist(id):

    user = list(filter(lambda t:t['id']==int(id),users))

    if not user:
        abort(404, message="Todo {} doesn't exist".format(id))
    else:
        return user[0]


class GetToken(Resource):
    def __init__(self):
        self.reqparse = reqparse.RequestParser()
        self.reqparse.add_argument('username', type = str, required = True,
            help = 'No username  provided', location=['json','form','args'])
        self.reqparse.add_argument('password', type = str, required = True,
            help = 'No user password provided', location=['json','form','args'])
        super(GetToken, self).__init__()
    def post(self):
        """Can be execute when receive HTTP Method `POST`."""

        args = self.reqparse.parse_args()
        user = User.query.filter_by(username=args['username']).first()

        # Check the args['password'] whether as same as user.password.
        if user.verify_password(args['password']):
            # serializer object will be saved the token period of time.

            token=user.generate_auth_token()
            print type(token)
            return {'token': token}
        else:
            abort(401)


# # 操作(put / get / delete)单一资源Todo
# shows a single todo item and lets you delete a todo item
class UserApi(Resource):
    decorators = [auth.login_required]
    def __init__(self):
        self.reqparse = reqparse.RequestParser()
        self.reqparse.add_argument('Authorization', type = str, location = ['headers'])
        self.reqparse.add_argument('id', type = int, location = ['json','form','args','headers'])
        self.reqparse.add_argument('username', type = str, location = ['json','form','args'])
        self.reqparse.add_argument('email', type = str, location = ['json','form','args'])
        super(UserApi, self).__init__()
    user_fields = {
        'id': fields.Integer,
        'username': fields.String,
        'email': fields.String,
        # 'password_hash': fields.FormattedString('********'),
    }



    @marshal_with(user_fields)
   def get(self, id):
        print self.reqparse.parse_args()
        
        token=self.reqparse.parse_args().get('Authorization').strip("token "
        vres=verify_token(token)
        print vres
        print "abcd"
        if vres:
            print "hehe"
            return abort_if_todo_doesnt_exist(id)
        else:
            print "haha"


    def delete(self, id):
        task=abort_if_todo_doesnt_exist(id)
        users.remove(task)
        return users, 204

    def put(self, id):
        args = self.reqparse.parse_args()

        user=abort_if_todo_doesnt_exist(id)
        if args.has_key('id'):
            abort(500, message="user's  ID  can not be change"
        for key in args.keys():
            if key =='id' or not args[key]:
                args.pop(key)
        user.update(args)
        return user, 201



# 设置路由
api.add_resource(GetToken, '/token',endpoint='get_token')
api.add_resource(UserListApi, '/users',endpoint='users')
api.add_resource(UserApi, '/users/<int:id>',endpoint='user')



我通过curl -H "Content-Type: application/json" -d '{"username":"wjx","password":"123"}' -i -X POST http://127.0.0.1:5000/api/v1.0/token  获取到token
然后到项目python manage shell 里面用User.verify_auth_token()去验证token都没有问题

但是通过访问http://127.0.0.1:5000/api/v1.0/users/2  通过heders 传Authorization参数去api view 里面去验证就回报反签名失败的错误Signature 'YkDiicE28E3j-y7i80HbVMGG4LR6XEFsAl6OTofMlps' does not match


查了一天都没找到办法

作者: venux    时间: 2018-08-10 14:41
和content_type有关,而且verify_token不需要在函数中调用,框架会自动调用执行




欢迎光临 Chinaunix (http://bbs.chinaunix.net/) Powered by Discuz! X3.2