#! /usr/bin/env python3 # eurostat-fetcher -- Fetch series from Eurostat database # By: Emmanuel Raviart <emmanuel.raviart@cepremap.org> # # Copyright (C) 2017-2018 Cepremap # https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher # # eurostat-fetcher is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # eurostat-fetcher is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. """Fetch series from Eurostat, the statistical office of the European Union, using bulk download and SDMX formats. http://ec.europa.eu/eurostat/data/database EUROSTAT bulk download: - http://ec.europa.eu/eurostat/fr/data/bulkdownload - http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing EUROSTAT SDMX documentation: - http://ec.europa.eu/eurostat/web/sdmx-infospace/welcome - http://ec.europa.eu/eurostat/web/sdmx-web-services/rest-sdmx-2.1 """ import argparse import logging import shutil import subprocess import sys from pathlib import Path from dulwich.errors import NotGitRepository from dulwich.repo import Repo from lxml import etree from dbnomics_data_model.storages import StorageError from dbnomics_data_model.storages import git as git_storage log = logging.getLogger(__name__) nsmap = dict( nt='urn:eu.europa.ec.eurostat.navtree', ) def iter_datasets_to_download(toc_element, old_toc_element=None): """Yield datasets. If old_toc_element is provided, yield only updated datasets.""" def iterate_datasets_elements(toc_element): for element in toc_element.iterfind('.//nt:leaf', namespaces=nsmap): if element.attrib["type"] in ("dataset", "table"): # This is a dataset yield element old_last_update_by_dataset_code = {} if old_toc_element is not None: # Index lastUpdate attributes in old table_of_contents.xml. for element in iterate_datasets_elements(old_toc_element): dataset_code = element.findtext("nt:code", namespaces=nsmap) old_last_update = element.findtext("nt:lastUpdate", namespaces=nsmap) old_last_update_by_dataset_code[dataset_code] = old_last_update for leaf_element in iterate_datasets_elements(toc_element): if old_toc_element is None: yield leaf_element else: dataset_code = leaf_element.findtext('nt:code', namespaces=nsmap) old_last_update = old_last_update_by_dataset_code.get(dataset_code) if old_last_update is None: # This leaf_element is newer in this version of table_of_contents.xml yield leaf_element else: last_update = leaf_element.findtext("nt:lastUpdate", namespaces=nsmap) if last_update != old_last_update: yield leaf_element def main(): parser = argparse.ArgumentParser() parser.add_argument('target_dir', type=Path, help='path of target directory containing Eurostat series') parser.add_argument('--full', action='store_true', help='download all datasets; default behavior is to download what changed since last commit') parser.add_argument('--log', default='INFO', help='level of logging messages') parser.add_argument('--resume', action='store_true', help='keep existing files in target directory') parser.add_argument('--datasets', nargs='+', metavar='DATASET_CODE', help='convert only the given datasets (datasets codes, space separated)') args = parser.parse_args() if not args.target_dir.is_dir(): parser.error("Could not find directory {!r}".format(str(args.target_dir))) numeric_level = getattr(logging, args.log.upper(), None) if not isinstance(numeric_level, int): raise ValueError('Invalid log level: {}'.format(args.log)) logging.basicConfig(format="%(levelname)s:%(message)s", level=numeric_level) parser = etree.XMLParser(remove_blank_text=True) log.info("Mode: %s", "full" if args.full else "incremental") # Read old table of contents. table_of_contents_file_name = 'table_of_contents.xml' old_toc_element = None if not (args.full or args.datasets): # let's check that we can access to Git history (to compare table_of_contents.xml versions) try: repo = Repo(str(args.target_dir)) except NotGitRepository: log.error("%s is not a Git repository. Use --full option to download all the files of the provider.", args.target_dir) sys.exit(1) try: tree = git_storage.get_commit_tree(repo, "HEAD", args.target_dir) except StorageError: log.error("%s is not a Git repository. Use --full option to download all the files of the provider.", args.target_dir) sys.exit(1) table_of_contents_bytes = git_storage.load_text_blob(repo, tree, table_of_contents_file_name, decode=False) old_toc_element = etree.fromstring(table_of_contents_bytes) # Fetch new table of contents, abbreviated "toc". xml_file_path = args.target_dir / table_of_contents_file_name xml_url = 'http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing?file={}'.format( table_of_contents_file_name) log.info('Fetching table of content {}'.format(xml_url)) download(xml_url, xml_file_path) toc_element = etree.parse(str(xml_file_path), parser=parser) # Create first-level directories. data_dir = args.target_dir / 'data' data_dir.mkdir(exist_ok=True) datastructures_dir = args.target_dir / 'datastructure' datastructures_dir.mkdir(exist_ok=True) # Build set of datasets and set of datasets definitions to download. datasets = set() datastructures = set() for leaf_element in iter_datasets_to_download(toc_element, old_toc_element): dataset_code = leaf_element.findtext('./nt:code', namespaces=nsmap) if args.datasets and dataset_code not in args.datasets: log.debug("Skipping dataset %r because it is not mentioned by --datasets option", dataset_code) continue dataset_url = leaf_element.findtext('./nt:downloadLink[@format="sdmx"]', namespaces=nsmap) if dataset_url: datasets.add((dataset_code, dataset_url)) datastructure_url = leaf_element.findtext('./nt:metadata[@format="sdmx"]', namespaces=nsmap) if datastructure_url: datastructure_code = datastructure_url.rsplit('/', 1)[-1].split('.', 1)[0] datastructures.add((datastructure_code, datastructure_url)) log.info("Downloading {} datasets and {} datastructures.".format(len(datasets), len(datastructures))) # Fetch datasets. for index, (dataset_code, dataset_url) in enumerate(sorted(datasets), start=1): dataset_dir = data_dir / dataset_code dataset_xml_file_path = dataset_dir / "{}.sdmx.xml".format(dataset_code) if args.resume and dataset_xml_file_path.is_file(): log.info('Skipping existing dataset {}/{} {}'.format(index, len(datasets), dataset_url)) else: if dataset_dir.is_dir(): shutil.rmtree(str(dataset_dir)) dataset_dir.mkdir(exist_ok=True) log.info('Fetching dataset {}/{} {}'.format(index, len(datasets), dataset_url)) zip_file_path = dataset_dir / "{}.zip".format(dataset_code) download(dataset_url, zip_file_path) unzip(zip_file_path, dataset_dir) zip_file_path.unlink() # Fetch datasets definitions. for index, (datastructure_code, datastructure_url) in enumerate(sorted(datastructures), start=1): datastructure_dir = datastructures_dir / datastructure_code datastructure_xml_file_path = datastructure_dir / "{}.sdmx.xml".format(datastructure_code) # Handle particular case for rail_if_esms. if datastructure_code == "rail_if_esms": datastructure_xml_file_path = datastructure_dir / "{}.sdmx".format(datastructure_code) if args.resume and datastructure_xml_file_path.is_file(): log.info('Skipping existing data structure {}/{} {}'.format(index, len(datastructures), datastructure_url)) else: if datastructure_dir.is_dir(): shutil.rmtree(str(datastructure_dir)) datastructure_dir.mkdir(exist_ok=True) log.info('Fetching data structure {}/{} {}'.format(index, len(datastructures), datastructure_url)) zip_file_path = datastructure_dir / "{}.zip".format(datastructure_code) download(datastructure_url, zip_file_path) unzip(zip_file_path, datastructure_dir) zip_file_path.unlink() return 0 def download(url, output_file: Path): if output_file.is_file(): output_file.unlink() return subprocess.run(["wget", "-c", "--no-verbose", url, "-O", str(output_file)]) def unzip(zip_file: Path, output_dir: Path): return subprocess.run(["unzip", "-d", str(output_dir), str(zip_file)]) if __name__ == '__main__': sys.exit(main())