#! /usr/bin/env python3 # eurostat-fetcher -- Fetch series from Eurostat database # By: Emmanuel Raviart <emmanuel.raviart@cepremap.org> # # Copyright (C) 2017 Cepremap # https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher # # eurostat-fetcher is free software; you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # eurostat-fetcher is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. """Fetch series from Eurostat, the statistical office of the European Union, using bulk download and SDMX formats. http://ec.europa.eu/eurostat/data/database EUROSTAT bulk download: - http://ec.europa.eu/eurostat/fr/data/bulkdownload - http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing EUROSTAT SDMX documentation: - http://ec.europa.eu/eurostat/web/sdmx-infospace/welcome - http://ec.europa.eu/eurostat/web/sdmx-web-services/rest-sdmx-2.1 """ import argparse import io import logging import re import shutil import subprocess import sys import zipfile from pathlib import Path from dulwich.errors import NotGitRepository from dulwich.repo import Repo from lxml import etree from dbnomics_data_model.storages import StorageError from dbnomics_data_model.storages import git as git_storage log = logging.getLogger(__name__) nsmap = dict( nt='urn:eu.europa.ec.eurostat.navtree', ) def iter_datasets_to_download(toc_element, old_toc_element=None): """Yield datasets. If old_toc_element is provided, yield only updated datasets.""" old_last_update_by_dataset_code = {} if old_toc_element is not None: # Index lastUpdate attributes in old table_of_contents.xml. for element in old_toc_element.iterfind('.//nt:leaf[@type="dataset"]', namespaces=nsmap): dataset_code = element.findtext("nt:code", namespaces=nsmap) old_last_update = element.findtext("nt:lastUpdate", namespaces=nsmap) old_last_update_by_dataset_code[dataset_code] = old_last_update for leaf_element in toc_element.iterfind('.//nt:leaf[@type="dataset"]', namespaces=nsmap): if old_toc_element is None: yield leaf_element else: dataset_code = leaf_element.findtext('nt:code', namespaces=nsmap) old_last_update = old_last_update_by_dataset_code.get(dataset_code) if old_last_update is None: # This leaf_element is newer in this version of table_of_contents.xml yield leaf_element else: last_update = leaf_element.findtext("nt:lastUpdate", namespaces=nsmap) if last_update != old_last_update: yield leaf_element def main(): parser = argparse.ArgumentParser() parser.add_argument('target_dir', type=Path, help='path of target directory containing Eurostat series') parser.add_argument('--full', action='store_true', help='download all datasets; default behavior is to download what changed since last commit') parser.add_argument('--resume', action='store_true', help='keep existing files in target directory') parser.add_argument('-v', '--verbose', action='store_true', default=False, help='display debug logging messages') args = parser.parse_args() logging.basicConfig( format="%(levelname)s:%(asctime)s:%(message)s", level=logging.DEBUG if args.verbose else logging.INFO, ) assert args.target_dir.is_dir() parser = etree.XMLParser(remove_blank_text=True) log.info("Mode: %s", "full" if args.full else "incremental") # Read old table of contents. table_of_contents_file_name = 'table_of_contents.xml' old_toc_element = None if not args.full: try: repo = Repo(str(args.target_dir)) except NotGitRepository: log.error("%s is not a Git repository. Use --full option to download all the files of the provider.") sys.exit(1) try: tree = git_storage.get_commit_tree(repo, "HEAD", args.target_dir) except StorageError: log.error("%s is not a Git repository. Use --full option to download all the files of the provider.", args.target_dir) sys.exit(1) table_of_contents_bytes = git_storage.load_text_blob(repo, tree, table_of_contents_file_name, decode=False) old_toc_element = etree.fromstring(table_of_contents_bytes) # Fetch new table of contents, abbreviated "toc". xml_file_path = args.target_dir / table_of_contents_file_name xml_url = 'http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing?file={}'.format( table_of_contents_file_name) log.info('Fetching table of content {}'.format(xml_url)) download(xml_url, xml_file_path) toc_element = etree.parse(str(xml_file_path), parser=parser) # Create first-level directories. data_dir = args.target_dir / 'data' data_dir.mkdir(exist_ok=True) datastructures_dir = args.target_dir / 'datastructure' datastructures_dir.mkdir(exist_ok=True) # Build set of datasets and set of datasets definitions to download. datasets = set() datastructures = set() for leaf_element in iter_datasets_to_download(toc_element, old_toc_element): dataset_url = leaf_element.findtext('./nt:downloadLink[@format="sdmx"]', namespaces=nsmap) if dataset_url: dataset_code = leaf_element.findtext('./nt:code', namespaces=nsmap) datasets.add((dataset_code, dataset_url)) datastructure_url = leaf_element.findtext('./nt:metadata[@format="sdmx"]', namespaces=nsmap) if datastructure_url: datastructure_code = datastructure_url.rsplit('/', 1)[-1].split('.', 1)[0] datastructures.add((datastructure_code, datastructure_url)) log.info("Downloading {} datasets and {} datastructures.".format(len(datasets), len(datastructures))) # Fetch datasets. for index, (dataset_code, dataset_url) in enumerate(sorted(datasets), start=1): dataset_dir = data_dir / dataset_code dataset_xml_file_path = dataset_dir / "{}.sdmx.xml".format(dataset_code) if args.resume and dataset_xml_file_path.is_file(): log.info('Skipping existing dataset {}/{} {}'.format(index, len(datasets), dataset_url)) else: if dataset_dir.is_dir(): shutil.rmtree(str(dataset_dir)) dataset_dir.mkdir(exist_ok=True) log.info('Fetching dataset {}/{} {}'.format(index, len(datasets), dataset_url)) zip_file_path = dataset_dir / "{}.zip".format(dataset_code) download(dataset_url, zip_file_path) unzip(zip_file_path, dataset_dir) zip_file_path.unlink() # normalize_xml_file(dataset_xml_file_path) # Fetch datasets definitions. for index, (datastructure_code, datastructure_url) in enumerate(sorted(datastructures), start=1): datastructure_dir = datastructures_dir / datastructure_code datastructure_xml_file_path = datastructure_dir / "{}.sdmx.xml".format(datastructure_code) # Handle particular case for rail_if_esms. if datastructure_code == "rail_if_esms": datastructure_xml_file_path = datastructure_dir / "{}.sdmx".format(datastructure_code) if args.resume and datastructure_xml_file_path.is_file(): log.info('Skipping existing data structure {}/{} {}'.format(index, len(datastructures), datastructure_url)) else: if datastructure_dir.is_dir(): shutil.rmtree(str(datastructure_dir)) datastructure_dir.mkdir(exist_ok=True) log.info('Fetching data structure {}/{} {}'.format(index, len(datastructures), datastructure_url)) zip_file_path = datastructure_dir / "{}.zip".format(datastructure_code) download(datastructure_url, zip_file_path) unzip(zip_file_path, datastructure_dir) zip_file_path.unlink() # normalize_xml_file(datastructure_xml_file_path) return 0 def download(url, output_file: Path): if output_file.is_file(): output_file.unlink() return subprocess.run(["wget", "-c", "--no-verbose", url, "-O", str(output_file)]) def normalize_xml_file(xml_file: Path): """Normalize data changing at each download, like today date, in order to avoid triggering a false commit in source data. Don't use lxml, which raises SerialisationError with too large files (e.g. ef_m_farmang). """ return replace_inplace(xml_file, "s#<Prepared>.+</Prepared>#<Prepared>1111-11-11T11:11:11</Prepared>#") def replace_inplace(f: Path, pattern): # From https://stackoverflow.com/questions/39086/search-and-replace-a-line-in-a-file-in-python return subprocess.run(["sed", "-i", "--", pattern, str(f)]) def unzip(zip_file: Path, output_dir: Path): return subprocess.run(["unzip", "-d", str(output_dir), str(zip_file)]) if __name__ == '__main__': sys.exit(main())