Newer
Older
#! /usr/bin/env python3
# eurostat-fetcher -- Fetch series from Eurostat database
# By: Emmanuel Raviart <emmanuel.raviart@cepremap.org>
#
# Copyright (C) 2017 Cepremap
# https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher
#
# eurostat-fetcher is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# eurostat-fetcher is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Fetch series from Eurostat, the statistical office of the European Union, using bulk download and SDMX formats.
http://ec.europa.eu/eurostat/data/database
EUROSTAT bulk download:
- http://ec.europa.eu/eurostat/fr/data/bulkdownload
- http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing
EUROSTAT SDMX documentation:
- http://ec.europa.eu/eurostat/web/sdmx-infospace/welcome
- http://ec.europa.eu/eurostat/web/sdmx-web-services/rest-sdmx-2.1
"""
import argparse
import io
import shutil
import subprocess
import sys
import zipfile
from dulwich.errors import NotGitRepository
from dulwich.repo import Repo
from dbnomics_data_model.storages import StorageError
from dbnomics_data_model.storages import git as git_storage
log = logging.getLogger(__name__)
prepared_element_re = re.compile('<Prepared>.+</Prepared>')
def iter_datasets_to_download(xml_element, old_xml_element=None):
"""Yield datasets. If old_xml_element is provided, yield only updated datasets."""
old_last_update_by_dataset_code = {}
if old_xml_element is not None:
# Index lastUpdate attributes in old table_of_contents.xml.
for element in old_xml_element.iterfind('.//nt:leaf[@type="dataset"]', namespaces=nsmap):
dataset_code = element.findtext("nt:code", namespaces=nsmap)
old_last_update = element.findtext("nt:lastUpdate", namespaces=nsmap)
old_last_update_by_dataset_code[dataset_code] = old_last_update
for leaf_element in xml_element.iterfind('.//nt:leaf[@type="dataset"]', namespaces=nsmap):
if old_xml_element is None:
yield leaf_element
else:
dataset_code = leaf_element.findtext('nt:code', namespaces=nsmap)
old_last_update = old_last_update_by_dataset_code.get(dataset_code)
if old_last_update is None:
# This leaf_element is new in this version of table_of_contents.xml
yield leaf_element
else:
last_update = leaf_element.findtext("nt:lastUpdate", namespaces=nsmap)
if last_update != old_last_update:
yield leaf_element
parser.add_argument('target_dir', type=Path, help='path of target directory containing Eurostat series')
parser.add_argument('--full', action='store_true',
help='download all datasets; default behavior is to download what changed since last commit')
parser.add_argument('--resume', action='store_true', help='keep existing files in target directory')
parser.add_argument('-v', '--verbose', action='store_true', default=False, help='display debug logging messages')
logging.basicConfig(
format="%(levelname)s:%(asctime)s:%(message)s",
level=logging.DEBUG if args.verbose else logging.INFO,
)
parser = etree.XMLParser(remove_blank_text=True)
log.info("Mode: {}".format("full" if args.full else "incremental"))
table_of_contents_file_name = 'table_of_contents.xml'
old_xml_element = None
if not args.full:
try:
repo = Repo(str(args.target_dir))
except NotGitRepository:
log.error("%s is not a Git repository. Use --full option to download all the files of the provider.")
sys.exit(1)
try:
tree = git_storage.get_commit_tree(repo, "HEAD", args.target_dir)
except StorageError:
log.error("%s is not a Git repository. Use --full option to download all the files of the provider.")
sys.exit(1)
table_of_contents_bytes = git_storage.load_text_blob(repo, tree, table_of_contents_file_name, decode=False)
old_xml_element = etree.fromstring(table_of_contents_bytes)
xml_file_path = args.target_dir / table_of_contents_file_name
xml_url = 'http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing?file={}'.format(
table_of_contents_file_name)
log.info('Fetching table of content {}'.format(xml_url))
response = requests.get(xml_url)
xml_element = etree.fromstring(response.content, parser=parser)
with open(str(xml_file_path), 'wb') as xml_file:
etree.ElementTree(xml_element).write(xml_file, encoding='utf-8', pretty_print=True, xml_declaration=True)
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# Create first-level directories.
data_dir = args.target_dir / 'data'
data_dir.mkdir(exist_ok=True)
datastructures_dir = args.target_dir / 'datastructure'
datastructures_dir.mkdir(exist_ok=True)
# Build set of datasets and set of datasets definitions to download.
datasets = set()
datastructures = set()
for leaf_element in iter_datasets_to_download(xml_element, old_xml_element):
dataset_code = leaf_element.findtext('./nt:code', namespaces=nsmap)
dataset_url = leaf_element.findtext('./nt:downloadLink[@format="sdmx"]', namespaces=nsmap)
if dataset_url:
datasets.add((dataset_code, dataset_url))
datastructure_url = leaf_element.findtext('./nt:metadata[@format="sdmx"]', namespaces=nsmap)
if datastructure_url:
datastructures.add(datastructure_url)
log.info("Downloading {} datasets and {} datastructures.".format(len(datasets), len(datastructures)))
# Delete directories of datasets and datasets definitions to download.
if not args.resume:
for dataset_code, dataset_url in datasets:
dataset_dir = data_dir / dataset_code
shutil.rmtree(str(dataset_dir))
for datastructure_url in datastructures:
datastructure_code = datastructure_url.rsplit('/', 1)[-1].split('.', 1)[0]
datastructure_dir = datastructures_dir / datastructure_code
for index, (dataset_code, dataset_url) in enumerate(sorted(datasets), start=1):
dataset_dir = data_dir / dataset_code
if args.resume and dataset_dir.is_dir() and list(dataset_dir.iterdir()):
log.info('Skipping existing dataset {}/{} {}'.format(index, len(datasets), dataset_url))
dataset_dir.mkdir(exist_ok=True)
log.info('Fetching dataset {}/{} {}'.format(index, len(datasets), dataset_url))
response = requests.get(dataset_url)
response.raise_for_status()
data_zip_file = zipfile.ZipFile(io.BytesIO(response.content))
for data_zip_info in data_zip_file.infolist():
if data_zip_info.filename.endswith('.xml'):
with data_zip_file.open(data_zip_info) as data_file:
xml_file_path = dataset_dir / data_zip_info.filename
data_zip_file.extract(data_zip_info, str(dataset_dir))
# Fetch datasets definitions.
for index, datastructure_url in enumerate(sorted(datastructures), start=1):
datastructure_code = datastructure_url.rsplit('/', 1)[-1].split('.', 1)[0]
datastructure_dir = datastructures_dir / datastructure_code
if args.resume and datastructure_dir.is_dir() and list(dataset_dir.iterdir()):
log.info('Skipping existing data structure {}/{} {}'.format(index, len(datastructures), datastructure_url))
datastructure_dir.mkdir(exist_ok=True)
log.info('Fetching data structure {}/{} {}'.format(index, len(datastructures), datastructure_url))
response = requests.get(datastructure_url)
response.raise_for_status()
metadata_zip_file = zipfile.ZipFile(io.BytesIO(response.content))
for metadata_zip_info in metadata_zip_file.infolist():
if metadata_zip_info.filename.endswith('.xml'):
with metadata_zip_file.open(metadata_zip_info) as metadata_file:
xml_file_path = datastructure_dir / metadata_zip_info.filename
write_normalized_xml_file(xml_file_path, metadata_file)
metadata_zip_file.extract(metadata_zip_info, str(datastructure_dir))
def write_normalized_xml_file(xml_file_path, source_file):
"""Normalize data that changes at each download, like today date,
in order to avoid triggering a false commit in source data.
Use regexes because lxml raises SerialisationError with too large files.
xml_str = source_file.read().decode('utf-8')
with open(str(xml_file_path), mode="w") as xml_file:
xml_file.write(prepared_element_re.sub("<Prepared>1111-11-11T11:11:11</Prepared>", xml_str, 1))
if __name__ == '__main__':
sys.exit(main())