Warm tip: This article is reproduced from serverfault.com, please click

python-具有多个数据库连接的lambda上的SqlAlchemy和pyMysql连接池

(python - SqlAlchemy & pyMysql connection pooling on a lambda with multiple DB connections)

发布于 2020-12-01 18:01:01

所以问题是我有多个数据库,我想在SqlAlchemy中使用相同的数据库池。它驻留在Lambda上,并在启动Lambda时创建池。我希望后续的数据库连接使用现有池。

初始池连接bpConnect以及对该连接的任何后续查询都可以正常工作。

什么不DOES工作的companyConnect连接。我收到以下错误:

sqlalchemy.exc.StatementError: (builtins.AttributeError) 'XRaySession' object has no attribute 'cursor'

我有这些用于我的联系:

# Pooling
import sqlalchemy.pool as pool

#################### Engines ###################################################
def bpGetConnection():
    engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{os.environ['database']}"
    engine = create_engine(engine_endpoint, echo_pool=True)
    session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
    db = session()
    return db

bpPool = pool.StaticPool(bpGetConnection)

def companyGetConnection(database):
    engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{database}"
    compEngine = create_engine(engine_endpoint, pool=bpPool)
    session = XRaySessionMaker(bind=compEngine, autoflush=True, autocommit=False)
    db = Session()
    return db

#################### POOLING #############################################

def bpConnect():
    conn = bpPool.connect()
    return conn

def companyConnect(database):
    conn = companyGetConnection(database)
    return conn

#################################################################

在此示例中将它们称为:

from connections import companyConnect, bpConnect
from models import Company, Customers

def getCustomers(companyID):
    db = bpConnect()
    myQuery = db.query(Company).filter(Company.id == companyID).one()

    compDB = companyConnect(myQuery.database)
    customers = compDB.query(Customers).all()
    return customers
Questioner
griff4594
Viewed
11
griff4594 2020-12-05 03:44:07

我想出了如何在lambda上使用动态池来做到这一点:

class DBRegistry(object):
    _db = {}

    def get(self, url, **kwargs):
        if url not in self._db:
            engine = create_engine(url, **kwargs)
            Session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
            session = scoped_session(Session)
            self._db[url] = session
        return self._db[url]

compDB = DBRegistry()

def bpGetConnection():
    engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{os.environ['database']}?charset=utf8"
    engine = create_engine(engine_endpoint)
    session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
    db = session()
    return db

bpPool = pool.QueuePool(bpGetConnection, pool_size=500, timeout=11)

def bpConnect():
    conn = bpPool.connect()
    return conn

def companyConnect(database):
    engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{database}?charset=utf8"
    conn = compDB.get(engine_endpoint, poolclass=QueuePool)
    return conn

因此,基本上,它将使用一个池用于主数据库上所需的恒定连接,而将另一个池用于动态更改所需的数据库。当需要连接到这些公司数据库之一时,它将检查池注册表中是否已经存在该池。如果该池不存在,它将创建一个并注册。