Source code for nldi_xstool.ancillary

"""Ancilarry."""

from typing import Any, List, Tuple

import async_retriever as ar
import geopandas as gpd
import requests
from pygeohydro import NWIS
from shapely.geometry import LineString, Point, Polygon

from .ExtADCPBathy import ExtADCPBathy


[docs] def get_ext_bathy_xs(file: str, dist: float, lonstr: str, latstr: str, estr: str, acrs: str) -> gpd.GeoDataFrame: """Extend a measured bathymetric cross-section to above water topography. Args: file (str): Bathymetric survey file. dist (float): Distance to extend survey on both ends. lonstr (str): String heading on longitude in file. latstr (str): String heading on latitude in file. estr (str): String heading on elevation in file. acrs (str): CRS code of measured lon/lat values, for example: epsg:4326. Returns: gpd.GeoDataFrame: Geopandas dataframe of complete cross-section. """ exs = ExtADCPBathy(file=file, dist=dist, lonstr=lonstr, latstr=latstr, estr=estr, acrs=acrs) return exs.get_xs_complete()
# The following function converts NGVD29 to NAVD88 if gage is in NGVD29 using # NOAA NGS Vertcon service api # https://www.ngs.noaa.gov/web_services/ncat/lat-long-height-service.shtml def get_gage_datum(gagenum: str, verbose: bool = False) -> float: """Return the datum of a USGS gage in meters. Function uses the NOAA NGS Vertcon service to convert NGVD29 to NAVD88 if necessary. Args: gagenum (str): USGS Gage number. verbose (bool): Verbose output. Defaults to False. Returns: float: USGS Gage datum in meters. """ si = NWIS().get_info({"site": gagenum}, expanded=True) if si["alt_datum_cd"].values[0] != "NGVD29": return float(si["alt_va"].values[0] * 0.3048) url = "https://www.ngs.noaa.gov/api/ncat/llh" lat_str = "dec_lat_va" lon_str = "dec_long_va" alt_str = "alt_va" indatum_str = "coord_datum_cd" outdatum_str = "NAD83(2011)" in_vert_dataum_str = "NGVD29" out_vert_dataum_str = "NAVD88" if f"{si[indatum_str].values[0]}" == "NAD83": indatum = "NAD83(2011)" else: indatum = f"{si[indatum_str].values[0]}" ohgt = float(si[alt_str].values[0]) * 0.3048 tmplonstr = si[lon_str].values[0] if len(str(tmplonstr)) == 6: tmplonstr = f"0{str(tmplonstr)}" payload = { "lat": f"{si[lat_str].values[0]}", "lon": f"{tmplonstr}", "orthoHt": repr(ohgt), "inDatum": indatum, "outDatum": outdatum_str, "inVertDatum": in_vert_dataum_str, "outVertDatum": out_vert_dataum_str, } resp = ar.retrieve_json([url], [{"params": payload}])[0] if isinstance(resp, list): resp = resp[0] if verbose: print(f"{si[indatum_str].values[0]}") print(f"payload: {payload}") print(resp) return float(resp["destOrthoht"]) # type: ignore[index] # Resolution types and their respective IDs for the Rest Service res_types = { "res_1m": 18, "res_3m": 19, "res_5m": 20, "res_10m": 21, "res_30m": 22, "res_60m": 23, } dim_order = {"latlon": 0, "lonlat": 1} # Create a bounding box from any geo type # 'width' is in meters. It is the width of the buffer to place around the input geometry def make_bbox(shape_type: str, coords: List[Tuple[float, float]], width: int) -> Any: """Return a bounding box of the coordinates buffered by width. Args: shape_type (str): Shape type, one of: point, line, polygon. coords (List[Tuple[float, float]]): List of coordinate tuples. width (int): Width in meters. Returns: Any: Bounding box of geometry (minx, miny, maxx, maxy). """ if shape_type == "point": shape = Point(coords) elif shape_type == "line": shape = LineString(coords) elif shape_type == "polygon": shape = Polygon(coords) converted_width = convert_width(width) buff_shape = shape.buffer(converted_width) return buff_shape.bounds # Convert the width of the buffer from meters to 'degree' # This is NOT a precise conversion, just a quick overestimation # Maybe fix this later def convert_width(width: int) -> int: """Buffer bbox geometry with specified width. This is an estimation and is not precise. Args: width (int): Width in meters to buffer geometry's bounding box. Returns: int: Buffer in degrees. """ factor = 1 / 70000 dist = factor * width return int(dist) # Get request from arcgis Rest Services def get_dem(bbox: Tuple[float, float, float, float], res_type: str) -> bool: """Esri map service query of 3DEPElevationIndex using bounding box. Returns True if res_type (resolution of 3DEP elevation) intersects bounding box. Args: bbox (Tuple[float, float, float, float]): (minx, miny, maxx, maxy). res_type (str): Key in res_types, e.g. ``"res_1m"``, ``"res_3m"``, etc. Returns: bool: True if bbox intersects 3DEP elevation with res_type. """ minx = str(bbox[0]) miny = str(bbox[1]) maxx = str(bbox[2]) maxy = str(bbox[3]) res_id = res_types[res_type] return_val = False domain = "https://index.nationalmap.gov/" path = f"arcgis/rest/services/3DEPElevationIndex/MapServer/{res_id}/query" url = domain + path payload = { # "where": "", # "text": "", # "objectIds": "", # "time": "", "geometry": '{xmin:"' + minx + '",ymin:"' + miny + '",xmax:"' + maxx + '",ymax:"' + maxy + '",spatialReference:{wkid:4326}}', "geometryType": "esriGeometryEnvelope", "inSR": "EPSG:4326", "spatialRel": "esriSpatialRelIntersects", # "relationParam": "", # "outFields": "", "returnGeometry": "true", # "returnTrueCurves": "false", "maxAllowableOffset": "100", "geometryPrecision": "3", "outSR": "EPSG:4326", # "having": "", # "returnIdsOnly": "false", # "returnCountOnly": "false", # "orderByFields": "", # "groupByFieldsForStatistics": "", # "outStatistics": "", # "returnZ": "false", # "returnM": "false", # "gdbVersion": "", # "historicMoment": "", # "returnDistinctValues": "false", # "resultOffset": "", # "resultRecordCount": "", # "queryByDistance": "", # "returnExtentOnly": "false", # "datumTransformation": "", # "parameterValues": "", # "rangeValues": "", # "quantizationParameters": "", "featureEncoding": "esriDefault", "f": "geojson", } try: r = requests.get(url, params=payload, timeout=30) r.raise_for_status() resp = r.json() except requests.exceptions.Timeout: print("The request timed out") return False except requests.exceptions.RequestException as e: print(f"An error occurred: {e}") return False # If the Rest Services has a 200 response if "features" in resp: # If the features are not empty, then the DEM exist if resp["features"] != []: return_val = True # If features are empty, then no DEM if resp["features"] == []: return_val = False # If 'error', then there was an unsuccessful request if "error" in resp: return_val = False return return_val def query_dems(shape_type: str, coords: List[Tuple[float, float]], width: int = 100) -> Any: """Query 3DEP Elevation Index for available spatial resolution. Args: shape_type (str): One of: point, line, polygon. coords (List[Tuple[float, float]]): List of coordinate tuples, e.g. [(x,y), (x,y)]. width (int): Width to buffer bounding box of shape. Defaults to 100. Returns: Any: Dictionary of query keys (e.g. res_1m) and bool intersection values. """ bbox = make_bbox(shape_type, coords, width) # Make the bbox # Loop thru all resolutions and submit a query for each one return query_dems_bbox(bbox)
[docs] def query_dems_bbox(bbox: Tuple[float, float, float, float]) -> Any: """Query 3DEP Elevation Index for available spatial resolution. Uses esriSpatialRelIntersects - Query Geometry Intersects Target Geometry. Args: bbox (Tuple[float, float, float, float]): (minx, miny, maxx, maxy). Returns: dict: Boolean values associated with resolution keys. """ return {res_type: get_dem(bbox, res_type) for res_type in res_types}