Flask 学习 七 用户认证

时间:2023-03-09 01:34:06
Flask 学习 七 用户认证

使用werkzeug 实现密码散列

from werkzeug.security import generate_password_hash,check_password_hash

class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(64), unique=True,index=True)
role_id=db.Column(db.Integer,db.ForeignKey('roles.id'))
password_hash=db.Column(db.String(128))
@property
def password(self):
raise AttributeError('密码不是一个可读属性') #只写属性
@password.setter
def password(self,password):
self.password_hash = generate_password_hash(password) def verify_password(self,password):
return check_password_hash(self.password_hash,password) def __repr__(self):
return '<User %r>' % self.username

密码散列化测试tests/test_url_model.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import unittest
from app.models import User
class UserModelTestCase(unittest.TestCase):
def test_password_setter(self):
u = User(password='cat')
self.assertTrue(u.password_hash is not None)
def test_no_password_getter(self):
u = User(password='cat')
with self.assertRaises(AttributeError):
u.password
def test_password_verification(self):
u = User(password='cat')
self.assertTrue(u.verify_password('cat'))
self.assertFalse(u.verify_password('dog'))
def test_password_salts_are_random(self):
u =User(password='cat')
u2=User(password='cat')
self.assertTrue(u.password_hash != u2.password_hash)

创建用户认证蓝本

app/auth/__init__.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Blueprint
auth = Blueprint('auth',__name__)
from . import views

app/auth/views.py 蓝本路由和视图函数

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import render_template
from . import auth @auth.route('/login')
def login():
return render_template('auth/login.html')

app/__init__.py 注册蓝本

from .auth import auth as auth_blueprint
app.register_blueprint(auth_blueprint,url_prefix='/auth')

安装flask-login插件

pip install flask-login

修改User模型,支持用户登陆 app/models.py

from flask_login import UserMixin

class User(UserMixin,db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
email=db.Column(db.String(64),unique=True,index=True)
username = db.Column(db.String(64), unique=True,index=True)
role_id=db.Column(db.Integer,db.ForeignKey('roles.id'))
password_hash=db.Column(db.String(128))

app/__init__.py 初始化Flask_Login

from flask_login import LoginManager
#初始化Flask-Login
login_manager=LoginManager()
login_manager.session_protection='strong' # 记录客户端ip,用户代理信息,发现异动,登出用户,可以设置不同等级None,'basic','strong'
login_manager.login_view='auth.login' # 设置登录页面端点,蓝本的名字也要加到前面 def create_app(config_name):
#....
login_manager.init_app(app) #初始化登陆
#....

app/models.py 加载用户的回掉函数

from . import login_manager

# 加载用户的回调函数
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# 加载用户回掉函数,接收以Unicode字符串表示的用户标识符,如果能找到这个用户必须返回用户对象,否则返回None

保护路由

为了只让认证的用户访问,未认证的用户会拦截请求,发往登陆页面

#保护路由
@app.route('/secret')
@login_required
def secret():
return '只有认证后的用户才能登陆'

添加登陆表单

app/auth/forms.py

from flask_wtf import FlaskForm
from wtforms import StringField,PasswordField,BooleanField,SubmitField
from wtforms.validators import DataRequired,Length,Email class LoginForm(FlaskForm):
email=StringField('邮箱',validators=[DataRequired(),Length(1,64),Email()])
password=PasswordField('密码',validators=[DataRequired()])
remember_me=BooleanField('保持登陆')
submit = SubmitField('登陆')

auth/login.html  用wtf.quick_form()渲染表单

{% extends 'base.html' %}
{% import'bootstrap/wtf.html' as wtf %}
{% block title %}Flask-登陆{% endblock %}
{% block page_content %}
<div class="page-header">
<h1>登陆</h1>
</div>
<div class="col-md-4">
{{ wtf.quick_form(form) }}
</div>
{% endblock %}

base.html  # current_user.is_authenticated 如果是匿名用户登陆is_authenticated返回False,这个方法可以判断当前用户是否登陆

<ul class="nav navbar-nav navbar-right">
{% if current_user.is_authenticated %}
<li><a href="{{ url_for('auth.logout') }}">退出登陆</a></li>
{% else %}
<li><a href="{{ url_for('auth.login') }}">立即登陆</a></li>
{% endif %}
</ul>

登入用户

app/auth/views.py #登陆路由

from flask import render_template,redirect,request,url_for,flash
from flask_login import login_user,login_required,logout_user,current_user
from . import auth
from ..models import User
from .forms import LoginForm @auth.route('/login',methods=['get','post'])
def login():
form = LoginForm()
if form.validate_on_submit(): # 验证表单数据
user = User.query.filter_by(email=form.email.data).first()
if user is not None and user.verify_password(form.password.data): # verify_password会验证表单数据
login_user(user,form.remember_me.data) # 如果为True则为用户生成长期有效的cookies
return redirect(request.args.get('next') or url_for('main.index'))# next保存原地址(从request.args字典中读取)
flash('用户名或密码无效')
return render_template('auth/login.html',form=form)

更新登陆模板auth/login.html

{% extends 'base.html' %}
{% import'bootstrap/wtf.html' as wtf %}
{% block title %}Flask-登陆{% endblock %}
{% block page_content %}
<div class="page-header">
<h1>登陆</h1>
</div>
<div class="col-md-4">
{{ wtf.quick_form(form) }}
</div>
{% endblock %}

登出用户

auth/views.py

from flask_login import login_user,login_required,logout_user,current_user

@auth.route('/logout')
@login_required
def logout():
logout_user()
flash('你已经退出')
return redirect(url_for('main.index'))

测试登陆 index.html

<h1>Hello,
{% if current_user.is_authenticated %}
{{ current_user.username }}
{% else %}
访客
{% endif %}!
</h1>

注册新用户

添加用户注册表单

from flask_wtf import FlaskForm
from wtforms import StringField,PasswordField,BooleanField,SubmitField
from wtforms.validators import DataRequired,Length,Email,Regexp,EqualTo
from ..models import User class RegistrationForm(FlaskForm):
email = StringField('邮箱', validators=[DataRequired(), Length(1, 64), Email()])
username= StringField('用户名', validators=[DataRequired(), Length(1, 64), Regexp('^[A-Za-z][A-Za-z0-9_.]*$',0,'用户名必须是字母,数字,点号,下划线')])
password = PasswordField('密码', validators=[DataRequired(),EqualTo('password2',message='密码不一致')])
password2 = PasswordField('再次输入密码', validators=[DataRequired()])
submit = SubmitField('注册')
def validate_email(self,filed):
if User.query.filter_by(email=filed.data).first():
raise ValidationError('邮箱已被注册')
def validate_username(self,filed):
if User.query.filter_by(username=filed.data).first():
raise ValidationError('用户名已被使用')

auth/register.html

{% extends 'base.html' %}
{% import'bootstrap/wtf.html' as wtf %}
{% block title %}Flask-注册{% endblock %}
{% block page_content %}
<div class="page-header">
<h1>注册</h1>
</div>
<div class="col-md-4">
{{ wtf.quick_form(form) }}
</div>
{% endblock %}

auth/login.html 添加链接到注册页面

 <p>新用户?<a href="{{ url_for('auth.register') }}">点击这里注册</a></p>

app/auth/views.py

@auth.route('/register',methods=['get','post'])
def register():
form = RegistrationForm()
if form.validate_on_submit():
user=User(email = form.email.data,
username=form.username.data,
password=form.password.data)
db.session.add(user)
flash('你现在已经登陆了')
return redirect(url_for('auth.login'))
return render_template('auth/register.html',form=form)

确认账户

使用isdangerous生成确认令牌

app/models.py 确认用户账户

from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from flask import current_app
from . import db
class User(UserMixin,db.Model):
# ...
confirmed = db.Column(db.Boolean,default=False)
def generate_confirmation_token(self,expiration=3600): #生成令牌,有效期1小时
s = Serializer(current_app.config['SECRET_KEY'],expiration)
return s.dumps({'confirm':self.id})
def confirm(self,token): # 检验令牌 如果通过把confirmed字段设为True
s=Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token)
except:
return False
if data.get('confirm') !=self.id: # 检查令牌id是否存储是已登录用户进行匹配
return False
self.confirmed=True
db.session.add(self)
return True

发送确认邮件

auth/views.py

from ..email import send_mail

@auth.route('/register',methods=['get','post'])
def register():
form = RegistrationForm()
if form.validate_on_submit():
user=User(email = form.email.data,
username=form.username.data,
password=form.password.data)
db.session.add(user)
db.session.commit()
token = user.generate_confirmation_token()
send_mail(user.email,'确认你的账户','auth/email/confirm',user=user,token=token)
flash('我们已经给你发送了一封确认邮件')
return redirect(url_for('main.index'))
return render_template('auth/register.html',form=form)

auth/email/cinfirm.txt

亲爱的,{{ user.username }}
欢迎来到 Flasky
为了确认您的账户,请点击以下链接: {{ url_for('auth.confirm',token=token,_external=True) }} #返回绝对路径 Flasky 团队

auth/views.py

from flask_login import current_user
@auth.route('/confirm/<token>')
@login_required # 保护这个路由,用户需要先登录
def confirm(token):
if current_user.confirmed:
return redirect(url_for('main.index'))
if current_user.confirm(token):
flash('你已经确认了你的账户,谢谢')
else:
flash('确认链接失效或过期')
return redirect(url_for('main.index'))

app/auth/views.py 使用before_app_request来全局使用处理程序中未确认的用户

@auth.before_app_request # 如果返回响应或重定向,会直接发送至客户端,不会调用请求视图函数
def before_request():
if current_user.is_authenticated\
and not current_user.confirmed \
and request.endpoint[:5] != 'auth.'\ #访问路由获取权限
and request.endpoint != 'static':
return redirect(url_for('auth.unconfirmed')) @auth.route('/unconfirmed')
def unconfirmed():
if current_user.is_anonymous or current_user.confirmed:
return redirect(url_for('main.index'))
return render_template('auth/unconfirmed.html')

auth/views.py 重新发送确认邮件

@auth.route('/confirm')
@login_required
def resend_confirmation():
token=current_user.generate_confirmation_token()
send_mail(current_user.email,'确认你的账户','auth/email/confirm',user=current_user,token=token)
flash('一封新的确认邮件已经发送到您的邮箱')
return redirect(url_for('main.index'))

管理账户:

修改密码  重设密码 修改电子邮件地址

https://github.com/Erick-LONG/flask_blog/commit/3748e70d9f986b066328185bcf417d8b8205ed8c

https://github.com/Erick-LONG/flask_blog/commit/ce4a67d31f0d7d5f5c6719eebb193b2949c77b0c

https://github.com/Erick-LONG/flask_blog/commit/f89ef3dac3819129165be4d9b2a67a2bb092cf34