(四) DRF认证, 权限, 节流

时间:2023-03-09 17:38:42
(四) DRF认证, 权限, 节流

一、Token 认证的来龙去脉

摘要

Token 是在服务端产生的。如果前端使用用户名/密码向服务端请求认证,服务端认证成功,那么在服务端会返回 Token 给前端。前端可以在每次请求的时候带上 Token 证明自己的合法地位

为什么要用 Token?

要回答这个问题很简单——因为它能解决问题!

可以解决哪些问题呢?

  1. Token 完全由应用管理,所以它可以避开同源策略

  2. Token 可以避免 CSRF 攻击

  3. Token 可以是无状态的,可以在多个服务间共享

Token 是在服务端产生的。如果前端使用用户名/密码向服务端请求认证,服务端认证成功,那么在服务端会返回 Token 给前端。前端可以在每次请求的时候带上 Token 证明自己的合法地位。如果这个 Token 在服务端持久化(比如存入数据库),那它就是一个永久的身份令牌。

时序图表示

使用 Token 的时序图如下:

1)登录

(四) DRF认证, 权限, 节流

2)业务请求

(四) DRF认证, 权限, 节流

关于token的详细信息,请参考链接:

https://blog.****.net/maxushan001/article/details/79222271

二、DRF 认证

DRF认证源码流程

https://www.cnblogs.com/derek1184405959/p/8712206.html

定义一个用户表和一个保存用户Token的表,models.py完整代码下:

from django.db import models
# 用户信息表
class UserInfo(models.Model):
username = models.CharField(max_length=, unique=True)
password = models.CharField(max_length=) type = models.SmallIntegerField(
choices=((, '普通用户'), (, 'VIP用户')),
default=
) # token
class Token(models.Model):
token = models.CharField(max_length=)
user = models.OneToOneField(to='UserInfo',on_delete=models.CASCADE)

token单独分一个表,是因为它是在原有用户表的功能扩展。不能对一个表无限的增加字段,否则会导致表原来越臃肿

在前后端分离的架构中,前端使用ajax请求发送给后端,它不能使用cookie/session。那么后端怎么知道这个用户是否登录了,是否是VIP用户呢?使用token就可以解决这个问题!

视图

def get_token_code(username):
"""
根据用户名和时间戳生成用户登陆成功的随机字符串
:param username: 字符串格式的用户名
:return: 字符串格式的Token
"""
import time
import hashlib
timestamp = str(time.time()) # 当前时间戳
m = hashlib.md5(bytes(username, encoding='utf8'))
m.update(bytes(timestamp, encoding='utf8')) # update必须接收一个bytes
return m.hexdigest() # 登陆视图
class LoginView(APIView):
"""
登陆检测视图
. 接收用户发过来(POST)的用户名和密码数据
. 校验用户名密码是否正确
- 成功就返回登陆成功(发Token)
- 失败就返回错误提示
""" def post(self, request): # POST请求
res = {"code": }
# 从post里面取数据
username = request.data.get("username")
password = request.data.get("password")
# 去数据库查询
user_obj = models.UserInfo.objects.filter(
username=username,
password=password,
).first()
if user_obj:
# 登陆成功
# 生成Token
token = get_token_code(username)
# 将token保存
# 用user=user_obj这个条件去Token表里查询
# 如果有记录就更新defaults里传的参数, 没有记录就用defaults里传的参数创建一条数据
models.Token.objects.update_or_create(defaults={"token": token}, user=user_obj)
# 将token返回给用户
res["token"] = token
else:
# 登录失败
res["code"] =
res["error"] = '用户名或密码错误'
return Response(res)

路由

from django.conf.urls import url
from app01 import views urlpatterns = [
url(r'login/$', views.LoginView.as_view()),
]

使用postman发送post登录

(四) DRF认证, 权限, 节流

查看返回结果,code为0表示登录成功,并返回一个token

(四) DRF认证, 权限, 节流

查看表app01_token,就会多一条记录

(四) DRF认证, 权限, 节流

定义一个认证类

在app01(应用名)目录下创建目录utils,在此目录下创建auth.py

from rest_framework.authentication import BaseAuthentication
from app01 import models
from rest_framework.exceptions import AuthenticationFailed class MyAuth(BaseAuthentication): def authenticate(self, request): # 必须要实现此方法
if request.method in ['POST', 'PUT', 'DELETE']:
token = request.data.get("token")
# 去数据库查询有没有这个token
token_obj = models.Token.objects.filter(token=token).first()
if token_obj:
# token_obj有2个属性,详见models.py中的Token。
# return后面的代码,相当于分别赋值。例如a=,b=2等同于a,b=,
# return多个值,返回一个元组
#在rest framework内部会将这两个字段赋值给request,以供后续操作使用
return token_obj.user, token # self.user, self.token = token_obj.user, token
else:
raise AuthenticationFailed('无效的token')
else:
return None, None

视图级别认证

修改views.py,完整代码如下:

from django.shortcuts import render, HttpResponse
from app01 import models
from app01 import app01_serializers # 导入自定义的序列化
from rest_framework.viewsets import ModelViewSet
from rest_framework.views import APIView
from rest_framework.response import Response
from app01.utils.auth import MyAuth # app01.utils.auth表示app01目录下的utils下的auth.py # Create your views here. # 生成Token的函数
def get_token_code(username):
"""
根据用户名和时间戳生成用户登陆成功的随机字符串
:param username: 字符串格式的用户名
:return: 字符串格式的Token
"""
import time
import hashlib
timestamp = str(time.time()) # 当前时间戳
m = hashlib.md5(bytes(username, encoding='utf8'))
m.update(bytes(timestamp, encoding='utf8')) # update必须接收一个bytes
return m.hexdigest() # 登陆视图
class LoginView(APIView):
"""
登陆检测视图
. 接收用户发过来(POST)的用户名和密码数据
. 校验用户名密码是否正确
- 成功就返回登陆成功(发Token)
- 失败就返回错误提示
""" def post(self, request): # POST请求
res = {"code": }
# 从post里面取数据
username = request.data.get("username")
password = request.data.get("password")
# 去数据库查询
user_obj = models.UserInfo.objects.filter(
username=username,
password=password,
).first()
if user_obj:
# 登陆成功
# 生成Token
token = get_token_code(username)
# 将token保存
# 用user=user_obj这个条件去Token表里查询
# 如果有记录就更新defaults里传的参数, 没有记录就用defaults里传的参数创建一条数据
models.Token.objects.update_or_create(defaults={"token": token}, user=user_obj)
# 将token返回给用户
res["token"] = token
else:
# 登录失败
res["code"] =
res["error"] = '用户名或密码错误'
return Response(res) class CommentViewSet(ModelViewSet):
queryset = models.Comment.objects.all()
serializer_class = app01_serializers.CommentSerializer
authentication_classes = [MyAuth, ] # 局部使用认证方法MyAuth

发送一个空的post请求,返回结果如下:

(四) DRF认证, 权限, 节流

发送一个错误的token

(四) DRF认证, 权限, 节流

返回结果:

(四) DRF认证, 权限, 节流

注意:这个信息是由raise AuthenticationFailed('无效的token')触发的。

如果想在MyAuth类-->authenticate方法-->代码else中触发别的信息,也同样需要定义raise

全局级别认证

要想让每一个视图都要认证,可以在settings.py中配置

REST_FRAMEWORK = {
# 表示app01-->utils下的auth.py里面的MyAuth类
"DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.auth.MyAuth", ]
}

修改views.py,注释掉CommentViewSet中的authentication_classes

再次测试上面的3种请求方式,效果同上!

三、DRF权限

权限源码流程

请参考链接:

http://www.cnblogs.com/derek1184405959/p/8722212.html

自定义一个权限类 has_permission

注意:当返回一个对象时,才会触发

什么对象呢?json对象!why?

在CommentViewSet视图中,它会返回一个json数据

http://127.0.0.1:8000/api/comment/  它会返回一个json列表

http://127.0.0.1:8000/api/comment/1 它会返回一个json对象。

当使用了权限类后,类中有has_permission,就有触发

举例:

在目录app01-->utils下面新建文件permission.py

"""
自定义的权限类
"""
from rest_framework.permissions import BasePermission class MyPermission(BasePermission):
def has_permission(self, request, view):
"""
判断该用户有没有权限
"""
# 判断用户是不是VIP用户
# 如果是VIP用户就返回True
# 如果是普通用户就返回False
print('我要进行自定义的权限判断啦....')
print(request)
print(request.user)
return True

视图级别配置

修改views.py,指定permission_classes

from django.shortcuts import render, HttpResponse
from app01 import models
from app01 import app01_serializers # 导入自定义的序列化
from rest_framework.viewsets import ModelViewSet
from rest_framework.views import APIView
from rest_framework.response import Response
from app01.utils.auth import MyAuth # app01.utils.auth表示app01目录下的utils下的auth.py
from app01.utils.permission import MyPermission # Create your views here. # 生成Token的函数
def get_token_code(username):
"""
根据用户名和时间戳生成用户登陆成功的随机字符串
:param username: 字符串格式的用户名
:return: 字符串格式的Token
"""
import time
import hashlib
timestamp = str(time.time()) # 当前时间戳
m = hashlib.md5(bytes(username, encoding='utf8'))
m.update(bytes(timestamp, encoding='utf8')) # update必须接收一个bytes
return m.hexdigest() # 登陆视图
class LoginView(APIView):
"""
登陆检测视图
. 接收用户发过来(POST)的用户名和密码数据
. 校验用户名密码是否正确
- 成功就返回登陆成功(发Token)
- 失败就返回错误提示
""" def post(self, request): # POST请求
res = {"code": }
# 从post里面取数据
username = request.data.get("username")
password = request.data.get("password")
# 去数据库查询
user_obj = models.UserInfo.objects.filter(
username=username,
password=password,
).first()
if user_obj:
# 登陆成功
# 生成Token
token = get_token_code(username)
# 将token保存
# 用user=user_obj这个条件去Token表里查询
# 如果有记录就更新defaults里传的参数, 没有记录就用defaults里传的参数创建一条数据
models.Token.objects.update_or_create(defaults={"token": token}, user=user_obj)
# 将token返回给用户
res["token"] = token
else:
# 登录失败
res["code"] =
res["error"] = '用户名或密码错误'
return Response(res) class CommentViewSet(ModelViewSet):
queryset = models.Comment.objects.all()
serializer_class = app01_serializers.CommentSerializer
# authentication_classes = [MyAuth, ] # 局部使用认证方法MyAuth
permission_classes = [MyPermission, ] # 局部使用权限方法

发送get请求

(四) DRF认证, 权限, 节流

查看Pycharm控制台输出:

我要进行自定义的权限判断啦....
<rest_framework.request.Request object at 0x000002576A780FD0>
None

发现用户为None,它没有触发has_permission

访问单个评论,返回单个json对象

(四) DRF认证, 权限, 节流

查看Pycharm控制台输出:

我要进行自定义的权限判断啦....
<rest_framework.request.Request object at 0x000002576A780FD0>
这是在自定义权限类中的has_object_permission

它触发了has_permission,并输出了一段话

注意:json对象中,它增加一个属性user,值为null。为什么会增加呢?

因为在源码中,Request有个user方法,加 @property。它对返回结果做了在再次封装!

详情,请参考上面的权限源码流程

普通用户

发送post请求,写一个正确的token,用zhang用户的token

(四) DRF认证, 权限, 节流

查看Pycharm控制台输出:

我要进行自定义的权限判断啦....
<rest_framework.request.Request object at 0x000002576A9852B0>
UserInfo object

此时得到了一个用户对象

修改permission.py,获取用户名以及用户类型

"""
自定义的权限类
"""
from rest_framework.permissions import BasePermission class MyPermission(BasePermission):
def has_permission(self, request, view):
"""
判断该用户有没有权限
"""
# 判断用户是不是VIP用户
# 如果是VIP用户就返回True
# 如果是普通用户就返回False
print('我要进行自定义的权限判断啦....')
print(request)
print(request.user.username)
print(request.user.type)
return True

再次发送同样的post请求,再次查看Pycharm控制台输出

<rest_framework.request.Request object at 0x000001D893AC4048>
zhang

居然得到了zhang和1。为什么呢?为什么request.user.username就能得到用户名呢?

我来大概解释一下,先打开这篇文章:

https://www.cnblogs.com/derek1184405959/p/8712206.html

我引用里面几句话

Request有个user方法,加 @property 表示调用user方法的时候不需要加括号“user()”,可以直接调用:request.user

在rest framework内部会将这两个字段赋值给request,以供后续操作使用

return (token_obj.user,token_obj)

上面的return的值,来源于app01\utils\auth.py里面的MyAuth类中的return token_obj.user, token

简单来说,通过认证之后,它会request进行再次封装,所以调用request.user时,得到了一个对象

这个对象就是执行models.Token.objects.filter(token=token).first()的结果

如果ORM没有查询出结果,它就一个匿名用户!

修改permission.py,如果是VIP返回True,否则返回False

"""
自定义的权限类
"""
from rest_framework.permissions import BasePermission class MyPermission(BasePermission):
def has_permission(self, request, view):
"""
判断该用户有没有权限
"""
# 判断用户是不是VIP用户
# 如果是VIP用户就返回True
# 如果是普通用户就返回False
print('我要进行自定义的权限判断啦....')
# print(request)
print(request.user.username)
print(request.user.type)
if request.user.type == : # 是VIP用户
return True
else:
return False

修改settings.py,关闭全局级别认证

REST_FRAMEWORK = {
# 表示app01-->utils下的auth.py里面的MyAuth类
# "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.auth.MyAuth", ]
}

使用VIP用户wang登录

(四) DRF认证, 权限, 节流

查看返回结果,它返回wang的token

(四) DRF认证, 权限, 节流

查看表app01_token,它现在有2个记录了

(四) DRF认证, 权限, 节流

复制zhang的token,发送一条评论

(四) DRF认证, 权限, 节流

查看返回结果,提示您没有执行此操作的权限

(四) DRF认证, 权限, 节流

英文看不懂,没关系,定义成中文就行了

修改permission.py,定义message

"""
自定义的权限类
"""
from rest_framework.permissions import BasePermission class MyPermission(BasePermission):
message = '您没有执行此操作的权限!'
def has_permission(self, request, view):
"""
判断该用户有没有权限
"""
# 判断用户是不是VIP用户
# 如果是VIP用户就返回True
# 如果是普通用户就返回False
print('我要进行自定义的权限判断啦....')
# print(request)
print(request.user.username)
print(request.user.type)
if request.user.type == : # 是VIP用户
return True
else:
return False

再次发送,返回结果如下:

(四) DRF认证, 权限, 节流

VIP用户

将token改成VIP用户测试

(四) DRF认证, 权限, 节流

查看返回结果

(四) DRF认证, 权限, 节流

修改permission.py,定义发送类型

"""
自定义的权限类
"""
from rest_framework.permissions import BasePermission class MyPermission(BasePermission):
message = '您没有执行此操作的权限!'
def has_permission(self, request, view):
"""
判断该用户有没有权限
"""
# 判断用户是不是VIP用户
# 如果是VIP用户就返回True
# 如果是普通用户就返回False
print('我要进行自定义的权限判断啦....') if request.method in ['POST', 'PUT', 'DELETE']:
print(request.user.username)
print(request.user.type)
if request.user.type == : # 是VIP用户
return True
else:
return False
else:
return True

发送正确的值

(四) DRF认证, 权限, 节流

查看返回结果

(四) DRF认证, 权限, 节流

查看表app01_comment记录

(四) DRF认证, 权限, 节流

全局级别设置

修改settings.py,增加一行

REST_FRAMEWORK = {
# 表示app01-->utils下的auth.py里面的MyAuth类
# "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.auth.MyAuth", ]
"DEFAULT_PERMISSION_CLASSES": ["app01.utils.permission.MyPermission", ]
}

修改views.py,注释局部的

四、DRF节流

DRF节流源码分析

请参考链接:

http://www.cnblogs.com/derek1184405959/p/8722638.html

自定义限制类

对IP做限制,60秒只能访问3次

在about_drf\app01\utils下面创建throttle.py

"""
自定义的访问限制类
"""
from rest_framework.throttling import BaseThrottle, SimpleRateThrottle
import time D = {} # {'127.0.0.1': [, ,...]} class MyThrottle(BaseThrottle): def allow_request(self, request, view):
"""
返回True就放行,返回False表示被限制了...
"""
# . 获取当前访问的IP
ip = request.META.get("REMOTE_ADDR")
print('这是自定义限制类中的allow_request')
print(ip)
# . 获取当前的时间
now = time.time()
# 判断当前ip是否有访问记录
if ip not in D:
D[ip] = [] # 初始化一个空的访问历史列表
# 高端骚操作
history = D[ip]
while history and now - history[-] > :
history.pop()
# 判断最近一分钟的访问次数是否超过了阈值(3次)
if len(history) >= :
return False
else:
# 把这一次的访问时间加到访问历史列表的第一位
D[ip].insert(, now)
return True

代码解释:

request.META.get("REMOTE_ADDR")  获取远程IP

D  存储的值,类似于

"192.168.1.2":["17:06:45","12:04:03","12:04:01"]

最后一个元素,就是最先开始的时间

for循环列表,不能对列表做更改操作!所以使用while循环

while history and now - history[-] > :
history.pop()

history是历史列表,history[-1] 表示列表最后一个元素

history and now - history[-1] > 10 表示当历史列表中有元素,并且当前时间戳减去最后一个元素的时间戳大于10的时候,执行history.pop(),表示删除最后一个元素

当历史列表为空时,或者小于差值小于10的时候,结束循环。

视图使用

修改views.py

from django.shortcuts import render, HttpResponse
from app01 import models
from app01 import app01_serializers # 导入自定义的序列化
from rest_framework.viewsets import ModelViewSet
from rest_framework.views import APIView
from rest_framework.response import Response
from app01.utils.auth import MyAuth # app01.utils.auth表示app01目录下的utils下的auth.py
from app01.utils.permission import MyPermission
from app01.utils.throttle import MyThrottle # Create your views here. # 生成Token的函数
def get_token_code(username):
"""
根据用户名和时间戳生成用户登陆成功的随机字符串
:param username: 字符串格式的用户名
:return: 字符串格式的Token
"""
import time
import hashlib
timestamp = str(time.time()) # 当前时间戳
m = hashlib.md5(bytes(username, encoding='utf8'))
m.update(bytes(timestamp, encoding='utf8')) # update必须接收一个bytes
return m.hexdigest() # 登陆视图
class LoginView(APIView):
"""
登陆检测视图
. 接收用户发过来(POST)的用户名和密码数据
. 校验用户名密码是否正确
- 成功就返回登陆成功(发Token)
- 失败就返回错误提示
""" def post(self, request): # POST请求
res = {"code": }
# 从post里面取数据
username = request.data.get("username")
password = request.data.get("password")
# 去数据库查询
user_obj = models.UserInfo.objects.filter(
username=username,
password=password,
).first()
if user_obj:
# 登陆成功
# 生成Token
token = get_token_code(username)
# 将token保存
# 用user=user_obj这个条件去Token表里查询
# 如果有记录就更新defaults里传的参数, 没有记录就用defaults里传的参数创建一条数据
models.Token.objects.update_or_create(defaults={"token": token}, user=user_obj)
# 将token返回给用户
res["token"] = token
else:
# 登录失败
res["code"] =
res["error"] = '用户名或密码错误'
return Response(res) class CommentViewSet(ModelViewSet):
queryset = models.Comment.objects.all()
serializer_class = app01_serializers.CommentSerializer
authentication_classes = [MyAuth, ] # 局部使用认证方法MyAuth
# permission_classes = [MyPermission, ] # 局部使用权限方法
throttle_classes = [MyThrottle, ] # 局部使用限制方法

使用postman发送GET请求

疯狂的点击SEND按钮,多发送几次

(四) DRF认证, 权限, 节流

提示请求达到了限制

(四) DRF认证, 权限, 节流

等待十几秒,就可以访问了

全局使用

修改settings.py

REST_FRAMEWORK = {
# 表示app01-->utils下的auth.py里面的MyAuth类
# "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.auth.MyAuth", ],
#"DEFAULT_PERMISSION_CLASSES": ["app01.utils.permission.MyPermission", ],
"DEFAULT_THROTTLE_CLASSES": ["app01.utils.throttle.MyThrottle", ]
}

修改views.py,注释掉代码

使用内置限制类

修改about_drf\app01\utils\throttle.py

"""
自定义的访问限制类
"""
from rest_framework.throttling import BaseThrottle, SimpleRateThrottle
# import time
#
# D = {} # {'127.0.0.1': [, ,...]}
#
#
# class MyThrottle(BaseThrottle):
#
# def allow_request(self, request, view):
#
# """
# 返回True就放行,返回False表示被限制了...
# """
# # . 获取当前访问的IP
# ip = request.META.get("REMOTE_ADDR")
# print('这是自定义限制类中的allow_request')
# print(ip)
# # . 获取当前的时间
# now = time.time()
# # 判断当前ip是否有访问记录
# if ip not in D:
# D[ip] = [] # 初始化一个空的访问历史列表
# # 高端骚操作
# history = D[ip]
# while history and now - history[-] > :
# history.pop()
# # 判断最近一分钟的访问次数是否超过了阈值(3次)
# if len(history) >= :
# return False
# else:
# # 把这一次的访问时间加到访问历史列表的第一位
# D[ip].insert(, now)
# return True class MyThrottle(SimpleRateThrottle): scope = "rate" # rate是名字,可以随便定义! def get_cache_key(self, request, view):
return self.get_ident(request)

注意:scope是关键字参数

get_cache_key 的名字不能变动

self.get_ident(request)  表示远程IP地址

全局配置

REST_FRAMEWORK = {
# 表示app01-->utils下的auth.py里面的MyAuth类
# "DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.auth.MyAuth", ]
"DEFAULT_PERMISSION_CLASSES": ["app01.utils.permission.MyPermission", ],
"DEFAULT_THROTTLE_CLASSES": ["app01.utils.throttle.MyThrottle", ],
"DEFAULT_THROTTLE_RATES": {
"rate": "3/m",
}
}

注意:rate对应的是throttle.py里面MyThrottle定义的scope属性的值

3/m 表示1分钟3次

再次测试,效果如下:

(四) DRF认证, 权限, 节流

它还会返回倒计时的时间!