Source code for icenet.data.interfaces.esgf

import logging
import os
import warnings

import numpy as np
import pandas as pd
import xarray as xr

from icenet.data.interfaces.downloader import ClimateDownloader
from icenet.data.cli import download_args
from icenet.data.utils import esgf_search
"""

"""


[docs] class CMIP6Downloader(ClimateDownloader): """Climate downloader to provide CMIP6 reanalysis data from ESGF APIs Useful CMIP6 guidance: https://pcmdi.llnl.gov/CMIP6/Guide/dataUsers.html :param identifier: how to identify this dataset :param source: source ID in ESGF node :param member: member ID in ESGF node :param nodes: list of ESGF nodes to query :param experiments: experiment IDs to download :param frequency: query parameter frequency :param table_map: table map for :param grid_map: :param grid_override: :param exclude_nodes: "MRI-ESM2-0", "r1i1p1f1", None "MRI-ESM2-0", "r2i1p1f1", None "MRI-ESM2-0", "r3i1p1f1", None "MRI-ESM2-0", "r4i1p1f1", None "MRI-ESM2-0", "r5i1p1f1", None "EC-Earth3", "r2i1p1f1", "gr" "EC-Earth3", "r7i1p1f1", "gr" "EC-Earth3", "r10i1p1f1", "gr" "EC-Earth3", "r12i1p1f1", "gr" "EC-Earth3", "r14i1p1f1", "gr" """ TABLE_MAP = { 'siconca': 'SIday', 'tas': 'day', 'ta': 'day', 'tos': 'Oday', 'hus': 'day', 'psl': 'day', 'rlds': 'day', 'rsus': 'day', 'rsds': 'day', 'zg': 'day', 'uas': 'day', 'vas': 'day', 'ua': 'day', } GRID_MAP = { 'siconca': 'gn', 'tas': 'gn', 'ta': 'gn', 'tos': 'gr', 'hus': 'gn', 'psl': 'gn', 'rlds': 'gn', 'rsus': 'gn', # Surface Upwelling Shortwave Radiation 'rsds': 'gn', # Surface Downwelling Shortwave Radiation 'zg': 'gn', 'uas': 'gn', 'vas': 'gn', 'ua': 'gn', } # Prioritise European first, US last, avoiding unnecessary queries # against nodes further afield (all traffic has a cost, and the coverage # of local nodes is more than enough) ESGF_NODES = ( "esgf.ceda.ac.uk", "esg1.umr-cnrm.fr", "vesg.ipsl.upmc.fr", "esgf3.dkrz.de", "esgf.bsc.es", "esgf-data.csc.fi", "noresg.nird.sigma2.no", "esgf-data.ucar.edu", "esgf-data2.diasjp.net", ) def __init__(self, *args, source: str, member: str, nodes: object = ESGF_NODES, experiments: object = ('historical', 'ssp245'), frequency: str = "day", table_map: object = None, grid_map: object = None, grid_override: object = None, exclude_nodes: object = None, **kwargs): super().__init__(*args, identifier="cmip6.{}.{}".format(source, member), **kwargs) self._source = source self._member = member self._frequency = frequency self._experiments = experiments self._nodes = nodes if not exclude_nodes else \ [n for n in nodes if n not in exclude_nodes] self._table_map = table_map if table_map else CMIP6Downloader.TABLE_MAP self._grid_map = grid_map if grid_map else CMIP6Downloader.GRID_MAP self._grid_map_override = grid_override def _single_download(self, var_prefix: str, level: object, req_dates: object): """Overridden CMIP implementation for downloading from DAP server Due to the size of the CMIP set and the fact that we don't want to make 1850-2100 yearly requests for all downloads, we have a bespoke and overridden download implementation for this. :param var_prefix: :param level: :param req_dates: """ query = { 'source_id': self._source, 'member_id': self._member, 'frequency': self._frequency, 'variable_id': var_prefix, 'table_id': self._table_map[var_prefix], 'grid_label': self._grid_map_override if self._grid_map_override else self._grid_map[var_prefix], } var = var_prefix if not level else "{}{}".format(var_prefix, level) logging.info("Querying ESGF") results = [] for experiment_id in self._experiments: query['experiment_id'] = experiment_id for data_node in self._nodes: query['data_node'] = data_node # FIXME: inefficient, we can strip redundant results files # based on WCRP data management standards for file naming, # such as based on date. Refactor/rewrite this impl... node_results = esgf_search(**query) if len(node_results): logging.debug("Query: {}".format(query)) logging.debug("Found {}: {}".format(experiment_id, node_results)) results.extend(node_results) break logging.info("Found {} {} results from ESGF search".format( len(results), var_prefix)) try: # http://xarray.pydata.org/en/stable/user-guide/io.html?highlight=opendap#opendap # Avoid 500MB DAP request limit cmip6_da = xr.open_mfdataset(results, combine='by_coords', chunks={'time': '499MB'})[var_prefix] cmip6_da = cmip6_da.sel(time=slice(req_dates[0], req_dates[-1])) # TODO: possibly other attributes, especially with ocean vars if level: cmip6_da = cmip6_da.sel(plev=int(level) * 100) cmip6_da = cmip6_da.sel( lat=slice(self.hemisphere_loc[2], self.hemisphere_loc[0])) self.save_temporal_files(var, cmip6_da) except OSError as e: logging.exception("Error encountered: {}".format(e), exc_info=False)
[docs] def additional_regrid_processing(self, datafile: str, cube_ease: object): """ :param datafile: :param cube_ease: """ (datafile_path, datafile_name) = os.path.split(datafile) var_name = datafile_path.split(os.sep)[self._var_name_idx] # TODO: regrid fixes need better implementations if var_name == "siconca": cube_ease.data[cube_ease.data.mask] = 0. cube_ease.data[:, self._masks.get_land_mask()] = 0. if self._source == 'MRI-ESM2-0': cube_ease.data = cube_ease.data / 100. cube_ease.data = cube_ease.data.data elif var_name in ["tos", "hus1000"]: cube_ease.data[cube_ease.data.mask] = 0. cube_ease.data[:, self._masks.get_land_mask()] = 0. cube_ease.data = cube_ease.data.data if cube_ease.data.dtype != np.float32: logging.info("Regrid processing, data type not float: {}".format( cube_ease.data.dtype)) cube_ease.data = cube_ease.data.astype(np.float32)
[docs] def convert_cube(self, cube: object) -> object: """Converts Iris cube to be fit for CMIP regrid :param cube: the cube requiring alteration :return cube: the altered cube """ cs = self.sic_ease_cube.coord_system().ellipsoid for coord in ['longitude', 'latitude']: cube.coord(coord).coord_system = cs return cube
[docs] def main(): args = download_args(dates=True, extra_args=[ (["source"], dict(type=str)), (["member"], dict(type=str)), (("-xs", "--exclude-server"), dict(default=[], nargs="*")), (("-o", "--override"), dict(required=None, type=str)), ], workers=True) logging.info("CMIP6 Data Downloading") downloader = CMIP6Downloader( source=args.source, member=args.member, var_names=args.vars, dates=[ pd.to_datetime(date).date() for date in pd.date_range(args.start_date, args.end_date, freq="D") ], delete_tempfiles=args.delete, grid_override=args.override, levels=args.levels, north=args.hemisphere == "north", south=args.hemisphere == "south", max_threads=args.workers, exclude_nodes=args.exclude_server, ) logging.info("CMIP downloading: {} {}".format(args.source, args.member)) downloader.download() logging.info("CMIP regridding: {} {}".format(args.source, args.member)) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=UserWarning) downloader.regrid()