"""
.. currentmodule:: basin3d.plugins.usgs
:platform: Unix, Mac
:synopsis: USGS Daily Values and Instantaneous Values Plugin Definition and supporting views.
:module author: Val Hendrix <vhendrix@lbl.gov>
:module author: Danielle S Christianson <dschristianson@lbl.gov>
* :class:`USGSDataSourcePlugin` - This Data Source plugin maps the USGS Daily Values and Instantaneous Values Service to BASIN-3D Models
USGS to BASIN-3D Mapping
++++++++++++++++++++++++
The table below describes how BASIN-3D synthesis models are mapped to the USGS Daily Values and Instantaneous Service models.
=================== === ==================================================================================================
USGS NWIS BASIN-3D
=================== === ==================================================================================================
``nwis/dv`` >> :class:`basin3d.core.models.MeasurementTimeseriesTVPObservation` with aggregation_duration == DAY
------------------- --- --------------------------------------------------------------------------------------------------
``nwis/iv`` >> :class:`basin3d.core.models.MeasurementTimeseriesTVPObservation` with aggregation_duration == NONE
------------------- --- --------------------------------------------------------------------------------------------------
``nwis/sites`` >> :class:`basin3d.synthesis.models.field.MonitoringFeature`
------------------- --- --------------------------------------------------------------------------------------------------
``nwis/huc`` >> :class:`basin3d.core.models.MonitoringFeature`
------------------- --- --------------------------------------------------------------------------------------------------
``new_huc_rdb.txt`` >> :class:`basin3d.synthesis.models.field.MonitoringFeature`
* Region (2-digit HUC code) to Region
* Subregion (4-digit HUC code) to Subregion
* Accounting (6-digit HUC code) Unit to Basin
* Watershed (8-digit HUC code) to Subbasin
=================== === ==================================================================================================
Access Classes
++++++++++++++
The following are the access classes that map *USGS Water Data* to the *BASIN-3D Models*.
* :class:`USGSMeasurementTimeseriesTVPObservationAccess` - Access for accessing a group of data points grouped by time, space, model, sample etc.
* :class:`USGSMonitoringFeatureAccess` - Access for accessing monitoring features
---------------------
"""
import json
from basin3d.core import monitor
from typing import Any, List, Optional, Tuple
from basin3d.core.schema.enum import FeatureTypeEnum, AggregationDurationEnum
from basin3d.core.schema.query import QueryMeasurementTimeseriesTVP, QueryMonitoringFeature
from basin3d.core.access import get_url
from basin3d.core.models import AbsoluteCoordinate, AltitudeCoordinate, Coordinate, GeographicCoordinate, \
MeasurementTimeseriesTVPObservation, MonitoringFeature, RelatedSamplingFeature, \
TimeMetadataMixin, TimeValuePair, ResultListTVP
from basin3d.core.plugin import DataSourcePluginAccess, DataSourcePluginPoint, basin3d_plugin, separate_list_types
from basin3d.core.types import SpatialSamplingShapes
from basin3d.plugins import usgs_huc_codes
logger = monitor.get_logger(__name__)
[docs]
def convert_discharge(data, data_str, parameter, units):
"""
Convert the River Discharge to m^3
:param data:
:param data_str:
:param parameter:
:param units:
:return:
"""
if parameter in ['00060', '00061']:
# Hardcode conversion from ft^3 to m^3
# for River discharge
if data_str == '-999999':
data = int(data_str)
else:
data *= 0.028316847
units = "m^3/s"
return data, units
def _convert_tuple_to_str(a_tuple: tuple, synthesis_msg=[]) -> Optional[str]:
"""Helper function to convert a tuple of float to a str
:param a_tuple: tuple
"""
output: Optional[str] = None
msg: Optional[str] = None
try:
tuple_str_list = [str(v) for v in a_tuple]
output = ','.join(tuple_str_list)
except TypeError:
msg = f'Could not convert {a_tuple} to str'
except Exception as e:
msg = f'Some other error {e} while trying to convert {a_tuple} to str'
if msg:
logger.warning(msg)
synthesis_msg.append(msg)
return output
[docs]
def generator_usgs_measurement_timeseries_tvp_observation(view,
query: QueryMeasurementTimeseriesTVP,
synthesis_messages):
"""
Get the data for USGS Daily Values or Instantaneous Values
:param view: Access class
:param query: Query information for this request
:param synthesis_messages: list to hold any processing messages
:returns: a generator object that yields data from the request in json form
"""
# Temporal resolution is always daily.
search_params: List[Tuple[str, Any]] = list()
search_params.append(("startDT", query.start_date))
if query.end_date:
search_params.append(("endDT", query.end_date))
search_params.append(("parameterCd", ",".join([str(o) for o in query.observed_property])))
if query.statistic:
# if aggregation duration is NONE (iv) and there is a query that has a stat param, clear the statistics list
# add warning message to user, but there are no statistic values for IV call. need synthesis param to add message
if query.aggregation_duration[0] == AggregationDurationEnum.NONE:
synthesis_messages.append(
f"USGS Instantaneous Values service does not support statistics and cannot be specified when aggregation_duration = {AggregationDurationEnum.NONE}. Specified statistic arguments will be ignored.")
logger.info(
f"USGS Instantaneous Values service does not support statistics and cannot be specified when aggregation_duration = {AggregationDurationEnum.NONE}. Specified statistic arguments will be ignored.")
else:
search_params.append(("statCd", ",".join([str(o) for o in query.statistic])))
else:
search_params.append(("siteStatus", "all"))
# look for station locations only
# THIS IS Stream tupe locations only
search_params.append(("siteType", "ST"))
# JSON format
search_params.append(("format", "json"))
# handle the monitoring features
monitoring_feature_types = separate_list_types(query.monitoring_feature, {'named': str, 'bbox': tuple})
monitoring_feature_named = monitoring_feature_types.get('named', [])
monitoring_feature_filters = [] # overall list of monitoring feature query elements
if monitoring_feature_named and len(monitoring_feature_named[0]) > 2:
# search for stations
monitoring_feature_filters.append(("sites", ",".join(monitoring_feature_named)))
elif monitoring_feature_named:
# search for stations by specifying the huc
monitoring_feature_filters.append(("huc", ",".join(monitoring_feature_named)))
# translate the bounding box coordinates to the usgs API format. Note the usgs parameter name is bBox.
# Note: USGS is NAD83 and basin3d is WGS84 -- eventually need to convert. For now, they are close enough.
monitoring_features_bbox = monitoring_feature_types.get('bbox', [])
monitoring_features_bbox = [_convert_tuple_to_str(bbox_coord) for bbox_coord in monitoring_features_bbox]
monitoring_feature_filters.extend([('bBox', bbox_coord) for bbox_coord in monitoring_features_bbox if bbox_coord is not None])
if not monitoring_feature_filters:
msg = 'no monitoring features extracted from the query'
logger.error(msg)
synthesis_messages.append(msg)
# Request the data points, calls IV or DV depending on aggregation duration passed in param
# Default to DV service if aggregation duration is DAY or when nothing is specified
# Calls IV service in aggregation duration is NONE
endpoint = query.aggregation_duration[0] == AggregationDurationEnum.NONE and "iv" or "dv"
url = f'{view.datasource.location}{endpoint}'
for idx, mf_filter in enumerate(monitoring_feature_filters):
# if not the first filter in the list, remove the last element in the search params.
if idx > 0:
search_params = search_params[:-1]
search_params.append(mf_filter)
response = get_url(url, params=search_params)
if response.status_code == 200:
try:
json_obj = response.json()
# There is a valid json response
if json_obj:
timeseries_json = json_obj['value']['timeSeries']
# Iterate over monitoring_features
for data_json in timeseries_json:
yield data_json
except json.decoder.JSONDecodeError:
synthesis_messages.append("JSON Not Returned: {}".format(response.content))
logger.error("JSON Not Returned: {}".format(response.content))
else:
import re
p = re.compile(r'<.*?>')
synthesis_messages.append("HTTP {}: {}".format(response.status_code, p.sub(' ', response.text)))
logger.error("HTTP {}: {}".format(response.status_code, p.sub(' ', response.text)))
[docs]
def iter_rdb_to_json(rdb_text):
"""
Generator that iterates over an rdb file
:param rdb_text: USGS rdb data
:return:
"""
header = None
format = None # not doing anything with this
for i, line in enumerate(rdb_text.split("\n")):
line = line.replace("\r", "")
line = line.lstrip()
if line and not line[0] == '#':
if not header:
header = line.split()
continue
elif not format:
format = line.split()
elif line and line[0] not in ['#', ' ', '<', ' ']:
data = line.split("\t")
json_object = dict(zip(header, data))
yield json_object
def _load_point_obj(datasource, json_obj, observed_property_variables, synthesis_messages):
"""
Instantiate the object
#
# The following selected fields are included in this output:
#
# agency_cd -- Agency
# site_no -- Site identification number
# station_nm -- Site name
# site_tp_cd -- Site type
# dec_lat_va -- Decimal latitude
# dec_long_va -- Decimal longitude
# coord_acy_cd -- Latitude-longitude accuracy
# dec_coord_datum_cd -- Decimal Latitude-longitude datum
# alt_va -- Altitude of Gage/land surface
# alt_acy_va -- Altitude accuracy
# alt_datum_cd -- Altitude datum
# huc_cd -- Hydrologic unit code
:param json_obj:
:param observed_property_variables: list of locations and their available variables
:param synthesis_messages
:return:
"""
if 'site_no' in json_obj:
id = json_obj['site_no']
lat, lon = None, None
try:
lat, lon = float(json_obj['dec_lat_va']), float(json_obj['dec_long_va'])
except Exception as e:
synthesis_messages.append(f"Error getting latlon: {str(e)}")
logger.error(str(e))
if id not in observed_property_variables.keys():
mf_opv = "Could not find observed property variables for this monitoring feature"
msg = f"Could not find observed property variables for this monitoring feature: {id}"
synthesis_messages.append(msg)
logger.warning({msg})
else:
mf_opv = observed_property_variables[id]
monitoring_feature = MonitoringFeature(
datasource,
id="{}".format(id),
name=json_obj['station_nm'],
feature_type=FeatureTypeEnum.POINT,
shape=SpatialSamplingShapes.SHAPE_POINT,
related_sampling_feature_complex=[RelatedSamplingFeature(
datasource,
related_sampling_feature=json_obj['huc_cd'],
related_sampling_feature_type=FeatureTypeEnum.SUBBASIN, # previously site
role=RelatedSamplingFeature.ROLE_PARENT
)],
# geographical_group_id=huc_accounting_unit_id,
# geographical_group_type=FeatureTypeEnum.REGION,
observed_properties=mf_opv,
coordinates=Coordinate(
absolute=AbsoluteCoordinate(
horizontal_position=GeographicCoordinate(
**{"latitude": lat,
"longitude": lon,
"datum": json_obj['dec_coord_datum_cd'],
"units": GeographicCoordinate.UNITS_DEC_DEGREES}))
)
)
# huc_accounting_unit_id = json_obj['huc_cd'][0:6] #Subbasin
if json_obj['alt_va'] and json_obj['alt_acy_va'] and json_obj['alt_datum_cd']:
monitoring_feature.coordinates.absolute.vertical_extent = \
[AltitudeCoordinate(
**{"value": float(json_obj['alt_va']),
"resolution": float(json_obj['alt_acy_va']),
"datum": json_obj['alt_datum_cd']})]
return monitoring_feature
def _parse_sites_response(usgs_site_response, observed_properties_variables, unique_usgs_sites):
"""
Get a dictionary of location variables for the given location results and
get a dictionary of unique sites or subbasins
:param usgs_site_response: datasource JSON object of the locations
:return observed_properties_variables, unique_usgs_sites: a tuple of a dictionary of observed property variables for the given location results and a dictionary of unique sites or subbasins
"""
for v in iter_rdb_to_json(usgs_site_response.text):
param, site, stat = v['parm_cd'], v['site_no'], v['stat_cd']
observed_properties_variables.setdefault(site, [])
# FIX: stat -- need to change to B3D vocab
if param not in observed_properties_variables[site] and stat != 'NOT_SUPPORTED':
observed_properties_variables[site].append(param)
if site not in unique_usgs_sites:
unique_usgs_sites[site] = v
logger.debug("Location DataTypes: {}".format(observed_properties_variables))
return observed_properties_variables, unique_usgs_sites
[docs]
class USGSMonitoringFeatureAccess(DataSourcePluginAccess):
"""
Access for mapping USGS HUC Units to :class:`~basin3d.core.models.MonitoringFeature` objects.
============== === =======================================================
USGS HUC code BASIN-3D
============== === =======================================================
2-digit >> :class:`basin3d.core.schema.enum.FeatureType` REGION
-------------- --- -------------------------------------------------------
4-digit >> :class:`basin3d.core.schema.enum.FeatureType` SUBREGION
-------------- --- -------------------------------------------------------
6-digit >> :class:`basin3d.core.schema.enum.FeatureType` BASIN
-------------- --- -------------------------------------------------------
8-digit >> :class:`basin3d.core.schema.enum.FeatureType` SUBBASIN
============== === =======================================================
"""
synthesis_model_class = MonitoringFeature
[docs]
def list(self, query: QueryMonitoringFeature):
"""
List Monitoring Feature
:param query: The query information object
:return: a generator object that yields :class:`~basin3d.core.models.MonitoringFeature`
objects
"""
synthesis_messages: List[str] = []
feature_type = isinstance(query.feature_type,
FeatureTypeEnum) and query.feature_type.value or query.feature_type
if feature_type in USGSDataSourcePlugin.feature_types or feature_type is None:
# Convert parent_features
usgs_regions = []
usgs_subbasins = []
parent_features = []
if query.parent_feature:
for value in query.parent_feature:
parent_features.append(value)
if len(value) < 4:
usgs_regions.append(value)
elif len(value) == 8:
usgs_subbasins.append(value)
if not feature_type or feature_type != FeatureTypeEnum.POINT:
huc_text = self.get_hydrological_unit_codes(synthesis_messages=synthesis_messages)
for json_obj in [o for o in iter_rdb_to_json(huc_text) if
not parent_features or [p for p in parent_features if o["huc"].startswith(p)]]:
monitoring_feature = None
if (feature_type is None or feature_type == FeatureTypeEnum.REGION) and len(json_obj["huc"]) < 4:
monitoring_feature = self._load_huc_obj(json_obj, feature_type=FeatureTypeEnum.REGION)
elif (feature_type is None or feature_type == FeatureTypeEnum.SUBREGION) and len(
json_obj["huc"]) == 4:
monitoring_feature = self._load_huc_obj(json_obj, feature_type=FeatureTypeEnum.SUBREGION,
related_sampling_feature=json_obj["huc"][0:2],
related_sampling_feature_type=FeatureTypeEnum.REGION)
elif (feature_type is None or feature_type == FeatureTypeEnum.BASIN) and len(json_obj["huc"]) == 6:
monitoring_feature = self._load_huc_obj(json_obj, feature_type=FeatureTypeEnum.BASIN,
related_sampling_feature=json_obj["huc"][0:4],
related_sampling_feature_type=FeatureTypeEnum.SUBREGION)
elif (feature_type is None or feature_type == FeatureTypeEnum.SUBBASIN) and len(
json_obj["huc"]) == 8:
hucs = {json_obj["huc"][0:i] for i in range(2, 8, 2)}
# Filter by regions if it is set
if not usgs_regions or not hucs.isdisjoint(usgs_regions):
# This is a Cataloging Unit (See https://water.usgs.gov/GIS/huc_name.html)
monitoring_feature = self._load_huc_obj(
json_obj=json_obj, feature_type=FeatureTypeEnum.SUBBASIN,
description="{} Watershed: Drainage basin code is defined by the USGS State "
"Office where the site is located.".format(json_obj["basin"]),
related_sampling_feature=json_obj["huc"][0:6],
related_sampling_feature_type=FeatureTypeEnum.BASIN)
else:
logger.debug("Ignoring HUC {}".format(json_obj["huc"]))
# Determine whether to yield the monitoring feature object
if monitoring_feature:
if query.monitoring_feature and json_obj['huc'] in query.monitoring_feature:
yield monitoring_feature
elif not query.monitoring_feature:
yield monitoring_feature
# no feature_type specified and/or feature_type == POINT
else:
base_url = '{}site/?{}&seriesCatalogOutput=true&outputDataTypeCd=iv,dv&siteStatus=all&format=rdb'
# initiate variables for looping thru requests for multiple location filters
unique_sites = {} # type: ignore[var-annotated]
observed_properties = {} # type: ignore[var-annotated]
loc_filters = []
# Points by id: USGS calls these sites
if query.monitoring_feature is not None:
# split up mf query types
mf_types = separate_list_types(query.monitoring_feature, {'named': str, 'bbox': tuple})
mf_named = mf_types.get('named', [])
mf_bbox = mf_types.get('bbox', [])
if mf_named:
usgs_sites = ','.join(mf_named)
loc_filters.append(f'sites={usgs_sites}')
if mf_bbox:
bbox_coords = [_convert_tuple_to_str(bbox_tuple) for bbox_tuple in mf_bbox]
loc_filters.extend([f'bBox={bbox_cc}' for bbox_cc in bbox_coords])
else:
# Point by subbasin: USGS calls subbasin as huc (instead of sites) to retrieve all subbasins
usgs_subbasin = ",".join(usgs_subbasins)
loc_filters.append(f'huc={usgs_subbasin}')
# url = base_url.format(self.datasource.location, f'huc={usgs_subbasin}')
for a_filter in loc_filters:
url = base_url.format(self.datasource.location, a_filter)
logger.debug(f"{self.__class__.__name__}.list url:{url}")
usgs_site_response = get_url(url)
if usgs_site_response and usgs_site_response.status_code == 200:
observed_properties, unique_sites = _parse_sites_response(
usgs_site_response, observed_properties, unique_sites)
else:
msg = f'Problem with request to {url}'
if usgs_site_response:
msg = f'{msg}; {usgs_site_response.status_code}: {usgs_site_response.json}'
logger.warning(msg)
synthesis_messages.append(msg)
if unique_sites:
for v in unique_sites.values():
yield _load_point_obj(datasource=self, json_obj=v,
observed_property_variables=observed_properties,
synthesis_messages=synthesis_messages)
else:
synthesis_messages.append(f"Feature type {feature_type} not supported by {self.datasource.name}.")
logger.warning(f"Feature type {feature_type} not supported by {self.datasource.name}.")
return StopIteration(synthesis_messages)
[docs]
@staticmethod
def get_hydrological_unit_codes(synthesis_messages):
"""
Get the hydrological unit codes for USGS
Originally from "https://water.usgs.gov/GIS/new_huc_rdb.txt"
New url https://water.usgs.gov/themes/hydrologic-units/
To Do: update with current huc information.
"""
return usgs_huc_codes.CONTENT
[docs]
def get(self, query: QueryMonitoringFeature):
""" Get a single Monitoring Feature object
:param query: The query info object with id specified
:return: a :class:`basin3d.core.models.MonitoringFeature` object
"""
# query.id will always be a string at this point with validation upstream, thus ignoring the type checking
monitoring_feature_len = len(query.id) # type: ignore[arg-type]
if not query.feature_type:
if monitoring_feature_len == 2:
query.feature_type = FeatureTypeEnum.REGION
elif monitoring_feature_len == 4:
query.feature_type = FeatureTypeEnum.SUBREGION
elif monitoring_feature_len == 6:
query.feature_type = FeatureTypeEnum.BASIN
elif monitoring_feature_len == 8:
query.feature_type = FeatureTypeEnum.SUBBASIN
else:
query.feature_type = FeatureTypeEnum.POINT
query.monitoring_feature = [query.id] # type: ignore[list-item]
for o in self.list(query=query):
return o
# An 8 character code can also be a point, Try that
if monitoring_feature_len == 8:
query.feature_type = FeatureTypeEnum.POINT
for o in self.list(query=query):
return o
return None
def _load_huc_obj(self, json_obj, feature_type, description=None,
related_sampling_feature=None, related_sampling_feature_type=None):
"""
Transform USGS huc information to a :class:`~basin3d.core.models.MonitoringFeature` object
:param json_obj: USGS Site Service location object
:return: a serialized :class:`~basin3d.core.models.MonitoringFeature` object
"""
if not description:
description = "{}: {}".format(feature_type, json_obj["basin"])
related_sampling_feature_complex = list()
if related_sampling_feature:
related_sampling_feature_complex = [RelatedSamplingFeature(
self,
related_sampling_feature=related_sampling_feature,
related_sampling_feature_type=related_sampling_feature_type,
role=RelatedSamplingFeature.ROLE_PARENT
)]
result = None
if json_obj:
result = MonitoringFeature(
self, id=json_obj["huc"],
name=json_obj["basin"],
description=description,
feature_type=feature_type,
shape=SpatialSamplingShapes.SHAPE_SURFACE,
coordinates=None,
related_sampling_feature_complex=related_sampling_feature_complex,
observed_properties=None)
return result
[docs]
class USGSMeasurementTimeseriesTVPObservationAccess(DataSourcePluginAccess):
"""
USGS Daily Values Service: https://waterservices.usgs.gov/docs/dv-service/
USGS Instantaneous Values Service: https://waterservices.usgs.gov/docs/instantaneous-values/
Access for mapping USGS water services daily or instantaneous value data to
:class:`basin3d.core.models.MeasurementTimeseriesTVPObservation` objects.
Daily Value and Instantaneous Value Qualification Code (dv_rmk_cd)
============= ========= ================================================================================
BASIN-3D Code USGS Code Description
============= ========= ================================================================================
ESTIMATED e Value has been edited or estimated by USGS personnel and is write protected
NOT_SUPPORTED & Value was computed from affected unit values
ESTIMATED E Value was computed from estimated unit values.
VALIDATED A Approved for publication -- Processing and review completed.
UNVALIDATED P Provisional data subject to revision.
NOT_SUPPORTED < The value is known to be less than reported value and is write protected.
NOT_SUPPORTED > The value is known to be greater than reported value and is write protected.
NOT_SUPPORTED 1 Value is write protected without any remark code to be printed
NOT_SUPPORTED 2 Remark is write protected without any remark code to be printed
NOT_SUPPORTED _ No remark (blank)
============= ========= ================================================================================
"""
synthesis_model_class = MeasurementTimeseriesTVPObservation
[docs]
def list(self, query: QueryMeasurementTimeseriesTVP):
"""
List of Measurement Timeseries TVP Observation objects for USGS Daily Values or Instantaneous Values
:param query: :class:`basin3d.core.schema.query.QueryMeasurementTimeseriesTVP`
:return: a generator object that yields :class:`~basin3d.synthesis.models.measurement.MeasurementTimeseriesTVPObservation` objects
"""
synthesis_messages = []
feature_obj_dict = {}
if not query.monitoring_feature:
return None
loc_filters = []
# split up mf query types
mf_types = separate_list_types(query.monitoring_feature, {'named': str, 'bbox': tuple})
mf_named = mf_types.get('named', [])
mf_bbox = mf_types.get('bbox', [])
if mf_named:
usgs_sites = ','.join(mf_named)
loc_filter = f'sites={usgs_sites}'
if len(usgs_sites) < 3:
loc_filter = f'huc={usgs_sites}'
loc_filters.append(loc_filter)
if mf_bbox:
bbox_coords = [_convert_tuple_to_str(bbox_tuple) for bbox_tuple in mf_bbox]
loc_filters.extend([f'bBox={bbox_cc}' for bbox_cc in bbox_coords])
base_url = '{}site/?{}&seriesCatalogOutput=true&outputDataTypeCd=iv,dv&siteStatus=all&format=rdb'
observed_properties = {} # type: ignore[var-annotated]
unique_sites = {} # type: ignore[var-annotated]
for a_filter in loc_filters:
url = base_url.format(self.datasource.location, a_filter)
logger.debug(f"{self.__class__.__name__}.list url:{url}")
usgs_site_response = get_url(url)
if usgs_site_response and usgs_site_response.status_code == 200:
observed_properties, unique_sites = _parse_sites_response(
usgs_site_response, observed_properties, unique_sites)
else:
msg = f'Problem with request to {url}'
if usgs_site_response:
msg = f'{msg}; {usgs_site_response.status_code}: {usgs_site_response.json}'
logger.warning(msg)
synthesis_messages.append(msg)
if unique_sites:
for v in unique_sites.values():
if v["site_no"]:
feature_obj_dict[v["site_no"]] = v
# Iterate over data objects returned
processed_sites = []
for data_json in generator_usgs_measurement_timeseries_tvp_observation(self, query, synthesis_messages):
unit_of_measurement = data_json["variable"]["unit"]['unitCode']
timezone_offset = data_json["sourceInfo"]["timeZoneInfo"]["defaultTimeZone"]["zoneOffset"]
# name has agency, sitecode, parameter id and stat code
# e.g. "USGS:385106106571000:00060:00003"
_, feature_id, parameter, statistic = data_json["name"].split(":")
if feature_id in processed_sites:
continue
processed_sites.append(feature_id)
if feature_id in feature_obj_dict.keys():
monitoring_feature = _load_point_obj(
datasource=self,
json_obj=feature_obj_dict[feature_id],
observed_property_variables=observed_properties,
synthesis_messages=synthesis_messages)
else:
# ToDo: expand this to use the info in the data return
# ToDo: log message
monitoring_feature = None
result_TVPs = []
result_TVP_quality = []
result_quality = set()
has_filtered_data_points = 0
if len(data_json["values"]) > 1:
# Cannot think of the case in which response should have more than one values so adding a message to track it.
synthesis_messages.append(
f'{feature_id} has more than one timeseries returned for {parameter}. Contact plugin developer if you see this message.')
for values in data_json["values"]:
# The original code checked what USGS reported for the qualities present in the timeseries. Instead will build from individual point qualities below.
# result_quality.update(self.get_result_qualifiers(values["qualifier"]))
for value in values["value"]:
if len(value['qualifiers']) > 1:
# ToDo: add some error messaging.
pass
# result_point_quality = self.map_result_quality(value['qualifiers'])
result_point_quality = value['qualifiers'][0]
if not query.result_quality or result_point_quality in query.result_quality:
# Get the broker parameter
try:
try:
data: Optional[float] = float(value['value'])
data_str = value['value']
# Hardcoded unit conversion for river discharge parameters
data, unit_of_measurement = convert_discharge(data, data_str, parameter, unit_of_measurement)
if data:
result_quality.add(result_point_quality)
result_TVPs.append(TimeValuePair(timestamp=value['dateTime'], value=data))
result_TVP_quality.append(result_point_quality)
continue
except Exception as e:
synthesis_messages.append(f"Unit Conversion Issue: {str(e)}")
logger.error(str(e))
except Exception as e:
synthesis_messages.append(f"TimeValuePair ERROR: {str(e)}")
logger.error(e)
elif query.result_quality and result_point_quality not in query.result_quality:
has_filtered_data_points += 1
if has_filtered_data_points > 0:
msg = f'{feature_id} - {parameter}: {str(has_filtered_data_points)} timestamps did not match data quality query.'
synthesis_messages.append(msg)
if len(result_TVPs) == 0:
msg = f'{feature_id} had no valid data values for {parameter} that match the query.'
synthesis_messages.append(msg)
continue
measurement_timeseries_tvp_observation = MeasurementTimeseriesTVPObservation(
self,
id=feature_id, # FYI: this field is not unique and thus kinda useless
unit_of_measurement=unit_of_measurement,
feature_of_interest_type=FeatureTypeEnum.POINT,
feature_of_interest=monitoring_feature,
utc_offset=int(timezone_offset.split(":")[0]),
result=ResultListTVP(plugin_access=self, value=result_TVPs, result_quality=result_TVP_quality),
observed_property=parameter,
result_quality=list(result_quality),
aggregation_duration=query.aggregation_duration[0],
time_reference_position=TimeMetadataMixin.TIME_REFERENCE_MIDDLE,
statistic=statistic
)
yield measurement_timeseries_tvp_observation
return StopIteration(synthesis_messages)
[docs]
@basin3d_plugin
class USGSDataSourcePlugin(DataSourcePluginPoint):
title = 'USGS Data Source Plugin'
plugin_access_classes = (USGSMonitoringFeatureAccess, USGSMeasurementTimeseriesTVPObservationAccess)
feature_types = ['POINT', 'REGION', 'BASIN', 'SUBREGION', 'SUBBASIN']