Source code for django_basin3d.catalog

"""
`django_basin3d.catalog`
************************

.. currentmodule:: django_basin3d.catalog

:synopsis: The Django BASIN-3D Catalog
:module author: Val Hendrix <vhendrix@lbl.gov>
:module author: Danielle Svehla Christianson <dschristianson@lbl.gov>

.. contents:: Contents
    :local:
    :backlinks: top

"""
import importlib
import json
import logging
from typing import Iterator, List, Optional, Union

from django.conf import settings
from django.db import IntegrityError, OperationalError
from django.db.models import Q

from basin3d.core.catalog import CatalogBase, CatalogException
from basin3d.core.models import ObservedProperty, AttributeMapping
# from basin3d.core.plugin import PluginMount
from basin3d.core.schema.enum import BaseEnum, MappedAttributeEnum, MAPPING_DELIMITER, NO_MAPPING_TEXT, set_mapped_attribute_enum_type

logger = logging.getLogger(__name__)


[docs]class CatalogDjango(CatalogBase): def __init__(self, variable_filename: str = 'basin3d_observed_property_vocabulary.csv'): super().__init__(variable_filename)
[docs] def is_initialized(self) -> bool: """Has the catalog been initialized?""" try: from django_basin3d.models import DataSource datasources = DataSource.objects.count() if isinstance(datasources, int): return datasources > 0 logger.debug('Catalog not initialized') return False except ImportError: return False
def _convert_django_observed_property(self, django_opv) -> Optional[ObservedProperty]: """ Convert django observed property variable to basin3d :param django_opv: :return: """ if django_opv: return ObservedProperty( basin3d_vocab=django_opv.basin3d_vocab, full_name=django_opv.full_name, categories=django_opv.categories.split(","), units=django_opv.units ) return None def _convert_django_attribute_mapping(self, django_am) -> Optional[AttributeMapping]: """ Convert django attribute_mapping :param django_am: :return: """ if django_am: attr_type_list = django_am.attr_type.split(MAPPING_DELIMITER) if isinstance(django_am.basin3d_desc, list): basin3d_desc_list = django_am.basin3d_desc else: # It should always be a list try: basin3d_desc_list = json.loads(django_am.basin3d_desc) except Exception as e: raise e basin3d_desc = [] for attr_type, desc in zip(attr_type_list, basin3d_desc_list): if attr_type == MappedAttributeEnum.OBSERVED_PROPERTY.value: op = ObservedProperty( basin3d_vocab=desc.get('basin3d_vocab'), full_name=desc.get('full_name'), categories=desc.get('categories'), units=desc.get('units') ) basin3d_desc.append(op) elif attr_type in MappedAttributeEnum.values(): attr_enum_class = set_mapped_attribute_enum_type(attr_type) attr_type_enum = getattr(attr_enum_class, desc) basin3d_desc.append(attr_type_enum) else: basin3d_desc.append(desc) return AttributeMapping( attr_type=django_am.attr_type, basin3d_vocab=django_am.basin3d_vocab, basin3d_desc=basin3d_desc, datasource_vocab=django_am.datasource_vocab, datasource_desc=django_am.datasource_desc, datasource=django_am.datasource ) return None def _convert_basin3d_attr_mapping_basin3d_desc(self, basin3d_desc: list) -> list: json_ready_basin3d_desc = [] for desc in basin3d_desc: if isinstance(desc, ObservedProperty): json_ready_basin3d_desc.append(desc.to_dict()) elif isinstance(desc, BaseEnum): json_ready_basin3d_desc.append(desc.value) else: json_ready_basin3d_desc.append(desc) return json_ready_basin3d_desc def _get_observed_property(self, basin3d_vocab) -> Optional[ObservedProperty]: """ Access a single observed property variable :param basin3d_vocab: the observed property name :return: """ from django_basin3d import models as django_models try: opv = django_models.ObservedProperty.objects.get(basin3d_vocab=basin3d_vocab) return self._convert_django_observed_property(opv) except django_models.ObservedProperty.DoesNotExist: return None except Exception as e: if not e.__class__.__name__ == 'DoesNotExist': raise e return None def _get_attribute_mapping(self, datasource_id, attr_type, basin3d_vocab, datasource_vocab, **kwargs) -> Optional[AttributeMapping]: """ :param datasource_id: :param attr_type: :param basin3d_vocab: :param datasource_vocab: :param kwargs: :return: """ if not self.is_initialized(): raise CatalogException("Datasource catalog has not been initialized") from django_basin3d import models as django_models try: opv = django_models.AttributeMapping.objects.get( datasource__name=datasource_id, attr_type=attr_type, basin3d_vocab=basin3d_vocab, datasource_vocab=datasource_vocab) return self._convert_django_attribute_mapping(opv) except django_models.ObservedProperty.DoesNotExist: return None except Exception as e: if not e.__class__.__name__ == 'DoesNotExist': raise e return None
[docs] def find_observed_property(self, basin3d_vocab) -> Optional[ObservedProperty]: """ Return the :class:`basin3d.models.ObservedProperty` object for the BASIN-3D vocabulary specified. :param basin3d_vocab: BASIN-3D vocabulary :return: a :class:`basin3d.models.ObservedProperty` object """ if not self.is_initialized(): raise CatalogException("Datasource catalog has not been initialized") return self._get_observed_property(basin3d_vocab)
[docs] def find_observed_properties(self, basin3d_vocab: Optional[List[str]] = None) -> Iterator[Optional[ObservedProperty]]: """ Report the observed_properties available based on the BASIN-3D vocabularies specified. If no BASIN-3D vocabularies are specified, then return all observed properties available. :param basin3d_vocab: list of the BASIN-3D observed properties :return: generator that yields :class:`basin3d.models.ObservedProperty` objects """ if not self.is_initialized(): raise CatalogException("Datasource catalog has not been initialized") from django_basin3d import models as django_models if not basin3d_vocab: for opv in django_models.ObservedProperty.objects.all(): yield self._convert_django_observed_property(opv) else: for b3d_vocab in basin3d_vocab: opv = self._get_observed_property(b3d_vocab) if opv is not None: yield opv
[docs] def find_datasource_attribute_mapping(self, datasource_id: str, attr_type: str, datasource_vocab: str) -> Optional[AttributeMapping]: if not self.is_initialized(): raise CatalogException("Datasource catalog has not been initialized") # Consider checking args for a value from django_basin3d import models as django_models # Setup the search parameters query_params = { 'datasource__name': datasource_id, 'attr_type__contains': attr_type, 'datasource_vocab': datasource_vocab } msg = (f'No mapping was found for attr: "{attr_type}" and for datasource vocab: "{datasource_vocab}" ' f'in datasource: "{datasource_id}".') try: ds = django_models.DataSource.objects.get(name=datasource_id) except django_models.DataSource.DoesNotExist: msg = f'No Data Source "{datasource_id}" found.' ds = django_models.DataSource(name=None, location=None, id_prefix=None, plugin_module=None, plugin_class=None) except Exception as e: if e.__class__.__name__ not in ['DoesNotExist']: raise e # set up empty AttributeMapping in case where mapping is not found or another error occurs attr_mapping = AttributeMapping(attr_type=attr_type, basin3d_vocab=NO_MAPPING_TEXT, basin3d_desc=[], datasource_vocab=datasource_vocab, datasource_desc=msg, datasource=ds) try: attr_mapping = django_models.AttributeMapping.objects.get(**query_params) except django_models.AttributeMapping.DoesNotExist: return attr_mapping except django_models.AttributeMapping.MultipleObjectsReturned: msg = (f'Multiple mappings found for attr: "{attr_type}" and datasource vocab: "{datasource_vocab}" ' f'in datasource: "{datasource_id}". This should never happen.') attr_mapping.datasource_desc = msg return attr_mapping except Exception as e: if e.__class__.__name__ not in ['DoesNotExist', 'MultipleObjectsReturned']: raise e return self._convert_django_attribute_mapping(attr_mapping)
[docs] def find_attribute_mappings(self, datasource_id: str = None, attr_type: str = None, attr_vocab: Union[str, List] = None, from_basin3d: bool = False) -> Iterator[AttributeMapping]: if not self.is_initialized(): raise CatalogException("Datasource catalog has not been initialized") def construct_attr_vocab_query(attr_vocab_list, is_from_basin3d): query = Q(_connector=Q.OR) for a_vocab in attr_vocab_list: if not is_from_basin3d: query.add(('datasource_vocab__exact', a_vocab), conn_type=Q.OR) elif MAPPING_DELIMITER in a_vocab: query.add(('basin3d_vocab__regex', a_vocab), conn_type=Q.OR) else: query.add(('basin3d_vocab__exact', a_vocab), conn_type=Q.OR) query.add(('basin3d_vocab__regex', f'.*:{a_vocab}'), conn_type=Q.OR) query.add(('basin3d_vocab__regex', f'{a_vocab}:.*'), conn_type=Q.OR) query.add(('basin3d_vocab__regex', f'.*:{a_vocab}:.*'), conn_type=Q.OR) return query from django_basin3d import models as django_models query_params = [] if datasource_id is not None: try: django_models.DataSource.objects.get(name=datasource_id) except django_models.DataSource.DoesNotExist: logger.warning(f'No datasource for datasource_id {datasource_id} was found. Check plugin initialization') yield None except Exception as e: if e.__class__.__name__ not in ['DoesNotExist']: raise CatalogException(e) query_params.append(Q(datasource__name=datasource_id)) if attr_type is not None: if attr_type not in MappedAttributeEnum.values(): logger.warning(f'Attribute type {attr_type} is invalid') yield None query_params.append(Q(attr_type__contains=attr_type)) if attr_vocab: if isinstance(attr_vocab, str): attr_vocab = [attr_vocab] elif not isinstance(attr_vocab, List): raise CatalogException("attr_vocab must be a str or list") attr_vocab_query = construct_attr_vocab_query(attr_vocab, from_basin3d) query_params.append(attr_vocab_query) try: attr_mappings = django_models.AttributeMapping.objects.filter(*query_params) except django_models.AttributeMapping.DoesNotExist: vocab_source_type = 'datasource' if from_basin3d: vocab_source_type = 'BASIN-3D' logger.info(f'No mapping was found for attr: "{attr_type}" and for {vocab_source_type} vocab: "{attr_vocab}" ' f'in datasource: "{datasource_id}".') pass except Exception as e: if e.__class__.__name__ not in ['DoesNotExist']: raise e for attr_mapping in attr_mappings: yield self._convert_django_attribute_mapping(attr_mapping)
def _init_catalog(self, **kwargs): """ Initialize the catalog database :return: """ if not self.is_initialized(): from django_basin3d import models as django_models # Now create the Datasource objects in the data base from basin3d.core.plugin import PluginMount for name, plugin in PluginMount.plugins.items(): module_name = plugin.__module__ class_name = plugin.__name__ logger.info("Loading Plugin = {}.{}".format(module_name, class_name)) try: datasource = django_models.DataSource.objects.get(name=plugin.get_meta().id) except django_models.DataSource.DoesNotExist: logger.info("Registering NEW Data Source Plugin '{}.{}'".format(module_name, class_name)) datasource = django_models.DataSource() if hasattr(plugin.get_meta(), "connection_class"): datasource.credentials = plugin.get_meta().connection_class.get_credentials_format() # Update the datasource datasource.name = plugin.get_meta().id datasource.location = plugin.get_meta().location datasource.id_prefix = plugin.get_meta().id_prefix datasource.plugin_module = module_name datasource.plugin_class = class_name datasource.save() logger.info("Updated Data Source '{}'".format(plugin.get_meta().id)) def _insert(self, record): """ :param record: """ from django_basin3d import models as django_models if self.is_initialized(): if isinstance(record, ObservedProperty): try: p = django_models.ObservedProperty() p.basin3d_vocab = record.basin3d_vocab p.full_name = record.full_name p.categories = ",".join(record.categories) # type: ignore p.units = record.units p.save() logger.info(f'inserted {record.basin3d_vocab}') except IntegrityError as ie: # This object has already been loaded logger.debug(f'Integrity error for OP: {ie}') pass except Exception as e: logger.warning("Error Registering ObservedProperty '{}': {}".format(record.basin3d_vocab, str(e))) elif isinstance(record, AttributeMapping): try: ds_name = django_models.DataSource.objects.get(name=record.datasource.id) record_basin3d_desc = self._convert_basin3d_attr_mapping_basin3d_desc(record.basin3d_desc) p = django_models.AttributeMapping() p.datasource = ds_name p.attr_type = record.attr_type p.basin3d_vocab = record.basin3d_vocab p.basin3d_desc = record_basin3d_desc p.datasource_vocab = record.datasource_vocab p.datasource_desc = record.datasource_desc p.save() logger.info(f'inserted {record.datasource_vocab} mapping attribute') except IntegrityError: # This object has already been loaded logger.info(f'Warning: skipping AttributeMapping "{record.basin3d_vocab}". Already loaded.') pass except Exception as e: logger.info(f'Error Registering AttributeMapping "{record.basin3d_vocab}": {str(e)}') else: raise CatalogException('Could not insert record. Catalog not initialize')
[docs]def load_data_sources(sender, **kwargs): """ Load the Broker data sources from the registered plugins. :param sender: :param kwargs: :return: """ # Load all the plugins found in apps from basin3d.core.plugin import PluginMount for django_app in settings.INSTALLED_APPS: try: importlib.import_module(f'{django_app}.plugins') logger.info(f'Loaded {django_app} plugins') plugins = kwargs.get('plugins') if not plugins: plugins = PluginMount.plugins.values() plugin_count = len(plugins) logger.info(f'Attempting to load {plugin_count} plugins for {django_app}.') catalog = CatalogDjango() catalog.initialize([v(catalog) for v in plugins]) except ImportError as e: logger.warning(f'Warning: Potential error during attempt to import plugins for installed app: {e}. ' f'Please double check.') pass except OperationalError as oe: logger.error(f'Operational Error "{oe}" - Most likely happens on a reverse migration.')
[docs]def reload_data_sources(sender, **kwargs): """ :param sender: :param kwargs: :return: """ from django_basin3d import models as django_models try: django_models.AttributeMapping.objects.all().delete() logger.info('Attribute Mappings entries deleted.') django_models.ObservedProperty.objects.all().delete() logger.info('Observed Properties entries deleted.') django_models.DataSource.objects.all().delete() logger.info('Data Source entries deleted.') load_data_sources(sender, **kwargs) attribute_mapping_count = django_models.AttributeMapping.objects.count() observed_property_count = django_models.ObservedProperty.objects.count() datasource_count = django_models.DataSource.objects.count() logger.info(f'Data sources reloaded: data sources = {datasource_count}, ' f'observed properties = {observed_property_count}, ' f'attribute mappings = {attribute_mapping_count}') except CatalogException as e: logger.error(f'Error reloading data sources: {e}')