免费注册 查看新帖 |

Chinaunix

  平台 论坛 博客 文库
最近访问板块 发新帖
查看: 3772 | 回复: 1
打印 上一主题 下一主题

flask_restful token 反签名失败 [复制链接]

论坛徽章:
3
黄金圣斗士
日期:2015-12-01 13:27:34IT运维版块每日发帖之星
日期:2016-04-30 06:20:0015-16赛季CBA联赛之福建
日期:2018-08-10 14:38:43
跳转到指定楼层
1 [收藏(0)] [报告]
发表于 2018-08-03 17:47 |只看该作者 |倒序浏览
------models--------文件

#coding=utf8
from app import db,login_manager
from config import Config
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import UserMixin
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from itsdangerous import SignatureExpired,BadSignature

class User(UserMixin,db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key = True)
    username = db.Column(db.String(32), index = True)
    password_hash = db.Column(db.String(12)
    email = db.Column(db.String(30))


    def generate_auth_token(self, expiration = 3600):
        s = Serializer('mystr',expires_in = expiration)
        return s.dumps({ 'id': self.id })

    @staticmethod
    def verify_auth_token(token):
        print token
        s = Serializer('mystr')
        try:
            data = s.loads(token)
        except SignatureExpired:
            print "过期"
            return None # valid token, but expired
        except BadSignature as bs:
            print bs
            return None # invalid token
        user = User.query.get(data['id'])
        return user





--------api  view----------------文件

#coding:utf-8
from flask import abort,url_for
from flask_restful import reqparse,Resource,abort,fields, marshal_with
from . import api
from flask_httpauth import HTTPBasicAuth,HTTPTokenAuth
from app.utils import Fix_Url,serialize
from app.account.models import User
# ...


# auth = HTTPBasicAuth()
# 认证相关
auth = HTTPTokenAuth(scheme="token"
# auth = HTTPTokenAuth()
# TOKENS = {
#     "fejiasdfhu",
#     "fejiuufjeh"
# }


@auth.verify_token
def verify_token(token):
    res=User.verify_auth_token(token)
    print res,"123"
    if res:
        return True



# user=User.query.all()

users=[ serialize(user) for user in User.query.all()]
def abort_if_todo_doesnt_exist(id):

    user = list(filter(lambda t:t['id']==int(id),users))

    if not user:
        abort(404, message="Todo {} doesn't exist".format(id))
    else:
        return user[0]


class GetToken(Resource):
    def __init__(self):
        self.reqparse = reqparse.RequestParser()
        self.reqparse.add_argument('username', type = str, required = True,
            help = 'No username  provided', location=['json','form','args'])
        self.reqparse.add_argument('password', type = str, required = True,
            help = 'No user password provided', location=['json','form','args'])
        super(GetToken, self).__init__()
    def post(self):
        """Can be execute when receive HTTP Method `POST`."""

        args = self.reqparse.parse_args()
        user = User.query.filter_by(username=args['username']).first()

        # Check the args['password'] whether as same as user.password.
        if user.verify_password(args['password']):
            # serializer object will be saved the token period of time.

            token=user.generate_auth_token()
            print type(token)
            return {'token': token}
        else:
            abort(401)


# # 操作(put / get / delete)单一资源Todo
# shows a single todo item and lets you delete a todo item
class UserApi(Resource):
    decorators = [auth.login_required]
    def __init__(self):
        self.reqparse = reqparse.RequestParser()
        self.reqparse.add_argument('Authorization', type = str, location = ['headers'])
        self.reqparse.add_argument('id', type = int, location = ['json','form','args','headers'])
        self.reqparse.add_argument('username', type = str, location = ['json','form','args'])
        self.reqparse.add_argument('email', type = str, location = ['json','form','args'])
        super(UserApi, self).__init__()
    user_fields = {
        'id': fields.Integer,
        'username': fields.String,
        'email': fields.String,
        # 'password_hash': fields.FormattedString('********'),
    }



    @marshal_with(user_fields)
   def get(self, id):
        print self.reqparse.parse_args()
        
        token=self.reqparse.parse_args().get('Authorization').strip("token "
        vres=verify_token(token)
        print vres
        print "abcd"
        if vres:
            print "hehe"
            return abort_if_todo_doesnt_exist(id)
        else:
            print "haha"


    def delete(self, id):
        task=abort_if_todo_doesnt_exist(id)
        users.remove(task)
        return users, 204

    def put(self, id):
        args = self.reqparse.parse_args()

        user=abort_if_todo_doesnt_exist(id)
        if args.has_key('id'):
            abort(500, message="user's  ID  can not be change"
        for key in args.keys():
            if key =='id' or not args[key]:
                args.pop(key)
        user.update(args)
        return user, 201



# 设置路由
api.add_resource(GetToken, '/token',endpoint='get_token')
api.add_resource(UserListApi, '/users',endpoint='users')
api.add_resource(UserApi, '/users/<int:id>',endpoint='user')



我通过curl -H "Content-Type: application/json" -d '{"username":"wjx","password":"123"}' -i -X POST http://127.0.0.1:5000/api/v1.0/token  获取到token
然后到项目python manage shell 里面用User.verify_auth_token()去验证token都没有问题

但是通过访问http://127.0.0.1:5000/api/v1.0/users/2  通过heders 传Authorization参数去api view 里面去验证就回报反签名失败的错误Signature 'YkDiicE28E3j-y7i80HbVMGG4LR6XEFsAl6OTofMlps' does not match


查了一天都没找到办法

论坛徽章:
3
黄金圣斗士
日期:2015-12-01 13:27:34IT运维版块每日发帖之星
日期:2016-04-30 06:20:0015-16赛季CBA联赛之福建
日期:2018-08-10 14:38:43
2 [报告]
发表于 2018-08-10 14:41 |只看该作者
和content_type有关,而且verify_token不需要在函数中调用,框架会自动调用执行
您需要登录后才可以回帖 登录 | 注册

本版积分规则 发表回复

  

北京盛拓优讯信息技术有限公司. 版权所有 京ICP备16024965号-6 北京市公安局海淀分局网监中心备案编号:11010802020122 niuxiaotong@pcpop.com 17352615567
未成年举报专区
中国互联网协会会员  联系我们:huangweiwei@itpub.net
感谢所有关心和支持过ChinaUnix的朋友们 转载本站内容请注明原作者名及出处

清除 Cookies - ChinaUnix - Archiver - WAP - TOP