Source code for pySVA.sva_core

"""Core module for pySVA containing the main constructorSVA class.

This module defines the constructorSVA class which handles data loading,
grid operations, and provides access to various hydrodynamic properties
for salinity variance analysis.
"""

__all__ = ['constructorSVA']

import xarray as xr
import xugrid as xu
import dfm_tools as dfmt
import dask.array as da
import numpy as np
from functools import cached_property
from pySVA.sva_helpers import build_inverse_distance_weights, integrate_trapz, calculate_distance_pythagoras, differentiate_over_3d_coord

[docs] class constructorSVA: """ Core class for Salinity Variance Analysis (SVA) on unstructured grids. This class loads and processes hydrodynamic data from D-Flow FM simulations and provides access to grid topology, geometric properties, velocity fields, tracers, and derived quantities required for variance budget analysis. The class supports multiple input formats (netCDF files, :class:`xarray.Dataset`, :class:`xugrid.UgridDataset`, or :class:`xarray.DataArray`) and automatically extracts grid structure and coordinate systems following UGRID conventions. Parameters ---------- input_file : str or xr.Dataset or xu.UgridDataset or xr.DataArray Input data source. Can be a file path or pre-loaded dataset. data_description : dict Mapping between logical variable names and dataset variables. Required keys: - ``velx``, ``vely``, ``velz`` - ``viscosity`` - ``tracer`` - ``depth`` Optional keys: - ``horizontal_diffusivity`` - ``interfaces`` - ``bed_level`` - ``volume`` - ``flow_area`` Attributes ---------- ds : xr.Dataset or xu.UgridDataset Underlying hydrodynamic dataset. grid : object UGRID mesh object. face_coords, edge_coords, node_coords : xr.DataArray Cartesian coordinates of mesh elements. dimn_* : str Standardized dimension names for grid topology. Notes ----- - All geometric and topological quantities follow UGRID conventions. - Coordinates are expressed in projected Cartesian space (typically meters). - Many properties are computed lazily using :func:`functools.cached_property`. """ def __init__(self, input_file, data_description, **kwargs): """Initialize the SVA analysis object. Args: input_file (str or xr.Dataset or xu.UgridDataset or xr.DataArray): Path to netCDF file, pre-loaded xarray Dataset, UgridDataset, or DataArray containing hydrodynamic data. data_description (dict): Mapping of variable names to dataset keys. Required keys: 'velx', 'vely', 'velz', 'viscosity', 'tracer', 'depth'. Optional keys: 'horizontal_diffusivity', 'interfaces', 'bed_level', 'volume', 'flow_area'. **kwargs: Additional arguments passed to dfmt.open_partitioned_dataset(). Raises: IOError: If file cannot be read or input type is not supported. """ if isinstance(input_file, str): try: self._read(file_name=input_file) except (OSError, IOError, RuntimeError): raise IOError("Unable to read file.") elif isinstance(input_file, xr.Dataset): self.ds = input_file elif isinstance(input_file, xu.core.wrap.UgridDataset): self.ds = input_file elif isinstance(input_file, xr.DataArray): self.ds = input_file else: raise IOError("Please provide xr.Dataset, xu.core.wrap.UgridDataset, xr.DataArray, or a file path to a netCDF.") # Dimension-related attributes self.grid = self.ds.grid self.gridname = self.ds.grid.name self.dimn_maxef = f'{self.gridname}_nMax_edge_faces' self.dimn_maxfn = self.grid.to_dataset().mesh2d.attrs['max_face_nodes_dimension'] self.dimn_maxen = f'{self.gridname}_nMax_edge_nodes' self.dimn_cart = f'{self.gridname}_nCartesian_coords' self.dimn_nodes = self.ds.grid.node_dimension self.dimn_edges = self.ds.grid.edge_dimension self.dimn_faces = self.ds.grid.face_dimension self.fill_value = self.ds.grid.fill_value self.dimn_layer = self.ds.grid.to_dataset()[self.gridname].layer_dimension self.dimn_interface = self.ds.grid.to_dataset()[self.gridname].interface_dimension self.face_coords, self.edge_coords, self.node_coords = self.get_all_coordinates() # Mandatory attributes self.velx = self.ds[f'{data_description["velx"]}'] self.vely = self.ds[f'{data_description["vely"]}'] self.velz = self.ds[f'{data_description["velz"]}'] self.viscosity = self.ds[f'{data_description["viscosity"]}'] self.tracer = self.ds[f'{data_description["tracer"]}'] self.depth = self.ds[f'{data_description["depth"]}'] if 'horizontal_diffusivity' in data_description.keys(): self.diu = self.ds[f'{data_description["horizontal_diffusivity"]}'] else: # > Todo: add possible calculation of these self.diu = None if 'interfaces' in data_description.keys(): self.interfaces = self.ds[f'{data_description["interfaces"]}'] else: # > Todo: add possible calculation of these self.interfaces = None if 'bed_level' in data_description.keys(): self.bed_level = self.ds[f'{data_description["bed_level"]}'] else: try: self.bed_level = self.bed_level except: self.bed_level = None # Optional attributes if 'volume' in data_description.keys(): self.volume = self.ds[f'{data_description["volume"]}'] else: try: self.volume = self.volume except: self.volume = None if 'flow_area' in data_description.keys(): self.flow_area = self.ds[f'{data_description["flow_area"]}'] else: self.flow_area = None #self.flow_area # try: # self.flow_area = self.flow_area # except: # self.flow_area = None # Hidden/calculated values self.edge_nodes = self.edge_nodes self.face_edges = self.face_edges self.kzz = self.kzz self.tracer_variance = self.tracer_variance def _read(self, file_name, **kwargs): """ Load a hydrodynamic dataset from file. This method reads a partitioned netCDF dataset using :mod:`dfm_tools` and stores it as the internal dataset. Parameters ---------- file_name : str Path to the netCDF file. **kwargs : dict, optional Additional keyword arguments passed to :func:`dfm_tools.open_partitioned_dataset`. Notes ----- - The dataset is expected to follow D-Flow FM conventions. - Partitioned datasets are automatically combined. """ # self.ds = xr.open_dataset(file_name, use_cftime=True, **kwargs) self.ds = dfmt.open_partitioned_dataset(file_name, **kwargs) # @cached_property # def flow_area(self): # # > Get dimension names # dimn_faces = self.dimn_faces # dimn_maxef = self.dimn_maxef # dimn_interface = self.dimn_interface # dimn_edges = self.dimn_edges # # > Get general strings # fill_value = self.fill_value # gridname = self.gridname # # > Get properties # bed_level = self.bed_level # interfaces = self.interfaces # dimn_layer = self.dimn_layer # edge_faces = self.edge_faces # # > Get the edge-face connectivity and replace fill values with -1 # edge_faces = self.edge_faces # edge_faces = edge_faces.reset_coords(drop=True) # edge_faces_validbool = (edge_faces!=fill_value).data # edge_faces = edge_faces.where(edge_faces_validbool, -1) # # > Make sure the face dimension is not chunked, otherwise we will # # > get "PerformanceWarning: Slicing with an out-of-order index is generating x times more chunks." # chunks = {dimn_faces:-1} # bed_level = bed_level.chunk(chunks) # interfaces = interfaces.chunk(chunks) # # > Select the varname on faces in the edge-face connectivity matrix # edge_faces_stacked = edge_faces.stack(__tmp_dim__=(dimn_edges, dimn_maxef)) # # > // Get bed levels at nodes and bed levels at edges // # # > Get the bed_levels on the faces, and fill it in in the edge_node_connectivity matrix # bl_edge_faces_stacked = bed_level.isel({dimn_faces: edge_faces_stacked}) # bl_edge_faces = bl_edge_faces_stacked.unstack("__tmp_dim__") # bl_edge_faces = xr.where(edge_faces_validbool, bl_edge_faces, np.nan) # # // Get face-based water depth (Algorithm 4, Equation 6.14) // # # > Get the water levels on the faces, and fill it in in the edge-face connectivity matrix # wl_edge_faces_stacked = interfaces.isel({dimn_faces:edge_faces_stacked}) # wl_edge_faces = wl_edge_faces_stacked.unstack("__tmp_dim__") # wl_edge_faces = xr.where(edge_faces_validbool, wl_edge_faces, np.nan) # # > Calculate the water depth based on Algorithm 4 of the Technical refernece manual (huj in Eq. 6.14) # # wd_edges = xr.where(wl_edge_faces[:,0,:,:] > wl_edge_faces[:,1,:,:], wl_edge_faces[:,0,:,:] - bl_edge_faces.min(dim=dimn_maxef), wl_edge_faces[:,1,:,:] - bl_edge_faces.min(dim=dimn_maxef)).diff(dim=dimn_interface) # bl_min = bl_edge_faces.min(dim=dimn_maxef) # wl0 = wl_edge_faces.isel({dimn_maxef: 0}) # wl1 = wl_edge_faces.isel({dimn_maxef: 1}) # wd_edges = xr.where(wl0 > wl1, wl0 - bl_min, wl1 - bl_min).diff(dim=dimn_interface) # # // Calculate the cross-sectional bed variation // # # > Calculate the difference between the bed levels (6.16) # delta_bl = bl_edge_faces.max(dim=dimn_maxef) - bl_edge_faces.min(dim=dimn_maxef) # # > Define delta value # delta = 10e-3 # # > Get the edge_length # edge_length = self.edge_length # # > Get minimum of huj/delta_blj and 1 # min_huj_bl = xr.where(wd_edges/edge_length < 1, wd_edges/edge_length, 1) # # > Turn nan's back into min_huj_bl # min_huj_bl = xr.where((wd_edges/edge_length)!=np.nan, min_huj_bl, np.nan) # # Get minumum of delta_blj/huj # min_bl_huj = xr.where(edge_length/wd_edges < 1, edge_length/wd_edges, 1) # # > Turn nan's back into min_bl_huj # min_bl_huj = xr.where((edge_length/wd_edges)!=np.nan, min_bl_huj, np.nan) # # Chunk back # bed_level = bed_level.chunk("auto") # interfaces = interfaces.chunk("auto") # # > Calculate actual flow area based on Algorithm 5 (Eqs 6.16 and 6.17) # flow_area = xr.where(delta_bl < delta*edge_length, edge_length*wd_edges, edge_length*wd_edges*min_huj_bl*(1-0.5*min_bl_huj)).rename({dimn_interface:dimn_layer}) # flow_area = flow_area.rename(f'{gridname}_au') # return flow_area
[docs] @cached_property def face_area(self): """ Compute face-associated cross-sectional area on edges. The face area is defined as the product of interpolated cell thickness and edge length. Returns ------- xr.DataArray Face area with dimensions matching edges (typically ``n_edges`` and vertical layers if present). Notes ----- - Cell thickness is interpolated from faces to edges. - Edge length is computed geometrically from node coordinates. - The result is stored as ``{gridname}_face_area``. """ dimn_edges = self.dimn_edges # > Get general strings fill_value = self.fill_value gridname = self.gridname # > Get edge length edge_length = self.edge_length edge_length = edge_length # > Get the cell thickness cell_thickness = self.cell_thickness # > Interpolate the cell thickness on the edges cell_thickness_edges = self.uda_to_edges(cell_thickness) chunks = {dimn_edges:-1} cell_thickness_edges = cell_thickness_edges.chunk(chunks) face_area = (cell_thickness_edges * edge_length).rename(f'{gridname}_face_area') face_area = face_area.chunk('auto') return face_area
[docs] @cached_property def water_depth(self): """ Compute water depth at each grid face. Water depth is defined as the difference between the maximum interface elevation (water surface) and the bed level. Returns ------- xr.DataArray Water depth with dimensions ``(n_faces, ...)``. Notes ----- - Positive downward convention is used. - Computed as: .. math:: H = \\eta - z_b where :math:`\\eta` is the water surface elevation and :math:`z_b` is the bed level. """ # > Get general strings gridname = self.gridname # > Get dimension names dimn_interface = self.dimn_interface # > Get relevant properties interfaces = self.interfaces bed_level = self.bed_level # > Calculate water level + water depth water_level = interfaces.max(dim=dimn_interface) water_depth = (water_level - bed_level).rename(f'{gridname}_waterdepth') return water_depth
[docs] @cached_property def bed_level(self): """ Compute bed level at each grid face. The bed level is derived from the minimum interface elevation, optionally averaged over time. Returns ------- xr.DataArray Bed elevation with dimensions ``(n_faces, ...)``. Raises ------ ValueError If interface data is not available. Notes ----- - Computed as: .. math:: z_b = \\min(\\text{interfaces}) - If time-dependent, the result is averaged over time. """ # Get dimensions dimn_interface = self.dimn_interface try: # > Get properties interfaces = self.interfaces # > Calculate bed level as minimum of interface height bed_level = interfaces.min(dim=dimn_interface).mean(dim='time') except: raise ValueError('For the bed level to be calculated, you need to provide the depth of the interface between the layers, specified as "interfaces". These can be varying in time.') return bed_level
[docs] @cached_property def edge_length(self): """ Compute geometric length of each edge. The edge length is calculated as the Euclidean distance between the two nodes defining each edge in Cartesian space. Returns ------- xr.DataArray Edge length with dimension ``(n_edges,)``. Notes ----- - Computed using the Pythagorean distance: .. math:: L = \\sqrt{(x_2 - x_1)^2 + (y_2 - y_1)^2} """ # > Get properties edge_node_coords = self.edge_node_coords edge_length = calculate_distance_pythagoras(edge_node_coords[:,0,0], edge_node_coords[:,0,1], edge_node_coords[:,1,0], edge_node_coords[:,1,1]).rename(f'{self.gridname}_edge_length') return edge_length
[docs] @cached_property def face_edge_weights(self): """ Compute interpolation weights from faces to edges. For each face, weights are computed for all connected edges using distance-based weighting. Returns ------- xr.DataArray Weights with dimensions ``(n_faces, nMax_face_edges)``. Notes ----- - Based on distances between face centroids and edge midpoints. - Weights are normalized such that they sum to 1 per face. - Typically used for interpolation from face-centered variables to edge-centered quantities. """ # > Get dimension names dimn_edges = self.dimn_edges fill_value = self.fill_value # > Get the edge-face connectivity face_edges = self.face_edges face_edges = face_edges.reset_coords(drop=True) face_coords = self.face_coords edge_coords = self.edge_coords # > Fill face-edge-connectivity matrix with face coordinates face_edge_coords = xr.where(face_edges!=fill_value, edge_coords.isel({dimn_edges: face_edges}), np.nan) # > Build the weights face_edge_weights = build_inverse_distance_weights(face_coords, face_edge_coords) return face_edge_weights
[docs] @cached_property def edge_face_weights(self): """ Compute interpolation weights from edges to faces. For each edge, weights are computed for all connected faces using distance-based weighting. Returns ------- xr.DataArray Weights with dimensions ``(n_edges, nMax_edge_faces)``. Notes ----- - Based on distances between edge midpoints and face centroids. - Weights are normalized per edge. - Returned as a Dask-backed array for scalability. """ # > Get dimension names dimn_faces = self.dimn_faces fill_value = self.fill_value gridname = self.gridname # > Get the edge-face connectivity edge_faces = self.edge_faces edge_faces = edge_faces.reset_coords(drop=True) face_coords = self.face_coords edge_coords = self.edge_coords # > Fill edge-face-connectivity matrix with face coordinates edge_face_coords = xr.where(edge_faces!=fill_value, face_coords.isel({dimn_faces:edge_faces}), np.nan) # > Build the weights edge_face_weights = build_inverse_distance_weights(edge_coords, edge_face_coords) # > Make into dask array edge_face_weights = edge_face_weights.reset_coords(drop=True) edge_face_weights = xr.DataArray(da.from_array(edge_face_weights), dims=[*edge_face_weights.dims], coords=edge_face_weights.coords, name='grid_edge_face_weights') return edge_face_weights
[docs] def get_all_coordinates(self): """ Retrieve Cartesian coordinates of all mesh elements. Returns ------- tuple of xr.DataArray Tuple containing: - face_coords : ``(n_faces, nCartesian_coords)`` - edge_coords : ``(n_edges, nCartesian_coords)`` - node_coords : ``(n_nodes, nCartesian_coords)`` Notes ----- - Coordinates are derived from UGRID metadata. - Returned arrays include CF-compliant attributes. - Cartesian dimension is defined as ``{gridname}_nCartesian_coords``. """ uds = self.ds # > Get coordinate names coord_face_x, coord_face_y = uds.grid.to_dataset().mesh2d.attrs['node_coordinates'].replace('node', 'face').split() coord_edge_x, coord_edge_y = uds.grid.to_dataset().mesh2d.attrs['node_coordinates'].replace('node', 'edge').split() coord_node_x, coord_node_y = uds.grid.to_dataset().mesh2d.attrs['node_coordinates'].split() # > Get dimension names dimn_faces = self.dimn_faces dimn_nodes = self.dimn_nodes dimn_edges = self.dimn_edges dimn_cart = self.dimn_cart # > Get face coordinates face_array = np.c_[uds.mesh2d_face_x, uds.mesh2d_face_y] # is NOT equal to uds.grid.face_coordinates face_coords = xr.DataArray(data=face_array, dims=[dimn_faces, dimn_cart], coords={f'{coord_face_x}':([dimn_faces], uds[f'{coord_face_x}']), f'{coord_face_y}':([dimn_faces], uds[f'{coord_face_y}'])}, attrs={'units':'m', 'standard_name': 'projection_x_coordinate, projection_y_coordinate', 'long_name':'Characteristic coordinates of mesh face', 'bounds': 'mesh2d_face_x_bnd, mesh_face_y_bnd'}) # > Get edge coordaintes edge_array = uds.grid.edge_coordinates # np.c_[uds.mesh2d_edge_x, uds.mesh2d_edge_y] edge_coords = xr.DataArray(data=edge_array, dims=[dimn_edges, dimn_cart], coords={f'{coord_edge_x}':([dimn_edges], uds[f'{coord_edge_x}']), f'{coord_edge_y}':([dimn_edges], uds[f'{coord_edge_y}'])}, attrs={'units':'m', 'standard_name': 'projection_x_coordinate, projection_y_coordinate', 'long_name':'Characteristic coordinates of mesh face', 'bounds': 'mesh2d_face_x_bnd, mesh_face_y_bnd'}) # > Get node coordinates node_array = uds.grid.node_coordinates # np.c_[uds.mesh2d_node_x, uds.mesh2d_node_y] node_coords = xr.DataArray(data=node_array, dims=[dimn_nodes, dimn_cart], coords={f'{coord_node_x}':([dimn_nodes], uds[f'{coord_node_x}']), f'{coord_node_y}':([dimn_nodes], uds[f'{coord_node_y}'])}, attrs={'units':'m', 'standard_name': 'projection_x_coordinate, projection_y_coordinate', 'long_name':'Characteristic coordinates of mesh node', 'bounds': 'mesh2d_node_x_bnd, mesh_node_y_bnd'}) return face_coords, edge_coords, node_coords
[docs] @cached_property def edge_node_coords(self): """ Retrieve the coordinates of nodes associated with each edge. Maps the node coordinates to the corresponding edge-node connectivity, returning a DataArray with NaNs where no node is connected (fill value). Returns ------- xr.DataArray Node coordinates per edge with dimensions ``(n_edges, nMax_edge_nodes, nCartesian_coords)``. Notes ----- - Derived from edge–node connectivity. - Missing nodes (fill values) are replaced with NaN. - Used for geometric calculations such as edge length and normals. """ # > Get dimension names fill_value = self.fill_value dimn_nodes = self.dimn_nodes # > Get/buid edge-node connectivity edge_nodes = self.edge_nodes # > Build the node_coords # _, _, node_coords = self.get_all_coordinates() # just get the node_coords node_coords = self.node_coords # > Get the coordinates of all nodes belonging to an edge edge_node_coords = xr.where(edge_nodes != fill_value, node_coords.isel({dimn_nodes: edge_nodes}), np.nan) return edge_node_coords
[docs] @cached_property def kzz(self, dicoww=5e-5, prandtl_schmidt=0.7): """ Compute the vertical turbulent eddy diffusivity. The vertical diffusivity :math:`K_{zz}` is calculated as the sum of molecular diffusivity, background diffusivity, and a tracer-dependent laminar contribution. .. math:: K_{zz} = \\frac{\\nu}{Pr} + D_{0} + k_l where :math:`\\nu` is the kinematic viscosity, :math:`Pr` is the Prandtl/Schmidt number, :math:`D_{0}` is a constant background diffusivity, and :math:`k_l` is a tracer-dependent laminar component. Parameters ---------- dicoww : float, optional Background vertical diffusivity (:math:`m^2 s^{-1}`). Default is ``5e-5``. prandtl_schmidt : float, optional Prandtl/Schmidt number used to convert viscosity to diffusivity. Default is ``0.7``. Returns ------- xarray.DataArray Vertical turbulent eddy diffusivity defined on grid edges. Notes ----- The computed diffusivity is added to the dataset as ``{gridname}_dicwws`` and contains CF-style metadata describing the grid location and units. """ # > Get properties tracer = self.tracer.name gridname = self.gridname dimn_edges = self.dimn_edges viscosity = self.viscosity dicwwu = viscosity / prandtl_schmidt if tracer == f'{gridname}_sa1': k_l = (1/700) * 10e-6 elif tracer == f'{gridname}_tem1': k_l = (1/6.7) * 10e-6 # > Calculate kzz kzz = dicwwu + dicoww + k_l # > Assign attributes and rename kzz = kzz.assign_attrs({'mesh': f'{gridname}', 'location': 'edge', 'cell_methods': f'{dimn_edges}: mean', 'standard_name': 'eddy_diffusivity', 'long_name': 'turbulent vertical eddy diffusivity', 'units': 'm2 s-1', 'grid_mapping': 'projected_coordinate_system'}).rename(f'{gridname}_dicwwu') # > Add calculated diffusivity to dataset self.ds[f'{gridname}_dicwws'] = (kzz.dims, kzz.data) return kzz
[docs] @cached_property def edge_faces(self): """ Return the edge-to-face connectivity matrix. This property exposes the UGRID edge-face connectivity stored in the dataset as an :class:`xarray.DataArray`. Returns ------- xarray.DataArray Connectivity array with dimensions ``(edges, max_edge_faces)`` mapping each edge to its adjacent faces. Notes ----- The array contains integer indices referencing faces in the grid. Missing neighbors are indicated using the dataset's fill value. """ # > Get dimensions and fill value fill_value = self.fill_value dimn_edges = self.dimn_edges dimn_maxef = self.dimn_maxef edge_faces = xr.DataArray(self.ds.ugrid.grid.edge_face_connectivity, dims=(dimn_edges, dimn_maxef)) return edge_faces
[docs] @cached_property def edge_nodes(self): """ Return the edge-to-node connectivity matrix. Constructs an :class:`xarray.DataArray` describing which nodes define each edge of the unstructured grid. Returns ------- xarray.DataArray Edge-node connectivity array with dimensions ``(edges, max_edge_nodes)``. Notes ----- The returned array contains CF-UGRID compliant metadata including: * ``cf_role = "edge_node_connectivity"`` * ``start_index = 0`` Missing entries are filled using the dataset's configured fill value. """ # > Get fill value, grid name and dimensions fill_value = self.fill_value dimn_edges = self.dimn_edges # > Get coordinate names coord_edge_x, coord_edge_y = self.ds.grid.to_dataset().mesh2d.attrs['node_coordinates'].replace('node', 'edge').split() # > Determine dimension name dimn_maxen = self.dimn_maxen # > Get connectivity edge_nodes = self.ds.grid.edge_node_connectivity # > Make into xr.DataArray with correct sizes, dimensions, and coordinates edge_nodes = xr.DataArray(data=edge_nodes, dims=[dimn_edges, dimn_maxen], coords={f'{coord_edge_x}':([dimn_edges], self.ds[f'{coord_edge_x}']), f'{coord_edge_y}':([dimn_edges], self.ds[f'{coord_edge_y}'])}, attrs={'cf_role': 'edge_node_connectivity', 'start_index':0, '_FillValue':fill_value}, name=self.ds.grid.to_dataset().mesh2d.attrs['edge_node_connectivity']) return edge_nodes
[docs] @cached_property def face_edges(self): """ Return the face-to-edge connectivity matrix. Constructs an :class:`xarray.DataArray` describing which edges bound each face of the unstructured grid. Returns ------- xarray.DataArray Connectivity array with dimensions ``(faces, max_face_edges)``. Notes ----- The returned object follows CF-UGRID conventions with ``cf_role = "face_edge_connectivity"``. """ # > Get fill value, grid name, and dimensions fill_value = self.fill_value gridname = self.gridname dimn_faces = self.dimn_faces dimn_maxfn = self.dimn_maxfn # > Get voordinate names coord_face_x, coord_face_y = self.ds.grid.to_dataset().mesh2d.attrs['node_coordinates'].replace('node', 'face').split() # > Get connectivity face_edges = self.ds.grid.face_edge_connectivity # > Make into xr.DataArray with correct sizes, dimensions, and coordinates face_edges = xr.DataArray(face_edges, dims=[dimn_faces, dimn_maxfn], coords={f'{coord_face_x}': ([dimn_faces], self.ds[f'{coord_face_x}']), f'{coord_face_y}': ([dimn_faces], self.ds[f'{coord_face_y}'])}, attrs={'cf_role': 'face_edge_connectivity', 'start_index': 0, '_FillValue': fill_value}, name=f'{gridname}_face_edges') return face_edges
[docs] @cached_property def unvs(self): """ Compute unit normal vectors for all grid edges. The normal vectors are defined perpendicular to each edge and normalized to unit length. If the dataset uses geographic coordinates (WGS84), coordinates are first projected to the Dutch RD stereographic coordinate system before vector calculation. Returns ------- xarray.DataArray Unit normal vectors with dimensions ``(edges, cartesian_dim)``. Notes ----- The vectors are oriented outward relative to adjacent cells. The resulting array is stored in the dataset under the variable ``{gridname}_unvs``. """ from pyproj import Transformer # First check if the provided dataset is a xu.core.wrap.UgridDataset uds = self.ds # > Get dimensions, gridname, and coordinates gridname = uds.grid.name dimn_cart = self.dimn_cart varname_unvs = f'{gridname}_unvs' # > Get edge coordinate names edge_node_coords = self.edge_node_coords # > Check if the coordinate reference system is WGS84 (latitude, longitude) x1 = edge_node_coords[:, 0, 0] x2 = edge_node_coords[:, 1, 0] y1 = edge_node_coords[:, 0, 1] y2 = edge_node_coords[:, 1, 1] if uds.ugrid.crs[f'{gridname}'].name == 'WGS 84': # > Infer latitudes and longitudes from the edge-node-coordinates lat1 = y1 lat2 = y2 lon1 = x1 lon2 = x2 latlon2rd = Transformer.from_crs("epsg:4326", "+proj=sterea +lat_0=52.15616055555555 +lon_0=5.38763888888889 +k=0.9999079 +x_0=155000 +y_0=463000 +ellps=bessel +towgs84=565.237,50.0087,465.658,-0.406857,0.350733,-1.87035,4.0812 +units=m +no_defs") # Transform coordinates x1_n, y1_n = latlon2rd.transform(lat1, lon1) x2_n, y2_n = latlon2rd.transform(lat2, lon2) # Give correct attiributes x1 = xr.DataArray(x1_n, dims=x1.dims, coords=x1.coords) x2 = xr.DataArray(x2_n, dims=x2.dims, coords=x2.coords) y1 = xr.DataArray(y1_n, dims=y1.dims, coords=y1.coords) y2 = xr.DataArray(y2_n, dims=y2.dims, coords=y2.coords) x = x2 - x1 y = y2 - y1 nf = nf = xr.concat([-y, x], dimn_cart).T #np.dstack([-y, x]) vm_nf = nf.linalg.norm(dims=dimn_cart) # > Calculate the norm and divide by the norm unvs = nf / vm_nf unvs = unvs.rename(varname_unvs) uds[f'{varname_unvs}'] = (unvs.dims, unvs.data) return unvs
[docs] def uda_to_edges(self, uda): """ Interpolate a face-centered variable to grid edges. The interpolation uses inverse-distance weights derived from the adjacent faces connected to each edge. Parameters ---------- uda : xu.UgridDataArray Face-centered variable defined on the grid. Returns ------- xu.UgridDataArray Edge-centered variable obtained through weighted interpolation. Raises ------ ValueError If the provided variable does not contain the face dimension. Notes ----- Boundary edges with only one valid neighboring face inherit the value of that face. """ # > Define the to-be-interpolated tracer DataArray and grid varname = uda.name grid = self.ds.grid dimn_faces = self.dimn_faces if dimn_faces in uda.dims: # > Get dimension and grid names dimn_edges = self.dimn_edges fill_value = self.fill_value dimn_maxef = self.dimn_maxef # > Get the edge-face connectivity and replace fill values with -1 edge_faces = self.edge_faces edge_faces = edge_faces.reset_coords(drop=True) edge_faces_validbool = (edge_faces!=fill_value).data edge_faces = edge_faces.where(edge_faces_validbool, -1) # > Make sure the face dimension is not chunked, otherwise we will # > get "PerformanceWarning: Slicing with an out-of-order index is generating x times more chunks." chunks = {dimn_faces:-1} uda = uda.chunk(chunks) # > Select the varname on faces in the edge-face connectivity matrix edge_faces_stacked = edge_faces.stack(__tmp_dim__=(dimn_edges, dimn_maxef)) edge_var_stacked = uda.isel({dimn_faces: edge_faces_stacked}) edge_var = edge_var_stacked.unstack("__tmp_dim__") # > Convert data-array back to an xu.UgridDataArray edge_var = xu.UgridDataArray(edge_var, grid=grid) # > Set fill values to nan-values edge_var = edge_var.where(edge_faces_validbool, np.nan) # > Calculate the variable on the edges, based on the face_weights face_weights = self.edge_face_weights face_weights = face_weights.reset_coords(drop=True) # > Rechunk back with settin:g = "auto" #todo: provide option for custom chunksizes uda = uda.chunk(chunks="auto") edge_var = edge_var.chunk(chunks="auto") # If both of the edge vars are np.nan, then the sum should also be np.nan # Make a boolean for this to test the next step on nan_bool = ~edge_var.isnull().all(dim=dimn_maxef) # If one of the edges is nan, it's a boundary edge and this means that the corresponding # edge should be equal to the value in the non-nan cell > adjust the face_weights accordingly. # > First expand the face weights to correspond to the full edge_var array expanded_face_weights = face_weights.broadcast_like(edge_var) expanded_face_weights = expanded_face_weights.chunk("auto").chunk(edge_var.chunks) valid_mask = edge_var.notnull().reset_coords(drop=True) adjusted_face_weights = expanded_face_weights.where(valid_mask.data) # > Adjust the weights over the layer dimension so that the weights always add up to 1. adjusted_face_weights = (adjusted_face_weights / adjusted_face_weights.sum(dim=dimn_maxef, skipna=True)) # Multipy edge variables with the face_weights to get the variable size on the edges edge_var = (edge_var * adjusted_face_weights).sum(dim=dimn_maxef) # Apply the boolean edge_var = edge_var.where(nan_bool) # edge_var = edge_var.mean(dim=dimn_maxef) # Give name and attributes attr_list = {'location': 'edge', 'cell_methods': f'{dimn_faces}: inverse distance weighted mean'} edge_var = edge_var.assign_attrs(attr_list).rename(varname) return edge_var else: raise ValueError(f'Variable {varname} does not contain dimension faces, so cannot be transformed from faces to edges.')
[docs] def uda_to_faces(self, uda): """ Interpolate an edge-centered variable to grid faces. The interpolation averages values from the edges that bound each face using predefined edge weights. Parameters ---------- uda : xu.UgridDataArray Edge-centered variable. Returns ------- xu.UgridDataArray Face-centered variable. Raises ------ ValueError If the provided variable does not contain the edge dimension. """ # > Define the to-be-interpolated tracer DataArray and grid varname = uda.name grid = self.ds.grid dimn_nodes = self.dimn_nodes dimn_edges = self.dimn_edges if dimn_edges in uda.dims: dimn_faces = self.dimn_faces fill_value = self.fill_value dimn_maxfn = self.dimn_maxfn # > Get the edge-face connectivity and replace fill values with -1 face_edges = self.face_edges face_edges = face_edges.reset_coords(drop=True) face_edges_validbool = (face_edges!=fill_value).data face_edges = face_edges.where(face_edges_validbool, -1) # > Make sure the face dimension is not chunked, otherwise we will # > get "PerformanceWarning: Slicing with an out-of-order index is generating x times more chunks." chunks = {dimn_edges:-1} uda = uda.chunk(chunks) # > Select the varname on faces in the edge-face connectivity matrix face_edges_stacked = face_edges.stack(__tmp_dim__=(dimn_faces, dimn_maxfn)) face_var_stacked = uda.isel({dimn_edges: face_edges_stacked}) face_var = face_var_stacked.unstack("__tmp_dim__") # > Convert data-array back to an xu.UgridDataArray face_var = xu.UgridDataArray(face_var, grid=grid) # > Set fill values to nan-values face_var = face_var.where(face_edges_validbool, np.nan) # > Rechunk back with setting = "auto" #todo: provide option for custom chunksizes uda = uda.chunk(chunks="auto") face_var = face_var.chunk(chunks="auto") # If all of the edge vars are np.nan, then the sum should also be np.nan # Make a boolean for this to test the next step on nan_bool = ~face_var.isnull().all(dim=dimn_maxfn) # > Calculate the variable on the edges, based on the face_weights edge_weights = self.face_edge_weights # Multipy edge variables with the face_weights to get the variable size on the edges # face_var = face_var.mean(dim=dimn_maxfn) face_var = (face_var * edge_weights).sum(dim=dimn_maxfn) # # Apply the boolean face_var = face_var.where(nan_bool) # Update attributes from node/edge to face face_attrs = {'location': 'face', 'cell_methods': f'{dimn_faces}: mean'} face_var = face_var.assign_attrs(face_attrs) return face_var else: raise ValueError(f'Variable {varname} does not contain dimension edges, so cannot be transformed from edges to faces.')
[docs] def compute_gradient_on_face(self, uda, scalar=False, add_to_dataset=False, mode_depth=None): """ Compute the horizontal gradient of a variable on grid faces. The gradient is evaluated using Gauss' divergence theorem by summing flux contributions across edges surrounding each face. Parameters ---------- uda : xu.UgridDataArray Variable for which the gradient is computed. scalar : bool, optional If ``True``, treat the variable as scalar and use face areas instead of flow areas. Default is ``False``. add_to_dataset : bool, optional If ``True``, store the resulting gradient in the dataset. mode_depth : {"sum", "mean", None}, optional Controls how vertical layers are treated when computing the gradient. Returns ------- xarray.DataArray Gradient vector with dimensions ``(faces, cartesian_dim)``. """ # > Obtain uds and grid from constructorSVA object uds = self.ds grid = uds.grid # > Get dimension and grid names dimn_maxfn = self.dimn_maxfn dimn_faces = self.dimn_faces dimn_edges = self.dimn_edges fill_value = self.fill_value gridname = self.gridname dimn_cart = self.dimn_cart # > Get unit normal vectors unvs = self.unvs unvs = unvs.reset_coords(drop=True) # > Get the volume and flow area variables: check if they're in the # > constructor first, then check the dataset, else throw error if scalar: flow_area = self.face_area else: flow_area = self.flow_area volume = self.volume if mode_depth == 'sum': # > Derive the summed cell volume volume = volume.sum(dim=self.dimn_layer, keep_attrs=True) # > Get the summed cell area flow_area = self.flow_area.sum(dim=self.dimn_layer, keep_attrs=True) elif mode_depth == 'mean': # > Get the average cell thickness cell_thickness = self.cell_thickness.mean(dim=self.dimn_layer, keep_attrs=True) # > Get the average cell volume from the average cell thickness volume = cell_thickness * self.cell_area # > Calculate the average flow area flow_area = flow_area.mean(dim=self.dimn_layer, keep_attrs=True) else: pass # > Get the face-edge connectivity and replace fill values with -1 face_edges = self.face_edges # > Get boolean to mask other arrays too face_edges_validbool = (face_edges!=fill_value) # > Mask edges that don't have a value face_edges = face_edges.where(face_edges_validbool.data, -1) # > Stack face_edges for later use face_edges_stacked = face_edges.stack(__tmp_dim__=(dimn_faces, dimn_maxfn)) # > Take the lentgth of the cartesian axis nc = unvs.sizes[dimn_cart] # Make (faces, maxfn) -> (faces, maxfn, cart) # face_edges_validbool3d = face_edges_validbool.expand_dims({dimn_cart: nc}).data # > Get the unit normal vectors (nf) also in the face-edges matrix fe_nfs = xr.where(face_edges_validbool, unvs.isel({dimn_edges: face_edges}), np.nan) fe_nfs = fe_nfs.chunk(chunks="auto") # // 2. See if the normal vectors are pointing out of the cell. If not, flip them. # > Calculate distance vectors dv = self.distance_vectors dv = dv.reset_coords(drop=True) # > Calculate the dot product between the calculated normal vectors and the distance vector for each face i_nfs = xr.dot(fe_nfs, dv, dims=[dimn_cart]) # > if the product < 1, multiply by -1 to get an outwards facing normal vector, and update the variable fe_nfs = xr.where(i_nfs > 0, fe_nfs, fe_nfs * -1) # > Determine if we're looking at a velocity value u1 or u0 # > Because these are vector quantities in the direction of the normal vector, # > to get to the final vector, we have to multiply u1/u0 by the normal vector # > first. Also, we need to check their sign for every edge. if not dimn_edges in uda.dims: uda = self.uda_to_edges(uda) else: pass # > Make sure the edge dimension is not chunked, otherwise we will # > get "PerformanceWarning: Slicing with an out-of-order index is generating x times more chunks." chunks = {dimn_edges:-1} uda = uda.chunk(chunks) # > Fill the face-edges matrix with the varname # > Do this via stack and unstack since 2D indexing does not # > properly work in dask yet: https://github.com/dask/dask/pull/10237 edge_var_stacked = uda.isel({dimn_edges: face_edges_stacked}) edge_var = edge_var_stacked.unstack("__tmp_dim__") # > Convert data-array back to an xu.UgridDataArray edge_var = xu.UgridDataArray(edge_var, grid=grid) # > Replace locations of the validbools with NaN's edge_var = xr.where(face_edges_validbool, edge_var, np.nan) # > Fill face_edge matrix with flow area data flow_area = flow_area.chunk(chunks) edge_au_stacked = flow_area.isel({dimn_edges:face_edges_stacked}) edge_au = edge_au_stacked.unstack("__tmp_dim__") # > Rechunk the data back to the original edge_var = edge_var.chunk(chunks="auto") uda = uda.chunk(chunks="auto") edge_au = edge_au.chunk(chunks="auto") # > Multiply the variable with the edge area (flow area), multiply by the # > "flipped boolean" and the unit normal vector, and sum (dimension: faces) face_vars = (edge_var * edge_au * fe_nfs).sum(dim=dimn_maxfn, keep_attrs=True) # > Multiply the total result with (1/cell volume) (dimension: faces, cartesian_coordinates) gradient = (1/volume) * face_vars # > Check if it needs to be added to the dataset self.ds if add_to_dataset: # > Determine varname from the provided array varname = uda.name uds[f'{gridname}_{varname}_gradient'] = (gradient.dims, gradient.data) return gradient
[docs] @cached_property def distance_vectors(self): """ Compute vectors from face centroids to edge midpoints. These vectors are used in gradient and geometric calculations involving the unstructured mesh. Returns ------- xarray.DataArray Distance vectors with dimensions ``(faces, max_face_edges, cartesian_dim)``. """ # > Get dimensions, fill_value, and varname fill_value = self.fill_value dimn_edges = self.dimn_edges dimn_faces = self.dimn_faces dimn_cart = self.dimn_cart # Get coordinate names coord_face_x, coord_face_y = self.ds.grid.to_dataset().mesh2d.attrs['node_coordinates'].replace('node', 'face').split() # > Get face-edge-connectivity face_edges = self.face_edges face_edges = face_edges.reset_coords(drop=True) # > Get the cell face_coords! centroid_array = self.ds.grid.face_coordinates centroid_coords = xr.DataArray(data=centroid_array, dims=[dimn_faces, dimn_cart], coords={f'{coord_face_x}':([dimn_faces], self.ds[f'{coord_face_x}']), f'{coord_face_y}':([dimn_faces], self.ds[f'{coord_face_y}'])}, attrs={'units':'m', 'standard_name': 'projection_x_coordinate, projection_y_coordinate', 'long_name':'Characteristic coordinates of mesh centroids', 'bounds': 'mesh2d_face_x_bnd, mesh_face_y_bnd'}) # >> 1. Get the distance vector # > Get the edge coordinates edge_coords = self.edge_coords # > Put edge coordinates into face-edge connectivity matrix face_edge_coords = xr.where(face_edges!=fill_value, edge_coords.isel({dimn_edges: face_edges}), np.nan) # > Subtract the face coordinates from each of the edge coordinates in the connectivity matrix distance_vectors = face_edge_coords - centroid_coords return distance_vectors
[docs] @cached_property def tracer_variance(self): """ Compute tracer variance. The tracer variance is defined as the squared deviation of the tracer from its depth-averaged value. Returns ------- xr.DataArray Tracer variance with the same dimensions as the input tracer field (typically including vertical layers and horizontal grid dimensions). Notes ----- - The variance is computed as: .. math:: c'^2 = (c - \\overline{c})^2 where :math:`c` is the tracer concentration and :math:`\\overline{c}` is the depth-averaged tracer. - The depth-average is taken over the vertical coordinate (layer or interface dimension). """ # > Get properties tracer = self.tracer # > Calculate mean mean_tracer = tracer.mean(dim=self.dimn_layer) # > Calculate perturbation tracer_perturbation = tracer - mean_tracer tracer_variance = (tracer_perturbation**2).rename(f"{self.tracer.name}_variance") return tracer_variance
[docs] @cached_property def depth_integrated_tracer_variance(self): """ Compute the depth-integrated tracer variance. The integration is performed using the trapezoidal rule. Returns ------- xarray.DataArray Depth-integrated tracer variance. """ # > Get properties tracer_variance = self.tracer_variance depth = self.depth.drop_vars([n for n,v in self.depth.coords.items() if f"{self.dimn_layer}" in v.dims]) depth_integrated_tracer_variance = integrate_trapz(tracer_variance, depth, dim=self.dimn_layer).rename(f"depth_integrated_{self.tracer.name}_variance") return depth_integrated_tracer_variance
[docs] @cached_property def cell_thickness(self): """ Compute vertical thickness of grid cells. The cell thickness is defined as the vertical distance between consecutive interfaces in the water column. Returns ------- xr.DataArray Cell thickness with dimensions including the vertical layer coordinate (typically ``n_layers`` or equivalent) and the horizontal grid dimension (e.g., faces or edges). Notes ----- - The thickness is computed as: .. math:: \\Delta z_k = z_{k+1} - z_k where :math:`z_k` and :math:`z_{k+1}` are consecutive interface elevations. - The result represents layer thickness in physical space (typically meters). - The sign convention follows the vertical coordinate system of the input dataset (usually positive upward for interfaces, resulting in positive layer thickness). """ cell_thickness = self.ds.mesh2d_flowelem_zw.diff(dim=self.dimn_interface).rename({f'{self.dimn_interface}':self.dimn_layer}).rename(f'{self.gridname}_cell_thickness') cell_thickness = cell_thickness.reset_coords(drop=True) return cell_thickness
[docs] @cached_property def cell_area(self): """ Compute horizontal area of each grid cell. The horizontal cell area is derived from edge geometry using edge lengths and outward unit normal vectors, consistent with finite-volume formulations on unstructured grids. Returns ------- xr.DataArray Horizontal cell area defined on faces with dimension ``(n_faces,)`` or including additional dimensions if time-dependent. Notes ----- - The area is computed by integrating contributions from all edges surrounding a face. - In discrete form, this corresponds to a polygonal area reconstruction using edge vectors and normals: .. math:: A = \\frac{1}{2} \\sum_e \\mathbf{r}_e \\cdot \\mathbf{n}_e where :math:`\\mathbf{r}_e` is the edge vector and :math:`\\mathbf{n}_e` is the outward unit normal. - The formulation is consistent with finite-volume flux integration and ensures geometric conservation. - Assumes a planar (Cartesian) coordinate system. """ # > Get the relevant dimensions dimn_edges = self.dimn_edges dimn_cart = self.dimn_cart dimn_maxfn = self.dimn_maxfn # > Get the distance vectors first dv = self.distance_vectors dv = dv.reset_coords(drop=True) # > Get face_edges face_edges = self.face_edges face_edges = face_edges #.reset_coords(drop=True) # > Get boolean to mask other arrays too face_edges_validbool = (face_edges!=self.fill_value).data # > Mask edges that don't have a value face_edges = face_edges.where(face_edges_validbool, -1) # > Stack face_edges for later use face_edges_stacked = face_edges.stack(__tmp_dim__=(self.dimn_faces, self.dimn_maxfn)) # > Get unit normal vectors unvs = self.unvs unvs = unvs #.reset_coords(drop=True) # > Get the unit normal vectors (nf) also in the face-edges matrix fe_nfs = xr.where(face_edges_validbool, unvs.isel({self.dimn_edges: face_edges}), np.nan) fe_nfs = fe_nfs.chunk(chunks="auto") # > Calculate the dot product between the calculated normal vectors and the distance vector for each face i_nfs = xr.dot(fe_nfs, dv, dims=[self.dimn_cart]) # > if the product < 1, multiply by -1 to get an outwards facing normal vector, and update the variable fe_nfs = xr.where(i_nfs > 0, fe_nfs, fe_nfs * -1) # > Get the flow area flow_area = self.flow_area # > Fill face_edge matrix with flow area data edge_length = self.edge_length chunks = {dimn_edges:-1} edge_length = edge_length.chunk(chunks) edge_len_stacked = edge_length.isel({self.dimn_edges:face_edges_stacked}) edge_len = edge_len_stacked.unstack("__tmp_dim__") # > Calculate the area cell_area = ((1/2)*(xr.dot(dv, fe_nfs, dim=[dimn_cart])*edge_len)).sum(dim=dimn_maxfn).rename(f'{self.gridname}_cell_area') # Incorrect as of 3-4-2025 # cell_area = xr.DataArray(self.ds.grid.area, dims=(self.dimn_faces), name=f'{self.gridname}_cell_area') return cell_area
[docs] @cached_property def volume(self): """ Compute the volume of each grid cell. Returns ------- xarray.DataArray Cell volume calculated as cell_area times the cell_thickness. """ # > Get strings gridname = self.gridname # > Get properties cell_area = self.cell_area cell_thickness = self.cell_thickness volume = (cell_area * cell_thickness).rename(f'{self.gridname}_vol1') return volume
[docs] def straining(self, integration='depth', depth_averaged=False): """ Compute the straining term from the tracer variance budget. The straining term is defined as .. math:: \\text{straining} = -2 \, c' \, \mathbf{u}' \cdot \\nabla_h \\bar{c} where - :math:`c' = c - \\bar{c}` is the tracer perturbation relative to the depth-averaged tracer; - :math:`\mathbf{u}' = \mathbf{u} - \\bar{\mathbf{u}}` is the velocity perturbation relative to the depth-averaged velocity; - :math:`\\bar{c}` indicates a **depth-averaged** quantity; - :math:`\\nabla_h` denotes the horizontal gradient operator. Parameters ---------- integration : {"depth", "volume", "none"}, optional If "depth", integrate the straining term over the vertical layers. If "volume", integrate over the total volume (not yet implemented). If "none", no integration is performed. depth_averaged : bool, optional If ``True``, return the term already averaged over depth layers. Otherwise, return the full 3D field. Returns ------- xarray.DataArray Straining term from the tracer variance budget, with units consistent with :math:`c^2\,s^{-1}` (tracer squared per second). """ # > Get the tracer variance tracer_variance = self.tracer_variance # > Get the velocity perturbation ux_p = self.velx - self.velx.mean(dim=self.dimn_layer) uy_p = self.vely - self.vely.mean(dim=self.dimn_layer) # > Get the mean tracer over depth # tracer_mean = self.tracer.mean(dim=self.dimn_layer, keep_attrs=True).rename(f'{self.gridname}_{self.tracer.name}a') # a stands for averaged # > Get the gradient of the tracer mean using the Leibniz rule determining that # > the mean of the gradient == the gradient of the mean tracer_gradient = self.compute_gradient_on_face(self.tracer, scalar=True) tracer_mean_gradient = tracer_gradient.mean(dim=self.dimn_layer) # tracer_mean_gradient = self.compute_gradient_on_face(tracer_mean, depth_averaged=True) # > Calculate -2*sv2*ux_p and uy_p sv2_uxp = -2 * tracer_variance * ux_p sv2_uyp = -2 * tracer_variance * uy_p # > Calculate the dot product of the vector (-2*sv2*ux_p, -2*sv2*uy_p) with the tracer_mean_gradient dot_prod = tracer_mean_gradient.isel({f'{self.dimn_cart}':0})*sv2_uxp + tracer_mean_gradient.isel({f'{self.dimn_cart}':1})*sv2_uyp # > Drop coordinates that have coordinate to be integrated over before integrating so no difficulties arise dot_prod = dot_prod.drop_vars([n for n,v in dot_prod.coords.items() if f"{self.dimn_layer}" in v.dims]) depth = self.depth.drop_vars([n for n,v in self.depth.coords.items() if f"{self.dimn_layer}" in v.dims]) if integration.lower() == 'depth': # > Integrate to get the straining term (trapezoidal rule) straining = integrate_trapz(dot_prod, depth, dim=self.dimn_layer) elif integration.lower() == 'volume': # > Multiply with volume per cell and sum over all cells straining = (dot_prod * self.volume).sum(dim=[f'{self.dimn_layer}', f'{self.dimn_faces}']) elif integration.lower() == 'none': straining = dot_prod else: raise ValueError(f"Could not derive what to do with integration={integration} keyword.") if depth_averaged: straining = straining * (1/self.water_depth) return straining.rename(f"{integration}_integrated_{self.tracer.name}_straining")
[docs] def tendency(self, integration='depth', depth_averaged=False): """ Compute the temporal tendency term of the tracer variance budget. The tendency term is calculated as the time derivative of the tracer variance: .. math:: \\frac{\\partial (c')^2}{\\partial t} where :math:`c' = c - \\bar{c}` is the tracer perturbation relative to the depth-averaged tracer. Optionally, this term can be integrated over depth or volume. A depth-averaged output can also be requested. Parameters ---------- integration : {"depth", "volume", "none"}, optional Integration method for the tracer variance prior to differentiation. - "depth": integrate over the vertical coordinate - "volume": integrate over cell volumes - "none": no integration applied Default is "depth". depth_averaged : bool, optional If True, divide the integrated tendency by the local water depth to return a depth-averaged quantity. Returns ------- xarray.DataArray Tendency term in the tracer variance budget. Units are :math:`c^2\\, s^{-1}`, where :math:`c` is the tracer concentration. """ # > Get tracer variance tracer_variance = self.tracer_variance # > Drop coordinates that have coordinate to be integrated over before integrating so no difficulties arise tracer_variance = tracer_variance.drop_vars([n for n,v in tracer_variance.coords.items() if f"{self.dimn_layer}" in v.dims]) depth = self.depth.drop_vars([n for n,v in self.depth.coords.items() if f"{self.dimn_layer}" in v.dims]) if integration.lower() == 'depth': # > Integrate tracer variance over depth (trapezoidal rule) tv_int = integrate_trapz(tracer_variance, depth, self.dimn_layer) elif integration.lower() == 'volume': # > Multiply with volume per cell and sum over all cells tv_int = (tracer_variance * self.volume).sum(dim=[f'{self.dimn_layer}', f'{self.dimn_faces}']) elif integration.lower() == 'none': tv_int = tracer_variance else: raise ValueError(f"Could not derive what to do with integration={integration} keyword.") tv_int = tv_int.unify_chunks() # > Take the difference in time (d/dt) # > For this, chunk size in time must be larger than 1; check this first if any(val == 1 for val in tv_int.chunksizes['time']): tv_int = tv_int.chunk({'time': 4}) else: pass tendency = tv_int.differentiate("time", datetime_unit="s").rename(f"{integration}_integrated_{self.tracer.name}_tendency") if depth_averaged: tendency = tendency * (1/self.water_depth) return tendency
[docs] def advection(self, integration='depth', depth_averaged=False): """ Compute the advection term in the tracer variance budget, computed as: .. math:: \\text{advection} = \\nabla_h \\cdot (\\mathbf{u} (c')^2) where: - :math:`c' = c - \\bar{c}` is the tracer perturbation relative to the depth-averaged tracer; - :math:`\\nabla_h` indicates the horizontal divergence; - :math:`\\mathbf{u}=(u,v)` represents the full velocity vector. Integration over depth or volume is optional. Parameters ---------- integration : {"depth", "volume", "none"}, optional Integration method applied to the computed advection field. Default is "depth". depth_averaged : bool, optional If True, return depth-averaged advection. Returns ------- xarray.DataArray Advection term in the tracer variance budget. Units are :math:`c^2\\, s^{-1}`. """ # > First calculate (S')^2 * u and (S')^2 * v u_sv2 = self.velx * self.tracer_variance v_sv2 = self.vely * self.tracer_variance # > Drop coordinates that have coordinate to be integrated over before integrating so no difficulties arise u_sv2 = u_sv2.drop_vars([n for n,v in u_sv2.coords.items() if f"{self.dimn_layer}" in v.dims]) v_sv2 = v_sv2.drop_vars([n for n,v in v_sv2.coords.items() if f"{self.dimn_layer}" in v.dims]) depth = self.depth.drop_vars([n for n,v in self.depth.coords.items() if f"{self.dimn_layer}" in v.dims]) # > Calculate gradient grad_usv2 = self.compute_gradient_on_face(u_sv2) grad_vsv2 = self.compute_gradient_on_face(v_sv2) # > Cartesian dimension 0 is x-direction, 1 is y-direction # > We need du/dx + dv/dy for the horizontal divergence of the velocity * salinity variance vector adv = grad_usv2.isel({f'{self.dimn_cart}':0}) + grad_vsv2.isel({f'{self.dimn_cart}':1}) if integration.lower() == 'depth': # > Integrate advection over depth (trapezoidal rule) # > Applying the Leibniz rule, we know that the divergence of an integral with fixed limits # > is equal to the integral of the divergence, so we take the integral of the previously calculated advection term advection = integrate_trapz(adv, depth, dim=self.dimn_layer) elif integration.lower() == 'volume': # > Multiply with volume per cell and sum over all cells advection = (adv * self.volume).sum(dim=[f'{self.dimn_layer}', f'{self.dimn_faces}']) elif integration.lower() == 'none': advection = adv else: raise ValueError(f"Could not derive what to do with integration={integration} keyword.") if depth_averaged: advection = advection * (1/self.water_depth) return advection.rename(f"{integration}_integrated_{self.tracer.name}_advection")
[docs] def horizontal_dissipation(self, integration='depth', depth_averaged=False): """ Compute the horizontal dissipation term in the tracer variance budget. Horizontal dissipation is calculated using the horizontal diffusivity and squared horizontal gradients of the tracer: .. math:: \\text{horizontal dissipation} = 2 D_h \\left[\\left(\\frac{\\partial c}{\\partial x}\\right)^2 + \\left(\\frac{\\partial c}{\\partial y}\\right)^2\\right] where: - :math:`c` is the tracer; - :math:`D_h` is the horizontal diffusivity. Parameters ---------- integration : {"depth", "volume", "none"}, optional Integration method applied to the dissipation field. Default is "depth". depth_averaged : bool, optional If True, return depth-averaged horizontal dissipation. Returns ------- xarray.DataArray or None Horizontal dissipation term in the tracer variance budget. Units are :math:`c^2\\, s^{-1}`. Returns None if calculation cannot be performed (e.g., horizontal diffusivity undefined). """ # > Get the dimensions dimn_cart = self.dimn_cart # > Get the horizontal diffusion diu = self.diu # > Check if the horizontal diffusivity is defined on the edges, and if it is, # > interpolate it on the faces if self.dimn_edges in diu.dims: diu = self.uda_to_faces(diu) try: tracer_grad = self.compute_gradient_on_face(self.tracer, scalar=True) dsdx = tracer_grad.isel({dimn_cart:0}).transpose(*list(diu.dims)).reset_coords(drop=True) dsdy = tracer_grad.isel({dimn_cart:1}).transpose(*list(diu.dims)).reset_coords(drop=True) hor_dis = 2*diu*(dsdx**2 + dsdy**2) # > Drop coordinates that have coordinate to be integrated over before integrating so no difficulties arise hor_dis = hor_dis.drop_vars([n for n,v in hor_dis.coords.items() if f"{self.dimn_layer}" in v.dims]) depth = self.depth.drop_vars([n for n,v in self.depth.coords.items() if f"{self.dimn_layer}" in v.dims]) # > Integrate the total term and multiply with -1 if integration.lower() == 'depth': # > Integrate dissipation over depth (trapezoidal rule) dissipation = integrate_trapz(hor_dis, depth, self.dimn_layer) elif integration.lower() == 'volume': # > Multiply with volume per cell and sum over all cells dissipation = ((hor_dis * self.volume).sum(dim=[f'{self.dimn_layer}', f'{self.dimn_faces}'])) elif integration.lower() == 'none': dissipation = hor_dis else: raise ValueError(f"Could not derive what to do with integration={integration} keyword.") if depth_averaged: dissipation = dissipation * (1/self.water_depth) return dissipation.rename(f"{integration}_integrated_{self.tracer.name}_horizontal_dissipation") except: return None
[docs] def vertical_dissipation(self, integration='depth', depth_averaged=False): """ Compute the vertical dissipation term in the tracer variance budget. Vertical dissipation is calculated using the vertical eddy diffusivity and squared vertical gradients of the tracer: .. math:: \\text{vertical dissipation} = 2 K_{zz} \\left(\\frac{\\partial c}{\\partial z}\\right)^2 where: - :math:`c` is the tracer,; - :math:`K_{zz}` is the vertical diffusivity. Parameters ---------- integration : {"depth", "volume", "none"}, optional Integration method applied to the dissipation field. Default is "depth". depth_averaged : bool, optional If True, return depth-averaged vertical dissipation. Returns ------- xarray.DataArray Vertical dissipation term in the tracer variance budget. Units are :math:`c^2\\, s^{-1}`. """ # > Get the vertical turbulent diffusivity kzz = self.kzz # > See if the depth dimension is in the kzz variable, if not then it's a staggered grid. # > If staggered grid, then interpolate to dimension if (self.dimn_interface != None) and (self.dimn_interface in kzz.dims): kzz = dfmt.uda_interfaces_to_centers(kzz) # > Check if the vertical eddy diffusivity is defined on the edges, and if it is, # > interpolate it on the efaces if self.dimn_edges in kzz.dims: kzz = self.uda_to_faces(kzz) # > Differentiate the tracer over depth dsdz = differentiate_over_3d_coord(self.tracer, f"{self.depth.name}", axis=-1) dsdz_sq = 2 * kzz * (dsdz**2) # > Drop coordinates that have coordinate to be integrated over before integrating so no difficulties arise dsdz_sq = dsdz_sq.drop_vars([n for n,v in dsdz_sq.coords.items() if f"{self.dimn_layer}" in v.dims]) depth = self.depth.drop_vars([n for n,v in self.depth.coords.items() if f"{self.dimn_layer}" in v.dims]) # > Integrate the total term and multiply with -1 if integration.lower() == 'depth': # > Integrate dissipation over depth (trapezoidal rule) vertical_dissipation = integrate_trapz(dsdz_sq, depth, self.dimn_layer) elif integration.lower() == 'volume': # > Multiply with volume per cell and sum over all cells vertical_dissipation = ((dsdz_sq * self.volume).sum(dim=[f'{self.dimn_layer}', f'{self.dimn_faces}'])) elif integration.lower() == 'none': vertical_dissipation = dsdz_sq else: raise ValueError(f"Could not derive what to do with integration={integration} keyword.") if depth_averaged: vertical_dissipation = vertical_dissipation * (1/self.water_depth) return vertical_dissipation.rename(f"{integration}_integrated_{self.tracer.name}_dissipation")