"""
`django_basin3d.synthesis.viewsets`
***********************************
.. currentmodule:: django_basin3d.synthesis.viewsets
:synopsis: BASIN-3D Synthesis Model Viewsets (View Controllers) that support the REST API
:module author: Val Hendrix <vhendrix@lbl.gov>
:module author: Danielle Svehla Christianson <dschristianson@lbl.gov>
"""
import logging
import pydantic
import typing
from basin3d.core.schema.enum import FeatureTypeEnum
from basin3d.core.schema.query import QueryMeasurementTimeseriesTVP, QueryMonitoringFeature
from rest_framework import status, versioning
from rest_framework.decorators import action
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.viewsets import ViewSet
from basin3d.core.models import MeasurementTimeseriesTVPObservation, MonitoringFeature
from basin3d.core.synthesis import DataSourceModelAccess, MeasurementTimeseriesTVPObservationAccess, \
MonitoringFeatureAccess
from django_basin3d.models import DataSource
from django_basin3d.synthesis.serializers import MeasurementTimeseriesTVPObservationSerializer, \
MonitoringFeatureSerializer
logger = logging.getLogger(__name__)
def _get_request_feature_type(request):
"""
Return the feature type if exists in the request
:param request: request
otherwise return the text version
:return: the feature_type in the format specified, None if none exists
"""
for feature_type in FeatureTypeEnum.values():
urlpath = request.path_info
url_feature_type = ''.join(feature_type.lower().split('_'))
if f'{url_feature_type}s' in urlpath.split('/'):
return feature_type
return None
def _convert_str_params_to_list(params: dict, query_class) -> dict:
"""
:param params:
:param query_class:
:return:
"""
query_arg_types = typing.get_type_hints(query_class)
for arg, arg_type in query_arg_types.items():
# arg_type_str = str(arg_type)
if 'List' not in str(arg_type) or arg not in params.keys():
continue
params[arg] = params[arg].split(',')
return params
[docs]class DataSourcePluginViewSet(ViewSet, DataSourceModelAccess):
"""
Base ViewsSet for all DataSource plugins. This class extends the
`Django Rest Framework <https://www.django-rest-framework.org/>`_
class :class:`rest_framework.viewsets.ViewSet`. These are based on `Django generic views
<https://docs.djangoproject.com/en/2.2/topics/class-based-views/generic-display/>`_.
"""
versioning_class = versioning.NamespaceVersioning
def __init__(self):
# Override super class
from django_basin3d.catalog import CatalogDjango
self._catalog = CatalogDjango()
@property
def plugins(self):
plugins = {}
for d in DataSource.objects.all():
plugins[d.id_prefix] = d.get_plugin()
return plugins
@property
def catalog(self):
return self._catalog
[docs] def list(self, **kwargs) -> Response:
"""
Return the synthesized plugin results
:param request: The incoming request object
:type request: :class:`rest_framework.request.Request`
:param format: The format to present the data (default is json)
:return: The HTTP Response
:rtype: :class:`rest_framework.request.Response`
"""
items = []
request = kwargs['request']
query = kwargs['query']
itr = super(DataSourcePluginViewSet, self).list(query=query)
for i in itr:
items.append(i)
serializer = self.__class__.serializer_class(items, many=True, context={'request': request})
synthesis_response = itr.synthesis_response.dict(exclude_unset=True)
synthesis_response['data'] = serializer.data
return Response(synthesis_response)
[docs] def retrieve(self, **kwargs) -> Response:
"""
Retrieve a single object
:param pk: The primary key
:return: The HTTP Response
:rtype: :class:`rest_framework.request.Response`
"""
request = kwargs['request']
query = kwargs['query']
pk = kwargs['pk']
try:
item_synthesis_response = super(DataSourcePluginViewSet, self).retrieve(query)
if not item_synthesis_response or not item_synthesis_response.data:
return Response({"success": False, "detail": f"There is no detail for {pk}"},
status=status.HTTP_404_NOT_FOUND)
else:
try:
serializer = self.__class__.serializer_class(item_synthesis_response.data, context={'request': request})
synthesis_response = item_synthesis_response.dict(exclude_unset=True)
synthesis_response['data'] = serializer.data
return Response(synthesis_response)
except Exception as e:
logger.error("Plugin error: {}".format(e))
except Exception as e:
return Response({'success': False, 'detail': str(e)},
status=status.HTTP_404_NOT_FOUND, )
[docs]class MonitoringFeatureViewSet(DataSourcePluginViewSet, MonitoringFeatureAccess):
"""
MonitoringFeature: A feature upon which monitoring is made. OGC Timeseries Profile OM_MonitoringFeature.
**Synthesis Response**
This endpoint returns the following synthesis response object.
```json
{ "query": {}, "data": [] }
```
**Data Attributes**
Attribute for each data element from the synthesis response is as follows:
* *id:* string, Unique feature identifier
* *name:* string, Feature name
* *description:* string, Description of the feature
* *feature_type:* sting, FeatureType: REGION, SUBREGION, BASIN, SUBBASIN, WATERSHED, SUBWATERSHED, SITE, PLOT, HORIZONTAL PATH, VERTICAL PATH, POINT
* *observed_properties:* list of observed variables made at the feature. Observed property variables are configured via the plugins.
* *related_sampling_feature_complex:* list of related_sampling features. PARENT features are currently supported.
* *shape:* string, Shape of the feature: POINT, CURVE, SURFACE, SOLID
* *coordinates:* location of feature in absolute and/or representative datum
* *description_reference:* string, additional information about the Feature
* *related_party:* (optional) list of people or organizations responsible for the Feature
* *utc_offset:* float, Coordinate Universal Time offset in hours (offset in hours), e.g., +9
* *url:* url, URL with details for the feature
**Filter** by the following attributes (/?attribute=parameter&attribute=parameter&...)
* *datasource (optional):* a single data source id prefix (e.g ?datasource=`datasource.id_prefix`)
* *parent_feature (optional):* a monitoring feature name
**Restrict fields** with query parameter `fields`. (e.g. `?fields=id,name`)
"""
serializer_class = MonitoringFeatureSerializer
synthesis_model = MonitoringFeature
[docs] @typing.no_type_check
def list(self, request: Request, format: str = None) -> Response:
if not request:
raise Response({"success": False, "detail": "Request is missing"}, status=status.HTTP_400_BAD_REQUEST)
feature_type = _get_request_feature_type(request)
params = request.query_params.dict()
params = _convert_str_params_to_list(params, QueryMonitoringFeature)
return super().list(request=request, format=format, query=QueryMonitoringFeature(feature_type=feature_type,
**params))
[docs] @typing.no_type_check
def retrieve(self, request: Request, pk: str) -> Response:
if not request:
return Response({'success': False, 'detail': "Request is missing"},
status=status.HTTP_400_BAD_REQUEST, )
feature_type = _get_request_feature_type(request)
# retrieve method order: MonitoringFeatureViewSet, DataSourceViewSet, MonitoringFeatureAccess
# call super force this order based on inherited class ordering
return super().retrieve(request=request, query=QueryMonitoringFeature(id=pk, feature_type=feature_type), pk=pk)
[docs] @action(detail=True, url_name='regions-detail')
def regions(self, request, pk=None):
return self.retrieve(request=request, pk=pk)
[docs] @action(detail=True, url_name='subregions-detail')
def subregions(self, request, pk=None):
return self.retrieve(request=request, pk=pk)
[docs] @action(detail=True, url_name='basins-detail')
def basins(self, request, pk=None):
return self.retrieve(request=request, pk=pk)
[docs] @action(detail=True, url_name='subbasins-detail')
def subbasins(self, request, pk=None):
return self.retrieve(request=request, pk=pk)
[docs] @action(detail=True, url_name='watersheds-detail')
def watersheds(self, request, pk=None):
return self.retrieve(request=request, pk=pk)
[docs] @action(detail=True, url_name='subwatersheds-detail')
def subwatersheds(self, request, pk=None):
return self.retrieve(request=request, pk=pk)
[docs] @action(detail=True, url_name='sites-detail')
def sites(self, request, pk=None):
return self.retrieve(request=request, pk=pk)
[docs] @action(detail=True, url_name='plots-detail')
def plots(self, request, pk=None):
return self.retrieve(request=request, pk=pk)
[docs] @action(detail=True, url_name='horizontalpaths-detail')
def horizontalpaths(self, request, pk=None):
return self.retrieve(request=request, pk=pk)
[docs] @action(detail=True, url_name='verticalpaths-detail')
def verticalpaths(self, request, pk=None):
return self.retrieve(request=request, pk=pk)
[docs] @action(detail=True, url_name='points-detail')
def points(self, request, pk=None):
return self.retrieve(request=request, pk=pk)
[docs]class MeasurementTimeseriesTVPObservationViewSet(DataSourcePluginViewSet, MeasurementTimeseriesTVPObservationAccess):
"""
MeasurementTimeseriesTVPObservation: Series of measurement (numerical) observations in
TVP (time value pair) format grouped by time (i.e., a timeseries).
**Synthesis Response**
This endpoint returns the following synthesis response object.
```json
{ "query": {}, "data": [] }
```
**Data Attributes**
Attribute for each data element from the synthesis response is as follows:
* *id:* string, Observation identifier (optional)
* *type:* enum, MEASUREMENT_TVP_TIMESERIES
* *observed_property:* str, BASIN-3D vocabulary for the observation's observed property
* *datasource:* URL, url of the datasource
* *sampling_medium:* enum, sampling medium of the observed property (SOLID_PHASE, WATER, GAS, OTHER)
* *phenomenon_time:* datetime, datetime of the observation, for a timeseries the start and end times can be provided
* *utc_offset:* float, Coordinate Universal Time offset in hours (offset in hours), e.g., +9
* *feature_of_interest:* MonitoringFeature obj, feature on which the observation is being made
* *feature_of_interest_type:* enum (FeatureTypes), feature type of the feature of interest
* *result:* dict of corresponding lists of TimeValuePairs, the observed values of the observed property being assessed, and (opt) their result_quality,
* *time_reference_position:* enum, position of timestamp in aggregated_duration (START, MIDDLE, END)
* *aggregation_duration:* enum, time period represented by observation (YEAR, MONTH, DAY, HOUR, MINUTE, SECOND)
* *unit_of_measurement:* string, units in which the observation is reported
* *statistic:* enum, statistical property of the observation result (MEAN, MIN, MAX, TOTAL)
* *result_quality:* list of enum, quality assessment of the result enum (VALIDATED, UNVALIDATED, SUSPECTED, REJECTED, ESTIMATED)
**Filter** by the following attributes (?attribute=parameter,parameter&attribute=parameter&...):
* *monitoring_feature (required):* comma separated list of monitoring_features ids
* *observed_property (required):* comma separated list of observed property basin3d vocabularies
* *start_date (required):* date YYYY-MM-DD
* *end_date (optional):* date YYYY-MM-DD
* *aggregation_duration (default: DAY):* enum (YEAR|MONTH|DAY|HOUR|MINUTE|SECOND|NONE)
* *statistic (optional):* comma separated list of statistic enum(s) (MEAN|MIN|MAX|INSTANTANEOUS)
* *result_quality (optional):* comma separated list of result quality enum(s) enum (VALIDATED|UNVALIDATED|SUSPECTED|REJECTED|ESTIMATED)
* *sampling_medium (optional):* comma separated list of sampling medium enum(s) (SOLID_PHASE|WATER|GAS|OTHER)
* *datasource (optional):* a single data source id prefix (e.g ?datasource=`datasource.id_prefix`)
**Restrict fields** with query parameter `fields`. (e.g. `?fields=id,name`)
"""
serializer_class = MeasurementTimeseriesTVPObservationSerializer
synthesis_model = MeasurementTimeseriesTVPObservation
[docs] @action(detail=False, url_path='measurement_tvp_timeseries', url_name='measurementtvptimeseries-list', methods=['GET'])
def list(self, request: Request, format: str = None) -> Response:
if not request:
return Response({'success': False, 'detail': "Request is missing"},
status=status.HTTP_400_BAD_REQUEST, )
params = request.query_params.dict()
params = _convert_str_params_to_list(params, QueryMeasurementTimeseriesTVP)
try:
return super().list(request=request, format=format, query=QueryMeasurementTimeseriesTVP(**params))
except pydantic.ValidationError as exec_info:
return Response({'success': False, 'detail': "Missing or invalid search criteria",
"errors": exec_info.errors()},
status=status.HTTP_400_BAD_REQUEST, )