venux 发表于 2018-08-03 17:47

flask_restful token 反签名失败

------models--------文件

#coding=utf8
from app import db,login_manager
from config import Config
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import UserMixin
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from itsdangerous import SignatureExpired,BadSignature

class User(UserMixin,db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key = True)
    username = db.Column(db.String(32), index = True)
    password_hash = db.Column(db.String(128))
    email = db.Column(db.String(30))


    def generate_auth_token(self, expiration = 3600):
      s = Serializer('mystr',expires_in = expiration)
      return s.dumps({ 'id': self.id })

    @staticmethod
    def verify_auth_token(token):
      print token
      s = Serializer('mystr')
      try:
            data = s.loads(token)
      except SignatureExpired:
            print "过期"
            return None # valid token, but expired
      except BadSignature as bs:
            print bs
            return None # invalid token
      user = User.query.get(data['id'])
      return user





--------apiview----------------文件

#coding:utf-8
from flask import abort,url_for
from flask_restful import reqparse,Resource,abort,fields, marshal_with
from . import api
from flask_httpauth import HTTPBasicAuth,HTTPTokenAuth
from app.utils import Fix_Url,serialize
from app.account.models import User
# ...


# auth = HTTPBasicAuth()
# 认证相关
auth = HTTPTokenAuth(scheme="token")
# auth = HTTPTokenAuth()
# TOKENS = {
#   "fejiasdfhu",
#   "fejiuufjeh"
# }


@auth.verify_token
def verify_token(token):
    res=User.verify_auth_token(token)
    print res,"123"
    if res:
      return True



# user=User.query.all()

users=[ serialize(user) for user in User.query.all()]
def abort_if_todo_doesnt_exist(id):

    user = list(filter(lambda t:t['id']==int(id),users))

    if not user:
      abort(404, message="Todo {} doesn't exist".format(id))
    else:
      return user


class GetToken(Resource):
    def __init__(self):
      self.reqparse = reqparse.RequestParser()
      self.reqparse.add_argument('username', type = str, required = True,
            help = 'No usernameprovided', location=['json','form','args'])
      self.reqparse.add_argument('password', type = str, required = True,
            help = 'No user password provided', location=['json','form','args'])
      super(GetToken, self).__init__()
    def post(self):
      """Can be execute when receive HTTP Method `POST`."""

      args = self.reqparse.parse_args()
      user = User.query.filter_by(username=args['username']).first()

      # Check the args['password'] whether as same as user.password.
      if user.verify_password(args['password']):
            # serializer object will be saved the token period of time.

            token=user.generate_auth_token()
            print type(token)
            return {'token': token}
      else:
            abort(401)


# # 操作(put / get / delete)单一资源Todo
# shows a single todo item and lets you delete a todo item
class UserApi(Resource):
    decorators =
    def __init__(self):
      self.reqparse = reqparse.RequestParser()
      self.reqparse.add_argument('Authorization', type = str, location = ['headers'])
      self.reqparse.add_argument('id', type = int, location = ['json','form','args','headers'])
      self.reqparse.add_argument('username', type = str, location = ['json','form','args'])
      self.reqparse.add_argument('email', type = str, location = ['json','form','args'])
      super(UserApi, self).__init__()
    user_fields = {
      'id': fields.Integer,
      'username': fields.String,
      'email': fields.String,
      # 'password_hash': fields.FormattedString('********'),
    }



    @marshal_with(user_fields)
   def get(self, id):
      print self.reqparse.parse_args()
      
      token=self.reqparse.parse_args().get('Authorization').strip("token ")
      vres=verify_token(token)
      print vres
      print "abcd"
      if vres:
            print "hehe"
            return abort_if_todo_doesnt_exist(id)
      else:
            print "haha"


    def delete(self, id):
      task=abort_if_todo_doesnt_exist(id)
      users.remove(task)
      return users, 204

    def put(self, id):
      args = self.reqparse.parse_args()

      user=abort_if_todo_doesnt_exist(id)
      if args.has_key('id'):
            abort(500, message="user'sIDcan not be change")
      for key in args.keys():
            if key =='id' or not args:
                args.pop(key)
      user.update(args)
      return user, 201



# 设置路由
api.add_resource(GetToken, '/token',endpoint='get_token')
api.add_resource(UserListApi, '/users',endpoint='users')
api.add_resource(UserApi, '/users/<int:id>',endpoint='user')



我通过curl -H "Content-Type: application/json" -d '{"username":"wjx","password":"123"}' -i -X POST http://127.0.0.1:5000/api/v1.0/token获取到token
然后到项目python manage shell 里面用User.verify_auth_token()去验证token都没有问题

但是通过访问http://127.0.0.1:5000/api/v1.0/users/2通过heders 传Authorization参数去api view 里面去验证就回报反签名失败的错误Signature 'YkDiicE28E3j-y7i80HbVMGG4LR6XEFsAl6OTofMlps' does not match


查了一天都没找到办法

venux 发表于 2018-08-10 14:41

和content_type有关,而且verify_token不需要在函数中调用,框架会自动调用执行
页: [1]
查看完整版本: flask_restful token 反签名失败