Thread-local Sessions

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
 
session_factory = sessionmaker(bind=some_engine)
Session = scoped_session(session_factory)
some_session = Session()
some_other_session = Session()
some_session is some_other_session #True

使用了 scoped_session  默认情况下,创建的 session 都是 Thread-Local Scope,创建的 session 对象具体有两点变化:

  1. 使用 Session()创建的 session 对象都是一样的,这可以保证代码在不同的多次调用 session() 依然获得到相同的 session 对象
  2. 使用 Session()创建的 session 对象是  Thread-local, session 在线程与线程之间没有任何联系。

scoped_session 实现

class scoped_session(Generic[_S]):
    def __init__(
        self,
        session_factory: sessionmaker[_S],
        scopefunc: Optional[Callable[[], Any]] = None,
    ):
    	self.session_factory = session_factory
        if scopefunc:
            self.registry = ScopedRegistry(session_factory, scopefunc)
        else:
            self.registry = ThreadLocalRegistry(session_factory)
    	
	@property
    def _proxied(self) -> _S:
        t = self.registry()
        return t
        
    def get(self, ...):
    	return self._proxied.get(...)

每次 get、update 等操作时,会去 registry 中获取一个 session。如果配置了 scopefunc,则使用 ScopedRegistry,否则使用 ThreadLocalRegistry。

ThreadLocalRegistry 是使用 threading.local() 进行存储,这样可以每个线程的 session 都不一致。

class ThreadLocalRegistry(ScopedRegistry[_T]):
    """A :class:`.ScopedRegistry` that uses a ``threading.local()``
    variable for storage.
    """
 
    def __init__(self, createfunc: Callable[[], _T]):
        self.createfunc = createfunc
        self.registry = threading.local()
 
    def __call__(self) -> _T:
        try:
            return self.registry.value  # type: ignore[no-any-return]
        except AttributeError:
            val = self.registry.value = self.createfunc()
            return val

ScopedRegistry 类似,会根据 scopefunc 来判断是否是同一个 scope,若是则返回同一个 session。

class ScopedRegistry(Generic[_T]):
    __slots__ = "createfunc", "scopefunc", "registry"
 
    createfunc: _CreateFuncType[_T]
    scopefunc: _ScopeFuncType
    registry: Any
 
    def __init__(
        self, createfunc: Callable[[], _T], scopefunc: Callable[[], Any]
    ):
        self.createfunc = createfunc
        self.scopefunc = scopefunc
        self.registry = {}
 
    def __call__(self) -> _T:
        key = self.scopefunc()
        try:
            return self.registry[key]  # type: ignore[no-any-return]
        except KeyError:
            return self.registry.setdefault(key, self.createfunc()) 

SQLAlchemy 的 scoped_session_sqlalchemy session生命周期 scope-CSDN博客