Source code for icenet.data.sic.osisaf

import copy
import csv
import fnmatch
import ftplib
import logging
import os

import datetime as dt
from ftplib import FTP

import dask
from distributed import Client, LocalCluster
import numpy as np
import pandas as pd
import xarray as xr

from icenet.data.cli import download_args
from icenet.data.producers import Downloader
from icenet.data.sic.mask import Masks
from icenet.utils import Hemisphere, run_command
from icenet.data.sic.utils import SIC_HEMI_STR
"""

"""

invalid_sic_days = {
    Hemisphere.NORTH: [
        *[
            d.date()
            for d in pd.date_range(dt.date(1979, 5, 21), dt.date(1979, 6, 4))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1979, 6, 10), dt.date(1979, 6, 26))
        ],
        dt.date(1979, 7, 1),
        *[
            d.date()
            for d in pd.date_range(dt.date(1979, 7, 24), dt.date(1979, 7, 28))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1980, 1, 4), dt.date(1980, 1, 10))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1980, 2, 27), dt.date(1980, 3, 4))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1980, 3, 16), dt.date(1980, 3, 22))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1980, 4, 9), dt.date(1980, 4, 15))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1981, 2, 27), dt.date(1981, 3, 5))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1984, 8, 12), dt.date(1984, 8, 24))
        ],
        dt.date(1984, 9, 14),
        *[
            d.date()
            for d in pd.date_range(dt.date(1985, 9, 22), dt.date(1985, 9, 28))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1986, 3, 29), dt.date(1986, 7, 1))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1987, 1, 3), dt.date(1987, 1, 19))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1987, 1, 29), dt.date(1987, 2, 2))
        ],
        dt.date(1987, 2, 23),
        *[
            d.date()
            for d in pd.date_range(dt.date(1987, 2, 26), dt.date(1987, 3, 2))
        ],
        dt.date(1987, 3, 13),
        *[
            d.date()
            for d in pd.date_range(dt.date(1987, 3, 22), dt.date(1987, 3, 26))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1987, 4, 3), dt.date(1987, 4, 17))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1987, 12, 1), dt.date(1988, 1, 12))
        ],
        dt.date(1989, 1, 3),
        *[
            d.date()
            for d in pd.date_range(dt.date(1990, 12, 21), dt.date(1990, 12, 26))
        ],
        dt.date(1979, 5, 28),
        dt.date(1979, 5, 30),
        dt.date(1979, 6, 1),
        dt.date(1979, 6, 3),
        dt.date(1979, 6, 11),
        dt.date(1979, 6, 13),
        dt.date(1979, 6, 15),
        dt.date(1979, 6, 17),
        dt.date(1979, 6, 19),
        dt.date(1979, 6, 21),
        dt.date(1979, 6, 23),
        dt.date(1979, 6, 25),
        dt.date(1979, 7, 1),
        dt.date(1979, 7, 25),
        dt.date(1979, 7, 27),
        dt.date(1984, 9, 14),
        dt.date(1987, 1, 16),
        dt.date(1987, 1, 18),
        dt.date(1987, 1, 30),
        dt.date(1987, 2, 1),
        dt.date(1987, 2, 23),
        dt.date(1987, 2, 27),
        dt.date(1987, 3, 1),
        dt.date(1987, 3, 13),
        dt.date(1987, 3, 23),
        dt.date(1987, 3, 25),
        dt.date(1987, 4, 4),
        dt.date(1987, 4, 6),
        dt.date(1987, 4, 10),
        dt.date(1987, 4, 12),
        dt.date(1987, 4, 14),
        dt.date(1987, 4, 16),
        dt.date(1987, 4, 4),
        dt.date(1990, 1, 26),
        dt.date(2022, 11, 9),
    ],
    Hemisphere.SOUTH: [
        dt.date(1979, 2, 5),
        dt.date(1979, 2, 25),
        dt.date(1979, 3, 23),
        *[
            d.date()
            for d in pd.date_range(dt.date(1979, 3, 26), dt.date(1979, 3, 30))
        ],
        dt.date(1979, 4, 12),
        dt.date(1979, 5, 16),
        *[
            d.date()
            for d in pd.date_range(dt.date(1979, 5, 21), dt.date(1979, 5, 27))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1979, 7, 10), dt.date(1979, 7, 18))
        ],
        dt.date(1979, 8, 10),
        dt.date(1979, 9, 3),
        *[
            d.date()
            for d in pd.date_range(dt.date(1980, 1, 4), dt.date(1980, 1, 10))
        ],
        dt.date(1980, 2, 16),
        *[
            d.date()
            for d in pd.date_range(dt.date(1980, 2, 27), dt.date(1980, 3, 4))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1980, 3, 14), dt.date(1980, 3, 22))
        ],
        dt.date(1980, 3, 31),
        *[
            d.date()
            for d in pd.date_range(dt.date(1980, 4, 9), dt.date(1980, 4, 15))
        ],
        dt.date(1980, 4, 22),
        *[
            d.date()
            for d in pd.date_range(dt.date(1981, 2, 27), dt.date(1981, 3, 5))
        ],
        dt.date(1981, 6, 10),
        *[
            d.date()
            for d in pd.date_range(dt.date(1981, 8, 3), dt.date(1982, 8, 9))
        ],
        dt.date(1982, 8, 6),
        *[
            d.date()
            for d in pd.date_range(dt.date(1983, 7, 7), dt.date(1983, 7, 11))
        ],
        dt.date(1983, 7, 22),
        dt.date(1984, 6, 12),
        *[
            d.date()
            for d in pd.date_range(dt.date(1984, 8, 12), dt.date(1984, 8, 24))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1984, 9, 13), dt.date(1984, 9, 17))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1984, 10, 3), dt.date(1984, 10, 9))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1984, 11, 18), dt.date(1984, 11, 22))
        ],
        dt.date(1985, 7, 23),
        *[
            d.date()
            for d in pd.date_range(dt.date(1985, 9, 22), dt.date(1985, 9, 28))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1986, 3, 29), dt.date(1986, 11, 2))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1987, 1, 3), dt.date(1987, 1, 15))
        ],
        *[
            d.date()
            for d in pd.date_range(dt.date(1987, 12, 1), dt.date(1988, 1, 12))
        ],
        dt.date(1990, 8, 14),
        dt.date(1990, 8, 15),
        dt.date(1990, 8, 24),
        *[
            d.date()
            for d in pd.date_range(dt.date(1990, 12, 22), dt.date(1990, 12, 26))
        ],
        dt.date(1979, 2, 5),
        dt.date(1979, 2, 25),
        dt.date(1979, 3, 23),
        dt.date(1979, 3, 27),
        dt.date(1979, 3, 29),
        dt.date(1979, 4, 12),
        dt.date(1979, 5, 16),
        dt.date(1979, 7, 11),
        dt.date(1979, 7, 13),
        dt.date(1979, 7, 15),
        dt.date(1979, 7, 17),
        dt.date(1979, 8, 10),
        dt.date(1979, 9, 3),
        dt.date(1980, 2, 16),
        dt.date(1980, 3, 15),
        dt.date(1980, 3, 31),
        dt.date(1980, 4, 22),
        dt.date(1981, 6, 10),
        dt.date(1982, 8, 6),
        dt.date(1983, 7, 8),
        dt.date(1983, 7, 10),
        dt.date(1983, 7, 22),
        dt.date(1984, 6, 12),
        dt.date(1984, 9, 14),
        dt.date(1984, 9, 16),
        dt.date(1984, 10, 4),
        dt.date(1984, 10, 6),
        dt.date(1984, 10, 8),
        dt.date(1984, 11, 19),
        dt.date(1984, 11, 21),
        dt.date(1985, 7, 23),
        *[
            d.date()
            for d in pd.date_range(dt.date(1986, 7, 2), dt.date(1986, 11, 1))
        ],
        dt.date(1990, 8, 14),
        dt.date(1990, 8, 15),
        dt.date(1990, 8, 24),
        dt.date(2022, 11, 9),
    ]
}

var_remove_list = [
    'time_bnds', 'raw_ice_conc_values', 'total_standard_error',
    'smearing_standard_error', 'algorithm_standard_error', 'status_flag',
    'Lambert_Azimuthal_Grid'
]


# This is adapted from the data/loaders implementations
[docs] class DaskWrapper: """ :param dask_port: :param dask_timeouts: :param dask_tmp_dir: :param workers: """ def __init__(self, dask_port: int = 8888, dask_timeouts: int = 60, dask_tmp_dir: object = "/tmp", workers: int = 8): self._dashboard_port = dask_port self._timeout = dask_timeouts self._tmp_dir = dask_tmp_dir self._workers = workers
[docs] def dask_process(self, *args, method: callable, **kwargs): """ :param method: """ dashboard = "localhost:{}".format(self._dashboard_port) with dask.config.set({ "temporary_directory": self._tmp_dir, "distributed.comm.timeouts.connect": self._timeout, "distributed.comm.timeouts.tcp": self._timeout, }): cluster = LocalCluster( dashboard_address=dashboard, n_workers=self._workers, threads_per_worker=1, scheduler_port=0, ) logging.info("Dashboard at {}".format(dashboard)) with Client(cluster) as client: logging.info("Using dask client {}".format(client)) ret = method(*args, **kwargs) return ret
[docs] class SICDownloader(Downloader): """Downloads OSI-SAF SIC data from 1979-present using OpenDAP. The dataset comprises OSI-450 (1979-2015) and OSI-430-b (2016-ownards) Monthly averages are-computed on the server-side. This script can take about an hour to run. The query URLs were obtained from the following sites: - OSI-450 (1979-2016): https://thredds.met.no/thredds/dodsC/osisaf/ met.no/reprocessed/ice/conc_v2p0_nh_agg.html - OSI-430-b (2016-present): https://thredds.met.no/thredds/dodsC/osisaf/ met.no/reprocessed/ice/conc_crb_nh_agg.html - OSI-430-a (2022-present): https://osi-saf.eumetsat.int/products/osi-430-as :param additional_invalid_dates: :param chunk_size: :param dates: :param delete_tempfiles: :param download: :param dtype: """ def __init__(self, *args, additional_invalid_dates: object = (), chunk_size: int = 10, dates: object = (), delete_tempfiles: bool = True, download: bool = True, dtype: object = np.float32, parallel_opens: bool = True, **kwargs): super().__init__(*args, identifier="osisaf", **kwargs) self._chunk_size = chunk_size self._dates = dates self._delete = delete_tempfiles self._download = download self._dtype = dtype self._parallel_opens = parallel_opens self._invalid_dates = invalid_sic_days[self.hemisphere] + \ list(additional_invalid_dates) self._masks = Masks(north=self.north, south=self.south) self._ftp_osi450 = "/reprocessed/ice/conc/v2p0/{:04d}/{:02d}/" self._ftp_osi430b = "/reprocessed/ice/conc-cont-reproc/v2p0/{:04d}/{:02d}/" self._ftp_osi430a = "/reprocessed/ice/conc-cont-reproc/v3p0/{:04d}/{:02d}/" self._mask_dict = { month: self._masks.get_active_cell_mask(month) for month in np.arange(1, 12 + 1) } # Load dates that previously had a file size of zero. # To recheck they haven't been fixed since last download. zero_dates_path = os.path.join(self.get_data_var_folder("siconca"), "zero_size_days.csv") self._zero_dates_path = zero_dates_path self._zero_dates = [] if os.path.exists(zero_dates_path): with open(zero_dates_path, "r") as fh: self._zero_dates = [ pd.to_datetime("-".join(date)).date() for date in csv.reader(fh) ]
[docs] def download(self): """ """ hs = SIC_HEMI_STR[self.hemisphere_str[0]] data_files = [] ftp = None var = "siconca" logging.info("Not downloading SIC files, (re)processing NC files in " "existence already" if not self._download else "Downloading SIC datafiles to .temp intermediates...") cache = {} osi430b_start = dt.date(2016, 1, 1) osi430a_start = dt.date(2021, 1, 1) dt_arr = list(reversed(sorted(copy.copy(self._dates)))) # Filtering dates based on existing data filter_years = sorted(set([d.year for d in dt_arr])) extant_paths = [ os.path.join(self.get_data_var_folder(var), "{}.nc".format(filter_ds)) for filter_ds in filter_years ] extant_paths = [df for df in extant_paths if os.path.exists(df)] if len(extant_paths) > 0: extant_ds = xr.open_mfdataset(extant_paths) exclude_dates = [ pd.to_datetime(date).date() for date in extant_ds.time.values ] # Do not exclude dates that previously had a file size of 0 exclude_dates = set(exclude_dates).difference(self._zero_dates) logging.info("Excluding {} dates already existing from {} dates " "requested.".format(len(exclude_dates), len(dt_arr))) dt_arr = sorted(list(set(dt_arr).difference(exclude_dates))) dt_arr.reverse() # We won't hold onto an active dataset during network I/O extant_ds.close() # End filtering while len(dt_arr): el = dt_arr.pop() if el in self._invalid_dates: logging.warning("Date {} is in invalid list".format(el)) continue date_str = el.strftime("%Y_%m_%d") temp_path = os.path.join( self.get_data_var_folder(var, append=[str(el.year)]), "{}.temp".format(date_str)) nc_path = os.path.join( self.get_data_var_folder(var, append=[str(el.year)]), "{}.nc".format(date_str)) if not self._download: if os.path.exists(nc_path): reproc_path = os.path.join( self.get_data_var_folder(var, append=[str(el.year)]), "{}.reproc.nc".format(date_str)) logging.debug("{} exists, becoming {}".format( nc_path, reproc_path)) os.rename(nc_path, reproc_path) data_files.append(reproc_path) else: if os.path.exists(temp_path): logging.info("Using existing {}".format(temp_path)) data_files.append(temp_path) else: logging.debug("{} does not exist".format(nc_path)) continue else: if not os.path.isdir(os.path.dirname(temp_path)): os.makedirs(os.path.dirname(temp_path), exist_ok=True) if os.path.exists(temp_path) or os.path.exists(nc_path): logging.debug("{} file exists, skipping".format(date_str)) if not os.path.exists(nc_path): data_files.append(temp_path) continue if not ftp: logging.info("FTP opening") ftp = FTP('osisaf.met.no') ftp.login() chdir_path = self._ftp_osi450 \ if el < osi430b_start else self._ftp_osi430b \ if el < osi430a_start else self._ftp_osi430a chdir_path = chdir_path.format(el.year, el.month) try: ftp.cwd(chdir_path) if chdir_path not in cache: cache[chdir_path] = ftp.nlst() cache_match = "ice_conc_{}_ease*_{:04d}{:02d}{:02d}*.nc".\ format(hs, el.year, el.month, el.day) ftp_files = [ el for el in cache[chdir_path] if fnmatch.fnmatch(el, cache_match) ] if len(ftp_files) > 1: raise ValueError( "More than a single file found: {}".format( ftp_files)) elif not len(ftp_files): logging.warning( "File is not available: {}".format(cache_match)) continue except ftplib.error_perm: logging.warning("FTP error, possibly missing month chdir " "for {}".format(date_str)) continue # Check if remote file size is too small, if so, render date invalid # and continue. file_size = ftp.size(ftp_files[0]) # Check remote file size in bytes if file_size < 100: logging.warning( f"Date {el} is in invalid list, as file size too small") self._zero_dates.append(el) self._invalid_dates.append(el) continue else: # Removing missing date file if it was created for a file with zero size before if el in self._zero_dates: self._zero_dates.remove(el) fpath = os.path.join( self.get_data_var_folder( "siconca", append=[str(pd.to_datetime(el).year)]), "missing.{}.nc".format(date_str)) if os.path.exists(fpath): os.unlink(fpath) with open(temp_path, "wb") as fh: ftp.retrbinary("RETR {}".format(ftp_files[0]), fh.write) logging.debug("Downloaded {}".format(temp_path)) data_files.append(temp_path) self._zero_dates = set(self._zero_dates) self.zero_dates() if ftp: ftp.quit() logging.debug("Files being processed: {}".format(data_files)) if len(data_files): ds = xr.open_mfdataset(data_files, combine="nested", concat_dim="time", data_vars=["ice_conc"], drop_variables=var_remove_list, engine="netcdf4", chunks=dict(time=self._chunk_size,), parallel=self._parallel_opens) logging.debug("Processing out extraneous data") ds = ds.drop_vars(var_remove_list, errors="ignore") da = ds.resample(time="1D").mean().ice_conc da = da.where(da < 9.9e+36, 0.) # Missing values da /= 100. # Convert from SIC % to fraction for coord in ['lat', 'lon']: if coord not in da.coords: logging.warning("Adding {} vals to coords, as missing in " "this the combined dataset".format(coord)) da.coords[coord] = self._get_missing_coordinates( var, hs, coord) # In experimenting, I don't think this is actually required for month, mask in self._mask_dict.items(): da.loc[dict( time=(da['time.month'] == month))].values[:, ~mask] = 0. for date in da.time.values: day_da = da.sel(time=slice(date, date)) if np.sum(np.isnan(day_da.data)) > 0: logging.warning("NaNs detected, adding to invalid " "list: {}".format(date)) self._invalid_dates.append(pd.to_datetime(date)) var_folder = self.get_data_var_folder(var) group_by = "time.year" for year, year_da in da.groupby(group_by): req_date = pd.to_datetime(year_da.time.values[0]) year_path = os.path.join( var_folder, "{}.nc".format(getattr(req_date, "year"))) old_year_path = os.path.join( var_folder, "old.{}.nc".format(getattr(req_date, "year"))) if os.path.exists(year_path): logging.info( "Existing file needs concatenating: {} -> {}".format( year_path, old_year_path)) os.rename(year_path, old_year_path) old_da = xr.open_dataarray(old_year_path) year_da = year_da.drop_sel(time=old_da.time, errors="ignore") year_da = xr.concat([old_da, year_da], dim="time").sortby("time") old_da.close() os.unlink(old_year_path) logging.info("Saving {}".format(year_path)) year_da.compute() year_da.to_netcdf(year_path) self.missing_dates() if self._delete: for fpath in data_files: os.unlink(fpath)
[docs] def zero_dates(self): """ Write out any dates that have zero file size on the ftp server to csv """ if not self._zero_dates and os.path.exists(self._zero_dates_path): os.unlink(self._zero_dates_path) elif self._zero_dates: logging.info(f"Processing {len(self._zero_dates)} zero dates") with open(self._zero_dates_path, "w") as fh: for date in self._zero_dates: # FIXME: slightly unusual format for Ymd dates fh.write(date.strftime("%Y,%m,%d\n"))
[docs] def missing_dates(self): """ :return: """ filenames = set([ os.path.join(self.get_data_var_folder("siconca"), "{}.nc".format(el.strftime("%Y"))) for el in self._dates ]) filenames = [f for f in filenames if os.path.exists(f)] logging.info("Opening for interpolation: {}".format(filenames)) ds = xr.open_mfdataset(filenames, combine="nested", concat_dim="time", chunks=dict(time=self._chunk_size,), parallel=self._parallel_opens) return self._missing_dates(ds.ice_conc)
def _missing_dates(self, da: object) -> object: """ :param da: :return: """ if pd.Timestamp(1979, 1, 2) in da.time.values \ and dt.date(1979, 1, 1) in self._dates\ and pd.Timestamp(1979, 1, 1) not in da.time.values: da_1979_01_01 = da.sel( time=[pd.Timestamp(1979, 1, 2)]).copy().assign_coords( {'time': [pd.Timestamp(1979, 1, 1)]}) da = xr.concat([da, da_1979_01_01], dim='time') da = da.sortby('time') dates_obs = [pd.to_datetime(date).date() for date in da.time.values] dates_all = [ pd.to_datetime(date).date() for date in pd.date_range(min(self._dates), max(self._dates)) ] # Weirdly, we were getting future warnings for timestamps, but unsure # where from invalid_dates = [pd.to_datetime(d).date() for d in self._invalid_dates] missing_dates = [ date for date in dates_all if date not in dates_obs or date in invalid_dates ] logging.info("Processing {} missing dates".format(len(missing_dates))) if missing_dates: missing_dates_path = os.path.join(self.get_data_var_folder("siconca"), "missing_days.csv") with open(missing_dates_path, "a") as fh: for date in missing_dates: # FIXME: slightly unusual format for Ymd dates fh.write(date.strftime("%Y,%m,%d\n")) logging.debug("Interpolating {} missing dates".format( len(missing_dates))) for date in missing_dates: if pd.Timestamp(date) not in da.time.values: logging.info("Interpolating {}".format(date)) da = xr.concat([da, da.interp(time=pd.to_datetime(date))], dim='time') logging.debug("Finished interpolation") da = da.sortby('time') da.data = np.array(da.data, dtype=self._dtype) for date in missing_dates: date_str = pd.to_datetime(date).strftime("%Y_%m_%d") fpath = os.path.join( self.get_data_var_folder( "siconca", append=[str(pd.to_datetime(date).year)]), "missing.{}.nc".format(date_str)) if not os.path.exists(fpath): day_da = da.sel(time=slice(date, date)) mask = self._mask_dict[pd.to_datetime(date).month] day_da.data[0][~mask] = 0. logging.info("Writing missing date file {}".format(fpath)) day_da.to_netcdf(fpath) return da def _get_missing_coordinates(self, var, hs, coord): """ :param var: :param hs: :param coord: """ missing_coord_file = os.path.join(self.get_data_var_folder(var), "missing_coord_data.nc") if not os.path.exists(missing_coord_file): ftp_source_path = self._ftp_osi450.format(2000, 1) retrieve_cmd_template_osi450 = \ "wget -m -nH -nd -O {} " \ "ftp://osisaf.met.no{}/{}" filename_osi450 = \ "ice_conc_{}_ease2-250_cdr-v2p0_200001011200.nc".format(hs) run_command( retrieve_cmd_template_osi450.format(missing_coord_file, ftp_source_path, filename_osi450)) else: logging.info( "Coordinate path {} already exists".format(missing_coord_file)) ds = xr.open_dataset(missing_coord_file, drop_variables=var_remove_list, engine="netcdf4").load() try: coord_data = getattr(ds, coord) except AttributeError as e: logging.exception( "{} does not exist in coord reference file {}".format( coord, missing_coord_file)) raise RuntimeError(e) return coord_data
[docs] def main(): args = download_args(var_specs=False, workers=True, extra_args=[(("-u", "--use-dask"), dict(action="store_true", default=False)), (("-c", "--sic-chunking-size"), dict(type=int, default=10)), (("-dt", "--dask-timeouts"), dict(type=int, default=120)), (("-dp", "--dask-port"), dict(type=int, default=8888))]) logging.info("OSASIF-SIC Data Downloading") sic = SICDownloader( chunk_size=args.sic_chunking_size, dates=[ pd.to_datetime(date).date() for date in pd.date_range(args.start_date, args.end_date, freq="D") ], delete_tempfiles=args.delete, north=args.hemisphere == "north", south=args.hemisphere == "south", parallel_opens=args.parallel_opens, ) if args.use_dask: logging.warning("Attempting to use dask client for SIC processing") dw = DaskWrapper(workers=args.workers) dw.dask_process(method=sic.download) else: sic.download()