"""Tools for extending USGS measured Bathymetry (preliminary)."""
from pathlib import Path
import geopandas as gpd
import numpy as np
import pandas as pd
from geopandas.array import points_from_xy
from shapely.geometry import Point
from nldi_xstool.nldi_xstool import getxsatendpts
[docs]
class ExtADCPBathy:
"""Class to facilitate extending USGS measured bathymetry cross-sections."""
def __init__(self, file: str, dist: float, lonstr: str, latstr: str, estr: str, acrs: str) -> None:
"""Initializes an ExtADCPBathy instance.
Args:
file: str - The file path to read data from.
dist: float - The distance value.
lonstr: str - The longitude string.
latstr: str - The latitude string.
estr: str - The elevation string.
acrs: str - The coordinate reference system.
Raises:
FileNotFoundError: If the specified file does not exist.
"""
self.afile = Path(file)
if not self.afile.exists():
raise FileNotFoundError("File does not exist") # Ensure the file exists
self.adata = pd.read_csv(self.afile)
# Create a GeoDataFrame from the CSV data
self.agdf = gpd.GeoDataFrame(
self.adata, geometry=points_from_xy(self.adata[lonstr], self.adata[latstr])
).set_crs(acrs)
self.agdf["elevation"] = self.adata[estr]
# Convert CRS to Albers Conic for geometry calculations
self.agdf = self.agdf.to_crs("epsg:5071")
# Clean up dataframe by deleting original coordinate and elevation columns
self.agdf.drop(columns=[lonstr, latstr, estr], inplace=True)
self.ext = dist
self.crs = acrs
self.xs_complete = gpd.GeoDataFrame()
self._build_geom()
@staticmethod
def _distance(p1: Point, p2: Point) -> float:
"""Calculate the Euclidean distance between two points.
Args:
p1: First point.
p2: Second point.
Returns:
Distance between p1 and p2.
"""
return float(np.linalg.norm(np.array(p2) - np.array(p1)))
def _build_geom(self) -> None:
"""Builds and extends the geometry based on the input data."""
npts = len(self.agdf)
p1 = np.array(self.agdf.geometry.iloc[0].coords[0])
p2 = np.array(self.agdf.geometry.iloc[npts - 1].coords[0])
# Calculate the angle and distance for extending points
dx, dy = p2 - p1
theta = np.arctan2(dy, dx)
length = np.linalg.norm(p2 - p1)
# Calculate new points for extending the line
p1_new = p1 - (self.ext + 1) * np.array([np.cos(theta), np.sin(theta)])
p2_new = p1 + (length + self.ext + 1) * np.array([np.cos(theta), np.sin(theta)])
# Create dataframes for new points
df_pre = pd.DataFrame({"lon": [p1_new[0], p1[0]], "lat": [p1_new[1], p1[1]]})
df_post = pd.DataFrame({"lon": [p2[0], p2_new[0]], "lat": [p2[1], p2_new[1]]})
# Create GeoDataFrames for the new points and convert to original CRS
gdf_pre = gpd.GeoDataFrame(df_pre, geometry=gpd.points_from_xy(df_pre.lon, df_pre.lat)).set_crs("epsg:5071")
gdf_post = gpd.GeoDataFrame(df_post, geometry=gpd.points_from_xy(df_post.lon, df_post.lat)).set_crs("epsg:5071")
new_df_pre = self._get_xs(gdf_pre.to_crs("epsg:4326"), int(self.ext), 1).to_crs("epsg:5071")
new_df_post = self._get_xs(gdf_post.to_crs("epsg:4326"), int(self.ext), 1).to_crs("epsg:5071")
# Prepare final GeoDataFrame
for df in [new_df_pre, new_df_post]:
df.drop(columns=["distance"], inplace=True)
df["code"] = "0"
self.agdf["code"] = "1"
self.xs_complete = gpd.GeoDataFrame(
pd.concat([new_df_pre, self.agdf, new_df_post], ignore_index=True),
crs="epsg:5071",
)
# Calculate station values
p1_coords = np.array(self.xs_complete.geometry.iloc[0].coords[0])
self.xs_complete["station"] = self.xs_complete.geometry.apply(
lambda geom: self._distance(p1_coords, np.array(geom.coords[0]))
)
def _get_xs(self, gdf: gpd.GeoDataFrame, numpts: int, res: float) -> gpd.GeoDataFrame:
"""Wrapper for nldi-xstool.getxsatendpts to get cross-sections at endpoints.
Args:
gdf: GeoDataFrame with endpoint geometries.
numpts: Number of points.
res: Resolution.
Returns:
GeoDataFrame with cross-sections.
"""
return getxsatendpts(
[
(gdf.geometry[0].x, gdf.geometry[0].y),
(gdf.geometry[1].x, gdf.geometry[1].y),
],
numpts=numpts,
crs="epsg:4326",
res=res,
)
[docs]
def get_xs_complete(self) -> gpd.GeoDataFrame:
"""Return extended cross-section.
Returns:
GeoDataFrame with extended cross-section and spatial reference information.
"""
xs_complete_crs = self.xs_complete.to_crs(self.crs)
xs_complete_crs["spatial_ref"] = self.crs
return xs_complete_crs