"""
.. currentmodule:: basin3d.synthesis
:synopsis: BASIN-3D Synthesis API
:module author: Val Hendrix <vhendrix@lbl.gov>, Danielle Svehla Christianson <dschristianson@lbl.gov>. Catherine Wong <catwong@lbl.gov>
Functions
----------------
* :func:`register` - Register the specified plugins or implicitly register loaded plugins
Classes
--------
* :class:`DataSynthesizer` - Synthesis API
synthesis.DataSynthesizer Functions
-----------------------------------
* :func:`DataSynthesizer.attribute_mappings`- Search for attribute_mappings which describe how the datasource vocabularies are mapped to BASIN-3D vocabularies, including observed properties, statistics, result_quality, etc.
* :func:`DataSynthesizer.measurement_timeseries_tvp_observations`- Search for Measurement Timeseries TVP Observations from specified Monitoring features and observed property variables
* :func:`DataSynthesizer.monitoring_features`- Search for all monitoring features, features by parent monitoring features, features by monitoring feature identifiers, or look for a single monitoring feature by id.
* :func:`DataSynthesizer.observed_properties`- Search for observed properties
----------------------------------
"""
from importlib import import_module
from typing import List, Union, cast
from basin3d.core.catalog import CatalogSqlAlchemy
from basin3d.core.models import DataSource
from basin3d.core.plugin import PluginMount
from basin3d.core.schema.query import QueryMeasurementTimeseriesTVP, QueryMonitoringFeature, SynthesisResponse
from basin3d.core.synthesis import DataSourceModelIterator, MeasurementTimeseriesTVPObservationAccess, \
MonitoringFeatureAccess, logger
[docs]
class SynthesisException(Exception):
"""Special Exception for Synthesis module"""
pass
[docs]
def register(plugins: List[str] = None):
"""
Register the specified plugins or implicitly register loaded plugins
>>> from basin3d import synthesis
>>> synthesizer = synthesis.register(['basin3d.plugins.usgs.USGSDataSourcePlugin'])
>>> synthesizer.datasources
[DataSource(id='USGS', name='USGS', id_prefix='USGS', location='https://waterservices.usgs.gov/nwis/', credentials={})]
:param plugins: [Optional] plugins to registered
:return: DataSynthesizer(plugin_dict, catalog)
"""
if not plugins:
# Implicit registration of loaded plugins
plugins = list(PluginMount.plugins.values())
if not plugins:
raise SynthesisException("There are no plugins to register")
plugin_dict = {}
catalog = CatalogSqlAlchemy()
for plugin in plugins:
if isinstance(plugin, str):
# If this is a string convert to module and class then load
class_name_list = plugin.split(".")
module_name = plugin.replace(".{}".format(class_name_list[-1]), "")
module = import_module(module_name)
plugin_class = getattr(module, class_name_list[-1])
else:
# This is already a class
plugin_class = plugin
# Instantiate the plugin with the new catalog
plugin = plugin_class(catalog)
plugin_dict[plugin_class.get_meta().id_prefix] = plugin
logger.info("Loading Plugin = {}".format(plugin_class.__name__))
# Instantiate a synthesizer.
return DataSynthesizer(plugin_dict, catalog)
[docs]
class DataSynthesizer:
"""
Synthesis API
"""
def __init__(self, plugins: dict, catalog: CatalogSqlAlchemy):
self._plugins = plugins
self._catalog = catalog
self._datasources = {}
for p in self._plugins.values():
datasource = p.get_datasource()
self._datasources[datasource.id] = datasource
self._catalog.initialize(list(self._plugins.values()))
self._monitoring_feature_access = MonitoringFeatureAccess(plugins, self._catalog)
self._measurement_timeseries_tvp_observation_access = \
MeasurementTimeseriesTVPObservationAccess(plugins, self._catalog)
@property
def datasources(self) -> List[DataSource]:
"""
The Datasources loaded in this synthesizer
:return:
"""
return list(self._datasources.values())
[docs]
def observed_properties(self):
"""
>>> from basin3d.plugins import usgs
>>> from basin3d import synthesis
>>> synthesizer = synthesis.register()
>>> response = synthesizer.observed_properties()
>>> for opv in response:
... print(f'{opv.basin3d_vocab} -- {opv.full_name} -- {opv.units}')
ACT -- Acetate (CH3COO) -- mM
Br -- Bromide (Br) -- mM
Cl -- Chloride (Cl) -- mM
DIN -- Dissolved Inorganic Nitrogen (Nitrate + Nitrite) -- mg/L
DTN -- Dissolved Total Nitrogen (DTN) -- mg/L
F -- Fluoride (F) -- mM
...
BASIN-3D observed properties. An observed property defines what is being measured.
Data source observed property vocabularies are mapped and thus synthesized to the BASIN-3D observed property vocabulary.
:return: an iterator of :class:`basin3d.core.models.ObservedProperty` objects
"""
return self._catalog.find_observed_properties()
[docs]
def attribute_mappings(self, datasource_id=None, attr_type=None, attr_vocab=None, from_basin3d=False):
"""
>>> from basin3d.plugins import usgs
>>> from basin3d import synthesis
>>> synthesizer = synthesis.register()
>>> response = synthesizer.attribute_mappings() # list all attribute mappings registered
>>> for attr_mapping in response:
... print(f'{attr_mapping.attr_type} | {attr_mapping.basin3d_vocab} -- {attr_mapping.datasource_vocab}')
OBSERVED_PROPERTY:SAMPLING_MEDIUM | PH:WATER -- 00400
OBSERVED_PROPERTY:SAMPLING_MEDIUM | RDC:WATER -- 00060
OBSERVED_PROPERTY:SAMPLING_MEDIUM | WLE:WATER -- 63161
OBSERVED_PROPERTY:SAMPLING_MEDIUM | WT:WATER -- 00010
OBSERVED_PROPERTY:SAMPLING_MEDIUM | DO:WATER -- 00300
...
>>> response = synthesizer.attribute_mappings(datasource_id='USGS', attr_type='STATISTIC')
>>> for attr_mapping in response:
... print(f'{attr_mapping.attr_type} | {attr_mapping.basin3d_vocab} -- {attr_mapping.datasource_vocab}')
STATISTIC | MAX -- 00001
STATISTIC | MIN -- 00002
STATISTIC | MEAN -- 00003
STATISTIC | TOTAL -- 00006
>>> response = synthesizer.attribute_mappings(datasource_id='USGS', attr_type='RESULT_QUALITY', attr_vocab=['VALIDATED', 'ESTIMATED'], from_basin3d=True)
>>> for attr_mapping in response:
... print(f'{attr_mapping.attr_type} | {attr_mapping.basin3d_vocab} -- {attr_mapping.datasource_vocab}, {attr_mapping.datasource_desc}')
RESULT_QUALITY | VALIDATED -- A, Approved for publication -- Processing and review completed.
RESULT_QUALITY | ESTIMATED -- E, Value was computed from estimated unit values.
RESULT_QUALITY | ESTIMATED -- e, Value has been edited or estimated by USGS personnel and is write protected
Return all the :class:`basin3d.core.models.AttributMapping` registered or those that match the specified fields.
:param datasource_id: str, The datasource identifier
:param attr_type: str, The attribute type (e.g., OBSERVED_PROPERTY, STATISTIC, etc)
:param attr_vocab: str, The attribute vocabulary, either the BASIN-3D vocabulary or the datasource vocabulary
:param from_basin3d: bool, True = the specified attr_vocab is a BASIN-3D vocabulary, False: the specified attr_vocab is from the datasource
:return: iterator of :class:`basin3d.core.models.AttributeMapping` objects
"""
return self._catalog.find_attribute_mappings(datasource_id=datasource_id, attr_type=attr_type, attr_vocab=attr_vocab, from_basin3d=from_basin3d)
[docs]
def monitoring_features(self, query: QueryMonitoringFeature = None, **kwargs) -> Union[DataSourceModelIterator, SynthesisResponse]:
"""
Search for all Monitoring Features, Monitoring Features by parent monitoring features, or Monitoring Feature by id(s).
To see feature types for a given plugin: **<plugin_module>.<plugin_class>.feature_types**
**Search for a single monitoring feature by id:**
>>> from basin3d.plugins import usgs, epa
>>> from basin3d import synthesis
>>> synthesizer = synthesis.register()
>>> response = synthesizer.monitoring_features(id='USGS-0101')
>>> mf = response.data
>>> print(f"{mf.id} - {mf.description}")
USGS-0101 - SUBREGION: St. John
**Search for all USGS monitoring features:**
>>> for mf in synthesizer.monitoring_features(datasource='USGS', feature_type='region'): # doctest: +ELLIPSIS
... print(f"{mf.id} - {mf.description}")
USGS-01 - REGION: New England
USGS-02 - REGION: Mid Atlantic
USGS-03 - REGION: South Atlantic-Gulf
...
**Search for USGS points by parent (subbasin) monitoring features:**
>>> for mf in synthesizer.monitoring_features(feature_type='point',parent_feature=['USGS-17040101']): # doctest: +ELLIPSIS
... print(f"{mf.id} {mf.coordinates and [(p.x, p.y) for p in mf.coordinates.absolute.horizontal_position]}")
USGS-13010000 [(-110.6647222, 44.1336111)]
USGS-13010065 [(-110.6675, 44.09888889)]
USGS-13010450 [(-110.5874305, 43.9038296)]
...
**Search for USGS points by monitoring features identifiers:**
>>> for mf in synthesizer.monitoring_features(feature_type='point', monitoring_feature=['USGS-13010000', 'USGS-13010450']): # doctest: +ELLIPSIS
... print(f"{mf.id} {mf.coordinates and [(p.x, p.y) for p in mf.coordinates.absolute.horizontal_position]}")
USGS-13010000 [(-110.6647222, 44.1336111)]
USGS-13010450 [(-110.5874305, 43.9038296)]
**Search for USGS points by monitoring feature bounding boxes:**
>>> for mf in synthesizer.monitoring_features(feature_type='point', monitoring_feature=[(-90.6, 34.4, -90.5, 34.6), (-106.7, 38.9, -106.5, 39.0)]): # doctest: +ELLIPSIS
... print(f"{mf.id} {mf.coordinates and [(p.x, p.y) for p in mf.coordinates.absolute.horizontal_position]}")
USGS-07047970 [(-90.58399367, 34.52400003)]
USGS-07287700 [(-90.5302222, 34.48425)]
USGS-09106800 [(-106.6009444, 38.92469444)]
Note: Monitoring feature identifiers and bounding boxes can be combined in the monitoring_feature argument.
**Unsupported feature types warning:**
The code below is an example of what you will see if a registered plugin does not support
the requested feature type.
>>> response_itr = synthesizer.monitoring_features(feature_type='horizontal_path')
>>> for mf in response_itr:
... print(mf)
...
**Output warning messages from the returned iterator**
This is an example of checking the synthesis response messages in the :class:`basin3d.core.synthesis.DataSourceModelIterator`.
>>> response_itr.synthesis_response.messages
[SynthesisMessage(msg='Feature type HORIZONTAL_PATH not supported by USGS.', level='WARN', where=['USGS', 'MonitoringFeature']), SynthesisMessage(msg='Feature type HORIZONTAL_PATH not supported by EPA Water Quality eXchange.', level='WARN', where=['EPA', 'MonitoringFeature'])]
:param query: (optional) A Monitoring Feature Query :class:`basin3d.core.schema.query.QueryMonitoringFeature` object
:param kwargs: (optional) Monitoring Feature Query parameters. See Query info below.
:return: a single :class:`~basin3d.core.schema.query.SynthesisResponse` for a query by id
or a :class:`~basin3d.core.synthesis.DataSourceModelIterator` for multple :class:`basin3d.core.models.MonitoringFeature` objects.
.. note::
**Monitoring Feature Query parameters**
:class:`basin3d.core.schema.query.QueryMonitoringFeature`
| All parameters are optional.
|
| * **datasource** A single data source id prefix.
| * **feature_type** The :class:`basin3d.core.schema.enum.FeatureTypeEnum` of the desired Monitoring Feature(s). Data Sources may not support all Feature Types.
|
| Only one of the following can be specified in a query:
|
| * **id** A single Monitoring Feature ID for the Monitoring Feature desired. Returns a single Synthesis Response object.
| * **monitoring_feature** List of Monitoring Feature IDs for the Monitoring Features desired and / or sets of bounding box coordinates (WGS84: western longitude, southern latitude, eastern longitude, northern latitude). Returns iterator of MonitoringFeature objects.
| * **parent_feature** List of Monitoring Feature IDs for the parent features of the desired Monitoring Features. Returns iterator of MonitoringFeature objects.
.. note::
**Monitoring Feature attributes**
:class:`basin3d.core.models.MonitoringFeature`
| Attributes values are dependent on data source features.
|
| * **id** Unique feature identifier, prefixed by data source id
| * **name** Feature name
| * **description** Description of the Monitoring Feature
| * **feature_type** :class:`basin3d.core.schema.enum.FeatureTypeEnum` REGION, SUBREGION, BASIN, SUBBASIN, WATERSHED, SUBWATERSHED, SITE, PLOT, HORIZONTAL PATH, VERTICAL PATH, POINT
| * **observed_properties** List of observed properties :class:`basin3d.core.models.ObservedProperty` collected at the feature.
| * **related_sampling_feature_complex** List of :class:`basin3d.core.models.RelatedSamplingFeature`. PARENT features are currently supported.
| * **shape** Shape of the feature: POINT, CURVE, SURFACE, SOLID
| * **coordinates** Location of feature in absolute and/or representative datum: :class:`basin3d.core.models.Coordinate`
| * **description_reference** Additional information about the feature
| * **related_party** List of people or organizations responsible for the feature
| * **utc_offset** Coordinate Universal Time offset in hours (offset in hours), e.g., +9
| * **datasource** The feature's data source :class:`basin3d.core.models.DataSource`
"""
if not query:
query = QueryMonitoringFeature(**kwargs)
# Search for single or list?
if query.id:
# mypy casts are only used as hints for the type checker,
# and they don’t perform a runtime type check.
return cast(SynthesisResponse, self._monitoring_feature_access.retrieve(query=query))
else:
# mypy casts are only used as hints for the type checker,
# and they don’t perform a runtime type check.
return cast(DataSourceModelIterator,
self._monitoring_feature_access.list(query=query))
[docs]
def measurement_timeseries_tvp_observations(self, query: QueryMeasurementTimeseriesTVP = None, **kwargs) -> DataSourceModelIterator:
"""
Search for Measurement Timeseries TVP Observations for the specified query arguments.
Aggregation Duration for DAY (default) and NONE are both supported.
**Search with aggregation duration DAY:**
>>> from basin3d.plugins import usgs
>>> from basin3d import synthesis
>>> synthesizer = synthesis.register()
>>> timeseries = synthesizer.measurement_timeseries_tvp_observations(monitoring_feature=['USGS-09110990'],observed_property=['RDC','WT'],start_date='2019-10-01',end_date='2019-10-30',aggregation_duration='DAY')
>>> for timeseries in timeseries:
... print(f"{timeseries.feature_of_interest.id} - {timeseries.observed_property.get_basin3d_vocab()}")
USGS-09110990 - RDC
**Search with aggregation duration NONE, using both a monitoring feature identifier and bounding box:**
>>> from basin3d.plugins import usgs
>>> from basin3d import synthesis
>>> synthesizer = synthesis.register()
>>> timeseries = synthesizer.measurement_timeseries_tvp_observations(monitoring_feature=["USGS-09110990", (-106.7, 38.9, -106.5, 39.0)],observed_property=['RDC','WT'],start_date='2024-04-01',end_date='2024-04-30',aggregation_duration='NONE')
>>> for timeseries in timeseries:
... print(f"{timeseries.feature_of_interest.id} - {timeseries.observed_property.get_basin3d_vocab()}")
USGS-09110990 - RDC
USGS-09106800 - RDC
:param query: (optional) :class:`basin3d.core.schema.query.QueryMeasurementTimeseriesTVP` object
:param kwargs: (required) Measurement Timeseries TVP Query parameters. See Query info below.
:return: a :class:`~basin3d.core.synthesis.DataSourceModelIterator` that yields :class:`basin3d.core.models.MeasurementTimeseriesTVPObservation` objects
.. note::
**Measurement Timeseries TVP Query parameters**
:class:`basin3d.core.schema.query.QueryMeasurementTimeseriesTVP`
| Required arguments:
|
| * **monitoring_feature** List of monitoring features id(s) and / or sets of bounding box coordinates (WGS84: western longitude, southern latitude, eastern longitude, northern latitude)
| * **observed_property** List of observed property(ies), i.e., BASIN-3D observed property vocabulary. See :func:`basin3d.synthesis.DataSynthesizer.observed_properties`
| * **start_date** Start date YYYY-MM-DD
|
| Optional arguments:
|
| * **end_date** End date YYYY-MM-DD
| * **aggregation_duration** A single aggregation duration :class:`basin3d.core.schema.enum.AggregationDurationEnum` (YEAR|MONTH|DAY|HOUR|MINUTE|SECOND|NONE)
| * **statistic** List of statistic(s) :class:`basin3d.core.schema.enum.StatisticEnum` (MEAN|MIN|MAX|INSTANTANEOUS)
| * **result_quality** List of result quality(ies) :class:`basin3d.core.schema.enum.ResultQualityEnum` (VALIDATED|UNVALIDATED|SUSPECTED|REJECTED|ESTIMATED)
| * **sampling_medium** List of sampling medium(s) :class:`basin3d.core.schema.enum.SamplingMediumEnum` (SOLID_PHASE|WATER|GAS|OTHER)
| * **datasource** A single data source id prefix
.. note::
**Measurement Timeseries TVP Observation attributes**
:class:`basin3d.core.models.MeasurementTimeseriesTVPObservation`
| Attributes values are dependent on data source features.
|
| :class:`basin3d.core.models.MappedAttribute` are returned for several attributes so that the data source values are also available.
|
| * **id** Observation identifier
| * **type** Type of observation: MEASUREMENT_TVP_TIMESERIES
| * **observed_property** The observation's observed property :class:`basin3d.core.models.MappedAttribute`
| * **datasource** The data source :class:`basin3d.core.models.DataSource`
| * **sampling_medium** Observed property sampling medium :class:`basin3d.core.models.MappedAttribute` (SOLID_PHASE, WATER, GAS, OTHER)
| * **phenomenon_time** Datetime of the observation, for a timeseries the start and end times can be provided
| * **utc_offset** Coordinate Universal Time offset in hours (offset in hours), e.g., +9
| * **feature_of_interest** Monitoring Feature object :class:`basin3d.core.models.MonitoringFeature`, feature on which the observation is being made
| * **feature_of_interest_type** Feature type of the feature of interest, :class:`basin3d.core.schema.enum.FeatureTypeEnum`
| * **result** Observed values of the observed property being assessed, and (opt) their result quality, :class:`basin3d.core.models.ResultListTVP`
| * **time_reference_position** Position of timestamp in aggregated_duration (START, MIDDLE, END)
| * **aggregation_duration** Time period represented by observation :class:`basin3d.core.models.MappedAttribute` (YEAR, MONTH, DAY, HOUR, MINUTE, SECOND)
| * **unit_of_measurement** Units in which the observation is reported
| * **statistic** Statistical property of the observation result :class:`basin3d.core.models.MappedAttribute` (MEAN, MIN, MAX, TOTAL)
| * **result_quality** List quality assessment found in the results :class:`basin3d.core.models.MappedAttribute` (VALIDATED, UNVALIDATED, SUSPECTED, REJECTED, ESTIMATED)
"""
if not query:
# Raises validation errors
query = QueryMeasurementTimeseriesTVP(**kwargs)
# mypy casts are only used as hints for the type checker,
# and they don’t perform a runtime type check.
return cast(DataSourceModelIterator,
self._measurement_timeseries_tvp_observation_access.list(
query))