Skip to content
Snippets Groups Projects
convert.py 19.3 KiB
Newer Older
Christophe Benz's avatar
Christophe Benz committed
#! /usr/bin/env python3


# eurostat-fetcher -- Fetch series from Eurostat database
Christophe Benz's avatar
Christophe Benz committed
# By: Christophe Benz <christophe.benz@cepremap.org>
Christophe Benz's avatar
Christophe Benz committed
#
Christophe Benz's avatar
Christophe Benz committed
# Copyright (C) 2017-2018 Cepremap
Christophe Benz's avatar
Christophe Benz committed
# https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher
#
# eurostat-fetcher is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# eurostat-fetcher is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
Christophe Benz's avatar
Christophe Benz committed
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""Convert Eurostat provider, categories, datasets and time series to DBnomics JSON and TSV files."""
Christophe Benz's avatar
Christophe Benz committed


import argparse
import logging
Christophe Benz's avatar
Christophe Benz committed
import os
Christophe Benz's avatar
Christophe Benz committed
import re
Christophe Benz's avatar
Christophe Benz committed
import sys
Christophe Benz's avatar
Christophe Benz committed
from collections import OrderedDict
from pathlib import Path
from typing import Dict, Iterator, List, Set, Tuple
import humanize
import ujson as json
Christophe Benz's avatar
Christophe Benz committed
from dbnomics_data_model import observations
Christophe Benz's avatar
Christophe Benz committed
from lxml import etree
from toolz import get_in, valmap
Christophe Benz's avatar
Christophe Benz committed
provider_code = "Eurostat"
Christophe Benz's avatar
Christophe Benz committed
provider_json = {
    "code": provider_code,
    "name": provider_code,
Christophe Benz's avatar
Christophe Benz committed
    "region": "EU",
Christophe Benz's avatar
Christophe Benz committed
    "terms_of_use": "http://ec.europa.eu/eurostat/about/policies/copyright",
    "website": "http://ec.europa.eu/eurostat/home",
}

datasets_dir_name = "data"
log = logging.getLogger(__name__)
namespace_url_by_name = {"xml": "http://www.w3.org/XML/1998/namespace"}
Christophe Benz's avatar
Christophe Benz committed
DAILY_PERIOD_RE = re.compile(r"(?P<year>\d{4})(?P<month>\d{2})(?P<day>\d{2})")
Christophe Benz's avatar
Christophe Benz committed
DATASETS_ENV_VAR = "DATASETS"
FULL_ENV_VAR = "FULL"
def convert_datasets(
    datasets_to_convert: List[Tuple[str, Path]], dataset_json_stubs: Dict[str, dict], target_dir: Path
):
    log.info("Converting %d datasets...", len(datasets_to_convert))

    converted_datasets_codes = set()
    for index, (dataset_code, source_dataset_dir) in enumerate(sorted(datasets_to_convert), start=1):
        if dataset_code in converted_datasets_codes:
            log.debug("Skipping dataset %r because it was already converted", dataset_code)
            continue

        sdmx_file = source_dataset_dir / "{}.sdmx.xml".format(dataset_code)

        log.info(
            "Converting SDMX source file %d/%d %s (%s)",
            index,
            len(datasets_to_convert),
            sdmx_file,
            humanize.naturalsize(sdmx_file.stat().st_size, gnu=True),
        )

        dataset_dir = target_dir / dataset_code
        dataset_dir.mkdir(exist_ok=True)

        dataset_json_stub = dataset_json_stubs[dataset_code]
        convert_sdmx_file(dataset_json_stub, sdmx_file, source_dataset_dir, dataset_dir)

        converted_datasets_codes.add(dataset_code)


def convert_sdmx_element(element, dataset_json, dataset_context, dsd_infos, series_jsonl_file):
Christophe Benz's avatar
Christophe Benz committed
    # Due to event=end, given to iterparse, we receive <Obs> then <Series> elements, in this order.

    if element.tag.endswith("Series"):

        # Ignore some specific XML element attributes corresponding to series SDMX attributes,
        # because series SDMX attributes do not exist in DBnomics.
Christophe Benz's avatar
Christophe Benz committed
        series_element_attributes = OrderedDict(
            [
                (attribute_key, attribute_value)
                for attribute_key, attribute_value in element.attrib.items()
                if attribute_key not in {"TIME_FORMAT"}  # Redundant with FREQ.
            ]
        )

        dimensions_codes_order = list(series_element_attributes.keys())
        if dataset_json["dimensions_codes_order"] is None:
            dataset_json["dimensions_codes_order"] = dimensions_codes_order
        else:
            # dimensions_codes_order must not change between series.
Christophe Benz's avatar
Christophe Benz committed
            assert dataset_json["dimensions_codes_order"] == dimensions_codes_order, (
                dataset_json["dimensions_codes_order"],
                dimensions_codes_order,
            )

        # Fill series dimensions labels in dataset.json.

        t0 = time.time()

        for dimension_code, dimension_value_code in series_element_attributes.items():
            if dimension_code not in dataset_json["dimensions_labels"]:
                dimension_label = dsd_infos["concepts"].get(dimension_code)
                if dimension_label and dimension_code not in dataset_json["dimensions_labels"]:
                    # Some dimensions labels are an empty string: e.g. bs_bs12_04.sdmx.xml
                    dataset_json["dimensions_labels"][dimension_code] = dimension_label
Christophe Benz's avatar
Christophe Benz committed
            if (
                dimension_code in dataset_json["dimensions_values_labels"]
                and dimension_value_code in dataset_json["dimensions_values_labels"][dimension_code]
            ):
                continue
            codelist_code = dsd_infos["codelist_by_concept"][dimension_code]
            dimension_value_label = get_in([codelist_code, dimension_value_code], dsd_infos["codelists"])
            if dimension_value_label:
Christophe Benz's avatar
Christophe Benz committed
                dataset_json["dimensions_values_labels"].setdefault(dimension_code, {})[
                    dimension_value_code
                ] = dimension_value_label
        timings["series_labels"] += time.time() - t0
        # Series code is not defined by provider: create it from dimensions values codes.
Christophe Benz's avatar
Christophe Benz committed
        series_code = ".".join(series_element_attributes[dimension_code] for dimension_code in dimensions_codes_order)
Christophe Benz's avatar
Christophe Benz committed

Christophe Benz's avatar
Christophe Benz committed
        # Write series JSON to file.
Christophe Benz's avatar
Christophe Benz committed

        observations_header = [["PERIOD", "VALUE"] + dsd_infos["attributes"]]
Christophe Benz's avatar
Christophe Benz committed
        frequency = series_element_attributes["FREQ"]
Christophe Benz's avatar
Christophe Benz committed
        observations_body = (
            list(iter_normalized_observations(dataset_context["current_series_observations"], frequency))
            if frequency == "D"
Christophe Benz's avatar
Christophe Benz committed
            else dataset_context["current_series_observations"]
Christophe Benz's avatar
Christophe Benz committed
        )
        series_json = {
            "code": series_code,
            "dimensions": [
                series_element_attributes[dimension_code]  # Every dimension MUST be defined for each series.
                for dimension_code in dimensions_codes_order
            ],
Christophe Benz's avatar
Christophe Benz committed
            "observations": observations_header + observations_body,
        json.dump(series_json, series_jsonl_file, ensure_ascii=False, sort_keys=True)
        series_jsonl_file.write("\n")
Christophe Benz's avatar
Christophe Benz committed
        timings["series_file"] += time.time() - t0

        # Reset context for next series.

        dataset_context["current_series_observations"] = []

    elif element.tag.endswith("Obs"):

        # Fill observations attributes labels in dataset.json.

        t0 = time.time()

        for attribute_code, attribute_value_code in element.attrib.items():
            # Ignore period and value observations XML attributes, because they don't need labels.
            if attribute_code in ["TIME_PERIOD", "OBS_VALUE"]:
                continue
            attribute_label = dsd_infos["concepts"].get(attribute_code)
            if attribute_label and attribute_code not in dataset_json["attributes_labels"]:
                dataset_json["attributes_labels"][attribute_code] = attribute_label
            # Some attributes values codes are multi-valued and concatenated into the same string.
Christophe Benz's avatar
Christophe Benz committed
            attribute_value_codes = (
                list(attribute_value_code) if attribute_code == "OBS_STATUS" else [attribute_value_code]
            )
            for attribute_value_code in attribute_value_codes:
Christophe Benz's avatar
Christophe Benz committed
                if (
                    attribute_code in dataset_json["attributes_values_labels"]
                    and attribute_value_code in dataset_json["attributes_values_labels"][attribute_code]
                ):
                    continue
                codelist_code = dsd_infos["codelist_by_concept"][attribute_code]
                attribute_value_label = get_in([codelist_code, attribute_value_code], dsd_infos["codelists"])
                if attribute_value_label:
Christophe Benz's avatar
Christophe Benz committed
                    dataset_json["attributes_values_labels"].setdefault(attribute_code, {})[
                        attribute_value_code
                    ] = attribute_value_label

        timings["observations_labels"] += time.time() - t0

        obs_value = observations.value_to_float(element.attrib.get("OBS_VALUE"))
Christophe Benz's avatar
Christophe Benz committed
        dataset_context["current_series_observations"].append(
            [
                element.attrib["TIME_PERIOD"],  # Will be normalized later, if needed.
                obs_value,
            ]
            + [element.attrib.get(attribute_name, "") for attribute_name in dsd_infos["attributes"]]
        )
    elif element.tag.endswith("Extracted"):

        dataset_json["updated_at"] = element.text + "Z"  # Assume the value is UTC time.

def convert_sdmx_file(dataset_json_stub, sdmx_file: Path, source_dataset_dir: Path, dataset_dir: Path):
Christophe Benz's avatar
Christophe Benz committed
    timings = {k: 0 for k in {"series_labels", "series_file", "observations_labels", "dsd_infos"}}
Christophe Benz's avatar
Christophe Benz committed

    assert dataset_json_stub.get("name"), dataset_json_stub
    assert dataset_dir.is_dir(), dataset_dir
    dataset_code = dataset_json_stub["code"]
    dsd_file_path = source_dataset_dir / "{}.dsd.xml".format(dataset_code)
    dsd_element = etree.parse(str(dsd_file_path)).getroot()

    # Initialize dataset.json data

    dataset_json = {
        "attributes_labels": {},  # Will be defined by each series.
        "attributes_values_labels": {},  # Will be defined by each series.
        "dimensions_codes_order": None,  # Will be defined by first series.
        "dimensions_labels": {},  # Will be defined by each series.
        "dimensions_values_labels": {},  # Will be defined by each series.
    dataset_json.update(dataset_json_stub)
    dsd_infos = {
        "attributes": [
            element.attrib["conceptRef"]
            for element in dsd_element.iterfind(".//{*}Attribute[@attachmentLevel='Observation']")
        ],
        "codelists": {
            element.attrib["id"]: {
                code_element.attrib["value"]: code_element.findtext(
Christophe Benz's avatar
Christophe Benz committed
                    "./{*}Description[@xml:lang='en']", namespaces=namespace_url_by_name
                )
                for code_element in element.iterfind("./{*}Code")
            }
Christophe Benz's avatar
Christophe Benz committed
            for element in dsd_element.iterfind(".//{*}CodeList")
        },
        "concepts": {
            element.attrib["id"]: element.findtext("./{*}Name[@xml:lang='en']", namespaces=namespace_url_by_name)
Christophe Benz's avatar
Christophe Benz committed
            for element in dsd_element.iterfind(".//{*}Concept")
        },
        "codelist_by_concept": {
            element.attrib["conceptRef"]: element.attrib["codelist"]
            for element in dsd_element.find(".//{*}Components")
            if "conceptRef" in element.attrib and "codelist" in element.attrib
        },
    }

    timings["dsd_infos"] += time.time() - t0
    with (dataset_dir / "series.jsonl").open("w") as series_jsonl_file:
        dataset_context = {
            "current_series_observations": [],
        }
        # Side-effects: mutate dataset_context, write files.
        context = etree.iterparse(str(sdmx_file), events=["end"])
        for event, element in context:
            convert_sdmx_element(element, dataset_json, dataset_context, dsd_infos, series_jsonl_file)
            if event == "end":
                # Inspired from fast_iter, cf https://www.ibm.com/developerworks/xml/library/x-hiperfparse/
                element.clear()
                while element.getprevious() is not None:
                    del element.getparent()[0]
                continue
        del context
        write_json_file(dataset_dir / "dataset.json", without_falsy_values(dataset_json))
    log.debug("timings: {} total: {:.3f}".format(valmap("{:.3f}".format, timings), sum(timings.values())))
def iter_child_directories(directory: Path) -> Iterator[Path]:
    """Iterate over child directories of a directory."""
    for child in directory.iterdir():
        if child.is_dir():
            yield child


def iter_datasets_to_convert(
    source_datasets_dir: Path, target_dir: Path, *, datasets, resume
) -> Iterator[Tuple[str, Path]]:
    for source_dataset_dir in sorted(iter_child_directories(source_datasets_dir)):
        dataset_code = source_dataset_dir.name

        if datasets and dataset_code not in datasets:
            log.debug(
                "Skipping dataset %r because it is not mentioned by --datasets option",
                dataset_code,
            )
            continue

        sdmx_file = source_dataset_dir / "{}.sdmx.xml".format(dataset_code)

        if not sdmx_file.is_file():
            log.error(
                "Skipping dataset %s because SDMX file %s is missing",
                dataset_code,
                str(sdmx_file),
            )
            continue

        dataset_dir = target_dir / dataset_code

        if resume and dataset_dir.is_dir():
            log.debug(
                "Skipping dataset %r because it already exists (due to --resume option)",
                dataset_code,
            )
            continue

        yield dataset_code, source_dataset_dir


Christophe Benz's avatar
Christophe Benz committed
def iter_normalized_observations(observations, frequency):
    for observation in observations:
        period = observation[0]
        if frequency == "D":
            period = normalize_period(period, frequency)
        yield [period] + observation[1:]


def normalize_period(s, frequency):
    if frequency == "D":
        m = DAILY_PERIOD_RE.match(s)
        if m:
Christophe Benz's avatar
Christophe Benz committed
            return "{}-{}-{}".format(m.group("year"), m.group("month"), m.group("day"))
def toc_to_category_tree(source_dir: Path, dataset_codes_to_convert: Set[str]):
    """Walk recursively table_of_contents.xml and return category_tree_json and dataset.json stubs."""
    # Parse "table_of_contents", abbreviated "toc".
    toc_element = etree.parse(str(source_dir / "table_of_contents.xml")).getroot()

    dataset_json_stubs = {}
    category_tree_json = toc_element_to_category_tree(toc_element, dataset_json_stubs, dataset_codes_to_convert)

    return category_tree_json, dataset_json_stubs


def toc_element_to_category_tree(xml_element, dataset_json_stubs, dataset_codes_to_convert: Set[str]):
Bruno Duyé's avatar
Bruno Duyé committed
    """Walk recursively xml_element (table_of_contents.xml) and return category_tree_json.
    Side-effects: fill dataset_json_stubs.
Bruno Duyé's avatar
Bruno Duyé committed
    """
Christophe Benz's avatar
Christophe Benz committed
    xml_element_tag = xml_element.tag[len("urn:eu.europa.ec.eurostat.navtree") + 2 :]
    if xml_element_tag == "tree":
Christophe Benz's avatar
Christophe Benz committed
        return list(
            filter(
                None,
                (
                    toc_element_to_category_tree(child_element, dataset_json_stubs, dataset_codes_to_convert)
                    for child_element in xml_element
                ),
Christophe Benz's avatar
Christophe Benz committed
            )
        )
    elif xml_element_tag == "branch":
Christophe Benz's avatar
Christophe Benz committed
        if code == "DS-1062396":
            # Remove "International trade in goods - detailed data" branch from category tree
            # as it doesn't contains any data here.
            # https://git.nomics.world/dbnomics-fetchers/management/issues/435
            return None
Christophe Benz's avatar
Christophe Benz committed
        children = list(
            filter(
                None,
                (
                    toc_element_to_category_tree(child_element, dataset_json_stubs, dataset_codes_to_convert)
Christophe Benz's avatar
Christophe Benz committed
                    for child_element in xml_element.iterfind("{*}children/*")
                ),
            )
        )
        return (
            without_falsy_values(
                {
                    "code": code,
                    "name": xml_element.findtext("{*}title[@language='en']"),
                    "children": children,
                }
            )
            if children
            else None
        )
    elif xml_element_tag == "leaf" and xml_element.attrib["type"] in (
        "dataset",
        "table",
    ):
Christophe Benz's avatar
Christophe Benz committed
        dataset_code = xml_element.findtext("{*}code")
        if dataset_code not in dataset_codes_to_convert:
            return None

Christophe Benz's avatar
Christophe Benz committed
        dataset_name = xml_element.findtext("{*}title[@language='en']")
        if dataset_code not in dataset_json_stubs:
            dataset_json_stubs[dataset_code] = {
                "code": dataset_code,
                "name": dataset_name,
                "description": xml_element.findtext("{*}shortDescription[@language='en']") or None,
                "doc_href": xml_element.findtext("{*}metadata[@format='html']") or None,
            }
        return {
            "code": dataset_code,
            "name": dataset_name,
        }
Christophe Benz's avatar
Christophe Benz committed
        log.warning(
            "Unexpected node type: {!r}, type {!r} (code {!r})".format(
                xml_element_tag,
                xml_element.attrib["type"],
                xml_element.findtext("{*}code"),
            )
        )
Christophe Benz's avatar
Christophe Benz committed
def main():

    datasets_from_env = os.getenv(DATASETS_ENV_VAR)
    if datasets_from_env:
        datasets_from_env = datasets_from_env.split(",")

Christophe Benz's avatar
Christophe Benz committed
    parser = argparse.ArgumentParser()
Christophe Benz's avatar
Christophe Benz committed
    parser.add_argument(
        "source_dir",
        type=Path,
        help="path of source directory containing Eurostat series in source format",
    )
    parser.add_argument(
        "target_dir",
        type=Path,
        help="path of target directory containing datasets & " "series in DBnomics JSON and TSV formats",
    )
    parser.add_argument(
        "--datasets",
        nargs="+",
        metavar="DATASET_CODE",
        default=datasets_from_env,
        help="convert only the given datasets (datasets codes, space separated)",
    )
    parser.add_argument("--log", default="INFO", help="level of logging messages")
    parser.add_argument("--resume", action="store_true", help="do not process already written datasets")
Christophe Benz's avatar
Christophe Benz committed
    args = parser.parse_args()

Christophe Benz's avatar
Christophe Benz committed
    if not args.source_dir.is_dir():
        parser.error("Could not find directory {!r}".format(str(args.source_dir)))
    if not args.target_dir.is_dir():
        parser.error("Could not find directory {!r}".format(str(args.target_dir)))
    numeric_level = getattr(logging, args.log.upper(), None)
    if not isinstance(numeric_level, int):
Christophe Benz's avatar
Christophe Benz committed
        raise ValueError("Invalid log level: {}".format(args.log))
    logging.basicConfig(format="%(levelname)s:%(message)s", level=numeric_level)
    log.info("Command-line arguments: %r", args)
    write_json_file(args.target_dir / "provider.json", provider_json)
    source_datasets_dir = args.source_dir / datasets_dir_name
    datasets_to_convert = list(
        iter_datasets_to_convert(
            source_datasets_dir, target_dir=args.target_dir, datasets=args.datasets, resume=args.resume
Christophe Benz's avatar
Christophe Benz committed
        )
    dataset_codes_to_convert = set(dataset_code for (dataset_code, _) in datasets_to_convert)
    category_tree_json, dataset_json_stubs = toc_to_category_tree(
        source_dir=args.source_dir, dataset_codes_to_convert=dataset_codes_to_convert
    )
    convert_datasets(
        datasets_to_convert=datasets_to_convert, dataset_json_stubs=dataset_json_stubs, target_dir=args.target_dir
    )
    log.info("Writing category tree...")
    write_json_file(args.target_dir / "category_tree.json", category_tree_json)
Christophe Benz's avatar
Christophe Benz committed
def without_falsy_values(mapping):
Christophe Benz's avatar
Christophe Benz committed
    return {k: v for k, v in mapping.items() if v}
def write_json_file(path, data):
    with path.open("w") as f:
        json.dump(data, f, ensure_ascii=False, indent=2, sort_keys=True)


Christophe Benz's avatar
Christophe Benz committed
if __name__ == "__main__":
Christophe Benz's avatar
Christophe Benz committed
    sys.exit(main())