Source code for bmi_wavewatch3.downloader

import gzip
import pathlib
import urllib
import urllib.request


[docs]class WaveWatch3Downloader: def __init__(self, url, force=False): self._url = url self._force = force self._filepath = None
[docs] @staticmethod def url_file_part(url): return pathlib.Path(urllib.parse.urlparse(url).path).name
[docs] @staticmethod def retreive(url, filename=None, reporthook=None, force=False): if filename is None: filename = WaveWatch3Downloader.url_file_part(url) if not pathlib.Path(filename).is_file() or force: filepath, _ = urllib.request.urlretrieve( url, reporthook=reporthook, data=None, filename=filename ) else: filepath = filename filepath = pathlib.Path(filepath) if filepath.suffix == ".gz": filepath = WaveWatch3Downloader.unzip(filepath) return pathlib.Path(filepath).absolute()
[docs] @staticmethod def unzip(filepath): filepath = pathlib.Path(filepath) with gzip.open(filepath, "rb") as zip_file, open(filepath.stem, "wb") as fp: fp.write(zip_file.read()) return pathlib.Path(filepath.stem)
@property def filepath(self): return self._filepath @property def url(self): return self._url