Source code for db_plugins.db.sql.connection

from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base

from .query import SQLQuery
from ..generic import DatabaseConnection, DatabaseCreator
from .models import Base

MAP_KEYS = {"HOST", "USER", "PASSWORD", "PORT", "DB_NAME", "ENGINE"}


def satisfy_keys(config_keys):
    return MAP_KEYS.difference(config_keys)


def settings_map(config):
    return f"{config['ENGINE']}://{config['USER']}:{config['PASSWORD']}@{config['HOST']}:{config['PORT']}/{config['DB_NAME']}"


class SQLDatabaseCreator(DatabaseCreator):
    def create_database(self) -> DatabaseConnection:
        return SQLConnection()


[docs]class SQLConnection(DatabaseConnection): def __init__(self, config=None, engine=None, Base=None, Session=None, session=None): self.config = config self.engine = engine self.Base = Base self.Session = Session self.session = session self.use_scoped = False
[docs] def connect( self, config, base=None, session_options=None, create_session=True, use_scoped=False, scope_func=None, ): """ Establishes connection to a database and initializes a session. Parameters ---------- config : dict Database configuration. For example: .. code-block:: python "SQL": { "ENGINE": "postgresql", "HOST": "host", "USER": "username", "PASSWORD": "pwd", "PORT": 5432, # postgresql tipically runs on port 5432. Notice that we use an int here. "DB_NAME": "database", } base : sqlalchemy.ext.declarative.declarative_base() Base class used by sqlalchemy to create tables session_options : dict Options passed to sessionmaker create_session : Boolean Whether to instantiate a session or not. The default value is True since this is the previous behavior. Proper usage should be to pass False and create / close the session on demand inside the application. You should call this method first and then call SQLConnection.create_session(use_scoped) .. code-block:: python # scoped session example conn = SQLConnection() conn.connect(config, create_session=False) conn.create_session(use_scoped=True) # use session conn.query() conn.session.remove() use_scoped : Boolean Whether to use scoped session or not. Use a scoped session if you are using it in a web service like an API. Read more about scoped sessions at: `<https://docs.sqlalchemy.org/en/13/orm/contextual.html?highlight=scoped>`_ scope_func : function A function which serves as the scope for the session. The session will live only in the scope of that function. """ self.config = config if len(satisfy_keys(set(config.keys()))) == 0: self.config["SQLALCHEMY_DATABASE_URL"] = settings_map(self.config) self.engine = self.engine or create_engine( self.config["SQLALCHEMY_DATABASE_URL"] ) self.Base = base or Base session_options = session_options or {} session_options["query_cls"] = SQLQuery if self.Session is None: self.Session = sessionmaker(bind=self.engine, **session_options) if create_session: self.create_session(use_scoped, scope_func)
[docs] def create_session(self, use_scoped, scope_func=None): """ Creates a SQLAlchemy Session object to interact with the database. Parameters ---------- use_scoped : Boolean Whether to use scoped session or not. Use a scoped session if you are using it in a web service like an API. Read more about scoped sessions at: `<https://docs.sqlalchemy.org/en/13/orm/contextual.html?highlight=scoped>`_ scope_func : function A function which serves as the scope for the session. The session will live only in the scope of that function. """ if not use_scoped: self._create_unscoped_session() else: self._create_scoped_session(scope_func)
def _create_scoped_session(self, scope_func): self.session = scoped_session(self.Session, scopefunc=scope_func) self.Base.query = self.session.query_property(query_cls=SQLQuery) self.use_scoped = True def _create_unscoped_session(self): self.session = self.Session() def end_session(self): if self.use_scoped: self.session.remove() else: self.session.close() self.session = None def create_db(self): self.Base.metadata.create_all(bind=self.engine) def drop_db(self): self.Base.metadata.drop_all(bind=self.engine)
[docs] def query(self, *args): """ Creates a BaseQuery object that allows you to query the database using the SQLAlchemy API, or using the BaseQuery methods like ``get_or_create`` Parameters ---------- args : tuple Args you can pass to SQLALchemy Query class, for example a model. Examples -------- .. code-block:: python # Using SQLAlchemy API db_conn.query(Probability).all() # Using db-plugins db_conn.query(Probability).find(filter_by=**filters) db_conn.query().get_or_create(model=Object, filter_by=**filters) """ return self.session.query(*args)