Source code for icenet.data.interfaces.mars

import datetime
import logging
import os
import sys

from itertools import product

import ecmwfapi
import pandas as pd
import xarray as xr

from icenet.data.cli import download_args
from icenet.data.interfaces.downloader import ClimateDownloader
from icenet.data.interfaces.utils import batch_requested_dates
"""

"""


[docs] class HRESDownloader(ClimateDownloader): """Climate downloader to provide CMIP6 reanalysis data from ESGF APIs :param identifier: how to identify this dataset """ PARAM_TABLE = 128 # Background on the use of forecast and observational data # https://confluence.ecmwf.int/pages/viewpage.action?pageId=85402030 # https://confluence.ecmwf.int/display/CKB/ERA5%3A+data+documentation#ERA5:datadocumentation-Dateandtimespecification HRES_PARAMS = { "siconca": (31, "siconc"), # sea_ice_area_fraction "tos": (34, "sst"), # sea surface temperature (actually # sst?) "zg": (129, "z"), # geopotential "ta": (130, "t"), # air_temperature (t) "hus": (133, "q"), # specific_humidity "psl": (134, "sp"), # surface_pressure "uas": (165, "u10"), # 10m_u_component_of_wind "vas": (166, "v10"), # 10m_v_component_of_wind "tas": (167, "t2m"), # 2m_temperature (t2m) # https://confluence.ecmwf.int/display/CKB/ERA5%3A+data+documentation#ERA5:datadocumentation-Meanrates/fluxesandaccumulations # https://apps.ecmwf.int/codes/grib/param-db/?id=175 # https://confluence.ecmwf.int/pages/viewpage.action?pageId=197702790 # # Mean rate/flux parameters in ERA5 (e.g. Table 4 for surface and # single levels) provide similar information to accumulations (e.g. # Table 3 for surface and single levels), except they are expressed as # temporal means, over the same processing periods, and so have units # of "per second". "rlds": (175, "strd"), "rsds": (169, "ssrd"), # plev 129.128 / 130.128 / 133.128 # sfc 31.128 / 34.128 / 134.128 / # 165.128 / 166.128 / 167.128 / 169.128 / 177.128 # ORAS5 variables in param-db (need to consider depth) # "thetao": (151129, "thetao"), # "so": (151130, "so"), # Better matches than equivalent X / Y parameters in param-db # "uo": (151131, "uo"), # "vo": (151132, "vo"), } # https://confluence.ecmwf.int/display/UDOC/Keywords+in+MARS+and+Dissemination+requests MARS_TEMPLATE = """ retrieve, class=od, date={date}, expver=1, levtype={levtype}, {levlist}param={params}, step={step}, stream=oper, time=12:00:00, type=fc, area={area}, grid=0.25/0.25, target="{target}", format=netcdf """ def __init__(self, *args, identifier: str = "mars.hres", **kwargs): super().__init__(*args, identifier=identifier, **kwargs) self._server = ecmwfapi.ECMWFService("mars") def _single_download(self, var_names: object, pressures: object, req_dates: object): """ :param var_names: :param pressures: :param req_dates: :return: """ for dt in req_dates: assert dt.year == req_dates[0].year downloads = [] levtype = "plev" if pressures else "sfc" for req_batch in batch_requested_dates(req_dates, attribute="month"): req_batch = sorted(req_batch) request_month = req_batch[0].strftime("%Y%m") logging.info("Downloading month file {}".format(request_month)) if req_batch[-1] - datetime.datetime.utcnow().date() == \ datetime.timedelta(days=-1): logging.warning("Not allowing partial requests at present, " "removing {}".format(req_batch[-1])) req_batch = req_batch[:-1] request_target = os.path.join( self.base_path, self.hemisphere_str[0], "{}.{}.nc".format(levtype, request_month)) os.makedirs(os.path.dirname(request_target), exist_ok=True) request = self.mars_template.format( area="/".join([str(s) for s in self.hemisphere_loc]), date="/".join([el.strftime("%Y%m%d") for el in req_batch]), levtype=levtype, levlist="levelist={},\n ".format(pressures) if pressures else "", params="/".join([ "{}.{}".format(self.params[v][0], self.param_table) for v in var_names ]), target=request_target, # We are only allowed date prior to -24 hours ago, dynamically # retrieve if date is today # TODO: too big - step="/".join([str(i) for i in range(24)]), step=0, ) if not os.path.exists(request_target): logging.debug("MARS REQUEST: \n{}\n".format(request)) try: self._server.execute(request, request_target) except ecmwfapi.api.APIException: logging.exception("Could not complete ECMWF request: {}") else: downloads.append(request_target) else: logging.debug("Already have {}".format(request_target)) downloads.append(request_target) logging.debug("Files downloaded: {}".format(downloads)) ds = xr.open_mfdataset(downloads) ds = ds.resample(time='1D', keep_attrs=True).mean(keep_attrs=True) for var_name, pressure in product( var_names, pressures.split('/') if pressures else [None]): var = var_name if not pressure else \ "{}{}".format(var_name, pressure) da = getattr(ds, self.params[var_name][1]) if pressure: da = da.sel(level=int(pressure)) self.save_temporal_files(var, da) ds.close() if self.delete: for downloaded_file in downloads: if os.path.exists(downloaded_file): logging.info("Removing {}".format(downloaded_file)) os.unlink(downloaded_file)
[docs] def download(self): """ """ logging.info("Building request(s), downloading and daily averaging " "from {} API".format(self.identifier.upper())) sfc_vars = [ var for idx, var in enumerate(self.var_names) if not self.levels[idx] ] level_vars = [ var for idx, var in enumerate(self.var_names) if self.levels[idx] ] levels = "/".join([ str(s) for s in sorted(set([p for ps in self.levels if ps for p in ps])) ]) # req_dates = self.filter_dates_on_data() dates_per_request = \ batch_requested_dates(self._dates, attribute=self.group_dates_by) for req_batch in dates_per_request: if len(sfc_vars) > 0: self._single_download(sfc_vars, None, req_batch) if len(level_vars) > 0: self._single_download(level_vars, levels, req_batch) logging.info("{} daily files downloaded".format( len(self._files_downloaded)))
[docs] def additional_regrid_processing(self, datafile: str, cube_ease: object): """ :param datafile: :param cube_ease: """ (datafile_path, datafile_name) = os.path.split(datafile) var_name = datafile_path.split(os.sep)[self._var_name_idx] if var_name == 'tos': # Overwrite maksed values with zeros logging.debug("MARS additional regrid: {}".format(var_name)) cube_ease.data[cube_ease.data.mask] = 0. cube_ease.data[:, self._masks.get_land_mask()] = 0. cube_ease.data = cube_ease.data.data if var_name in ['rlds', 'rsds']: # FIXME: We're taking the mean across the hourly samples for the # day in fc which needs to be comparative with the analysis product # from ERA5. My interpretation is that this should be /24, but of # course it doesn't work like that thanks to orbital rotation. # We need to verify the exact mechanism for converting forecast # values to reanalysis equivalents, but this rudimentary divisor # should work in the meantime # # FIXME FIXME FIXME cube_ease /= 12. if var_name.startswith("zg"): # https://apps.ecmwf.int/codes/grib/param-db/?id=129 # # We want the geopotential height as per ERA5 cube_ease /= 9.80665
@property def mars_template(self): return getattr(self, "MARS_TEMPLATE") @property def params(self): return getattr(self, "HRES_PARAMS") @property def param_table(self): return getattr(self, "PARAM_TABLE")
[docs] class SEASDownloader(HRESDownloader): # TODO: step should be configurable for this downloader # TODO: unsure why, but multiple dates break the download with # ERROR 89 (MARS_EXPECTED_FIELDS): Expected 4700, got 2350 MARS_TEMPLATE = """ retrieve, class=od, date={date}, expver=1, levtype={levtype}, method=1, number=0/1/2/3/4/5/6/7/8/9/10/11/12/13/14/15/16/17/18/19/20/21/22/23/24, origin=ecmf, {levlist}param={params}, step=0/to/2232/by/24, stream=mmsf, system=5, time=00:00:00, type=fc, target="{target}", format=netcdf, grid=0.25/0.25, area={area}""" def _single_download(self, var_names: object, pressures: object, req_dates: object): """ :param var_names: :param pressures: :param req_dates: :return: """ for dt in req_dates: assert dt.year == req_dates[0].year downloads = [] levtype = "plev" if pressures else "sfc" for req_date in req_dates: request_day = req_date.strftime("%Y%m%d") logging.info("Downloading daily file {}".format(request_day)) request_target = os.path.join( self.base_path, self.hemisphere_str[0], "{}.{}.nc".format(levtype, request_day)) os.makedirs(os.path.dirname(request_target), exist_ok=True) request = self.mars_template.format( area="/".join([str(s) for s in self.hemisphere_loc]), date=req_date.strftime("%Y-%m-%d"), levtype=levtype, levlist="levelist={},\n ".format(pressures) if pressures else "", params="/".join([ "{}.{}".format(self.params[v][0], self.param_table) for v in var_names ]), target=request_target, ) if not os.path.exists(request_target): logging.debug("MARS REQUEST: \n{}\n".format(request)) try: self._server.execute(request, request_target) except ecmwfapi.api.APIException: logging.exception("Could not complete ECMWF request: {}") else: downloads.append(request_target) else: logging.debug("Already have {}".format(request_target)) downloads.append(request_target) logging.debug("Files downloaded: {}".format(downloads)) for download_filename in downloads: logging.info("Processing {}".format(download_filename)) ds = xr.open_dataset(download_filename) ds = ds.mean("number") for var_name, pressure in product( var_names, pressures.split('/') if pressures else [None]): var = var_name if not pressure else \ "{}{}".format(var_name, pressure) da = getattr(ds, self.params[var_name][1]) if pressure: da = da.sel(level=int(pressure)) self.save_temporal_files(var, da, date_format="%Y%m%d") ds.close() if self.delete: for downloaded_file in downloads: if os.path.exists(downloaded_file): logging.info("Removing {}".format(downloaded_file)) os.unlink(downloaded_file)
[docs] def save_temporal_files(self, var, da, date_format=None, freq=None): """ :param var: :param da: :param date_format: :param freq: """ var_folder = self.get_data_var_folder(var) req_date = pd.to_datetime(da.time.values[0]) latlon_path, regridded_name = \ self.get_req_filenames(var_folder, req_date, date_format=date_format) logging.info("Retrieving and saving {}".format(latlon_path)) da.compute() da.to_netcdf(latlon_path) if not os.path.exists(regridded_name): self._files_downloaded.append(latlon_path)
[docs] def main(identifier, extra_kwargs=None): args = download_args() logging.info("ECMWF {} Data Downloading".format(identifier)) cls = getattr(sys.modules[__name__], "{}Downloader".format(identifier)) if extra_kwargs is None: extra_kwargs = dict() instance = cls( identifier="mars.{}".format(identifier.lower()), var_names=args.vars, dates=[ pd.to_datetime(date).date() for date in pd.date_range(args.start_date, args.end_date, freq="D") ], delete_tempfiles=args.delete, levels=args.levels, north=args.hemisphere == "north", south=args.hemisphere == "south", **extra_kwargs) instance.download() instance.regrid()
[docs] def seas_main(): main("SEAS", dict(group_dates_by="day",))
[docs] def hres_main(): main("HRES")