rest-framework框架——认证、权限、频率组件

时间:2023-03-09 02:21:43
rest-framework框架——认证、权限、频率组件

一、rest-framework登录验证

  网站登录之后就有个人中心可以对个人信息进行修改,但是在每次向服务器发请求时,由于http是无状态的,导致每次都是新的请求。

  服务端则需要对每次请求都进行认证,确认用户是否登录及登录用户是谁。

  因此需要将认证逻辑抽离出来,之前的做法主要是加装饰器或中间件。在前后端不分离的项目中通常是使用cookie和session,前后端分离的项目通常是使用token。

1、models.py添加User和Token模型

class User(models.Model):
name = models.CharField(max_length=32)
pwd = models.CharField(max_length=32) class Token(models.Model):
user = models.OneToOneField("User", on_delete=models.CASCADE)
token = models.CharField(max_length=128) def __str__(self):
return self.token

  添加后执行数据库迁移,添加app01_user和app01_token表。

2、给login配置url

urlpatterns = [
...
re_path(r'^login/$', views.LoginView.as_view(), name="login"),
]

3、配置视图

def get_random_str(user):
""" 生成随机字符串 """
import hashlib, time
ctime = str(time.time())
md5 = hashlib.md5(bytes(user, encoding='utf-8'))
md5.update(bytes(ctime, encoding="utf-8"))
return md5.hexdigest() class LoginView(APIView):
def post(self, request):
# 验证逻辑:获取用户名密码与数据库比对
name = request.data.get("name")
pwd = request.data.get("pwd")
user = User.objects.filter(name=name, pwd=pwd).first()
res = {"state_code": 1000, "msg": None} # 成功或失败需要返回的字典标识这次的状态
if user:
# 通过校验 拿到随机字符串交给这个人
random_str = get_random_str(user.name) # 获取随机字符串
token = Token.objects.update_or_create(user=user, defaults={"token": random_str})
res["token"] = str(token) # json不能序列化对象,因此转为字符串
else:
# 校验失败
res["state_code"]=1001 # 错误状态码
res["msg"] = "用户名或密码错误" return Response(json.dumps(res))

  注意:

(1)登录验证逻辑

class LoginView(APIView):
def post(self, request):
# 验证逻辑:获取用户名密码与数据库比对
name = request.data.get("name")
pwd = request.data.get("pwd")
user = User.objects.filter(name=name, pwd=pwd).first()
if user:
# 通过校验 拿到随机字符串交给这个人
pass
else:
# 校验失败
pass return Response("login....")

(2)生成随机字符串

def get_random_str(user):
""" 生成随机字符串 """
import hashlib, time
ctime = str(time.time())
md5 = hashlib.md5(bytes(user, encoding='utf-8'))
md5.update(bytes(ctime, encoding="utf-8"))
return md5.hexdigest()

  注意:ctime=str(time.time()) ,世界上一直在变化的就是时间变量,因此ctime这个变量每个都是不同的。

         hashlib.md5()构建md5对象。实例化md5时传递参数叫做加盐:

md5 = hashlib.md5(bytes(user, encoding='utf-8'))

  md5.update()写入要加密的字节:(这里的md5是实例化出来的对象)

md5.update(bytes(ctime, encoding="utf-8"))

  md5_obj.hexdigest():获取密文

return md5.hexdigest()

  加盐之后,即使要加密的数据完全一样,但是用户名肯定不一样,因此产生的密文一定唯一。

(3)update_or_create(self, defaults=None, **kwargs)方法

Token.objects.update_or_create(user=user,defaults={"token":random_str})

  用给定的**kwargs值查找对象(这里是user=user),如果defaults不为None则用defaults的值{"token":random_str}更新对象;如果为None则创建一个新对象。

class QuerySet:
def update_or_create(self, defaults=None, **kwargs):
"""
Look up an object with the given kwargs, updating one with defaults
if it exists, otherwise create a new one.
Return a tuple (object, created), where created is a boolean
specifying whether an object was created.
"""
defaults = defaults or {}

  如果是create操作返回值是添加的数据,如果是update操作返回值是更新的条数。

二、局部视图认证

from rest_framework import exceptions

class TokenAuth(object):   # 这个类名可以任意取
def authenticate(self, request): # 这个方法名不可变动
token = request.GET.get("token")
token_obj = Token.objects.filter(token=token).first()
if not token_obj:
raise exceptions.AuthenticationFailed("验证失败!")
# 如果有值 return两个值中间加逗号,就构成了一个元组
return token_obj.user.name, token_obj.token # 元组:(关联用户对象的名字,当前登录对象token) def authenticate_header(self, request): # 不加会报错,要求要传入两个参数
pass class BookView(APIView):
authentication_classes = [TokenAuth, ] def get(self, request):
book_list = Book.objects.all()
bs = BookModelSerializers(book_list, many=True, context={"request": request}) # 序列化结果
# return HttpResponse(bs.data)
return Response(bs.data) def post(self, request):
# POST请求的数据
bs = BookModelSerializers(data=request.data)
if bs.is_valid(): # 验证数据是否合格
print(bs.validated_data)
bs.save() # create方法
return Response(bs.data) # 当前添加的数据
else:
return Response(bs.errors)

1、分析源码

  每次请求都要执行dispatch。

(1)dispatch分发前的认证权限验证

  在用户访问时执行APIView的dispatch方法,在dispatch进行分发操作前,需要先执行self.initial(request, *args, **kwargs),执行认证、权限、频率操作。

def dispatch(self, request, *args, **kwargs):
self.args = args
self.kwargs = kwargs
request = self.initialize_request(request, *args, **kwargs) # 构建新request
self.request = request
self.headers = self.default_response_headers # deprecate? try:
self.initial(request, *args, **kwargs) # 认证权限频率 # Get the appropriate handler method
if request.method.lower() in self.http_method_names:
handler = getattr(self, request.method.lower(),
self.http_method_not_allowed)
else:
handler = self.http_method_not_allowed response = handler(request, *args, **kwargs) except Exception as exc:
response = self.handle_exception(exc) self.response = self.finalize_response(request, response, *args, **kwargs)
return self.response

(2)initial()方法中的认证权限频率组件

class APIView(View):
def initial(self, request, *args, **kwargs):
"""代码省略"""
# Ensure that the incoming request is permitted
# 初始化认证组件
self.perform_authentication(request)
# 权限组件
self.check_permissions(request)
# 频率组件
self.check_throttles(request)

(3)查看perform_authentication()方法

def perform_authentication(self, request):
request.user

  由于在dispatch函数中,self.initial(request, *args, **kwargs)晚于request = self.initialize_request(request, *args, **kwargs)。因此这里的request是新构建的request。request.user即需要去Request类中寻找user静态方法。

  这个新request通过initialize_request方法返回Request类对象:

def initialize_request(self, request, *args, **kwargs):
parser_context = self.get_parser_context(request) return Request(
request,
parsers=self.get_parsers(),
authenticators=self.get_authenticators(),
negotiator=self.get_content_negotiator(),
parser_context=parser_context
)

(4)Request类中有方法user

  紧跟with后面的语句会被求值,返回对象的__enter__()方法被调用,这个方法的返回值将被赋值给as关键字后面的变量,当with后面的代码块全部被执行完之后,将调用前面返回对象的__exit__()方法。

  with语句最关键的地方在于被求值对象必须有__enter__()和__exit__()这两个方法。

  由此可知 self._authenticate()一定会执行。

@property
def user(self):
"""
Returns the user associated with the current request, as authenticated
by the authentication classes provided to the request.
  当request已经通过认证类提供的认证,返回当前请求关联的用户
"""
if not hasattr(self, '_user'):
with wrap_attributeerrors():
self._authenticate()
return self._user

  注意:@property装饰器就是负责把一个方法变成属性调用。未通过验证的需要用self._authenticate()方法来进行校验。

(5)_authenticate方法分析(认证核心)

  循环 self.authenticators,拿到我们配置的每个认证类的实例化对象。

  配置的认证类中一定要实现authenticate()方法,否则一定会报错。

  需要注意的是:这里的 self 就是 request。authenticate()方法的返回值是一个元组——user_auth_tuple

class Request:
def _authenticate(self):
"""
Attempt to authenticate the request using each authentication instance in turn.
"""
# 循环 self.authenticators,拿到我们配置的每个认证类的实例化对象。
for authenticator in self.authenticators:
try:
# 配置的认证类中一定要实现authenticate()方法,否则一定会报错
# 这里的 self 就是 request。authenticate()方法的返回值是一个元组——user_auth_tuple
user_auth_tuple = authenticator.authenticate(self)
except exceptions.APIException:
self._not_authenticated()
raise if user_auth_tuple is not None:
self._authenticator = authenticator
# 元组的值赋给:request.user和request.auth
self.user, self.auth = user_auth_tuple
return self._not_authenticated()

  继续追溯 self.authenticators 的来源是Request实例化时传入的:

class Request:
"""
Wrapper allowing to enhance a standard `HttpRequest` instance. Kwargs:
- request(HttpRequest). The original request instance.
- parsers_classes(list/tuple). The parsers to use for parsing the
request content.
- authentication_classes(list/tuple). The authentications used to try
authenticating the request's user.
""" def __init__(self, request, parsers=None, authenticators=None,
negotiator=None, parser_context=None):
assert isinstance(request, HttpRequest), (
'The `request` argument must be an instance of '
'`django.http.HttpRequest`, not `{}.{}`.'
.format(request.__class__.__module__, request.__class__.__name__)
) self._request = request
self.parsers = parsers or ()
self.authenticators = authenticators or ()
...

(6)追溯self.authenticators

  找到Request实例化的方法:initialize_request

class APIView(View):
def initialize_request(self, request, *args, **kwargs):
return Request(
authenticators=self.get_authenticators(),
)

  再由此找到get_authenticators方法:

class APIView(View):
def get_authenticators(self):
"""
Instantiates and returns the list of authenticators that this view can use.
"""
return [auth() for auth in self.authentication_classes]

  继续追溯 self.authentication_classes:

# rest_framework/views.py
class APIView(View): # The following policies may be set at either globally, or per-view.
renderer_classes = api_settings.DEFAULT_RENDERER_CLASSES
parser_classes = api_settings.DEFAULT_PARSER_CLASSES
authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES

  self.authentication_classes 去配置文件中拿所有的认证类

  authentication_classes就是我们在视图函数中定义的列表:

class BookView(APIView):
authentication_classes = [TokenAuth, ]
def get(self, request):... def post(self, request):...

  [auth() for auth in self.authentication_classes]这个语句的含义:循环每一个认证类并进行实例化,放在数组中。以[TokenAuth, ]为例返回值是[TokenAuth(), ]。

  因此回传回去authenticators=[TokenAuth(), ]。

(7)_authenticate方法处理

class Request(object):
def __init__(self, request, parsers=None, authenticators=None,
negotiator=None, parser_context=None):
self._request = request
self.parsers = parsers or ()
self.authenticators = authenticators or () # 如果是None返回空元组,如果有值返回authenticators
"""
print(3 and 0) # 0
print(0 and 2) # 0
print(0 or 1) # 1
print(4 or 1) # 4
""" def _authenticate(self):
"""
Attempt to authenticate the request using each authentication instance
in turn.
"""
for authenticator in self.authenticators: # [TokenAuth(), ], authenticator:TokenAuth()
try:
user_auth_tuple = authenticator.authenticate(self) # TokenAuth必须有authenticate方法
# authenticator.authenticate(self):是一个实例对象调用自己的实例方法,本不需要传self,这里一定是传的一个形参。
# 这个方法是在Request类中,追溯调用关系可知,这里的self是一个新request对象
except exceptions.APIException: # 抛出错误
self._not_authenticated() # 没有验证成功
raise if user_auth_tuple is not None: # 如果user_auth_tuple有值
self._authenticator = authenticator
self.user, self.auth = user_auth_tuple # 将元组的值赋给self.user和self.auth
return self._not_authenticated() # 没有验证成功

  authenticator.authenticate(self):是一个实例对象调用自己的实例方法,本不需要传self,这里一定是传的一个形参。这个方法是在Request类中,追溯调用关系可知,这里的self是一个新request对象。因此在构建自定义的TokenAuth时一定要在def authenticate(self, request):  添加request参数。

2、测试验证

  rest-framework框架——认证、权限、频率组件

  访问时添加数据库查到的token信息,验证通过:

  rest-framework框架——认证、权限、频率组件

  打印之前在_authenticate将元组的值赋给self.user和self.auth的值:

class BookView(APIView):
authentication_classes = [TokenAuth, ] def get(self, request):
print(request.user) # egon
print(request.auth) # 02aee930be6011068e24f68935d52b02

  可以看到正好对应authoerticate函数的返回值:return token_obj.user.name, token_obj.token.

  因此在视图中类中配置authentication_classes,即可实现局部认证。

3、BaseAuthentication模块引入规范简化认证类编写

from rest_framework import exceptions
from rest_framework.authentication import BaseAuthentication class TokenAuth(BaseAuthentication):
def authenticate(self, request):
token = request.GET.get("token")
token_obj = Token.objects.filter(token=token).first()
if not token_obj:
raise exceptions.AuthenticationFailed("验证失败!")
# 如果有值 return两个值中间加逗号,就构成了一个元组
return token_obj.user.name, token_obj.token # 元组:(关联用户对象的名字,当前登录对象token) # def authenticate_header(self, request): # 不加会报错,且要求要传入两个参数
# pass

  BaseAuthentication包含authenticate和authenticate_header函数。默认用来被覆盖。

三、全局视图认证

1、源码分析

  如果没有在局部定义authentication_classes=[TokenAuth, ]。回溯查找默认的authentication_classes。

(1)APIView有变量authentication_classes

class APIView(View):
authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES

  api_settings.DEFAULT_AUTHENTICATION_CLASSES是类的实例对象.属性的模式。当调用不存在的属性时,Python会试图调用__getattr__(self,attr)来获取属性,并且返回attr。

(2)查看api_settings

  该语句在rest_framework/settings.py中。发现api_settings是一个实例对象。实例化时执行相应的init方法。

api_settings = APISettings(None, DEFAULTS, IMPORT_STRINGS)

  DEFAULTS这个值是一个字典,每一个键后面都是跟着一个元组,保存了关于rest_framework所有默认配置。也定义在rest_framework/settings.py文件中。

  rest-framework框架——认证、权限、频率组件

(3)查看APISettings类__init__方法

class APISettings(object):
def __init__(self, user_settings=None, defaults=None, import_strings=None):
if user_settings:
self._user_settings = self.__check_user_settings(user_settings)
self.defaults = defaults or DEFAULTS
self.import_strings = import_strings or IMPORT_STRINGS
self._cached_attrs = set()

  user_settings默认为None,如果有值拿到user_settings.

  self.defaults = defaults or DEFAULTS  拿到DEFAULTS字典值。

(4)APISettings类的def __getattr__(self, attr)方法

class APISettings(object):
def __getattr__(self, attr):
if attr not in self.defaults:
raise AttributeError("Invalid API setting: '%s'" % attr) try:
# Check if present in user settings
val = self.user_settings[attr]
except KeyError: # 当self.user_settings的值是一个空字典,取值报错KeyError
# Fall back to defaults
val = self.defaults[attr] # 异常处理去取默认的DEFAULT值 # Coerce import strings into classes
if attr in self.import_strings:
val = perform_import(val, attr) # Cache the result
self._cached_attrs.add(attr)
setattr(self, attr, val)
return val

  __getattr__是python里的一个内建函数,可以很方便地动态返回一个属性;当调用不存在的属性时,Python会试图调用__getattr__(self,item)来获取属性,并且返回item;

class Person(object):
def __init__(self, name):
self.name = name def __getattr__(self, item):
print("item", item) def dream(self):
print("dreaming。。。。。") alex = Person("alex")
alex.yuan # 打印:item yuan

  val = self.user_settings[attr]:user_settings执行的返回值后面加上[attr]

(5)APISettings类的user_settings方法执行

class APISettings(object):
@property
def user_settings(self): # 静态方法
if not hasattr(self, '_user_settings'):
self._user_settings = getattr(settings, 'REST_FRAMEWORK', {})
return self._user_settings

  这里的settings指的是restDemo项目中的restDemo/settings.py。

  因此getattr(settings, 'REST_FRAMEWORK', {})  代表的意思是:去settings.py中去拿REST_FRAMEWORK变量,如果拿不到则取一个空字典。因此self._user_settings一定是一个字典。

  因此self.user_settings[attr]是在字典中取键attr的值.当字典为空时,取不到值会抛出Keyerror错误,进行异常处理去取默认的DEFAULT字典中的值:

DEFAULTS = {
"""省略代码"""
'DEFAULT_AUTHENTICATION_CLASSES': (
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.BasicAuthentication'
),
"""省略代码"""

2、在settings.py中配置全局视图

REST_FRAMEWORK = {
# 仿照DEFAULT配置认证类路径
"DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.TokenAuth"]
}

(1)REST_FRAMEWORK是一个字典

  键必须是DEFAULT_AUTHENTICATION_CLASSES

(2)值是认证类路径

  由于要指定认证类路径因此要把之前写的TokenAuth从views.py迁移到一个新文件(文件名自定义)中,这里是:/app01/utils.py。在这里定义认证类TokenAuth,认证类必须包含authenticate方法。

  注意:不能继承object类,必须继承基础认证类——BaseAuthentication。

from .models import *
from rest_framework import exceptions
from rest_framework.authentication import BaseAuthentication class TokenAuth(BaseAuthentication):
def authenticate(self, request):
token = request.GET.get("token")
token_obj = Token.objects.filter(token=token).first()
if not token_obj:
raise exceptions.AuthenticationFailed("验证失败!")
# 如果有值 return两个值中间加逗号,就构成了一个元组
return token_obj.user.name, token_obj.token # 元组:(关联用户对象的名字,当前登录对象token)

3、在全局认证情况下,配置部分访问不进行认证

class LoginView(APIView):
authentication_classes = [] # 配置为空,优先级高于全局认证,不用进行认证
def post(self, request):....

四、权限组件

  权限是对某件事情决策的范围和程度,权限在项目开发中经常会用到。视频网站中,很多视频需要vip用户才能观看,甚至还有一些视频需要svip、ssvip才能观看。

1、权限组件源码分析

  DRF的权限与版本、认证、频率组件都非常相似,均是在initial方法里初始化的。一般来说权限基于登录,因此在DRF源码中先执行版本和认证才执行权限。

(1)访问请求交给APIView类下的dispatch方法处理

class APIView(View):
def dispatch(self, request, *args, **kwargs):
self.args = args
self.kwargs = kwargs
request = self.initialize_request(request, *args, **kwargs)
self.request = request
self.headers = self.default_response_headers # deprecate? try:
self.initial(request, *args, **kwargs)
"""代码省略"""

  在dispatch中使用的initial()方法,包含了权限组件:

def initial(self, request, *args, **kwargs):
  """代码省略"""
# Ensure that the incoming request is permitted
# 认证组件
self.perform_authentication(request)
# 权限组件
self.check_permissions(request)
# 频率组件
self.check_throttles(request)

(2)权限函数——check_permissions

  这里循环拿到的permission是我们配置的每一个权限类的实例对象(MyPermission等);

  permission_denied用于抛出异常,因此权限类必须有has_permission方法,否则将抛出异常。

  注意:这里的self一直是我们视图类的实例化对象

  失败抛出异常时,通过反射在实例化对象中找message属性,因此可以在权限类中定义message来定义异常信息。

class APIView(View):
def check_permissions(self, request):
"""
Check if the request should be permitted.
Raises an appropriate exception if the request is not permitted.
"""
for permission in self.get_permissions(): # 循环的是[SVIPPermission(), ] permission是SVIPPermission()——权限实例
if not permission.has_permission(request, self): # 由此可见权限类必须有has_permission方法
        # 通过权限认证什么都不用做,不通过执行以下代码
self.permission_denied(
request, message=getattr(permission, 'message', None)
)

  self.get_permissions()是权限类实例对象组成的列表。

  permission.has_permission()是权限类中的权限方法。

  self.permission_denied()是在权限没有权限方法时,抛出异常。

  1)查看get_permissions方法

class APIView(View):
def get_permissions(self):
"""
Instantiates and returns the list of permissions that this view requires.
"""
return [permission() for permission in self.permission_classes]

  [permission() for permission in self.permission_classes]与认证组件完全类似:循环每一个权限类并进行实例化,组成一个列表返回。

  2)查看确认permission_classes的值

class APIView(View):
# The following policies may be set at either globally, or per-view.
authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
permission_classes = api_settings.DEFAULT_PERMISSION_CLASSES

  如果在自己的视图类中定义了permission_classes(优先取当前视图的局部配置) ,说明配置了局部视图权限,就取自己定义的。

  如果没有定义:

  由于权限组件依然是用了api_settings这个APISettings类实例对象,实例化时执行__init__函数,因此也执行了user_settings静态方法。

class APISettings(object):
def __init__(self, user_settings=None, defaults=None, import_strings=None):
if user_settings:
self._user_settings = self.__check_user_settings(user_settings)
self.defaults = defaults or DEFAULTS def user_settings(self):
if not hasattr(self, '_user_settings'):
self._user_settings = getattr(settings, 'REST_FRAMEWORK', {}) # 去settings.py中去拿REST_FRAMEWORK变量,如果拿不到则取一个空字典
return self._user_settings

  这里user_settings函数通过反射取settings下是否有配置‘REST_FRAMEWORK’,如果有配置即说明配置了全局视图权限。self._user_settings取配置的值。

  getattr(settings, 'REST_FRAMEWORK', {})里的settings是从django引入过来的:

from django.conf import settings

  如果没有配置则self._user_settings是一个空字典,需要进一步分析api_settings.DEFAULT_PERMISSION_CLASSES:

  APISettings类中包含__getattr__方法,当调用不存在的属性时,Python会试图调用__getattr__(self,item)来获取属性,并且返回item:

class APISettings(object):
def __getattr__(self, attr):
if attr not in self.defaults:
raise AttributeError("Invalid API setting: '%s'" % attr) try:
# Check if present in user settings
val = self.user_settings[attr] # 全局权限视图未设置时self.user_settings返回值是一个空字典,设置时取到全局配置
except KeyError: # 空字典取值报错抛出异常
# Fall back to defaults
val = self.defaults[attr] # 获取DEFAULT中默认值 # Coerce import strings into classes
if attr in self.import_strings:
val = perform_import(val, attr) # Cache the result
self._cached_attrs.add(attr)
setattr(self, attr, val)
return val

  由于全局权限视图未未设置时self.user_settings返回值是一个空字典,因此取值会失败,通过异常处理获取DEFAULT默认配置中的值。

  3)总结

  self.get_permissions()的返回值是权限实例列表,以下面的示例为例是[SVIPPermission(), ]。因此for循环拿到的permission是一个个权限实例:SVIPPermission()

  因此如果permission.has_permission返回值是true,直接完成权限验证;如果返回值是False,则返回没有权限的提示。

  因此,权限类一定要有has_permission方法,否则会抛出异常。

2、局部视图权限

(1)前置准备

  修改models.py中user表结构,设置用户类型和默认值:

class User(models.Model):
name = models.CharField(max_length=32)
pwd = models.CharField(max_length=32)
type_choice = ((1, "普通用户"), (2, "VIP"), (3, "SVIP"))
user_type = models.IntegerField(choices=type_choice, default=1)

  修改后完成数据库迁移。

(2)配置权限类

  自定义工具包文件夹utils,创建permission.py权限文件:

from rest_framework.permissions import BasePermission

class SVIPPermission(BasePermission):
message = "您没有权限"
# 超级用户可用
def has_permission(self, request, view):
username = request.user # 获取前面认证过的信息:egon
user_type = User.objects.filter(name=username).first().user_type
if user_type == 3:
# 通过权限认证
return True
else:
# 未通过权限认证
return False

  视图配置如下所示:

from rest_framework import viewsets
from utils.permission import SVIPPermission # 权限 class AuthorViewSet(viewsets.ModelViewSet):
# authentication_classes = [TokenAuth, ]
permission_classes = [SVIPPermission, ]
queryset = Author.objects.all() # 配置queryset:告知这个类这次处理的数据
serializer_class = AuthorModelSerializers # 告知处理用到的序列化组件

  显示效果:

  rest-framework框架——认证、权限、频率组件

(3)用message自定义配置错误提示

  查看源码可以看到,check_permissions中失败抛出异常时,通过反射在实例化对象中找message属性,因此可以在权限类中定义message来定义异常信息。

class SVIPPermission(object):
# 超级用户可用
message = "只有超级用户才能访问"
def has_permission(self, request, view):
username = request.user # 获取前面认证过的信息:egon
user_type = User.objects.filter(name=username).first().user_type
if user_type == 3:
# 通过权限认证
return True
else:
# 未通过权限认证
return False

  显示效果:

  rest-framework框架——认证、权限、频率组件

3、全局视图权限

  在settings.py中配置全局权限

REST_FRAMEWORK = {
# 认证类路径
"DEFAULT_AUTHENTICATION_CLASSES": ["app01.utils.TokenAuth"],
# 权限类路径
"DEFAULT_PERMISSION_CLASSES": ["utils.permission.SVIPPermission"]
}

五、throttle(频率访问)组件

  开放平台的API接口调用需要限制其频率,以节约服务器资源和避免恶意的频繁调用。

1、源码分析

  频率组件的源码和前面的版本、认证、权限是一个流程。

class APIView(View):
def dispatch(self, request, *args, **kwargs):
try:
self.initial(request, *args, **kwargs) def initial(self, request, *args, **kwargs):
self.check_throttles(request) def check_throttles(self, request):
"""
Check if request should be throttled.
Raises an appropriate exception if the request is throttled.
"""
for throttle in self.get_throttles():
if not throttle.allow_request(request, self):
self.throttled(request, throttle.wait()) def get_throttles(self):
return [throttle() for throttle in self.throttle_classes] # 频率对象列表

  get_throttles:去配置文件中拿到所有频率控制类,并实例化放入列表中。

  因此 throttle 是配置中每个频率控制类的实例化对象。

  self.throttled 则是失败抛出异常。

  throttle.wait方法主要用来向客户端返回还需要多少时间可以继续访问。

2、频率组件原理和模板

(1)频率组件原理

  DRF中频率控制基本原理是基于访问次数和时间,可以通过自己定义的方法来实现。

  请求走到频率组件的时候,DRF内部会有一个字典记录访问者的IP,以这个IP为KEY,value为一个列表,存放访问者每次访问的时间。

{ip: [time1, time2, time3],}

  把每次访问最新时间放入列表的最前面,记录这样一个数据结构后,既可以用如下示例(设置10秒内只能访问次)实现限流:

  -- 1、判断访问者的IP是否在这个请求IP的字典里

  -- 2、保证这个列表里都是最近10秒内的访问的时间

      判断当前请求时间和列表里最早的(也就是最后的)请求时间的查

      如果差大于10秒,说明请求以及不是最近10秒内的,删除掉,

      继续判断倒数第二个,直到差值小于10秒

  -- 3、判断列表的长度(即访问次数),是否大于我们设置的5次,

      如果大于就限流,否则放行,并把时间放入列表的最前面。

(2)频率组件模板

  创建文件DRFDemo/utils/throttle.py:

import time
from rest_framework.throttling import BaseThrottle VISIT_RECORD = {} class MyThrottle(BaseThrottle): def allow_request(self, request, view):
# 实现限流的逻辑
# 以ip地址限流
# 访问列表 {ip: [time1,time2,time3]}
# 1、获取请求的ip地址
# 2、判断ip地址是否在访问列表
# 2.1 不在,给访问列表添加key,value
# 2.2 在,需要获取该ip的访问记录,把当前时间加入列表
# 3、确保列表里最新访问即最老访问的时间差 是1分钟
# 4、得到列表的长度,判断是否是允许的次数 pass def wait(self):
# 返回还剩多久可继续访问
pass 

3、局部视图throttle

(1)限制类定义

  直接将频率限制类定义在DRFDemo/utils/throttle.py中:

import time
from rest_framework.throttling import BaseThrottle VISIT_RECORD = {} class MyThrottle(BaseThrottle): def __init__(self):
self.history = None def allow_request(self, request, view):
# 实现限流的逻辑
# 以ip地址限流:要求访问站点的频率一分钟不超过3次
# 访问列表 {ip: [time1,time2,time3]}
# 1、获取请求的ip地址
remote_ip = request.META.get("REMOTE_ADDR")
ctime = time.time()
# 2、判断ip地址是否在访问列表
if remote_ip not in VISIT_RECORD:
# 2.1 不在,给访问列表添加key,value
VISIT_RECORD[remote_ip] = [ctime, ]
return True # 2.2 在,需要获取该ip的访问记录,把当前时间加入列表
history = VISIT_RECORD.get(remote_ip) # 取到列表 # 3、确保列表里最新访问即最老访问的时间差 是1分钟
while history and history[0] - history[-1] > 60:
history.pop() # 删最后一个
self.history = history
# 4、得到列表的长度,判断是否是允许的次数
if len(history) < 3:
# 未达到频率限制
history.insert(0, ctime) # 加到第一个
return True
else:
return False def wait(self):
# 返回还剩多久可继续访问
ctime = 60 - (self.history[0] - self.history[-1]) # 最新的时间减去最老的时间(可能是已经删除的时间)
return ctime

(2)视图中配置局部频率限制

  在视图类中中配置局部频率限制:

import uuid      # UUID对象和生成函数
from .models import User
from utils.auth import MyAuth # 认证
from utils.permission import MyPermission # 权限
from utils.throttle import MyThrottle # 频率
from rest_framework.views import APIView
from rest_framework.response import Response class DemoView(APIView):
def get(self, request):
return Response("认证demo~") class LoginView(APIView):
def post(self, request):
username = request.data.get("username")
pwd = request.data.get("pwd")
# 登录成功,生成token会将token给你返回
token = uuid.uuid4() # 通过随机数来生成UUID
User.objects.create(username=username, pwd=pwd, token=token)
return Response("创建用户成功") class TestView(APIView):
authentication_classes = [MyAuth, ] # 局部认证
# permission_classes = [MyPermission, ] # 局部权限
throttle_classes = [MyThrottle, ] def get(self, request):
# 访问:GET /auth/test?token=98bc3edce82143f8ad638f9cad336807
print(request.user) # User object (1)
print(request.auth) # 98bc3edce82143f8ad638f9cad336807
return Response("认证测试")

  注意:request.META 是一个Python字典,包含了所有本次HTTP请求的Header信息,比如用户IP地址和用户Agent(通常是浏览器的名称和版本号)。 注意,Header信息的完整列表取决于用户所发送的Header信息和服务器端设置的Header信息。

(3)频率组件测试

  连续访问:http://127.0.0.1:8000/auth/test?token=98bc3edce82143f8ad638f9cad336807,前三次均显示“认证测试”。第四次显示如下所示:

  rest-framework框架——认证、权限、频率组件

4、全局视图throttle

  在restDemo/settings.py中配置:

REST_FRAMEWORK = {
# "DEFAULT_VERSIONING_CLASS": "utils.version.MyVersion",
# 默认使用的版本控制类
"DEFAULT_VERSIONING_CLASS": "rest_framework.versioning.QueryParameterVersioning",
# 默认使用的版本
"DEFAULT_VERSION": "v1",
# 允许的版本
"ALLOWED_VERSIONS": "v1, v2",
# 版本使用的参数名称
"VERSION_PARAM": "ver",
# 默认认证类
"DEFAULT_AUTHENTICATION_CLASSES": ["utils.auth.MyAuth",],
# 默认权限类
"DEFAULT_PERMISSION_CLASSES": ["utils.permission.MyPermission"],
# 默认频率类
"DEFAULT_THROTTLE_CLASSES": ["utils.throttle.MyThrottle"]
}

5、框架内置throttle限流类

  将频率配置类修改为:

class MyThrottle(SimpleRateThrottle):
scope = "WD" def get_cache_key(self, request, view):
# 如果以ip地址做限流返回ip地址
key = self.get_ident(request)
return key

  settings.py设置:

REST_FRAMEWORK = {
# 默认使用的版本控制类
"DEFAULT_VERSIONING_CLASS": "rest_framework.versioning.QueryParameterVersioning",
# 默认使用的版本
"DEFAULT_VERSION": "v1",
# 允许的版本
"ALLOWED_VERSIONS": "v1, v2",
# 版本使用的参数名称
"VERSION_PARAM": "ver",
"DEFAULT_THROTTLE_RATES": {
"WD": "3/m"
}
}