Warm tip: This article is reproduced from serverfault.com, please click

SqlAlchemy & pyMysql connection pooling on a lambda with multiple DB connections

发布于 2020-12-01 18:01:01

So the issue is I have multiple databases that I want to use the same Database Pool in SqlAlchemy. This resides on a Lambda and the pool is created upon initiation of the Lambda. I want the subsequent DB connections to use the existing pool.

What works just fine is the initial pool connection bpConnect and any subsequent queries to that connection.

What DOESN'T work is the companyConnect connection. I get the following error:

sqlalchemy.exc.StatementError: (builtins.AttributeError) 'XRaySession' object has no attribute 'cursor'

I have these for my connections:

# Pooling
import sqlalchemy.pool as pool

#################### Engines ###################################################
def bpGetConnection():
    engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{os.environ['database']}"
    engine = create_engine(engine_endpoint, echo_pool=True)
    session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
    db = session()
    return db

bpPool = pool.StaticPool(bpGetConnection)

def companyGetConnection(database):
    engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{database}"
    compEngine = create_engine(engine_endpoint, pool=bpPool)
    session = XRaySessionMaker(bind=compEngine, autoflush=True, autocommit=False)
    db = Session()
    return db

#################### POOLING #############################################

def bpConnect():
    conn = bpPool.connect()
    return conn

def companyConnect(database):
    conn = companyGetConnection(database)
    return conn

#################################################################

They are called in this example:

from connections import companyConnect, bpConnect
from models import Company, Customers

def getCustomers(companyID):
    db = bpConnect()
    myQuery = db.query(Company).filter(Company.id == companyID).one()

    compDB = companyConnect(myQuery.database)
    customers = compDB.query(Customers).all()
    return customers
Questioner
griff4594
Viewed
0
griff4594 2020-12-05 03:44:07

I figured out how to do it with dynamic pools on a lambda:

class DBRegistry(object):
    _db = {}

    def get(self, url, **kwargs):
        if url not in self._db:
            engine = create_engine(url, **kwargs)
            Session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
            session = scoped_session(Session)
            self._db[url] = session
        return self._db[url]

compDB = DBRegistry()

def bpGetConnection():
    engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{os.environ['database']}?charset=utf8"
    engine = create_engine(engine_endpoint)
    session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
    db = session()
    return db

bpPool = pool.QueuePool(bpGetConnection, pool_size=500, timeout=11)

def bpConnect():
    conn = bpPool.connect()
    return conn

def companyConnect(database):
    engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{database}?charset=utf8"
    conn = compDB.get(engine_endpoint, poolclass=QueuePool)
    return conn

So basically it will use one pool for the constant connection needed on the main database and another pool which it will dynamically change the database it needs. When a connection to one of those company databases is needed, it will check if that pool already exists in the registry of pools. If the pool does not exist it will create one and register it.