Newer
Older
#! /usr/bin/env python3
# eurostat-fetcher -- Fetch series from Eurostat database
# By: Emmanuel Raviart <emmanuel.raviart@cepremap.org>
#
# https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher
#
# eurostat-fetcher is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# eurostat-fetcher is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Fetch series from Eurostat using bulk download and SDMX formats.
http://ec.europa.eu/eurostat/data/database
EUROSTAT bulk download:
- http://ec.europa.eu/eurostat/fr/data/bulkdownload
- http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing
EUROSTAT SDMX documentation:
- http://ec.europa.eu/eurostat/web/sdmx-infospace/welcome
- http://ec.europa.eu/eurostat/web/sdmx-web-services/rest-sdmx-2.1
"""
import argparse
Emmanuel Raviart
committed
import os
from datetime import datetime, timezone
from typing import Optional
from xml.etree.ElementTree import Element
log = logging.getLogger(__name__)
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"target_dir",
type=Path,
help="path of target directory containing Eurostat series",
)
parser.add_argument(
"--from-datetime",
type=datetime_with_timezone,
default=os.getenv("FROM_DATETIME"),
help="download only the datasets that changed since the provided datetime",
)
parser.add_argument("--log", default="INFO", help="level of logging messages")
parser.add_argument("--resume", action="store_true", help="keep existing files in target directory")
default=os.getenv("DATASETS"),
help="download only the given datasets (datasets codes, comma separated)",
if not args.target_dir.is_dir():
parser.error("Could not find directory {!r}".format(str(args.target_dir)))
numeric_level = getattr(logging, args.log.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError("Invalid log level: {}".format(args.log))
logging.basicConfig(format="%(levelname)s:%(message)s", level=numeric_level)
log.info("Command-line arguments: %r", args)
log.info(
"Argument --from-datetime was not provided: "
"the requested datasets will be downloaded without taking their update date into account"
)
else:
log.info(
"Argument --from-datetime=%r: only datasets that changed since that datetime will be downloaded",
if args.datasets is not None:
args.datasets = args.datasets.split(",")
toc_file_name = "table_of_contents.xml"
toc_file = args.target_dir / toc_file_name
if args.resume and toc_file.is_file():
log.debug("Skip downloading table of contents because of --resume and file already exists")
else:
toc_url = "http://ec.europa.eu/eurostat/estat-navtree-portlet-prod/BulkDownloadListing?file={}".format(
toc_file_name
)
log.info("Downloading table of content from %r", toc_url)
download(toc_url, toc_file)
toc_element = parse_table_of_contents(toc_file)
datastructures_dir.mkdir(exist_ok=True)
# Build set of datasets and set of datasets definitions to download.
datasets = set()
datastructures = set()
for leaf_element in iter_datasets_to_download(toc_element, args.from_datetime):
dataset_code = leaf_element.findtext("./nt:code", namespaces=nsmap)
if args.datasets and dataset_code not in args.datasets:
log.debug(
"Skipping dataset %r because it is not mentioned by --datasets option",
dataset_code,
)
dataset_url = leaf_element.findtext('./nt:downloadLink[@format="sdmx"]', namespaces=nsmap)
if dataset_url:
datasets.add((dataset_code, dataset_url))
datastructure_url = leaf_element.findtext('./nt:metadata[@format="sdmx"]', namespaces=nsmap)
datastructure_code = datastructure_url.rsplit("/", 1)[-1].split(".", 1)[0]
datastructures.add((datastructure_code, datastructure_url))
log.info("Downloading {} datasets and {} datastructures.".format(len(datasets), len(datastructures)))
for index, (dataset_code, dataset_url) in enumerate(sorted(datasets), start=1):
dataset_dir = data_dir / dataset_code
dataset_xml_file_path = dataset_dir / "{}.sdmx.xml".format(dataset_code)
if args.resume and dataset_xml_file_path.is_file():
log.debug("Skipping existing dataset {}/{} {}".format(index, len(datasets), dataset_url))
if dataset_dir.is_dir():
shutil.rmtree(str(dataset_dir))
log.info("Fetching dataset {}/{} {}".format(index, len(datasets), dataset_url))
zip_file_path = dataset_dir / "{}.zip".format(dataset_code)
download(dataset_url, zip_file_path)
unzip(zip_file_path, dataset_dir)
zip_file_path.unlink()
# Fetch datasets definitions.
for index, (datastructure_code, datastructure_url) in enumerate(sorted(datastructures), start=1):
datastructure_dir = datastructures_dir / datastructure_code
datastructure_xml_file_path = datastructure_dir / "{}.sdmx.xml".format(datastructure_code)
# Handle particular case for rail_if_esms.
if datastructure_code == "rail_if_esms":
datastructure_xml_file_path = datastructure_dir / "{}.sdmx".format(datastructure_code)
if args.resume and datastructure_xml_file_path.is_file():
log.debug("Skipping existing data structure {}/{} {}".format(index, len(datastructures), datastructure_url))
if datastructure_dir.is_dir():
shutil.rmtree(str(datastructure_dir))
log.info("Fetching data structure {}/{} {}".format(index, len(datastructures), datastructure_url))
zip_file_path = datastructure_dir / "{}.zip".format(datastructure_code)
download(datastructure_url, zip_file_path)
unzip(zip_file_path, datastructure_dir)
zip_file_path.unlink()
def datetime_with_timezone(s: str) -> datetime:
d = datetime.fromisoformat(s)
if d.tzinfo is None:
raise ValueError(f"Datetime must be provided with a timezone. Received {s!r}")
return d
def download(url, output_file: Path):
if output_file.is_file():
output_file.unlink()
return subprocess.run(["wget", "-c", "--no-verbose", url, "-O", str(output_file)])
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def parse_table_of_contents(file: Path) -> Element:
xml_parser = etree.XMLParser(remove_blank_text=True)
toc_element = etree.parse(str(file), parser=xml_parser)
return toc_element
def iter_datasets_to_download(toc_element, from_datetime: Optional[datetime] = None):
"""Yield datasets elements for datasets to be downloaded.
If from_datetime is provided, yield only the datasets that changed since that datetime."""
def iter_dataset_elements(toc_element):
for element in toc_element.iterfind(".//nt:leaf", namespaces=nsmap):
if element.attrib["type"] in ("dataset", "table"):
yield element
for leaf_element in iter_dataset_elements(toc_element):
if from_datetime is None:
yield leaf_element
continue
dataset_code = leaf_element.findtext("nt:code", namespaces=nsmap)
assert dataset_code
last_update_str = leaf_element.findtext("nt:lastUpdate", namespaces=nsmap)
if not last_update_str:
log.debug("Dataset %r has no lastUpdate attribute: it will be downloaded", dataset_code)
yield leaf_element
continue
assert from_datetime is not None
# Example: <nt:lastUpdate>18.03.2020</nt:lastUpdate>
last_update = datetime.strptime(last_update_str, "%d.%m.%Y").replace(tzinfo=timezone.utc)
if last_update >= from_datetime:
log.debug(
"Dataset %r lastUpdate is more recent than %r: it will be downloaded",
dataset_code,
from_datetime.isoformat(),
def unzip(zip_file: Path, output_dir: Path):
return subprocess.run(["unzip", "-q", "-d", str(output_dir), str(zip_file)])