加入收藏 | 设为首页 | 会员中心 | 我要投稿 银川站长网 (https://www.0951zz.com/)- 云通信、基础存储、云上网络、机器学习、视觉智能!
当前位置: 首页 > 综合聚焦 > 编程要点 > 语言 > 正文

python用contextvars管理上下文变量怎样操作

发布时间:2023-10-03 11:34:15 所属栏目:语言 来源:
导读:关于“python用contextvars管理上下文变量怎样做?”的知识有一些人不是很理解,对此小编给大家总结了相关内容,具有一定的参考借鉴价值,而且易于学习与理解,希望能对大家有所帮助,有这个方面学习需要的

关于“python用contextvars管理上下文变量怎样做?”的知识有一些人不是很理解,对此小编给大家总结了相关内容,具有一定的参考借鉴价值,而且易于学习与理解,希望能对大家有所帮助,有这个方面学习需要的朋友就继续往下看吧。

Python 在 3.7 的时候引入了一个模块:contextvars,从名字上很容易看出它指的是上下文变量(Context Variables),所以在介绍 contextvars 之前我们需要先了解一下什么是上下文(Context)。

Context 是一个包含了相关信息内容的对象,举个例子:"比如一部 13 集的动漫,你直接点进第八集,看到女主角在男主角面前流泪了"。相信此时你是不知道为什么女主角会流泪的,因为你没有看前面几集的内容,缺失了相关的上下文信息。

所以 Context 并不是什么神奇的东西,它的作用就是携带一些指定的信息。

web 框架中的 request

我们以 fastapi 和 sanic 为例,看看当一个请求过来的时候,它们是如何解析的。

# fastapi

from fastapi import FastAPI, Request

import uvicorn

app = FastAPI()

@app.get("/index")

async def index(request: Request):

name = request.query_params.get("name")

return {"name": name}

uvicorn.run("__main__:app", host="127.0.0.1", port=5555)

# -------------------------------------------------------

# sanic

from sanic import Sanic

from sanic.request import Request

from sanic import response

app = Sanic("sanic")

@app.get("/index")

async def index(request: Request):

name = request.args.get("name")

return response.json({"name": name})

app.run(host="127.0.0.1", port=6666)

发请求测试一下,看看结果是否正确。

可以看到请求都是成功的,并且对于 fastapi 和 sanic 而言,其 request 和 视图函数是绑定在一起的。也就是在请求到来的时候,会被封装成一个 Request 对象、然后传递到视图函数中。

但对于 flask 而言则不是这样子的,我们看一下 flask 是如何接收请求参数的。

from flask import Flask, request

app = Flask("flask")

@app.route("/index")

def index():

name = request.args.get("name")

return {"name": name}

app.run(host="127.0.0.1", port=7777)

我们看到对于 flask 而言则是通过 import request 的方式,如果不需要的话就不用 import,当然我这里并不是在比较哪种方式好,主要是为了引出我们今天的主题。首先对于 flask 而言,如果我再定义一个视图函数的话,那么获取请求参数依旧是相同的方式,但是这样问题就来了,不同的视图函数内部使用同一个 request,难道不会发生冲突吗?

显然根据我们使用 flask 的经验来说,答案是不会的,至于原因就是 ThreadLocal。

ThreadLocal

ThreadLocal,从名字上看可以得出它肯定是和线程相关的。没错,它专门用来创建局部变量,并且创建的局部变量是和线程绑定的。

import threading

# 创建一个 local 对象

local = threading.local()

def get():

name = threading.current_thread().name

# 获取绑定在 local 上的 value

value = local.value

print(f"线程: {name}, value: {value}")

def set_():

name = threading.current_thread().name

# 为不同的线程设置不同的值

if name == "one":

local.value = "ONE"

elif name == "two":

local.value = "TWO"

# 执行 get 函数

get()

t1 = threading.Thread(target=set_, name="one")

t2 = threading.Thread(target=set_, name="two")

t1.start()

t2.start()

"""

线程 one, value: ONE

线程 two, value: TWO

"""

可以看到两个线程之间是互不影响的,因为每个线程都有自己唯一的 id,在绑定值的时候会绑定在当前的线程中,获取也会从当前的线程中获取。可以把 ThreadLocal 想象成一个字典:

{

"one": {"value": "ONE"},

"two": {"value": "TWO"}

}

更准确的说 key 应该是线程的 id,为了直观我们就用线程的 name 代替了,但总之在获取的时候只会获取绑定在该线程上的变量的值。

而 flask 内部也是这么设计的,只不过它没有直接用 threading.local,而是自己实现了一个 Local 类,除了支持线程之外还支持 greenlet 的协程,那么它是怎么实现的呢?首先我们知道 flask 内部存在 "请求 context" 和 "应用 context",它们都是通过栈来维护的(两个不同的栈)。

# flask/globals.py

_request_ctx_stack = LocalStack()

_app_ctx_stack = LocalStack()

current_app = LocalProxy(_find_app)

request = LocalProxy(partial(_lookup_req_object, "request"))

session = LocalProxy(partial(_lookup_req_object, "session"))

每个请求都会绑定在当前的 Context 中,等到请求结束之后再销毁,这个过程由框架完成,开发者只需要直接使用 request 即可。所以请求的具体细节流程可以点进源码中查看,这里我们重点关注一个对象:werkzeug.local.Local,也就是上面说的 Local 类,它是变量的设置和获取的关键。直接看部分源码:

# werkzeug/local.py

class Local(object):

__slots__ = ("__storage__", "__ident_func__")

def __init__(self):

# 内部有两个成员:__storage__ 是一个字典,值就存在这里面

# __ident_func__ 只需要知道它是用来获取线程 id 的即可

object.__setattr__(self, "__storage__", {})

object.__setattr__(self, "__ident_func__", get_ident)

def __call__(self, proxy):

"""Create a proxy for a name."""

return LocalProxy(self, proxy)

def __release_local__(self):

self.__storage__.pop(self.__ident_func__(), None)

def __getattr__(self, name):

try:

# 根据线程 id 得到 value(一个字典)

# 然后再根据 name 获取对应的值

# 所以只会获取绑定在当前线程上的值

return self.__storage__[self.__ident_func__()][name]

except KeyError:

raise AttributeError(name)

def __setattr__(self, name, value):

ident = self.__ident_func__()

storage = self.__storage__

try:

# 将线程 id 作为 key,然后将值设置在对应的字典中

# 所以只会将值设置在当前的线程中

storage[ident][name] = value

except KeyError:

storage[ident] = {name: value}

def __delattr__(self, name):

# 删除逻辑也很简单

try:

del self.__storage__[self.__ident_func__()][name]

except KeyError:

raise AttributeError(name)

所以我们看到 flask 内部的逻辑其实很简单,通过 ThreadLocal 实现了线程之间的隔离。每个请求都会绑定在各自的 Context 中,获取值的时候也会从各自的 Context 中获取,因为它就是用来保存相关信息的(重要的是同时也实现了隔离)。

相应此刻你已经理解了上下文,但是问题来了,不管是 threading.local 也好、还是类似于 flask 自己实现的 Local 也罢,它们都是针对线程的。如果是使用 async def 定义的协程该怎么办呢?如何实现每个协程的上下文隔离呢?所以终于引出了我们的主角:contextvars。

contextvars

该模块提供了一组接口,可用于在协程中管理、设置、访问局部 Context 的状态。

import asyncio

import contextvars

c = contextvars.ContextVar("只是一个标识, 用于调试")

async def get():

# 获取值

return c.get() + "~~~"

async def set_(val):

# 设置值

c.set(val)

print(await get())

async def main():

coro1 = set_("协程1")

coro2 = set_("协程2")

await asyncio.gather(coro1, coro2)

asyncio.run(main())

"""

协程1~~~

协程2~~~

"""

ContextVar 提供了两个方法,分别是 get 和 set,用于获取值和设置值。我们看到效果和 ThreadingLocal 类似,数据在协程之间是隔离的,不会受到彼此的影响。

但我们再仔细观察一下,我们是在 set_ 函数中设置的值,然后在 get 函数中获取值。可 await get() 相当于是开启了一个新的协程,那么意味着设置值和获取值不是在同一个协程当中。但即便如此,我们依旧可以获取到希望的结果。因为 Python 的协程是无栈协程,通过 await 可以实现级联调用。

我们不妨再套一层:

import asyncio

import contextvars

c = contextvars.ContextVar("只是一个标识, 用于调试")

async def get1():

return await get2()

async def get2():

return c.get() + "~~~"

async def set_(val):

# 设置值

c.set(val)

print(await get1())

print(await get2())

async def main():

coro1 = set_("协程1")

coro2 = set_("协程2")

await asyncio.gather(coro1, coro2)

asyncio.run(main())

"""

协程1~~~

协程1~~~

协程2~~~

协程2~~~

"""

我们看到不管是 await get1() 还是 await get2(),得到的都是 set_ 中设置的结果,说明它是可以嵌套的。

并且在这个过程当中,可以重新设置值。

import asyncio

import contextvars

c = contextvars.ContextVar("只是一个标识, 用于调试")

async def get1():

c.set("重新设置")

return await get2()

async def get2():

return c.get() + "~~~"

async def set_(val):

# 设置值

c.set(val)

print("------------")

print(await get2())

print(await get1())

print(await get2())

print("------------")

async def main():

coro1 = set_("协程1")

coro2 = set_("协程2")

await asyncio.gather(coro1, coro2)

asyncio.run(main())

"""

------------

协程1~~~

重新设置~~~

重新设置~~~

------------

------------

协程2~~~

重新设置~~~

重新设置~~~

------------

"""

先 await get2() 得到的就是 set_ 函数中设置的值,这是符合预期的。但是我们在 get1 中将值重新设置了,那么之后不管是 await get1() 还是直接 await get2(),得到的都是新设置的值。

这也说明了,一个协程内部 await 另一个协程,另一个协程内部 await 另另一个协程,不管套娃(await)多少次,它们获取的值都是一样的。并且在任意一个协程内部都可以重新设置值,然后获取会得到最后一次设置的值。再举个栗子:

import asyncio

import contextvars

c = contextvars.ContextVar("只是一个标识, 用于调试")

async def get1():

return await get2()

async def get2():

val = c.get() + "~~~"

c.set("重新设置啦")

return val

async def set_(val):

# 设置值

c.set(val)

print(await get1())

print(c.get())

async def main():

coro = set_("古明地觉")

await coro

asyncio.run(main())

"""

古明地觉~~~

重新设置啦

"""

await get1() 的时候会执行 await get2(),然后在里面拿到 c.set 设置的值,打印 "古明地觉~~~"。但是在 get2 里面,又将值重新设置了,所以第二个 print 打印的就是新设置的值。\

如果在 get 之前没有先 set,那么会抛出一个 LookupError,所以 ContextVar 支持默认值:

import asyncio

import contextvars

c = contextvars.ContextVar("只是一个标识, 用于调试",

default="哼哼")

async def set_(val):

print(c.get())

c.set(val)

print(c.get())

async def main():

coro = set_("古明地觉")

await coro

asyncio.run(main())

"""

哼哼

古明地觉

"""

除了在 ContextVar 中指定默认值之外,也可以在 get 中指定:

import asyncio

import contextvars

c = contextvars.ContextVar("只是一个标识, 用于调试",

default="哼哼")

async def set_(val):

print(c.get("古明地恋"))

c.set(val)

print(c.get())

async def main():

coro = set_("古明地觉")

await coro

asyncio.run(main())

"""

古明地恋

古明地觉

"""

所以结论如下,如果在 c.set 之前使用 c.get:

当 ContextVar 和 get 中都没有指定默认值,会抛出 LookupError;

只要有一方设置了,那么会得到默认值;

如果都设置了,那么以 get 为准;

如果 c.get 之前执行了 c.set,那么无论 ContextVar 和 get 有没有指定默认值,获取到的都是 c.set 设置的值。

所以总的来说还是比较好理解的,并且 ContextVar 除了可以作用在协程上面,它也可以用在线程上面。没错,它可以替代 threading.local,我们来试一下:

import threading

import contextvars

c = contextvars.ContextVar("context_var")

def get():

name = threading.current_thread().name

value = c.get()

print(f"线程 {name}, value: {value}")

def set_():

name = threading.current_thread().name

if name == "one":

c.set("ONE")

elif name == "two":

c.set("TWO")

get()

t1 = threading.Thread(target=set_, name="one")

t2 = threading.Thread(target=set_, name="two")

t1.start()

t2.start()

"""

线程 one, value: ONE

线程 two, value: TWO

"""

和 threading.local 的表现是一样的,但是更建议使用 ContextVars。不过前者可以绑定任意多个值,而后者只能绑定一个值(可以通过传递字典的方式解决这一点)。

c.Token

当我们调用 c.set 的时候,其实会返回一个 Token 对象:

import contextvars

c = contextvars.ContextVar("context_var")

token = c.set("val")

print(token)

"""

<Token var=<ContextVar name='context_var' at 0x00..> at 0x00...>

"""

Token 对象有一个 var 属性,它是只读的,会返回指向此 token 的 ContextVar 对象。

import contextvars

c = contextvars.ContextVar("context_var")

token = c.set("val")

print(token.var is c) # True

print(token.var.get()) # val

print(

token.var.set("val2").var.set("val3").var is c

) # True

print(c.get()) # val3

登录后复制

Token 对象还有一个 old_value 属性,它会返回上一次 set 设置的值,如果是第一次 set,那么会返回一个 <Token.MISSING>。

import contextvars

c = contextvars.ContextVar("context_var")

token = c.set("val")

# 该 token 是第一次 c.set 所返回的

# 在此之前没有 set,所以 old_value 是 <Token.MISSING>

print(token.old_value) # <Token.MISSING>

token = c.set("val2")

print(c.get()) # val2

# 返回上一次 set 的值

print(token.old_value) # val

那么这个 Token 对象有什么作用呢?从目前来看貌似没太大用处啊,其实它最大的用处就是和 reset 搭配使用,可以对状态进行重置。

import contextvars

####

c = contextvars.ContextVar("context_var")

token = c.set("val")

# 显然是可以获取的

print(c.get()) # val

# 将其重置为 token 之前的状态

# 但这个 token 是第一次 set 返回的

# 那么之前就相当于没有 set 了

c.reset(token)

try:

c.get() # 此时就会报错

except LookupError:

print("报错啦") # 报错啦

# 但是我们可以指定默认值

print(c.get("默认值")) # 默认值

contextvars.Context

它负责保存 ContextVars 对象和设置的值之间的映射,但是我们不会直接通过 contextvars.Context 来创建,而是通过 contentvars.copy_context 函数来创建。

import contextvars

c1 = contextvars.ContextVar("context_var1")

c1.set("val1")

c2 = contextvars.ContextVar("context_var2")

c2.set("val2")

# 此时得到的是所有 ContextVar 对象和设置的值之间的映射

# 它实现了 collections.abc.Mapping 接口

# 因此我们可以像操作字典一样操作它

context = contextvars.copy_context()

# key 就是对应的 ContextVar 对象,value 就是设置的值

print(context[c1]) # val1

print(context[c2]) # val2

for ctx, value in context.items():

print(ctx.get(), ctx.name, value)

"""

val1 context_var1 val1

val2 context_var2 val2

"""

print(len(context)) # 2

除此之外,context 还有一个 run 方法:

import contextvars

c1 = contextvars.ContextVar("context_var1")

c1.set("val1")

c2 = contextvars.ContextVar("context_var2")

c2.set("val2")

context = contextvars.copy_context()

def change(val1, val2):

c1.set(val1)

c2.set(val2)

print(c1.get(), context[c1])

print(c2.get(), context[c2])

# 在 change 函数内部,重新设置值

# 然后里面打印的也是新设置的值

context.run(change, "VAL1", "VAL2")

"""

VAL1 VAL1

VAL2 VAL2

"""

print(c1.get(), context[c1])

print(c2.get(), context[c2])

"""

val1 VAL1

val2 VAL2

"""

我们看到 run 方法接收一个 callable,如果在里面修改了 ContextVar 实例设置的值,那么对于 ContextVar 而言只会在函数内部生效,一旦出了函数,那么还是原来的值。但是对于 Context 而言,它是会受到影响的,即便出了函数,也是新设置的值,因为它直接把内部的字典给修改了。

以上就是 contextvars 模块的用法,在多个协程之间传递数据是非常方便的,并且也是并发安全的。如果你用过 Go 的话,你应该会发现和 Go 在 1.7 版本引入的 context 模块比较相似,当然 Go 的 context 模块功能要更强大一些,除了可以传递数据之外,对多个 goroutine 的级联管理也提供了非常清蒸的解决方案。

总之对于 contextvars 而言,它传递的数据应该是多个协程之间需要共享的数据,像 cookie, session, token 之类的,比如上游接收了一个 token,然后不断地向下透传。但是不要把本应该作为函数参数的数据,也通过 contextvars 来传递,这样就有点本末倒置了。

(编辑:银川站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章