M1-Flask-Day2

时间:2022-01-27 05:52:49

内容概要:

  1.flask

    - 蓝图

    - 中间件

    - 闪现

  2.扩展

    - session

    - wtfrom

  3.上下文管理

    - local-threading

  4.websocket

    - 轮训

    - 长轮训

    - websocket

一.谈谈你对面向对象的理解

  1.三大特性,继承,封装,多态(初级水平)

    封装分两种:

        (1).函数的封装

          class db1():

            def func():

              pass

            def func2():

              pass

          (2).数据的封装(一个类的对象)

          

  2.__str__...方法

    __str__,

    __new__,

    __init__,

    __getattr__,

    __setattr__,

    __delattr__,

    __getitem__,

    __setitem__,

    __delitem__,

    __enter__,

    __exit__,

    __del__,

    __call__,

    __dict__,

    这些方法有特有的执行场景,类加()执行__init__方法,对象加(),__call__方法....

  ****一个对象和一个对象能不能相加、减、乘、除

    可以 加用__add__方法 

class Foo(object):
def __add__(self, other):
pass obj1 = Foo()
obj2 = Foo() res = obj1 + obj2  

  3.metaclass

    1.创建类的两种方法   

#创建类的方式一
class Foo(object):
pass #创建类的方式二
#第一个参数是类名,第二个元祖是继承谁,第三个字典是构造字段
Bar = type("MyFoo",(object,),{})
g = Bar()
print(g)

    2.验证类是由type创建

class Mytype(type):
def __init__(self,*args,**kwargs):
print("from my type")
super(Mytype,self).__init__(*args,**kwargs) class Foo(metaclass=Mytype):
pass g = Foo() """result
from my type
"""

二、flask-蓝图

   1.目录结构:

    M1-Flask-Day2

  2.__init__.py  

from flask import Flask

app = Flask(__name__)

from .views import account
from .views import user
app.register_blueprint(account.ac) #注册之后可以进行路由分发
app.register_blueprint(user.us) """
这里特殊的装饰器,是全局app,每个function都需要经过
@app.before_request
def check_login():
print('.....')
"""

  3.user.py

from flask import Blueprint

us = Blueprint('us',__name__,url_prefix='/xx') #生成蓝图对象,url_prefix是前缀,加上之后请求index就变为/xx/index

#这里是局部的装饰器,用于特定视图需要权限等
# @us.before_request
# def check_login():
# print('.....') @us.route('/index')
def index():
return 'index'

  4.manage.py

from pro_flask import app

if __name__ == '__main__':
app.run()

三、flask-闪现

   1.用于1次请求之后删除,基于session,取的时候用pop

from flask import Blueprint,redirect,request,flash,get_flashed_messages

ac = Blueprint('ac',__name__)

@ac.route('/login')
def login():
flash("登录成功1",category="x1") #category多了一层分组
flash("登录成功2",category="x2")
return redirect("/logout") @ac.route('/logout')
def logout():
res = get_flashed_messages(category_filter="x1") #默认不加category_filter取所有
print(res)
return 'logout'

  2.get_flask_message源码

def get_flashed_messages(with_categories=False, category_filter=[]):

    flashes = _request_ctx_stack.top.flashes
if flashes is None:
#存入local里的stack里对象的flashes
_request_ctx_stack.top.flashes = flashes = session.pop('_flashes') \
if '_flashes' in session else []
if category_filter:
flashes = list(filter(lambda f: f[0] in category_filter, flashes))
if not with_categories:
return [x[1] for x in flashes]
return flashes

三、flask-middleware

from flask import Flask
app = Flask(__name__) @app.route('/user',methods=['GET','POST'],endpoint='xxx')
def user():
return "login" class MiddleWare(object): #主要理解对象加括号执行__call__方法
def __init__(self,old_wsgi_app): self.old_wsgi_app = old_wsgi_app def __call__(self, *args, **kwargs):
#这是个时候还没有request
print("我做一些数据库连接check,或者清理缓存操作")
return self.old_wsgi_app(*args,**kwargs) if __name__ == '__main__':
app.wsgi_app = MiddleWare(app.wsgi_app)
app.run("0.0.0.0",9999)

四、flask-session

from flask import Flask,session
app = Flask(__name__) #如下几行操作就成功将session写入redis中了
from flask.ext.session import Session
from redis import Redis
app.config["SESSION_TYPE"] = 'redis'
app.config["SESSION_REDIS"] = Redis(host='192.16.1.1',port="6379",)
Session(app) @app.route('/user',methods=['GET','POST'],endpoint='xxx')
def user():
return "login" if __name__ == '__main__':
app.run("0.0.0.0",9999)

  源码剖析:

#session_interface = RedisSessionInterface() #程序刚开始加载时候执行
def _get_interface(self, app):
config = app.config.copy()
config.setdefault('SESSION_TYPE', 'null')
config.setdefault('SESSION_PERMANENT', True)
config.setdefault('SESSION_USE_SIGNER', False)
config.setdefault('SESSION_KEY_PREFIX', 'session:')
config.setdefault('SESSION_REDIS', None)
config.setdefault('SESSION_MEMCACHED', None)
config.setdefault('SESSION_FILE_DIR',
os.path.join(os.getcwd(), 'flask_session'))
config.setdefault('SESSION_FILE_THRESHOLD', 500)
config.setdefault('SESSION_FILE_MODE', 384)
config.setdefault('SESSION_MONGODB', None)
config.setdefault('SESSION_MONGODB_DB', 'flask_session')
config.setdefault('SESSION_MONGODB_COLLECT', 'sessions')
config.setdefault('SESSION_SQLALCHEMY', None)
config.setdefault('SESSION_SQLALCHEMY_TABLE', 'sessions') if config['SESSION_TYPE'] == 'redis':
session_interface = RedisSessionInterface(
config['SESSION_REDIS'], config['SESSION_KEY_PREFIX'],
config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
elif config['SESSION_TYPE'] == 'memcached':
session_interface = MemcachedSessionInterface(
config['SESSION_MEMCACHED'], config['SESSION_KEY_PREFIX'],
config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
elif config['SESSION_TYPE'] == 'filesystem':
session_interface = FileSystemSessionInterface(
config['SESSION_FILE_DIR'], config['SESSION_FILE_THRESHOLD'],
config['SESSION_FILE_MODE'], config['SESSION_KEY_PREFIX'],
config['SESSION_USE_SIGNER'], config['SESSION_PERMANENT'])
elif config['SESSION_TYPE'] == 'mongodb':
session_interface = MongoDBSessionInterface(
config['SESSION_MONGODB'], config['SESSION_MONGODB_DB'],
config['SESSION_MONGODB_COLLECT'],
config['SESSION_KEY_PREFIX'], config['SESSION_USE_SIGNER'],
config['SESSION_PERMANENT'])
elif config['SESSION_TYPE'] == 'sqlalchemy':
session_interface = SqlAlchemySessionInterface(
app, config['SESSION_SQLALCHEMY'],
config['SESSION_SQLALCHEMY_TABLE'],
config['SESSION_KEY_PREFIX'], config['SESSION_USE_SIGNER'],
config['SESSION_PERMANENT'])
else:
session_interface = NullSessionInterface() return session_interface

   程序执行session["xx"]=123时候先执行session_interface的open_session方法:

    def open_session(self, app, request):
sid = request.cookies.get(app.session_cookie_name)
if not sid:
sid = self._generate_sid() #根据uuid生成一个随机字符串
#第一次登陆进,返回{"sid":"asdasdada","xx":123}
return self.session_class(sid=sid, permanent=self.permanent)
if self.use_signer:
signer = self._get_signer(app)
if signer is None:
return None
try:
sid_as_bytes = signer.unsign(sid)
sid = sid_as_bytes.decode()
except BadSignature:
sid = self._generate_sid()
return self.session_class(sid=sid, permanent=self.permanent) if not PY2 and not isinstance(sid, text_type):
sid = sid.decode('utf-8', 'strict')
val = self.redis.get(self.key_prefix + sid)
if val is not None:
try:
data = self.serializer.loads(val)
return self.session_class(data, sid=sid)
except:
return self.session_class(sid=sid, permanent=self.permanent)
return self.session_class(sid=sid, permanent=self.permanent)

   程序返回之前会执行save_session动作:

    def save_session(self, app, session, response):
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
if not session:
if session.modified:
self.redis.delete(self.key_prefix + session.sid)
response.delete_cookie(app.session_cookie_name,
domain=domain, path=path)
return
httponly = self.get_cookie_httponly(app)
secure = self.get_cookie_secure(app)
expires = self.get_expiration_time(app, session)
val = self.serializer.dumps(dict(session))
self.redis.setex(name=self.key_prefix + session.sid, value=val,
time=total_seconds(app.permanent_session_lifetime)) #往redis里写数据ex为过期时间
if self.use_signer:
session_id = self._get_signer(app).sign(want_bytes(session.sid))
else:
session_id = session.sid
#最后将随机字符串写到cookie里
response.set_cookie(app.session_cookie_name, session_id,
expires=expires, httponly=httponly,
domain=domain, path=path, secure=secure)

五、上下文管理

  1.threading.local

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading #本地线程,保证即使是多个线程,自己的值也是互相隔离。
local_values = threading.local() def func(num):
#创建数据库连接
local_values.name = num
import time
time.sleep(1)
print(local_values.name, threading.current_thread().name) for i in range(20):
th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
th.start()

 2.自定义local类实现本地线程,也可以用setitem

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
from threading import get_ident #通过自定义模拟本地线程原理
class Local(object):
def __init__(self):
object.__setattr__(self, 'storage', {})
#self.storage = {} #这样设置就会产生递归,触发settattr def __setattr__(self, key, value):
ident = get_ident()
if ident in self.storage:
self.storage[ident][key] = value
else:
self.storage[ident] = {key: value} def __getattr__(self, item):
ident = get_ident()
return self.storage[ident][item] obj = Local()
def func(num):
#创建数据库连接
obj.value = num
import time
time.sleep(1)
print(obj.value, threading.current_thread().name) for i in range(20):
th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
th.start()

3.带协程判断加入

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
from threading import get_ident try: #协程获取唯一id方法
from greenlet import getcurrent as get_ident
except ImportError:
try:
from thread import get_ident
except ImportError:
from _thread import get_ident #通过自定义模拟本地线程原理
class Local(object):
def __init__(self):
object.__setattr__(self, 'storage', {})
#self.storage = {} #这样设置就会产生递归,触发settattr def __setattr__(self, key, value):
ident = get_ident()
if ident in self.storage:
self.storage[ident][key] = value
else:
self.storage[ident] = {key: value} def __getattr__(self, item):
ident = get_ident()
return self.storage[ident][item] obj = Local()
def func(num):
#创建数据库连接
obj.value = num
import time
time.sleep(1)
print(obj.value, threading.current_thread().name) for i in range(20):
th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
th.start()

  

相关文章