- 论坛徽章:
- 3
|
------models--------文件
#coding=utf8
from app import db,login_manager
from config import Config
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import UserMixin
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from itsdangerous import SignatureExpired,BadSignature
class User(UserMixin,db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key = True)
username = db.Column(db.String(32), index = True)
password_hash = db.Column(db.String(12 )
email = db.Column(db.String(30))
def generate_auth_token(self, expiration = 3600):
s = Serializer('mystr',expires_in = expiration)
return s.dumps({ 'id': self.id })
@staticmethod
def verify_auth_token(token):
print token
s = Serializer('mystr')
try:
data = s.loads(token)
except SignatureExpired:
print "过期"
return None # valid token, but expired
except BadSignature as bs:
print bs
return None # invalid token
user = User.query.get(data['id'])
return user
--------api view----------------文件
#coding:utf-8
from flask import abort,url_for
from flask_restful import reqparse,Resource,abort,fields, marshal_with
from . import api
from flask_httpauth import HTTPBasicAuth,HTTPTokenAuth
from app.utils import Fix_Url,serialize
from app.account.models import User
# ...
# auth = HTTPBasicAuth()
# 认证相关
auth = HTTPTokenAuth(scheme="token"
# auth = HTTPTokenAuth()
# TOKENS = {
# "fejiasdfhu",
# "fejiuufjeh"
# }
@auth.verify_token
def verify_token(token):
res=User.verify_auth_token(token)
print res,"123"
if res:
return True
# user=User.query.all()
users=[ serialize(user) for user in User.query.all()]
def abort_if_todo_doesnt_exist(id):
user = list(filter(lambda t:t['id']==int(id),users))
if not user:
abort(404, message="Todo {} doesn't exist".format(id))
else:
return user[0]
class GetToken(Resource):
def __init__(self):
self.reqparse = reqparse.RequestParser()
self.reqparse.add_argument('username', type = str, required = True,
help = 'No username provided', location=['json','form','args'])
self.reqparse.add_argument('password', type = str, required = True,
help = 'No user password provided', location=['json','form','args'])
super(GetToken, self).__init__()
def post(self):
"""Can be execute when receive HTTP Method `POST`."""
args = self.reqparse.parse_args()
user = User.query.filter_by(username=args['username']).first()
# Check the args['password'] whether as same as user.password.
if user.verify_password(args['password']):
# serializer object will be saved the token period of time.
token=user.generate_auth_token()
print type(token)
return {'token': token}
else:
abort(401)
# # 操作(put / get / delete)单一资源Todo
# shows a single todo item and lets you delete a todo item
class UserApi(Resource):
decorators = [auth.login_required]
def __init__(self):
self.reqparse = reqparse.RequestParser()
self.reqparse.add_argument('Authorization', type = str, location = ['headers'])
self.reqparse.add_argument('id', type = int, location = ['json','form','args','headers'])
self.reqparse.add_argument('username', type = str, location = ['json','form','args'])
self.reqparse.add_argument('email', type = str, location = ['json','form','args'])
super(UserApi, self).__init__()
user_fields = {
'id': fields.Integer,
'username': fields.String,
'email': fields.String,
# 'password_hash': fields.FormattedString('********'),
}
@marshal_with(user_fields)
def get(self, id):
print self.reqparse.parse_args()
token=self.reqparse.parse_args().get('Authorization').strip("token "
vres=verify_token(token)
print vres
print "abcd"
if vres:
print "hehe"
return abort_if_todo_doesnt_exist(id)
else:
print "haha"
def delete(self, id):
task=abort_if_todo_doesnt_exist(id)
users.remove(task)
return users, 204
def put(self, id):
args = self.reqparse.parse_args()
user=abort_if_todo_doesnt_exist(id)
if args.has_key('id'):
abort(500, message="user's ID can not be change"
for key in args.keys():
if key =='id' or not args[key]:
args.pop(key)
user.update(args)
return user, 201
# 设置路由
api.add_resource(GetToken, '/token',endpoint='get_token')
api.add_resource(UserListApi, '/users',endpoint='users')
api.add_resource(UserApi, '/users/<int:id>',endpoint='user')
我通过curl -H "Content-Type: application/json" -d '{"username":"wjx","password":"123"}' -i -X POST http://127.0.0.1:5000/api/v1.0/token 获取到token
然后到项目python manage shell 里面用User.verify_auth_token()去验证token都没有问题
但是通过访问http://127.0.0.1:5000/api/v1.0/users/2 通过heders 传Authorization参数去api view 里面去验证就回报反签名失败的错误Signature 'YkDiicE28E3j-y7i80HbVMGG4LR6XEFsAl6OTofMlps' does not match
查了一天都没找到办法
|
|