So the issue is I have multiple databases that I want to use the same Database Pool in SqlAlchemy. This resides on a Lambda and the pool is created upon initiation of the Lambda. I want the subsequent DB connections to use the existing pool.
What works just fine is the initial pool connection bpConnect
and any subsequent queries to that connection.
What DOESN'T work is the companyConnect
connection. I get the following error:
sqlalchemy.exc.StatementError: (builtins.AttributeError) 'XRaySession' object has no attribute 'cursor'
I have these for my connections:
# Pooling
import sqlalchemy.pool as pool
#################### Engines ###################################################
def bpGetConnection():
engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{os.environ['database']}"
engine = create_engine(engine_endpoint, echo_pool=True)
session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
db = session()
return db
bpPool = pool.StaticPool(bpGetConnection)
def companyGetConnection(database):
engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{database}"
compEngine = create_engine(engine_endpoint, pool=bpPool)
session = XRaySessionMaker(bind=compEngine, autoflush=True, autocommit=False)
db = Session()
return db
#################### POOLING #############################################
def bpConnect():
conn = bpPool.connect()
return conn
def companyConnect(database):
conn = companyGetConnection(database)
return conn
#################################################################
They are called in this example:
from connections import companyConnect, bpConnect
from models import Company, Customers
def getCustomers(companyID):
db = bpConnect()
myQuery = db.query(Company).filter(Company.id == companyID).one()
compDB = companyConnect(myQuery.database)
customers = compDB.query(Customers).all()
return customers
I figured out how to do it with dynamic pools on a lambda:
class DBRegistry(object):
_db = {}
def get(self, url, **kwargs):
if url not in self._db:
engine = create_engine(url, **kwargs)
Session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
session = scoped_session(Session)
self._db[url] = session
return self._db[url]
compDB = DBRegistry()
def bpGetConnection():
engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{os.environ['database']}?charset=utf8"
engine = create_engine(engine_endpoint)
session = XRaySessionMaker(bind=engine, autoflush=True, autocommit=False)
db = session()
return db
bpPool = pool.QueuePool(bpGetConnection, pool_size=500, timeout=11)
def bpConnect():
conn = bpPool.connect()
return conn
def companyConnect(database):
engine_endpoint = f"mysql+pymysql://{os.environ['DB_USERNAME']}:{os.environ['DB_PASSWORD']}@{os.environ['DB_HOST']}:{str(os.environ['DB_PORT'])}/{database}?charset=utf8"
conn = compDB.get(engine_endpoint, poolclass=QueuePool)
return conn
So basically it will use one pool for the constant connection needed on the main database and another pool which it will dynamically change the database it needs. When a connection to one of those company databases is needed, it will check if that pool already exists in the registry of pools. If the pool does not exist it will create one and register it.