Skip to content
Snippets Groups Projects
convert.py 19.3 KiB
Newer Older
Christophe Benz's avatar
Christophe Benz committed
#! /usr/bin/env python3


# eurostat-fetcher -- Fetch series from Eurostat database
Christophe Benz's avatar
Christophe Benz committed
# By: Christophe Benz <christophe.benz@cepremap.org>
Christophe Benz's avatar
Christophe Benz committed
#
Christophe Benz's avatar
Christophe Benz committed
# Copyright (C) 2017-2018 Cepremap
Christophe Benz's avatar
Christophe Benz committed
# https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher
#
# eurostat-fetcher is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# eurostat-fetcher is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
Christophe Benz's avatar
Christophe Benz committed
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""Convert Eurostat provider, categories, datasets and time series to DBnomics JSON and TSV files."""
Christophe Benz's avatar
Christophe Benz committed


import argparse
import logging
Christophe Benz's avatar
Christophe Benz committed
import re
Christophe Benz's avatar
Christophe Benz committed
import shutil
import subprocess
Christophe Benz's avatar
Christophe Benz committed
import sys
Christophe Benz's avatar
Christophe Benz committed
from collections import OrderedDict
from io import StringIO
from pathlib import Path
import humanize
Christophe Benz's avatar
Christophe Benz committed
from lxml import etree
from toolz import get_in, valmap
import ujson as json
Christophe Benz's avatar
Christophe Benz committed
from dbnomics_data_model import observations
provider_code = 'Eurostat'
Christophe Benz's avatar
Christophe Benz committed
provider_json = {
    "code": provider_code,
    "name": provider_code,
Christophe Benz's avatar
Christophe Benz committed
    "region": "EU",
Christophe Benz's avatar
Christophe Benz committed
    "terms_of_use": "http://ec.europa.eu/eurostat/about/policies/copyright",
    "website": "http://ec.europa.eu/eurostat/home",
}

args = None  # Will be defined by main().
datasets_dir_name = "data"
log = logging.getLogger(__name__)
namespace_url_by_name = {"xml": "http://www.w3.org/XML/1998/namespace"}
Christophe Benz's avatar
Christophe Benz committed
DAILY_PERIOD_RE = re.compile(r"(?P<year>\d{4})(?P<month>\d{2})(?P<day>\d{2})")

def convert_sdmx_element(element, dataset_json, dataset_context, dsd_infos, series_jsonl_file):
Christophe Benz's avatar
Christophe Benz committed
    # Due to event=end, given to iterparse, we receive <Obs> then <Series> elements, in this order.

    if element.tag.endswith("Series"):

        # Ignore some specific XML element attributes corresponding to series SDMX attributes,
        # because series SDMX attributes do not exist in DBnomics.
        series_element_attributes = OrderedDict([
            (attribute_key, attribute_value)
            for attribute_key, attribute_value in element.attrib.items()
            if attribute_key not in {"TIME_FORMAT"}  # Redundant with FREQ.
        ])

        dimensions_codes_order = list(series_element_attributes.keys())
        if dataset_json["dimensions_codes_order"] is None:
            dataset_json["dimensions_codes_order"] = dimensions_codes_order
        else:
            # dimensions_codes_order must not change between series.
            assert dataset_json["dimensions_codes_order"] == dimensions_codes_order, \
                (dataset_json["dimensions_codes_order"], dimensions_codes_order)

        # Fill series dimensions labels in dataset.json.

        t0 = time.time()

        for dimension_code, dimension_value_code in series_element_attributes.items():
            if dimension_code not in dataset_json["dimensions_labels"]:
                dimension_label = dsd_infos["concepts"].get(dimension_code)
                if dimension_label and dimension_code not in dataset_json["dimensions_labels"]:
                    # Some dimensions labels are an empty string: e.g. bs_bs12_04.sdmx.xml
                    dataset_json["dimensions_labels"][dimension_code] = dimension_label
            if dimension_code in dataset_json["dimensions_values_labels"] and \
                    dimension_value_code in dataset_json["dimensions_values_labels"][dimension_code]:
                continue
            codelist_code = dsd_infos["codelist_by_concept"][dimension_code]
            dimension_value_label = get_in([codelist_code, dimension_value_code], dsd_infos["codelists"])
            if dimension_value_label:
                dataset_json["dimensions_values_labels"].setdefault(
                    dimension_code, {})[dimension_value_code] = dimension_value_label
        timings["series_labels"] += time.time() - t0
        # Series code is not defined by provider: create it from dimensions values codes.
        series_code = ".".join(
            series_element_attributes[dimension_code]
            for dimension_code in dimensions_codes_order
        )
Christophe Benz's avatar
Christophe Benz committed

Christophe Benz's avatar
Christophe Benz committed
        # Write series JSON to file.
Christophe Benz's avatar
Christophe Benz committed

        observations_header = [["PERIOD", "VALUE"] + dsd_infos["attributes"]]
Christophe Benz's avatar
Christophe Benz committed
        frequency = series_element_attributes["FREQ"]
        observations_body = list(iter_normalized_observations(dataset_context["current_series_observations"], frequency)) \
            if frequency == "D" \
            else dataset_context["current_series_observations"]
        series_json = {
            "code": series_code,
            "dimensions": [
                series_element_attributes[dimension_code]  # Every dimension MUST be defined for each series.
                for dimension_code in dimensions_codes_order
            ],
Christophe Benz's avatar
Christophe Benz committed
            "observations": observations_header + observations_body,
        json.dump(series_json, series_jsonl_file, ensure_ascii=False, sort_keys=True)
        series_jsonl_file.write("\n")
Christophe Benz's avatar
Christophe Benz committed
        timings["series_file"] += time.time() - t0

        # Reset context for next series.

        dataset_context["current_series_observations"] = []

    elif element.tag.endswith("Obs"):

        # Fill observations attributes labels in dataset.json.

        t0 = time.time()

        for attribute_code, attribute_value_code in element.attrib.items():
            # Ignore period and value observations XML attributes, because they don't need labels.
            if attribute_code in ["TIME_PERIOD", "OBS_VALUE"]:
                continue
            attribute_label = dsd_infos["concepts"].get(attribute_code)
            if attribute_label and attribute_code not in dataset_json["attributes_labels"]:
                dataset_json["attributes_labels"][attribute_code] = attribute_label
            # Some attributes values codes are multi-valued and concatenated into the same string.
            attribute_value_codes = list(attribute_value_code) \
                if attribute_code == "OBS_STATUS" \
                else [attribute_value_code]
            for attribute_value_code in attribute_value_codes:
                if attribute_code in dataset_json["attributes_values_labels"] and \
                        attribute_value_code in dataset_json["attributes_values_labels"][attribute_code]:
                    continue
                codelist_code = dsd_infos["codelist_by_concept"][attribute_code]
                attribute_value_label = get_in([codelist_code, attribute_value_code], dsd_infos["codelists"])
                if attribute_value_label:
                    dataset_json["attributes_values_labels"].setdefault(
                        attribute_code, {})[attribute_value_code] = attribute_value_label

        timings["observations_labels"] += time.time() - t0

        obs_value = observations.value_to_float(element.attrib.get("OBS_VALUE"))
Christophe Benz's avatar
Christophe Benz committed
        dataset_context["current_series_observations"].append([
Christophe Benz's avatar
Christophe Benz committed
            element.attrib["TIME_PERIOD"],  # Will be normalized later, if needed.
Christophe Benz's avatar
Christophe Benz committed
            obs_value,
        ] + [
            element.attrib.get(attribute_name, "")
            for attribute_name in dsd_infos["attributes"]
        ])
    elif element.tag.endswith("Extracted"):

        dataset_json["updated_at"] = element.text + "Z"  # Assume the value is UTC time.

def convert_sdmx_file(dataset_json_stub, sdmx_file: Path, dataset_dir: Path):
    global timings
    timings = {
        k: 0
        for k in {"series_labels", "series_file", "observations_labels", "dsd_infos"}
Christophe Benz's avatar
Christophe Benz committed

    assert dataset_json_stub.get("name"), dataset_json_stub
    assert dataset_dir.is_dir(), dataset_dir
    dataset_code = dataset_json_stub["code"]
    dsd_file_path = args.source_dir / datasets_dir_name / dataset_code / "{}.dsd.xml".format(dataset_code)
    dsd_element = etree.parse(str(dsd_file_path)).getroot()

    # Initialize dataset.json data

    dataset_json = {
        "attributes_labels": {},  # Will be defined by each series.
        "attributes_values_labels": {},  # Will be defined by each series.
        "dimensions_codes_order": None,  # Will be defined by first series.
        "dimensions_labels": {},  # Will be defined by each series.
        "dimensions_values_labels": {},  # Will be defined by each series.
    dataset_json.update(dataset_json_stub)
    dsd_infos = {
        "attributes": [
            element.attrib["conceptRef"]
            for element in dsd_element.iterfind(".//{*}Attribute[@attachmentLevel='Observation']")
        ],
        "codelists": {
            element.attrib["id"]: {
                code_element.attrib["value"]: code_element.findtext(
                    "./{*}Description[@xml:lang='en']", namespaces=namespace_url_by_name)
                for code_element in element.iterfind("./{*}Code")
            }
            for element in dsd_element.iterfind('.//{*}CodeList')
        },
        "concepts": {
            element.attrib["id"]: element.findtext("./{*}Name[@xml:lang='en']", namespaces=namespace_url_by_name)
            for element in dsd_element.iterfind('.//{*}Concept')
        },
        "codelist_by_concept": {
            element.attrib["conceptRef"]: element.attrib["codelist"]
            for element in dsd_element.find(".//{*}Components")
            if "conceptRef" in element.attrib and "codelist" in element.attrib
        },
    }

    timings["dsd_infos"] += time.time() - t0
    with (dataset_dir / "series.jsonl").open("w") as series_jsonl_file:
        dataset_context = {
            "current_series_observations": [],
        }
        # Side-effects: mutate dataset_context, write files.
        context = etree.iterparse(str(sdmx_file), events=["end"])
        for event, element in context:
            convert_sdmx_element(element, dataset_json, dataset_context, dsd_infos, series_jsonl_file)
            if event == "end":
                # Inspired from fast_iter, cf https://www.ibm.com/developerworks/xml/library/x-hiperfparse/
                element.clear()
                while element.getprevious() is not None:
                    del element.getparent()[0]
                continue
        del context
        write_json_file(dataset_dir / "dataset.json", without_falsy_values(dataset_json))
    log.debug("timings: {} total: {:.3f}".format(valmap("{:.3f}".format, timings), sum(timings.values())))
Christophe Benz's avatar
Christophe Benz committed
def iter_normalized_observations(observations, frequency):
    for observation in observations:
        period = observation[0]
        if frequency == "D":
            period = normalize_period(period, frequency)
        yield [period] + observation[1:]


def normalize_period(s, frequency):
    if frequency == "D":
        m = DAILY_PERIOD_RE.match(s)
        if m:
            return '{}-{}-{}'.format(m.group('year'), m.group('month'), m.group('day'))
    return s


def toc_to_category_tree(xml_element, toc_dataset_json_stub_by_code):
Bruno Duyé's avatar
Bruno Duyé committed
    """Walk recursively xml_element (table_of_contents.xml) and return category_tree_json.
    Side-effects: fill toc_dataset_json_stub_by_code.
    """
Christophe Benz's avatar
Christophe Benz committed
    xml_element_tag = xml_element.tag[len("urn:eu.europa.ec.eurostat.navtree") + 2:]
    if xml_element_tag == "tree":
        return list(filter(None, (
            toc_to_category_tree(child_element, toc_dataset_json_stub_by_code)
            for child_element in xml_element
        )))
    elif xml_element_tag == "branch":
        children = list(filter(None, (
            toc_to_category_tree(child_element, toc_dataset_json_stub_by_code)
            for child_element in xml_element.iterfind("{*}children/*")
        )))
Christophe Benz's avatar
Christophe Benz committed
        return without_falsy_values({
Christophe Benz's avatar
Christophe Benz committed
            "code": xml_element.findtext("{*}code"),
            "name": xml_element.findtext("{*}title[@language='en']"),
            "children": children,
        }) if children else None
Christophe Benz's avatar
Christophe Benz committed
    elif xml_element_tag == "leaf" and xml_element.attrib["type"] == "dataset":
        dataset_code = xml_element.findtext("{*}code")
        dataset_name = xml_element.findtext("{*}title[@language='en']")
        if dataset_code not in toc_dataset_json_stub_by_code:
            toc_dataset_json_stub_by_code[dataset_code] = {
                "code": dataset_code,
                "name": dataset_name,
                "description": xml_element.findtext("{*}shortDescription[@language='en']") or None,
                "doc_href": xml_element.findtext("{*}metadata[@format='html']") or None,
            }
        return {
            "code": dataset_code,
            "name": dataset_name,
        }
Christophe Benz's avatar
Christophe Benz committed
def main():
Christophe Benz's avatar
Christophe Benz committed
    parser = argparse.ArgumentParser()
    parser.add_argument('source_dir', type=Path,
                        help='path of source directory containing Eurostat series in source format')
    parser.add_argument('target_dir', type=Path, help='path of target directory containing datasets & '
                        'series in DBnomics JSON and TSV formats')
    parser.add_argument('--datasets', nargs='+', metavar='DATASET_CODE', help='convert only the given datasets')
    parser.add_argument('--full', action='store_true',
                        help='convert all datasets; default behavior is to convert what changed since last commit')
    parser.add_argument('--log', default='INFO', help='level of logging messages')
    parser.add_argument('--resume', action='store_true', help='do not process already written datasets')
    parser.add_argument('--start-from', metavar='DATASET_CODE', help='start indexing from dataset code')
Christophe Benz's avatar
Christophe Benz committed
    args = parser.parse_args()

Christophe Benz's avatar
Christophe Benz committed
    if not args.source_dir.is_dir():
        parser.error("Could not find directory {!r}".format(str(args.source_dir)))
    if not args.target_dir.is_dir():
        parser.error("Could not find directory {!r}".format(str(args.target_dir)))
    numeric_level = getattr(logging, args.log.upper(), None)
    if not isinstance(numeric_level, int):
        raise ValueError('Invalid log level: {}'.format(args.log))
    logging.basicConfig(format="%(levelname)s:%(message)s", level=numeric_level)
    # Ask Git which datasets directories were modified in latest commit in source-data repository.
    if not args.full:
        try:
            output = subprocess.check_output(["git", "diff", "--name-status", "HEAD^", datasets_dir_name],
                                             cwd=str(args.source_dir), universal_newlines=True)
        except subprocess.CalledProcessError:
            args.full = True
        else:
            modified_datasets_codes = set()
            deleted_datasets_codes = set()
            for line in StringIO(output):
                action, file_path = line.strip().split()
                try:
                    dataset_code = Path(file_path).parent.relative_to(datasets_dir_name).name
                except ValueError:
                    continue
                if action in {"A", "M"}:
                    modified_datasets_codes.add(dataset_code)
                else:
                    assert action == "D", action
                    deleted_datasets_codes.add(dataset_code)
            log.info("%d datasets were modified and %d were deleted by last download",
                     len(modified_datasets_codes), len(deleted_datasets_codes))
    log.info("Mode: %s", "full" if args.full else "incremental")

    # Parse "table_of_contents", abbreviated "toc".
    toc_element = etree.parse(str(args.source_dir / "table_of_contents.xml")).getroot()
    # Walk recursively table_of_contents.xml and return category_tree_json.
    # Side-effects: fill toc_dataset_json_stub_by_code.
    toc_dataset_json_stub_by_code = {}
    category_tree_json = toc_to_category_tree(toc_element, toc_dataset_json_stub_by_code)
    datasets_codes_to_convert = set()
    for dataset_code in sorted(toc_dataset_json_stub_by_code):
        if args.datasets and dataset_code not in args.datasets:
            log.debug("Skipping dataset %r because it is not mentioned by --datasets option", dataset_code)
        if not args.full and dataset_code not in modified_datasets_codes:
            log.debug("Skipping dataset %r because it was not modified by last download (due to incremental mode)",
                      dataset_code)
        if args.start_from is not None and dataset_code < args.start_from:
            log.debug("Skipping dataset %r because of --start-from option", dataset_code)
            continue
        source_dataset_dir = args.source_dir / datasets_dir_name / dataset_code
        if not source_dataset_dir.is_dir():
            log.error("Skipping dataset %s because source directory %s is missing",
                      dataset_code, str(source_dataset_dir))
        sdmx_file = source_dataset_dir / "{}.sdmx.xml".format(dataset_code)
        if not sdmx_file.is_file():
            log.error("Skipping dataset %s because SDMX file %s is missing", dataset_code, str(sdmx_file))
        dataset_dir = args.target_dir / dataset_code
        if args.resume and (dataset_dir / "dataset.json").is_file():
            log.debug("Skipping dataset %r because it already exists (due to --resume option)", dataset_code)
            continue
        datasets_codes_to_convert.add(dataset_code)

    log.info("Converting %d datasets...", len(datasets_codes_to_convert))

    # Remove directories of datasets to be converted before converting.
    if not args.resume:
        log.info("Removing directories of deleted datasets and datasets to be converted...")
        datasets_codes_to_delete = datasets_codes_to_convert
        if not args.full:
            datasets_codes_to_delete = datasets_codes_to_delete.union(deleted_datasets_codes)
        for dataset_code in datasets_codes_to_delete:
            dataset_dir = args.target_dir / dataset_code
            if dataset_dir.is_dir():
                shutil.rmtree(str(dataset_dir))
    # Convert SDMX files. Side-effect: write files for each dataset.
    converted_datasets_codes = set()
    for index, dataset_code in enumerate(sorted(datasets_codes_to_convert), start=1):
        if dataset_code in converted_datasets_codes:
            log.debug("Skipping dataset %r because it was already converted", dataset_code)
            continue

        source_dataset_dir = args.source_dir / datasets_dir_name / dataset_code
        sdmx_file = source_dataset_dir / "{}.sdmx.xml".format(dataset_code)

        log.info("Converting SDMX source file %d/%d %s (%s)", index, len(datasets_codes_to_convert), sdmx_file,
                 humanize.naturalsize(sdmx_file.stat().st_size, gnu=True))
        dataset_dir = args.target_dir / dataset_code
        dataset_dir.mkdir(exist_ok=True)

        dataset_json_stub = toc_dataset_json_stub_by_code[dataset_code]
        convert_sdmx_file(dataset_json_stub, sdmx_file, dataset_dir)
        converted_datasets_codes.add(dataset_code)
    write_json_file(args.target_dir / "provider.json", provider_json)
    if category_tree_json:
        write_json_file(args.target_dir / "category_tree.json", category_tree_json)
Christophe Benz's avatar
Christophe Benz committed
def without_falsy_values(mapping):
    return {
        k: v
        for k, v in mapping.items()
        if v
    }

def write_json_file(path, data):
    with path.open("w") as f:
        json.dump(data, f, ensure_ascii=False, indent=2, sort_keys=True)


Christophe Benz's avatar
Christophe Benz committed
if __name__ == '__main__':
    sys.exit(main())