Django commands自定制

时间:2023-12-21 13:34:08

什么是Django Commands

Django 对于命令的添加有一套规范,你可以为每个app 指定命令。通俗一点讲,比如在使用manage.py文件执行命令的时候,可以自定制自己的命令,来实现命令的扩充。

commands的创建

1、在app内创建一个management的python目录
2、在management目录里面创建commands的python文件夹
3、在commands文件夹下创建任意py文件

此时py文件名就是你的自定制命令,可以使用下面方式执行

python manage.py 命令名

Django commands自定制

撰写command文件要求 

首先对于文件名没什么要求,内部需要定义一个Command类并继承BaseCommand类或其子类。

文件结构

Django commands自定制

其中help是command功能作用简介,handle函数是主处理程序,add_arguments函数是用来接收可选参数的

简单测试

# -*- coding: utf-8 -*-
# __author__ = 'dandy'
from django.core.management.base import BaseCommand class Command(BaseCommand):
help = 'test' def handle(self, *args, **options):
print('test')

带参数的测试

# -*- coding: utf-8 -*-
# __author__ = 'dandy'
from django.core.management.base import BaseCommand class Command(BaseCommand): def add_arguments(self, parser):
parser.add_argument('aaa', nargs='+', type=int)
parser.add_argument('--delete',
action='store_true',
dest='delete',
default=False,
help='Delete poll instead of closing it') def handle(self, *args, **options):
print('test')
print(args, options)

Django commands自定制

options里面直接取参数就可以了。

 # -*- coding: utf-8 -*-
# __author__ = 'dandy'
from django.core.management.base import BaseCommand
from django.conf import settings
import requests
import os
import threading command_path = os.path.join(settings.BASE_DIR, 'api', 'management', 'commands')
file = os.path.join(command_path, 'api_urls') class Command(BaseCommand):
help = 'test cost time for each api when getting data .We have set a middleware and create a file for log.' \
'here just send request by thread pool' def handle(self, *args, **options):
url_list = []
try:
if not os.path.exists(file):
raise FileNotFoundError('ERROR!!!! no file named api_urls in %s' % file)
with open (file, 'rb') as obj:
urls = obj.readlines()
if not len(urls):
raise Exception('ERROR!!! api_urls is a empty file !!')
for url in urls:
url = url.strip()
if url:
t = threading.Thread(target=self.get_url, args=(url,))
# t.setDaemon(True) # set True and you don't need to wait until main thread finished
t.start()
print('send all request successfully !')
except Exception as e:
print(e) def get_url(self, url):
requests.get(url)

实战1

 http://www.baidu.com
http://www.qq.com

api_urls