#! /usr/bin/env python3 # eurostat-fetcher -- Fetch series from Eurostat database # By: Emmanuel Raviart <emmanuel.raviart@cepremap.org> # # Copyright (C) 2017-2018 Cepremap # https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher # # eurostat-fetcher is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # eurostat-fetcher is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. """Fetch series from Eurostat using bulk download and SDMX formats. http://ec.europa.eu/eurostat/data/database EUROSTAT bulk download: - http://ec.europa.eu/eurostat/fr/data/bulkdownload - http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing EUROSTAT SDMX documentation: - http://ec.europa.eu/eurostat/web/sdmx-infospace/welcome - http://ec.europa.eu/eurostat/web/sdmx-web-services/rest-sdmx-2.1 """ import argparse import logging import os import shutil import subprocess import sys from datetime import datetime, timezone from pathlib import Path from typing import Optional from xml.etree.ElementTree import Element from lxml import etree log = logging.getLogger(__name__) nsmap = dict( nt="urn:eu.europa.ec.eurostat.navtree", ) def main(): parser = argparse.ArgumentParser() parser.add_argument( "target_dir", type=Path, help="path of target directory containing Eurostat series", ) parser.add_argument( "--from-datetime", type=datetime_with_timezone, default=os.getenv("FROM_DATETIME"), help="download only the datasets that changed since the provided datetime", ) parser.add_argument("--log", default="INFO", help="level of logging messages") parser.add_argument("--resume", action="store_true", help="keep existing files in target directory") parser.add_argument( "--datasets", nargs="+", metavar="DATASET_CODE", help="download only the given datasets (datasets codes, space separated)", ) args = parser.parse_args() if not args.target_dir.is_dir(): parser.error("Could not find directory {!r}".format(str(args.target_dir))) numeric_level = getattr(logging, args.log.upper(), None) if not isinstance(numeric_level, int): raise ValueError("Invalid log level: {}".format(args.log)) logging.basicConfig(format="%(levelname)s:%(message)s", level=numeric_level) log.info("Command-line arguments: %r", args) if args.from_datetime is None: log.info("Argument --from-datetime was not provided: all datasets will be downloaded") else: log.info( "Argument --from-datetime=%r: only datasets that changed since that datetime will be downloaded", args.from_datetime, ) toc_file_name = "table_of_contents.xml" toc_file = args.target_dir / toc_file_name if args.resume and toc_file.is_file(): log.debug("Skip downloading table of contents because of --resume and file already exists") else: toc_url = "http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing?file={}".format( toc_file_name ) log.info("Downloading table of content from %r", toc_url) download(toc_url, toc_file) toc_element = parse_table_of_contents(toc_file) # Create first-level directories. data_dir = args.target_dir / "data" data_dir.mkdir(exist_ok=True) datastructures_dir = args.target_dir / "datastructure" datastructures_dir.mkdir(exist_ok=True) # Build set of datasets and set of datasets definitions to download. datasets = set() datastructures = set() for leaf_element in iter_datasets_to_download(toc_element, args.from_datetime): dataset_code = leaf_element.findtext("./nt:code", namespaces=nsmap) if args.datasets and dataset_code not in args.datasets: log.debug( "Skipping dataset %r because it is not mentioned by --datasets option", dataset_code, ) continue dataset_url = leaf_element.findtext('./nt:downloadLink[@format="sdmx"]', namespaces=nsmap) if dataset_url: datasets.add((dataset_code, dataset_url)) datastructure_url = leaf_element.findtext('./nt:metadata[@format="sdmx"]', namespaces=nsmap) if datastructure_url: datastructure_code = datastructure_url.rsplit("/", 1)[-1].split(".", 1)[0] datastructures.add((datastructure_code, datastructure_url)) log.info("Downloading {} datasets and {} datastructures.".format(len(datasets), len(datastructures))) # Fetch datasets. for index, (dataset_code, dataset_url) in enumerate(sorted(datasets), start=1): dataset_dir = data_dir / dataset_code dataset_xml_file_path = dataset_dir / "{}.sdmx.xml".format(dataset_code) if args.resume and dataset_xml_file_path.is_file(): log.debug("Skipping existing dataset {}/{} {}".format(index, len(datasets), dataset_url)) else: if dataset_dir.is_dir(): shutil.rmtree(str(dataset_dir)) dataset_dir.mkdir(exist_ok=True) log.info("Fetching dataset {}/{} {}".format(index, len(datasets), dataset_url)) zip_file_path = dataset_dir / "{}.zip".format(dataset_code) download(dataset_url, zip_file_path) unzip(zip_file_path, dataset_dir) zip_file_path.unlink() # Fetch datasets definitions. for index, (datastructure_code, datastructure_url) in enumerate(sorted(datastructures), start=1): datastructure_dir = datastructures_dir / datastructure_code datastructure_xml_file_path = datastructure_dir / "{}.sdmx.xml".format(datastructure_code) # Handle particular case for rail_if_esms. if datastructure_code == "rail_if_esms": datastructure_xml_file_path = datastructure_dir / "{}.sdmx".format(datastructure_code) if args.resume and datastructure_xml_file_path.is_file(): log.debug("Skipping existing data structure {}/{} {}".format(index, len(datastructures), datastructure_url)) else: if datastructure_dir.is_dir(): shutil.rmtree(str(datastructure_dir)) datastructure_dir.mkdir(exist_ok=True) log.info("Fetching data structure {}/{} {}".format(index, len(datastructures), datastructure_url)) zip_file_path = datastructure_dir / "{}.zip".format(datastructure_code) download(datastructure_url, zip_file_path) unzip(zip_file_path, datastructure_dir) zip_file_path.unlink() return 0 def datetime_with_timezone(s: str) -> datetime: d = datetime.fromisoformat(s) if d.tzinfo is None: raise ValueError(f"Datetime must be provided with a timezone. Received {s!r}") return d def download(url, output_file: Path): if output_file.is_file(): output_file.unlink() return subprocess.run(["wget", "-c", "--no-verbose", url, "-O", str(output_file)]) def parse_table_of_contents(file: Path) -> Element: xml_parser = etree.XMLParser(remove_blank_text=True) toc_element = etree.parse(str(file), parser=xml_parser) return toc_element def iter_datasets_to_download(toc_element, from_datetime: Optional[datetime] = None): """Yield datasets elements for datasets to be downloaded. If from_datetime is provided, yield only the datasets that changed since that datetime.""" def iter_dataset_elements(toc_element): for element in toc_element.iterfind(".//nt:leaf", namespaces=nsmap): if element.attrib["type"] in ("dataset", "table"): yield element for leaf_element in iter_dataset_elements(toc_element): if from_datetime is None: yield leaf_element continue dataset_code = leaf_element.findtext("nt:code", namespaces=nsmap) assert dataset_code last_update_str = leaf_element.findtext("nt:lastUpdate", namespaces=nsmap) if not last_update_str: log.debug("Dataset %r has no lastUpdate attribute: it will be downloaded", dataset_code) yield leaf_element continue assert from_datetime is not None # Example: <nt:lastUpdate>18.03.2020</nt:lastUpdate> last_update = datetime.strptime(last_update_str, "%d.%m.%Y").replace(tzinfo=timezone.utc) if last_update >= from_datetime: log.debug( "Dataset %r lastUpdate is more recent than %r: it will be downloaded", dataset_code, from_datetime ) yield leaf_element def unzip(zip_file: Path, output_dir: Path): return subprocess.run(["unzip", "-q", "-d", str(output_dir), str(zip_file)]) if __name__ == "__main__": sys.exit(main())