Source code for Products.ZMySQLDA.DA

##############################################################################
#
# Copyright (c) 2001 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
""" The ZODB-based MySQL Database Connection object
"""
from _thread import allocate_lock

from AccessControl.class_init import InitializeClass
from AccessControl.Permissions import change_database_methods
from AccessControl.Permissions import use_database_methods
from AccessControl.Permissions import view_management_screens
from AccessControl.SecurityInfo import ClassSecurityInfo
from AccessControl.SecurityInfo import ModuleSecurityInfo
from App.special_dtml import HTMLFile
from Persistence import Persistent
from Shared.DC.ZRDB.Connection import Connection as ConnectionBase

from .db import DB
from .db import DBPool
from .permissions import add_zmysql_database_connections
from .utils import TableBrowser
from .utils import table_icons


# Connection Pool for connections to MySQL.
# Maps one mysql client connection to one DA object instance.
database_connection_pool_lock = allocate_lock()
database_connection_pool = {}

# Combining connection pool with the DB pool gets you
# one DA/connection per connection with 1 DBPool with 1 DB/thread
# pool_id -> DA -> DBPool -> thread id -> DB
# dc_pool[pool_id] == DBPool_instance
# DBPool_instance[thread id] == DB instance


[docs] class Connection(ConnectionBase): """ Zope database adapter for MySQL/MariaDB """ meta_type = 'Z MySQL Database Connection' security = ClassSecurityInfo() zmi_icon = 'fas fa-database' auto_create_db = True use_unicode = False charset = None timeout = None _v_connected = '' _isAnSQLConnection = 1 info = None security.declareProtected(view_management_screens, # NOQA: D001 'manage_browse') manage_browse = HTMLFile('www/browse', globals()) security.declareProtected(change_database_methods, # NOQA: D001 'manage_properties') manage_properties = HTMLFile('www/connectionEdit', globals()) manage_properties._setName('manage_main') manage_main = manage_properties manage_options = (ConnectionBase.manage_options[1:] + ({'label': 'Browse', 'action': 'manage_browse'},))
[docs] def __init__(self, id, title, connection_string, check, use_unicode=None, charset=None, auto_create_db=None, timeout=None): """ Instance setup. Optionally opens the connection. :string: id -- The id of the ZMySQLDA Connection :string: title -- The title of the ZMySQLDA Connection :string: connection_string -- The connection string describes how to connect to the relational database. See the documentation for details. :bool: check -- Check if the database connection can be opened after instantiation. :bool: use_unicode -- If set to ``True``, values from columns of type ``CHAR``, ``VARCHAR`` and ``TEXT`` are returned as unicode strings by the database backend. Combined with the hardcoded ``utf8`` character set of this package the setting allows you to control the character set of database return values better. Default: False. :string: charset -- The character set for the connection. MySQL/MariaDB will encode query results to this character set. Only utf8 will work. Default: utf8 :bool: auto_create_db -- If the database given in ``connection_string`` does not exist, create it automatically. Default: False. :int: timeout -- The connect timeout for the connection in seconds. Default: None """ self.use_unicode = bool(use_unicode) self.charset = charset self.auto_create_db = bool(auto_create_db) self.timeout = int(timeout) if timeout else None return super().__init__(id, title, connection_string, check)
def __setstate__(self, state): """ Skip super's __setstate__ as it connects which we don't want due to pool_key depending on acquisition. """ Persistent.__setstate__(self, state) security.declareProtected(use_database_methods, 'factory') # NOQA: D001
[docs] def factory(self): """ Base API. Returns factory method for DB connections. """ return DB
def _pool_key(self): """ Return key used for DA pool. """ return self.getPhysicalPath() def _getConnection(self): """ Helper method to retrieve an existing or create a new connection """ try: return self._v_database_connection except AttributeError: self.connect(self.connection_string) return self._v_database_connection security.declareProtected(use_database_methods, 'connect') # NOQA: D001
[docs] def connect(self, conn_string): """ Base API. Opens connection to mysql. Raises if problems. :string: conn_string -- The database connection string """ pool_key = self._pool_key() conn = database_connection_pool.get(pool_key) if conn is not None and conn.connection == conn_string: self._v_database_connection = conn self._v_connected = conn.connected_timestamp else: if conn is not None: conn.closeConnection() conn_pool = DBPool(self.factory(), create_db=self.auto_create_db, use_unicode=self.use_unicode, charset=self.charset, timeout=self.timeout) database_connection_pool_lock.acquire() try: conn = conn_pool(conn_string) database_connection_pool[pool_key] = conn finally: database_connection_pool_lock.release() self._v_database_connection = conn # If date is used as such, it can be wrong because an # existing connection may be reused. But this is suposedly # only used as a marker to know if connection was successfull. self._v_connected = conn.connected_timestamp return self # ??? why doesn't this return the connection ???
security.declareProtected(use_database_methods, # NOQA: D001 'sql_quote__')
[docs] def sql_quote__(self, sql_str, escapes={}): """ Base API. Used to massage SQL strings for use in queries. :string: sql_str -- The raw SQL string to transform. :dict: escapes -- Additional escape transformations. Default: empty ``dict``. """ connection = self._getConnection() if self.use_unicode and isinstance(sql_str, str): # Confusing naming: unicode_literal does not return an unencoded # ("unicode") string, but an encoded bytes string. encoded = connection.unicode_literal(sql_str) # If the SQL string was unencoded to begin with we want to make # sure to return the same thing, so decode it here. charset = self.charset or 'latin1' if charset.startswith('utf8'): charset = 'utf-8' return encoded.decode(charset) else: return connection.string_literal(sql_str)
security.declareProtected(change_database_methods, # NOQA: D001 'manage_edit')
[docs] def manage_edit(self, title, connection_string, check=None, use_unicode=None, charset=None, auto_create_db=None, timeout=None, REQUEST=None): """ Edit the connection attributes through the Zope ZMI. :string: title -- The title of the ZMySQLDA Connection :string: connection_string -- The connection string describes how to connect to the relational database. See the documentation for details. :bool: check -- Check if the database connection can be opened after instantiation. Default: False. :bool: use_unicode -- Use unicode internally. Default: False. :string: charset -- The character set for the connection. MySQL/MariaDB will encode query results to this character set. Only utf8 will work. Default: utf8 :bool: auto_create_db -- If the database given in ``connection_string`` does not exist, create it automatically. Default: False. :int: timeout -- The connect timeout for the connection in seconds. Default: None :request: REQUEST -- A Zope REQUEST object """ self.use_unicode = bool(use_unicode) self.charset = charset self.auto_create_db = bool(auto_create_db) self.timeout = int(timeout) if timeout else None try: result = super().manage_edit(title, connection_string, check=check) msg = 'Changes applied.' except Exception as exc: msg = 'ERROR: %s' % str(exc) if REQUEST is None: raise if REQUEST is None: return result else: url = '%s/manage_properties?manage_tabs_message=%s' REQUEST.RESPONSE.redirect(url % (self.absolute_url(), msg))
security.declareProtected(view_management_screens, # NOQA: D001 'tpValues')
[docs] def tpValues(self): """ Support the DTML ``tree`` tag Used in the Zope ZMI ``Browse`` tab """ t_list = [] connection = self._getConnection() for t_info in connection.tables(rdb=0): try: t_browser = TableBrowser() t_browser.__name__ = t_info['table_name'] t_browser._d = t_info t_browser._c = connection t_browser.icon = table_icons.get(t_info['table_type'], 'text') t_list.append(t_browser) except Exception: pass return t_list
InitializeClass(Connection) mod_security = ModuleSecurityInfo('Products.ZMySQLDA.DA') mod_security.declareProtected(add_zmysql_database_connections, # NOQA 'manage_addZMySQLConnectionForm') manage_addZMySQLConnectionForm = HTMLFile('www/connectionAdd', globals()) mod_security.declareProtected(add_zmysql_database_connections, # NOQA 'manage_addZMySQLConnection')
[docs] def manage_addZMySQLConnection(self, id, title, connection_string, check=None, use_unicode=None, auto_create_db=None, charset=None, timeout=None, REQUEST=None): """Factory function to add a connection object from the Zope ZMI. :string: id -- The id of the ZMySQLDA Connection :string: title -- The title of the ZMySQLDA Connection :string: connection_string -- The connection string describes how to connect to the relational database. See the documentation for details. :bool: check -- Check if the database connection can be opened after instantiation. Default: False. :bool: use_unicode -- If set to ``True``, values from columns of type ``CHAR``, ``VARCHAR`` and ``TEXT`` are returned as unicode strings by the database backend. Combined with the hardcoded ``utf8`` character set of this package the setting allows you to control the character set of database return values better. Default: False. :string: charset -- The character set for the connection. MySQL/MariaDB will encode query results to this character set. Only utf8 will work. Default: utf8 :bool: auto_create_db -- If the database given in ``connection_string`` does not exist, create it automatically. Default: False. :int: timeout -- The connect timeout for the connection in seconds. Default: None :object: REQUEST -- The currently active Zope request object. Default: None. """ self._setObject(id, Connection(id, title, connection_string, check, use_unicode=use_unicode, charset=charset, auto_create_db=auto_create_db, timeout=timeout)) if REQUEST is not None: return self.manage_main(self, REQUEST)
mod_security.apply(globals())