"""
.. currentmodule:: basin3d.plugins.epa
:platform: Unix, Mac
:synopsis: EPA Water Quality eXchange Plugin Definition
:module author: Danielle Christianson <dschristianson@lbl.gov>
* :class:`EPADataSourcePlugin` - This Data Source plugin maps the EPA Water Quality Exchange Data Source to the BASIN-3D Models
"""
import csv
import json
import os
import requests
import urllib.parse
from copy import deepcopy
from typing import List, Optional, Union
from datetime import datetime as dt, date
from basin3d.core import monitor
from basin3d.core.schema.enum import FeatureTypeEnum, MappedAttributeEnum
from basin3d.core.schema.query import QueryMeasurementTimeseriesTVP, QueryMonitoringFeature
from basin3d.core.access import get_url, AccessIssueException
from basin3d.core.models import AbsoluteCoordinate, RepresentativeCoordinate, Coordinate, GeographicCoordinate, \
DepthCoordinate, VerticalCoordinate, MeasurementTimeseriesTVPObservation, MonitoringFeature, \
TimeMetadataMixin, TimeValuePair, ResultListTVP, AttributeMapping
from basin3d.core.plugin import DataSourcePluginAccess, DataSourcePluginPoint, basin3d_plugin, separate_list_types
from basin3d.core.translate import get_datasource_mapped_attribute
from basin3d.core.types import SpatialSamplingShapes
logger = monitor.get_logger(__name__)
# https://www.epa.gov/waterdata/storage-and-retrieval-and-water-quality-exchange-domain-services-and-downloads
# Download link: https://cdx.epa.gov/wqx/download/DomainValues/TimeZone.csv
# Accessed 2023-10-03
TIMEZONE_MAP = {
"ADT": {"utc_offset": "-03:00", "name": "Atlantic Daylight Time"}, # -3,8/11/2006 10:57:50 AM
"AHST": {"utc_offset": "-10:00", "name": "Alaska-Hawaii Standard Time (*retired: >1983 use AKST)"}, # -10,8/11/2006 10:57:50 AM
"AKDT": {"utc_offset": "-08:00", "name": "Alaska Daylight Time"}, # -8,8/11/2006 10:57:50 AM
"AKST": {"utc_offset": "-09:00", "name": "Alaska Standard Time"}, # -9,8/11/2006 10:57:50 AM
"AST": {"utc_offset": "-04:00", "name": "Atlantic Standard Time"}, # -4,8/11/2006 10:57:50 AM
"BST": {"utc_offset": "-11:00", "name": "Bering Standard Time (*retired: >1983 use HAST)"}, # -11,8/11/2006 10:57:50 AM
"CDT": {"utc_offset": "-05:00", "name": "Central Daylight Time"}, # -5,8/11/2006 10:57:50 AM
"CEST": {"utc_offset": "+02:00", "name": "Sweden Daylight Time or Central European Standard Time"}, # 2,7/12/2016 9:39:33 AM
"CET": {"utc_offset": "+01:00", "name": "Stockholm Sweden Time or Central European Time"}, # 1,7/12/2016 9:39:33 AM
"CST": {"utc_offset": "-06:00", "name": "Central Standard Time"}, # -6,8/11/2006 10:57:50 AM
"EDT": {"utc_offset": "-04:00", "name": "Eastern Daylight Time"}, # -4,8/11/2006 10:57:50 AM
"EST": {"utc_offset": "-05:00", "name": "Eastern Standard Time"}, # -5,8/11/2006 10:57:50 AM
"GMT": {"utc_offset": "+00:00", "name": "Greenwich Mean Time"}, # 0,6/19/2009 11:19:08 AM
"GST": {"utc_offset": "+10:00", "name": "Guam Standard Time Zone (also Chamorro Standard Time)"}, # 10,8/11/2006 10:57:50 AM
"HADT": {"utc_offset": "-09:00", "name": "Hawaii-Aleutian Daylight Time"}, # -9,8/11/2006 10:57:50 AM
"HAST": {"utc_offset": "-10:00", "name": "Hawaii-Aleutian Standard Time"}, # -10,8/11/2006 10:57:50 AM
"KST": {"utc_offset": "+09:00", "name": "Korea Standard Time"}, # 9,7/12/2016 9:39:33 AM
"MDT": {"utc_offset": "-06:00", "name": "Mountain Daylight Time"}, # -6,8/11/2006 10:57:50 AM
"MST": {"utc_offset": "-07:00", "name": "Mountain Standard Time"}, # -7,8/11/2006 10:57:50 AM
"NDT": {"utc_offset": "-02:00", "name": "Newfoundland Daylight Time"}, # -2.5,8/11/2006 10:57:50 AM
"NST": {"utc_offset": "-03:00", "name": "Newfoundland Standard Time"}, # -3.5,8/11/2006 10:57:50 AM
"PDT": {"utc_offset": "-07:00", "name": "Pacific Daylight Time"}, # -7,8/11/2006 10:57:50 AM
"PST": {"utc_offset": "-08:00", "name": "Pacific Standard Time"}, # -8,8/11/2006 10:57:50 AM
"SST": {"utc_offset": "-11:00", "name": "American Samoa Standard Time"}, # -11,8/11/2006 10:57:50 AM
"UTC": {"utc_offset": "+00:00", "name": "Coordinated Universal Time"}, # 0,1/15/2010 1:10:00 PM
"YST": {"utc_offset": "-09:00", "name": "Yukon Standard Time: U.S.-Yukatat (*retired: >1983 use AKST)"} # -9,8/11/2006 10:57:50 AM
}
EPA_WQP_API_VERSION = os.environ.get('EPA_WQP_API_VERSION', '2.2')
# mapping of EPA WQP API v2.2 to v3.0
# https://www.epa.gov/waterdata/water-quality-portal-quick-reference-guide
# accessed 2024-09-23
FIELD_NAMES = {
'loc_id': {'2.2': 'MonitoringLocationIdentifier', '3.0': 'Location_Identifier'},
'lat': {'2.2': 'LatitudeMeasure', '3.0': 'Location_Latitude'},
'long': {'2.2': 'LongitudeMeasure', '3.0': 'Location_Longitude'},
'org_name': {'2.2': 'OrganizationFormalName', '3.0': 'Org_FormalName'},
'loc_name': {'2.2': 'MonitoringLocationName', '3.0': 'Location_Name'},
'huc_8_code': {'2.2': 'HUCEightDigitCode', '3.0': 'Location_HUCEightDigitCode'},
'provider': {'2.2': 'ProviderName', '3.0': 'ProviderName'},
'phys_chem_results': {'2.2': 'resultPhysChem', '3.0': 'fullPhysChem'},
'observed_property': {'2.2': 'CharacteristicName', '3.0': 'Result_Characteristic'},
'sampling_media': {'2.2': 'ActivityMediaName', '3.0': 'Activity_Media'},
'sample_fraction': {'2.2': 'ResultSampleFractionText', '3.0': 'Result_SampleFraction'},
'depth_height': {'2.2': 'ResultDepthHeightMeasure/MeasureValue', '3.0': 'ResultDepthHeight_Measure'},
'aggregation_duration': {'2.2': 'ResultTimeBasisText', '3.0': 'Result_TimeBasis'},
'sampling_temp': {'2.2': 'ResultTemperatureBasisText', '3.0': 'Result_MeasureTemperatureBasis'},
'statistic': {'2.2': 'StatisticalBaseCode', '3.0': 'Result_StatisticalBase'},
'start_date': {'2.2': 'ActivityStartDate', '3.0': 'Activity_StartDate'},
'start_time': {'2.2': 'ActivityStartTime/Time', '3.0': 'Activity_StartTime'},
'start_time_zone': {'2.2': 'ActivityStartTime/TimeZoneCode', '3.0': 'Activity_StartTimeZone'},
'end_date': {'2.2': 'ActivityEndDate', '3.0': 'Activity_EndDate'},
'end_time': {'2.2': 'ActivityEndTime/Time', '3.0': 'Activity_EndTime'},
'end_time_zone': {'2.2': 'ActivityEndTime/TimeZoneCode', '3.0': 'Activity_EndTimeZone'},
'result': {'2.2': 'ResultMeasureValue', '3.0': 'Result_Measure'},
'result_unit': {'2.2': 'ResultMeasure/MeasureUnitCode', '3.0': 'Result_MeasureUnit'},
'result_status': {'2.2': 'ResultStatusIdentifier', '3.0': 'Result_MeasureStatusIdentifier'},
'result_type': {'2.2': 'ResultValueTypeName', '3.0': 'Result_MeasureType'},
'activity_type': {'2.2': 'ActivityTypeCode', '3.0': 'Activity_TypeCode'},
'depth_height_unit': {'2.2': 'ResultDepthHeightMeasure/MeasureUnitCode', '3.0': 'ResultDepthHeight_MeasureUnit'},
'depth_height_ref': {'2.2': 'ResultDepthAltitudeReferencePointText', '3.0': 'ResultDepthHeight_AltitudeReferencePoint'}
}
EPA_GEOSERVER_WFS_TIMEOUT_LIMIT = os.environ.get('EPA_GEOSERVER_WFS_TIMEOUT_LIMIT', 5)
def _post_wqp_search(search_type: str, query_data: dict,
api_version: str = EPA_WQP_API_VERSION) -> Optional[requests.Response]:
"""
Request the locations information
:param search_type: EPA WQP Data Profile type: Physical / Chemical (See FIELD_NAMES key 'phys_chem_results' for vocab, or Station (for locations)
:param query_data: query in WQP vocabulary for the request
:param api_version: EPA WQP API version: '2.2' and '3.0' currently supported; otherwise the url will be invalid.
"""
url_api_version = 'INVALID_API_VERSION'
if api_version == '2.2':
url_api_version = 'data'
elif api_version == '3.0':
url_api_version = 'wqx3'
url_with_params = f'https://www.waterqualitydata.us/{url_api_version}/{search_type}/search?mimeType=csv'
headers = {'content-type': 'application/json'}
data = {'providers': ['STORET']}
data.update(query_data)
logger.info(f'url: {url_with_params} and payload {str(data)}')
try:
data_json = json.dumps(data)
resp = requests.post(url_with_params, data_json, headers=headers, verify=True, stream=True)
logger.info(f'response: {resp.status_code}')
return resp
except Exception as e:
logger.error(f'Unsuccessful attempt to query WQP; {e}')
return None
def _get_location_info(loc_type: str, loc_list: list, synthesis_messages: list = []) -> list:
"""
Get location information. First try the GeoServer WFS endpoint and if is not available
within the timeout limit, fail over to a WQP Station request.
:param loc_type: siteid or huc
:param loc_list: list of location identifiers
:param synthesis_messages: list of messages to report warning / error messages
"""
loc_info_list = []
fail_over = False
msg: Optional[str] = None
# try the ogc approach
try:
if loc_type == 'siteid':
id_list_str = _make_mf_query_str(loc_list)
loc_info_list = _get_location_info_ogc(id_list_str, synthesis_messages)
else:
for huc in loc_list:
huc_query_str = f'huc%3A{huc}'
loc_info_results = _get_location_info_ogc(huc_query_str, synthesis_messages)
loc_info_list.extend(loc_info_results)
# if fails, stream the csv file
except TimeoutError as e:
fail_over = True
msg = f'WFS Geoserver timed out, fail over to WQP Station request\nError: {e}'
except AccessIssueException as e:
fail_over = True
msg = f'WFS Geoserver did not respond appropriately, fail over to WQP Station request\nError: {e}'
except Exception as e:
fail_over = True
msg = f'WFS Geoserver did not respond appropriately, fail over to WQP Station request\nError: {e}'
if fail_over:
logger.warning(msg)
synthesis_messages.append(msg)
loc_info_list.extend(_get_location_info_csv(loc_type, loc_list, synthesis_messages))
return loc_info_list
def _get_location_info_csv(loc_type: str, loc_list: list, synthesis_messages: list) -> list:
"""
Get location information from the EPA Data Source using the Station WQP request type
:param loc_type: siteid or huc
:param loc_list: list of location identifiers
:param synthesis_messages: list of messages to report warning / error messages
"""
api_version = EPA_WQP_API_VERSION
loc_info: List[dict] = []
wqp_response = _post_wqp_search('Station', {loc_type: loc_list}, api_version)
if not wqp_response or (wqp_response.status_code and wqp_response.status_code != 200):
msg = 'No or invalid response to WQP Station request'
logger.error(msg)
synthesis_messages.append(msg)
return loc_info
wqp_csv = _get_csv_dict_reader(wqp_response)
# Build a dictionary object that looks like the OGC WFS structure so have one parsing code
for idx, row_dict in enumerate(wqp_csv):
properties_name = row_dict.get(FIELD_NAMES['loc_id'][api_version])
if not properties_name:
msg = f'Row {idx} of the response csv does not have an identifier, skipping'
logger.warning(msg)
synthesis_messages.append(msg)
continue
geometry_coord_lat = row_dict.get(FIELD_NAMES['lat'][api_version])
geometry_coord_long = row_dict.get(FIELD_NAMES['long'][api_version])
if not geometry_coord_lat or not geometry_coord_long:
msg = f'Row {idx} with identifier {properties_name} does not have both lat / long coordinates. skipping'
logger.warning(msg)
synthesis_messages.append(msg)
continue
properties_org_name = row_dict.get(FIELD_NAMES['org_name'][api_version])
properties_loc_name = row_dict.get(FIELD_NAMES['loc_name'][api_version])
properties_huc8 = row_dict.get(FIELD_NAMES['huc_8_code'][api_version])
properties_provider = row_dict.get(FIELD_NAMES['provider'][api_version])
loc_info.append({
'geometry': {'type': 'Point', 'coordinates': [geometry_coord_long, geometry_coord_lat]},
'properties': {'name': properties_name, 'locName': properties_loc_name,
'huc8': properties_huc8, 'orgName': properties_org_name,
'provider': properties_provider}})
return loc_info
def _get_location_info_ogc(loc_str: str, synthesis_messages: list) -> list:
"""
Get location information from the EPA Data Source for specified locations from WFS in OGC format
:param loc_str: query string for the GeoServer WFS request
:param synthesis_messages: list of messages to report warning / error messages
"""
loc_info_list = []
url = ('https://www.waterqualitydata.us/ogcservices/wfs/?request=GetFeature&service=wfs&version=2.0.0&typeNames=wqp_sites'
f'&SEARCHPARAMS={loc_str}%3Bproviders%3ASTORET&outputFormat=application%2Fjson')
# if the timeout limit is set via an environmental variable it will be a str. Convert it to float.
# if the conversion fails, then set value to str and get_url will throw an exception which will result
# in the get_loc_info method will fail over to the WQP Station request.
try:
geoserver_timeout_limit: Union[str, float, int] = float(EPA_GEOSERVER_WFS_TIMEOUT_LIMIT)
except ValueError as e:
msg = f'EPA_GEOSERVER_WFS_TIMEOUT_LIMIT: {e}'
logger.warning(msg)
synthesis_messages.append(msg)
geoserver_timeout_limit = EPA_GEOSERVER_WFS_TIMEOUT_LIMIT
except Exception as e:
msg = f'EPA_GEOSERVER_WFS_TIMEOUT_LIMIT: Unknown exception while trying to convert value to float: {e}'
logger.error(msg)
synthesis_messages.append(msg)
raise Exception(msg)
# ToDo: Enhancements -- stream and chunk for expected large returns, chunk site_ids into multiple calls if large list
result = get_url(url, timeout=geoserver_timeout_limit)
if result and result.status_code and result.status_code == 200:
try:
result_data = json.loads(result.content)
loc_info_list = result_data.get('features', [])
except Exception as e:
msg = f'EPA WQX {url} result could not be parsed {e}'
logger.warning(msg)
synthesis_messages.append(msg)
else:
error_code = 'NO RESPONSE'
if result and result.status_code:
error_code = result.status_code
msg = f'EPA WQX {url} returned error code {error_code}. Trying fail over.'
# Raise exception that will be caught in the _get_location_info try/except code block.
# The exception will be caught and the msg will be reported in the failover attempt.
raise AccessIssueException(msg)
return loc_info_list
def _make_monitoring_feature_object(plugin_access: DataSourcePluginAccess, loc_info_obj: dict) -> Optional[MonitoringFeature]:
"""
Generate a monitoring feature object from the EPA location info
:param loc_info_obj:
:return:
"""
loc_geometry_type = None
loc_geometry = loc_info_obj.get('geometry')
if loc_geometry:
loc_geometry_type = loc_geometry.get('type')
if not loc_geometry_type or loc_geometry_type.upper() != FeatureTypeEnum.POINT.value:
return None
loc_properties = loc_info_obj.get('properties')
if not loc_properties:
return None
loc_id = loc_properties.get('name')
loc_name = loc_properties.get('locName')
huc8 = loc_properties.get('huc8')
org = loc_properties.get('orgName')
provider = loc_properties.get('provider')
desc = f'Location is part of USGS huc {huc8}; organization {org}; provider {plugin_access.datasource.id} {provider}'
monitoring_feature = MonitoringFeature(
plugin_access,
id=loc_id,
name=loc_name,
description=desc,
feature_type=FeatureTypeEnum.POINT,
shape=SpatialSamplingShapes.SHAPE_POINT)
coord: list = loc_geometry.get('coordinates') # type: ignore
if coord:
monitoring_feature.coordinates = Coordinate(
absolute=AbsoluteCoordinate(horizontal_position=GeographicCoordinate(
**{"latitude": float(coord[1]),
"longitude": float(coord[0]),
"units": GeographicCoordinate.UNITS_DEC_DEGREES})))
return monitoring_feature
def _make_mf_query_str(query_mf: list) -> str:
"""
Create a monitoring feature query string for the EPA locations REST API
:param query_mf:
:return:
"""
id_list = []
for idx, mf_id in enumerate(query_mf):
url_encoded_id = urllib.parse.quote(mf_id)
if idx == 0:
id_list.append(f'siteid%3A{url_encoded_id}')
continue
id_list.append(f'%7C{url_encoded_id}')
return ''.join(id_list)
def _reformat_date_for_epa_query(b3d_query_date: date) -> str:
"""
Change BASIN-3D date object to EPA format: MM-DD-YYYY
:param b3d_query_date:
:return:
"""
date_str = str(b3d_query_date) # YYYY-MM-DD
date_str_pieces = date_str.split('-')
return f'{date_str_pieces[1]}-{date_str_pieces[2]}-{date_str_pieces[0]}'
[docs]
class EPAMonitoringFeatureAccess(DataSourcePluginAccess):
"""
Access for mapping EPA Water Quality Exchange sampling locations to
:class:`~basin3d.core.models.MonitoringFeature` objects.
Using WFS OGC Mapping service: '~/ogcservices/wfs/?request=GetFeature&service=wfs&version=2.0.0&typeNames=wqp_sites&SEARCHPARAMS=providers%3ASTORET&outputFormat=application%2Fjson'
Backup: WQP Station endpoint: 'https://www.waterqualitydata.us/{url_api_version}/Station/search?mimeType=csv'
============== === ====================================================
EPA WQX BASIN-3D
============== === ====================================================
siteid >> :class:`basin3d.core.models.MonitoringFeature`
============== === ====================================================
"""
synthesis_model_class = MonitoringFeature
[docs]
def list(self, query: QueryMonitoringFeature):
"""
Return measurement locations based on the query parameters
Either a parent_feature list or monitoring_feature list should be specified.
USGS hucs can be specified for parent_feature parameters (e.g., EPA-14020001)
:param query: MonitoringFeature query
:return:
"""
synthesis_messages: List[str] = []
feature_type = isinstance(query.feature_type, FeatureTypeEnum) and query.feature_type.value or query.feature_type
if feature_type and feature_type not in EPADataSourcePlugin.feature_types:
msg = f'Feature type {feature_type} not supported by {self.datasource.name}.'
synthesis_messages.append(msg)
logger.warning(msg)
elif not query.parent_feature and not query.monitoring_feature:
msg = f'{self.datasource.name} requires either a parent feature or monitoring feature be specified in the query.'
synthesis_messages.append(msg)
logger.warning(msg)
elif query.parent_feature and query.monitoring_feature:
msg = f'{self.datasource.name} does not support querying monitoring features by both parent_feature (huc) and monitoring_feature (list of ids).'
synthesis_messages.append(msg)
logger.warning(msg)
else:
loc_list = []
loc_type = None
if query.parent_feature:
for parent_feature in query.parent_feature:
if len(parent_feature) not in [2, 4, 6, 8, 10, 12] or not parent_feature.isdigit():
msg = f'{self.datasource.name}: {parent_feature} does not appear to be a valid USGS huc: 2, 4, 6, 8, 10, or 12-digit code.'
synthesis_messages.append(msg)
logger.warning(msg)
continue
wildcard = '*'
if len(parent_feature) >= 8:
wildcard = ''
loc_list.append(f'{parent_feature}{wildcard}')
if loc_list:
loc_type = 'huc'
elif query.monitoring_feature:
mf_separate_list = separate_list_types(query.monitoring_feature, {'named': str, 'bbox': tuple})
named_mf_list = mf_separate_list.get('named', [])
# ToDo: future, enable bbox query
if not named_mf_list:
logger.info(f'Data source {self.datasource.id} requires specification of monitoring feature identifier.')
yield
loc_list = deepcopy(named_mf_list)
loc_type = 'siteid'
if loc_type:
loc_info_list = _get_location_info(loc_type, loc_list, synthesis_messages) # type: ignore
for loc_info in loc_info_list:
mf_obj = _make_monitoring_feature_object(self, loc_info)
if mf_obj:
yield mf_obj
return StopIteration(synthesis_messages) # type: ignore
[docs]
def get(self, query: QueryMonitoringFeature):
"""
Get a EPA measurement location based on the id
:param query: MonitoringFeature query, id must be specified
:return:
"""
# query.id will always be a string at this point with validation upstream, thus ignoring the type checking
loc_info = _get_location_info('siteid', [query.id])
if loc_info:
if len(loc_info) > 1:
msg = f'{self.datasource.id}: monitoring feature query by id {query.id} yielded too many site_id results from WQP. Cannot report single site.'
logger.warning(msg)
return None
return _make_monitoring_feature_object(self, loc_info[0])
return None
def _get_csv_dict_reader(epa_response: requests.Response) -> csv.DictReader:
"""
Generate a DictReader from the EPA data response. Isolate the functionality so that it can easily be mocked for testing.
:param epa_response:
:return:
"""
return csv.DictReader(epa_response.iter_lines(decode_unicode=True))
def _convert_distance_unit_to_meters(unit: str) -> Optional[float]:
"""
Distance unit converter for depth / height values.
:param unit: the measurement unit variable
:return:
"""
multiplier = None
if unit == 'ft':
multiplier = 0.3048
elif unit == 'in':
multiplier = 0.0254
elif unit == 'cm':
multiplier = 0.01
elif unit == 'm':
multiplier = 1
return multiplier
def _parse_epa_results_phys_chem(wqp_response: requests.Response, query: QueryMeasurementTimeseriesTVP, op_map: dict,
results: dict, synthesis_messages: list, api_version: str = EPA_WQP_API_VERSION) -> set:
"""
Helper to parse the resultsPhysChem data.
See https://www.waterqualitydata.us/portal_userguide/#table-7-sample-results-physicalchemical-result-retrieval-metadata
Combo to find repeated measures in time (api_version = '2.2' | '3.0')
MonitoringLocationIdentifier | Location_Identifier -- endpoint query
CharacteristicName | Result_Characteristic -- endpoint query
ActivityMediaName | Activity_Media -- endpoint query
ResultSampleFractionText | Result_SampleFraction
ResultDepthHeightMeasure/MeasureValue | ResultDepthHeight_Measure
ResultTimeBasisText | Result_TimeBasis -- filter response | aggregation_duration, assume no value is instantaneous
ResultTemperatureBasisText | Result_MeasureTemperatureBasis
StatisticalBaseCode | Result_StatisticalBase -- filter response | statistic
Time (api_version = '2.2' | '3.0')
NOTE: combo these start time values to report in the TVP time. Set time position to start.
ActivityStartDate | Activity_StartDate-- endpoint query
ActivityStartTime/Time | Activity_StartTime
ActivityStartTime/TimeZoneCode | Activity_StartTimeZone
# Using start date only, but including here for completeness (api_version = '2.2' | '3.0')
ActivityEndDate | Activity_EndDate -- endpoint query
ActivityEndTime/Time | Activity_EndTime
ActivityEndTime/TimeZoneCode | Activity_EndTimeZone
Measurement results (api_version = '2.2' | '3.0')
ResultMeasureValue | Result_Measure = value
ResultMeasure/MeasureUnitCode | Result_MeasureUnit = unit
ResultStatusIdentifier | Result_MeasureStatusIdentifier -- filter response | result_quality
ResultValueTypeName | Result_MeasureType -- filter response for Estimated ONLY | result_quality
Additional info (api_version = '2.2' | '3.0')
ActivityTypeCode | Activity_TypeCode, e.g. Field Msr/Obs, Sample-Routine
ResultDepthHeightMeasure/MeasureUnitCode | ResultDepthHeight_MeasureUnit
ResultDepthAltitudeReferencePointText | ResultDepthHeight_AltitudeReferencePoint
:param wqp_response:
:param query:
:param op_map: observed property map of datasource variable vocab to basin vocab
:param results:
:param synthesis_messages:
:return:
"""
# ToDo: get estimated value from attribute mappings directly.
EPA_ESTIMATED_VOCAB = 'Estimated'
def translate_empty_value(value):
"""
Turn empty string values into None
:param value:
:return:
"""
if value == '':
return None
return value
def make_timestamp(row_dict: dict) -> Optional[str]:
start_date = row_dict.get(FIELD_NAMES['start_date'][api_version])
if not start_date:
return None
start_timestamp_str = f'{start_date}'
timestamp_str_format = '%Y-%m-%d'
start_time = row_dict.get(FIELD_NAMES['start_time'][api_version])
if start_time:
timestamp_str_format += ' %H:%M:%S'
start_timestamp_str += f' {start_time}'
start_time_zone = row_dict.get(FIELD_NAMES['start_time_zone'][api_version])
if start_time_zone:
mapped_time_zone = TIMEZONE_MAP.get(start_time_zone)
if mapped_time_zone:
start_time_zone_int = mapped_time_zone.get("utc_offset")
if start_time_zone_int:
start_timestamp_str += start_time_zone_int
timestamp_str_format += '%z'
try:
return dt.strptime(start_timestamp_str, timestamp_str_format).isoformat(sep='T')
except Exception as e:
# ToDo: enhancement, include more info in the msg
msg = f'Could not parse and convert start timestamp: {start_timestamp_str}; {e}'
synthesis_messages.append(msg)
logger.warning(msg)
return None
def str_to_numeric(value) -> Optional[Union[float, int]]:
if not translate_empty_value(value):
return None
is_float = True
if '.' not in value:
is_float = False
try:
if is_float:
return float(value)
return int(value)
except Exception:
# ToDo: enhancement, include more info in the msg
msg = f'Could not parse expected numerical measurement value {value}'
synthesis_messages.append(msg)
logger.warning(msg)
return None
def is_in_query(row_dict: dict, query: QueryMeasurementTimeseriesTVP) -> bool:
"""
(api version = '2.2' | '3.0')
ResultTimeBasisText | Result_TimeBasis -- filter response | aggregation_duration, assume empty value is instantaneous
StatisticalBaseCode | Result_StatisticalBase -- filter response | statistic
ResultStatusIdentifier | Result_MeasureStatusIdentifier -- filter response | result_quality
ResultValueTypeName | Result_MeasureType -- filter response for Estimated ONLY | result_quality
:param row_dict:
:param query:
:return:
"""
if query.aggregation_duration and query.aggregation_duration[0] != 'NONE':
ds_aggregation_duration = row_dict.get(FIELD_NAMES['aggregation_duration'][api_version])
if not ds_aggregation_duration or ds_aggregation_duration and ds_aggregation_duration not in query.aggregation_duration:
return False
elif query.aggregation_duration[0] == 'NONE':
if row_dict.get(FIELD_NAMES['aggregation_duration'][api_version]):
return False
if query.statistic:
ds_statistic = row_dict.get(FIELD_NAMES['statistic'][api_version])
if not ds_statistic or ds_statistic and ds_statistic not in query.statistic:
return False
# EPA estimated vocab is in vocabulary list separate from the rest of the result quality terms.
# Revisit if BASIN-3D adds a ResultValueType
if query.result_quality and EPA_ESTIMATED_VOCAB in query.result_quality:
ds_result_value_type = row_dict.get(FIELD_NAMES['result_type'][api_version])
if ds_result_value_type == EPA_ESTIMATED_VOCAB:
return True
if query.result_quality:
ds_result_value_type = row_dict.get(FIELD_NAMES['result_type'][api_version])
if ds_result_value_type == EPA_ESTIMATED_VOCAB:
return False
ds_result_quality = row_dict.get(FIELD_NAMES['result_status'][api_version])
if not ds_result_quality or ds_result_quality and ds_result_quality not in query.result_quality:
return False
return True
# --------------- End of internal helper functions
mf_set = set()
# wqp_csv = csv.DictReader(wqp_response.iter_lines(decode_unicode=True))
wqp_csv = _get_csv_dict_reader(wqp_response)
for row_dict in wqp_csv:
# filter out any results that don't match the query fields that were not specified in the call to WQP
if not is_in_query(row_dict, query):
continue
# make the dictionary key
mf_id = translate_empty_value(row_dict.get(FIELD_NAMES['loc_id'][api_version]))
# epa_observed_property = translate_empty_value(row_dict.get('CharacteristicName'))
epa_observed_property = translate_empty_value(row_dict.get(FIELD_NAMES['observed_property'][api_version]))
measurement = str_to_numeric(row_dict.get(FIELD_NAMES['result'][api_version]))
# check first set of required info
if any([x is None for x in [mf_id, epa_observed_property, measurement]]):
continue
# get the basin3d vocab for synthesis and skip if no match
basin3d_vocab = op_map.get(epa_observed_property)
if not basin3d_vocab:
continue
mf_set.add(mf_id)
# time
start_timestamp = make_timestamp(row_dict)
# if the time fields cannot be parsed, move on.
if not start_timestamp:
continue
tvp = TimeValuePair(timestamp=start_timestamp, value=measurement)
epa_sampling_medium = row_dict.get(FIELD_NAMES['sampling_media'][api_version])
epa_sample_fraction = row_dict.get(FIELD_NAMES['sample_fraction'][api_version])
# Note there are also Activity DepthHeight fields
epa_depth_height = row_dict.get(FIELD_NAMES['depth_height'][api_version])
epa_depth_height_unit = row_dict.get(FIELD_NAMES['depth_height_unit'][api_version])
epa_depth_height_ref = row_dict.get(FIELD_NAMES['depth_height_ref'][api_version])
epa_aggregation_duration = row_dict.get(FIELD_NAMES['aggregation_duration'][api_version])
epa_sampling_temp = row_dict.get(FIELD_NAMES['sampling_temp'][api_version])
epa_statistic = row_dict.get(FIELD_NAMES['statistic'][api_version])
epa_unit = translate_empty_value(row_dict.get(FIELD_NAMES['result_unit'][api_version]))
# result_quality lives in 2 fields so first look in one field that has priority over the others.
# If not, translate the the other field.
epa_result_quality = translate_empty_value(row_dict.get(FIELD_NAMES['result_type'][api_version]))
if epa_result_quality is None or epa_result_quality != EPA_ESTIMATED_VOCAB:
epa_result_quality = translate_empty_value(row_dict.get(FIELD_NAMES['result_status'][api_version]))
# The data return are a mix of siteids, variables, dates, etc because the data are not timeseries.
# so make a unique key out of the fields that would define a single time series. Add timestamps / data to that
# unique timeseries as the return is streamed.
result_key = (f'{mf_id}-{basin3d_vocab}-{epa_sampling_medium}-{epa_unit}-{epa_depth_height}'
f'-{epa_depth_height_unit}-{epa_aggregation_duration}-{epa_statistic}-{epa_sampling_temp}')
combo_dict = results.setdefault(result_key, {})
if 'metadata' not in combo_dict.keys():
id_info = f'{mf_id}-{epa_observed_property}'
if translate_empty_value(epa_sample_fraction):
id_info = f'{id_info}-sample_fraction:{epa_sample_fraction}'
if translate_empty_value(epa_sampling_temp):
id_info = f'{id_info}-activity_temp:{epa_sampling_temp}'
if translate_empty_value(epa_depth_height_ref):
id_info = f'{id_info}-depth_ref:{epa_depth_height_ref}'
combo_dict['metadata'] = {
'id': id_info,
'mf_id': mf_id,
'sampling_medium': translate_empty_value(epa_sampling_medium),
'aggregation_duration': translate_empty_value(epa_aggregation_duration),
'result_quality': set(),
'statistic': translate_empty_value(epa_statistic),
'unit_of_measurement': epa_unit,
'height_depth': str_to_numeric(epa_depth_height),
'height_depth_unit': translate_empty_value(epa_depth_height_unit),
'observed_property': epa_observed_property
}
if epa_result_quality:
combo_dict['metadata']['result_quality'].add(epa_result_quality)
combo_dict.setdefault('result_tvp', []).append(tvp)
combo_dict.setdefault('result_quality', []).append(epa_result_quality)
return mf_set
[docs]
class EPAMeasurementTimeseriesTVPObservationAccess(DataSourcePluginAccess):
"""
Water Quality Portal Service: https://www.waterqualitydata.us/
Access for mapping EPA water quality data to
:class:`~basin3d.core.models.MeasurementTimeseriesTVPObservation` objects.
"""
synthesis_model_class = MeasurementTimeseriesTVPObservation
[docs]
def list(self, query: QueryMeasurementTimeseriesTVP):
"""
Generate EPA Measurement Timeseries TVP object. The data are not really time series. But we are treating them as such.
:param query:
:return:
"""
api_version = EPA_WQP_API_VERSION
synthesis_messages: List[str] = []
mf_separated_lists = separate_list_types(query.monitoring_feature, {'named': str, 'bbox': tuple})
named_monitoring_features = mf_separated_lists.get('named', [])
# ToDo: future, expand to support bbox
if not named_monitoring_features:
logger.info(f'Data source {self.datasource.id} requires specification of monitoring feature identifier.')
yield
params = {'dataProfile': FIELD_NAMES['phys_chem_results'][api_version],
'siteid': named_monitoring_features,
'characteristicName': query.observed_property,
'startDateLo': _reformat_date_for_epa_query(query.start_date)}
if query.sampling_medium:
params.update({"sampleMedia": query.sampling_medium})
if query.end_date:
params.update({"startDateHi": _reformat_date_for_epa_query(query.end_date)})
# get data from resultPhysChem; possible extension biological results (sample based data)
epa_data = _post_wqp_search('Result', params, api_version)
mf_list = []
if epa_data and epa_data.status_code == 200:
# Because the data are all mixed together and we support multiple mappings,
# group the variables by the BASIN-3D vocab. Create a look up store for the variables in the query.
op_map = self._get_observed_property_map(query.observed_property)
results = {} # type: ignore[var-annotated]
mf_set = _parse_epa_results_phys_chem(epa_data, query, op_map, results, synthesis_messages, api_version)
mf_list = list(mf_set)
if mf_list:
# mf_query_str = _make_mf_query_str(mf_list)
loc_info = _get_location_info('siteid', mf_list, synthesis_messages)
loc_info_store = {}
for loc_info_obj in loc_info:
loc_properties = loc_info_obj.get('properties')
if not loc_properties:
continue
loc_id = loc_properties.get('name')
if loc_id:
loc_info_store[loc_id] = loc_info_obj
# loop thru the results
for result in results.values():
metadata = result.get('metadata')
mf_id = metadata.get('mf_id')
monitoring_feature_info = loc_info_store.get(mf_id)
if not monitoring_feature_info:
msg = f'{self.datasource.id}: Location information for {mf_id} could not be found.'
logger.debug(msg)
continue
mf_obj = _make_monitoring_feature_object(self, monitoring_feature_info)
if not mf_obj:
continue
if metadata.get('height_depth'):
depth_height_unit = metadata.get('height_depth_unit')
unit_multiplier = _convert_distance_unit_to_meters(depth_height_unit)
if depth_height_unit and unit_multiplier:
depth_height = metadata.get('height_depth')
depth_height *= unit_multiplier
if mf_obj.coordinates:
mf_obj.coordinates.representative = RepresentativeCoordinate(
vertical_position=DepthCoordinate(
value=depth_height,
distance_units=VerticalCoordinate.DISTANCE_UNITS_METERS))
metadata_result_quality = None
if metadata.get('result_quality'):
metadata_result_quality = list(metadata.get('result_quality'))
# deal with result values
measurement_result_quality = result.get('result_quality')
if any(measurement_result_quality):
tvp_result = ResultListTVP(self, value=result.get('result_tvp'), result_quality=measurement_result_quality)
else:
tvp_result = ResultListTVP(self, value=result.get('result_tvp'))
meas_tvp_obj = MeasurementTimeseriesTVPObservation(
plugin_access=self,
id=metadata.get('id'),
observed_property=metadata.get('observed_property'),
feature_of_interest=mf_obj,
feature_of_interest_type=mf_obj.shape,
time_reference_position=TimeMetadataMixin.TIME_REFERENCE_START,
result_quality=metadata_result_quality,
result=tvp_result,
statistic=metadata.get('statistic'),
aggregation_duration=metadata.get('aggregation_duration'),
sampling_medium=metadata.get('sampling_medium'),
unit_of_measurement=metadata.get('unit_of_measurement')
)
yield meas_tvp_obj
else:
msg = f'{self.datasource.id}: No resultPhysChem results matched the query: {str(params)}'
synthesis_messages.append(msg)
return StopIteration(synthesis_messages)
def _get_observed_property_map(self, observed_properties) -> dict:
"""
Get a map of data source vocabulary to basin 3d vocabulary to group observations.
:param observed_properties: list, observed properties
:return:
"""
op_map = {}
for observed_property in observed_properties:
attr_mapping: AttributeMapping = get_datasource_mapped_attribute(self, MappedAttributeEnum.OBSERVED_PROPERTY.value, observed_property)
basin3d_vocab = attr_mapping.basin3d_vocab
op_map[observed_property] = basin3d_vocab
return op_map
[docs]
@basin3d_plugin
class EPADataSourcePlugin(DataSourcePluginPoint):
title = 'EPA Water Quality eXchange Data Source Plugin'
plugin_access_classes = (EPAMonitoringFeatureAccess, EPAMeasurementTimeseriesTVPObservationAccess)
# IF this list grows, EPAMonitoringFeature logic may need to be updated.
feature_types = ['POINT']