-
Christophe Benz authorede9d58d88
download.py 9.66 KiB
#! /usr/bin/env python3
# eurostat-fetcher -- Fetch series from Eurostat database
# By: Emmanuel Raviart <emmanuel.raviart@cepremap.org>
#
# Copyright (C) 2017 Cepremap
# https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher
#
# eurostat-fetcher is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# eurostat-fetcher is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Fetch series from Eurostat, the statistical office of the European Union, using bulk download and SDMX formats.
http://ec.europa.eu/eurostat/data/database
EUROSTAT bulk download:
- http://ec.europa.eu/eurostat/fr/data/bulkdownload
- http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing
EUROSTAT SDMX documentation:
- http://ec.europa.eu/eurostat/web/sdmx-infospace/welcome
- http://ec.europa.eu/eurostat/web/sdmx-web-services/rest-sdmx-2.1
"""
import argparse
import io
import logging
import re
import shutil
import subprocess
import sys
import zipfile
from pathlib import Path
import requests
from dulwich.repo import Repo
from lxml import etree
import dbnomics_git_storage as git_storage
log = logging.getLogger(__name__)
nsmap = dict(
nt='urn:eu.europa.ec.eurostat.navtree',
)
prepared_element_re = re.compile('<Prepared>.+</Prepared>')
def iter_datasets_to_download(xml_element, old_xml_element=None):
"""Yield datasets. If old_xml_element is provided, yield only updated datasets."""
old_last_update_by_dataset_code = {}
if old_xml_element is not None:
# Index lastUpdate attributes in old table_of_contents.xml.
for element in old_xml_element.iterfind('.//nt:leaf[@type="dataset"]', namespaces=nsmap):
dataset_code = element.findtext("nt:code", namespaces=nsmap)
old_last_update = element.findtext("nt:lastUpdate", namespaces=nsmap)
old_last_update_by_dataset_code[dataset_code] = old_last_update
for leaf_element in xml_element.iterfind('.//nt:leaf[@type="dataset"]', namespaces=nsmap):
if old_xml_element is None:
yield leaf_element
else:
dataset_code = leaf_element.findtext('nt:code', namespaces=nsmap)
old_last_update = old_last_update_by_dataset_code.get(dataset_code)
if old_last_update is None:
# This leaf_element is new in this version of table_of_contents.xml
yield leaf_element
else:
last_update = leaf_element.findtext("nt:lastUpdate", namespaces=nsmap)
if last_update != old_last_update:
yield leaf_element
def main():
parser = argparse.ArgumentParser()
parser.add_argument('target_dir', type=Path, help='path of target directory containing Eurostat series')
parser.add_argument('--full', action='store_true',
help='download all datasets; default behavior is to download what changed since last commit')
parser.add_argument('--resume', action='store_true', help='keep existing files in target directory')
parser.add_argument('-v', '--verbose', action='store_true', default=False, help='display debug logging messages')
args = parser.parse_args()
logging.basicConfig(
format="%(levelname)s:%(asctime)s:%(message)s",
level=logging.DEBUG if args.verbose else logging.INFO,
)
assert args.target_dir.is_dir()
log.info("Mode: {}".format("full" if args.full else "incremental"))
old_xml_element = None
table_of_contents_file_name = 'table_of_contents.xml'
if not args.full:
repo = Repo(str(args.target_dir))
tree = git_storage.get_latest_commit_tree(repo)
if tree is None:
log.error("Incremental mode can't be used when source data repository has no commit.")
table_of_contents_bytes = git_storage.load_text_blob(repo, tree, table_of_contents_file_name, decode=False)
old_xml_element = etree.fromstring(table_of_contents_bytes)
# Fetch list of datasets.
xml_file_path = args.target_dir / table_of_contents_file_name
parser = etree.XMLParser(remove_blank_text=True)
if args.resume and args.full and xml_file_path.is_file():
log.info("Skipping existing file {}".format(table_of_contents_file_name))
xml_element = etree.parse(str(xml_file_path), parser=parser)
else:
xml_url = 'http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing?file={}'.format(
table_of_contents_file_name)
log.info('Fetching table of content {}'.format(xml_url))
response = requests.get(xml_url)
xml_element = etree.fromstring(response.content, parser=parser)
with open(str(xml_file_path), 'wb') as xml_file:
etree.ElementTree(xml_element).write(xml_file, encoding='utf-8', pretty_print=True, xml_declaration=True)
# Create first-level directories.
data_dir = args.target_dir / 'data'
data_dir.mkdir(exist_ok=True)
datastructures_dir = args.target_dir / 'datastructure'
datastructures_dir.mkdir(exist_ok=True)
# Build set of datasets and set of datasets definitions to download.
datasets = set()
datastructures = set()
for leaf_element in iter_datasets_to_download(xml_element, old_xml_element):
dataset_code = leaf_element.findtext('./nt:code', namespaces=nsmap)
dataset_url = leaf_element.findtext('./nt:downloadLink[@format="sdmx"]', namespaces=nsmap)
if dataset_url:
datasets.add((dataset_code, dataset_url))
datastructure_url = leaf_element.findtext('./nt:metadata[@format="sdmx"]', namespaces=nsmap)
if datastructure_url:
datastructures.add(datastructure_url)
log.info("Downloading {} datasets and {} datastructures.".format(len(datasets), len(datastructures)))
# Delete directories of datasets and datasets definitions to download.
if not args.resume:
for dataset_code, dataset_url in datasets:
dataset_dir = data_dir / dataset_code
if dataset_dir.is_dir():
shutil.rmtree(str(dataset_dir))
for datastructure_url in datastructures:
datastructure_code = datastructure_url.rsplit('/', 1)[-1].split('.', 1)[0]
datastructure_dir = datastructures_dir / datastructure_code
if datastructure_dir.is_dir():
shutil.rmtree(str(datastructure_dir))
# Fetch datasets.
for index, (dataset_code, dataset_url) in enumerate(sorted(datasets), start=1):
dataset_dir = data_dir / dataset_code
if args.resume and dataset_dir.is_dir() and list(dataset_dir.iterdir()):
log.info('Skipping existing dataset {}/{} {}'.format(index, len(datasets), dataset_url))
else:
dataset_dir.mkdir(exist_ok=True)
log.info('Fetching dataset {}/{} {}'.format(index, len(datasets), dataset_url))
response = requests.get(dataset_url)
response.raise_for_status()
data_zip_file = zipfile.ZipFile(io.BytesIO(response.content))
for data_zip_info in data_zip_file.infolist():
if data_zip_info.filename.endswith('.xml'):
with data_zip_file.open(data_zip_info) as data_file:
xml_file_path = dataset_dir / data_zip_info.filename
write_normalized_xml_file(xml_file_path, data_file)
else:
data_zip_file.extract(data_zip_info, str(dataset_dir))
# Fetch datasets definitions.
for index, datastructure_url in enumerate(sorted(datastructures), start=1):
datastructure_code = datastructure_url.rsplit('/', 1)[-1].split('.', 1)[0]
datastructure_dir = datastructures_dir / datastructure_code
if args.resume and datastructure_dir.is_dir() and list(dataset_dir.iterdir()):
log.info('Skipping existing data structure {}/{} {}'.format(index, len(datastructures), datastructure_url))
else:
datastructure_dir.mkdir(exist_ok=True)
log.info('Fetching data structure {}/{} {}'.format(index, len(datastructures), datastructure_url))
response = requests.get(datastructure_url)
response.raise_for_status()
metadata_zip_file = zipfile.ZipFile(io.BytesIO(response.content))
for metadata_zip_info in metadata_zip_file.infolist():
if metadata_zip_info.filename.endswith('.xml'):
with metadata_zip_file.open(metadata_zip_info) as metadata_file:
xml_file_path = datastructure_dir / metadata_zip_info.filename
write_normalized_xml_file(xml_file_path, metadata_file)
else:
metadata_zip_file.extract(metadata_zip_info, str(datastructure_dir))
return 0
def write_normalized_xml_file(xml_file_path, source_file):
"""Normalize data that changes at each download, like today date,
in order to avoid triggering a false commit in source data.
Use regexes because lxml raises SerialisationError with too large files.
"""
global prepared_element_re
xml_str = source_file.read().decode('utf-8')
with open(str(xml_file_path), mode="w") as xml_file:
xml_file.write(prepared_element_re.sub("<Prepared>1111-11-11T11:11:11</Prepared>", xml_str, 1))
if __name__ == '__main__':
sys.exit(main())