Newer
Older
#! /usr/bin/env python3
# eurostat-fetcher -- Fetch series from Eurostat database
# https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher
#
# eurostat-fetcher is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# eurostat-fetcher is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Convert Eurostat provider, categories, datasets and time series to DBnomics JSON and TSV files."""
from typing import Dict, Iterator, List, Set, Tuple
from lxml import etree
from toolz import get_in, valmap
"terms_of_use": "http://ec.europa.eu/eurostat/about/policies/copyright",
"website": "http://ec.europa.eu/eurostat/home",
}
log = logging.getLogger(__name__)
namespace_url_by_name = {"xml": "http://www.w3.org/XML/1998/namespace"}
DAILY_PERIOD_RE = re.compile(r"(?P<year>\d{4})(?P<month>\d{2})(?P<day>\d{2})")
def convert_datasets(
datasets_to_convert: List[Tuple[str, Path]], dataset_json_stubs: Dict[str, dict], target_dir: Path
):
log.info("Converting %d datasets...", len(datasets_to_convert))
converted_datasets_codes = set()
for index, (dataset_code, source_dataset_dir) in enumerate(sorted(datasets_to_convert), start=1):
if dataset_code in converted_datasets_codes:
log.debug("Skipping dataset %r because it was already converted", dataset_code)
continue
sdmx_file = source_dataset_dir / "{}.sdmx.xml".format(dataset_code)
log.info(
"Converting SDMX source file %d/%d %s (%s)",
index,
len(datasets_to_convert),
sdmx_file,
humanize.naturalsize(sdmx_file.stat().st_size, gnu=True),
)
dataset_dir = target_dir / dataset_code
dataset_dir.mkdir(exist_ok=True)
dataset_json_stub = dataset_json_stubs[dataset_code]
convert_sdmx_file(dataset_json_stub, sdmx_file, source_dataset_dir, dataset_dir)
converted_datasets_codes.add(dataset_code)
def convert_sdmx_element(element, dataset_json, dataset_context, dsd_infos, series_jsonl_file):
# Due to event=end, given to iterparse, we receive <Obs> then <Series> elements, in this order.
if element.tag.endswith("Series"):
# Ignore some specific XML element attributes corresponding to series SDMX attributes,
# because series SDMX attributes do not exist in DBnomics.
series_element_attributes = OrderedDict(
[
(attribute_key, attribute_value)
for attribute_key, attribute_value in element.attrib.items()
if attribute_key not in {"TIME_FORMAT"} # Redundant with FREQ.
]
)
dimensions_codes_order = list(series_element_attributes.keys())
if dataset_json["dimensions_codes_order"] is None:
dataset_json["dimensions_codes_order"] = dimensions_codes_order
else:
# dimensions_codes_order must not change between series.
assert dataset_json["dimensions_codes_order"] == dimensions_codes_order, (
dataset_json["dimensions_codes_order"],
dimensions_codes_order,
)
# Fill series dimensions labels in dataset.json.
t0 = time.time()
for dimension_code, dimension_value_code in series_element_attributes.items():
if dimension_code not in dataset_json["dimensions_labels"]:
dimension_label = dsd_infos["concepts"].get(dimension_code)
if dimension_label and dimension_code not in dataset_json["dimensions_labels"]:
# Some dimensions labels are an empty string: e.g. bs_bs12_04.sdmx.xml
dataset_json["dimensions_labels"][dimension_code] = dimension_label
if (
dimension_code in dataset_json["dimensions_values_labels"]
and dimension_value_code in dataset_json["dimensions_values_labels"][dimension_code]
):
continue
codelist_code = dsd_infos["codelist_by_concept"][dimension_code]
dimension_value_label = get_in([codelist_code, dimension_value_code], dsd_infos["codelists"])
if dimension_value_label:
dataset_json["dimensions_values_labels"].setdefault(dimension_code, {})[
dimension_value_code
] = dimension_value_label
timings["series_labels"] += time.time() - t0
# Series code is not defined by provider: create it from dimensions values codes.
series_code = ".".join(series_element_attributes[dimension_code] for dimension_code in dimensions_codes_order)
observations_header = [["PERIOD", "VALUE"] + dsd_infos["attributes"]]
observations_body = (
list(iter_normalized_observations(dataset_context["current_series_observations"], frequency))
if frequency == "D"
series_json = {
"code": series_code,
"dimensions": [
series_element_attributes[dimension_code] # Every dimension MUST be defined for each series.
for dimension_code in dimensions_codes_order
],
"observations": observations_header + observations_body,
json.dump(series_json, series_jsonl_file, ensure_ascii=False, sort_keys=True)
series_jsonl_file.write("\n")
timings["series_file"] += time.time() - t0
# Reset context for next series.
dataset_context["current_series_observations"] = []
elif element.tag.endswith("Obs"):
# Fill observations attributes labels in dataset.json.
t0 = time.time()
for attribute_code, attribute_value_code in element.attrib.items():
# Ignore period and value observations XML attributes, because they don't need labels.
if attribute_code in ["TIME_PERIOD", "OBS_VALUE"]:
continue
attribute_label = dsd_infos["concepts"].get(attribute_code)
if attribute_label and attribute_code not in dataset_json["attributes_labels"]:
dataset_json["attributes_labels"][attribute_code] = attribute_label
# Some attributes values codes are multi-valued and concatenated into the same string.
attribute_value_codes = (
list(attribute_value_code) if attribute_code == "OBS_STATUS" else [attribute_value_code]
)
for attribute_value_code in attribute_value_codes:
if (
attribute_code in dataset_json["attributes_values_labels"]
and attribute_value_code in dataset_json["attributes_values_labels"][attribute_code]
):
continue
codelist_code = dsd_infos["codelist_by_concept"][attribute_code]
attribute_value_label = get_in([codelist_code, attribute_value_code], dsd_infos["codelists"])
if attribute_value_label:
dataset_json["attributes_values_labels"].setdefault(attribute_code, {})[
attribute_value_code
] = attribute_value_label
timings["observations_labels"] += time.time() - t0
obs_value = observations.value_to_float(element.attrib.get("OBS_VALUE"))
dataset_context["current_series_observations"].append(
[
element.attrib["TIME_PERIOD"], # Will be normalized later, if needed.
obs_value,
]
+ [element.attrib.get(attribute_name, "") for attribute_name in dsd_infos["attributes"]]
)
elif element.tag.endswith("Extracted"):
dataset_json["updated_at"] = element.text + "Z" # Assume the value is UTC time.
def convert_sdmx_file(dataset_json_stub, sdmx_file: Path, source_dataset_dir: Path, dataset_dir: Path):
timings = {k: 0 for k in {"series_labels", "series_file", "observations_labels", "dsd_infos"}}
assert dataset_json_stub.get("name"), dataset_json_stub
assert dataset_dir.is_dir(), dataset_dir
dataset_code = dataset_json_stub["code"]
dsd_file_path = source_dataset_dir / "{}.dsd.xml".format(dataset_code)
dsd_element = etree.parse(str(dsd_file_path)).getroot()
# Initialize dataset.json data
dataset_json = {
"attributes_labels": {}, # Will be defined by each series.
"attributes_values_labels": {}, # Will be defined by each series.
"dimensions_codes_order": None, # Will be defined by first series.
"dimensions_labels": {}, # Will be defined by each series.
"dimensions_values_labels": {}, # Will be defined by each series.
dataset_json.update(dataset_json_stub)
dsd_infos = {
"attributes": [
element.attrib["conceptRef"]
for element in dsd_element.iterfind(".//{*}Attribute[@attachmentLevel='Observation']")
],
"codelists": {
element.attrib["id"]: {
code_element.attrib["value"]: code_element.findtext(
"./{*}Description[@xml:lang='en']", namespaces=namespace_url_by_name
)
for code_element in element.iterfind("./{*}Code")
}
},
"concepts": {
element.attrib["id"]: element.findtext("./{*}Name[@xml:lang='en']", namespaces=namespace_url_by_name)
},
"codelist_by_concept": {
element.attrib["conceptRef"]: element.attrib["codelist"]
for element in dsd_element.find(".//{*}Components")
if "conceptRef" in element.attrib and "codelist" in element.attrib
},
}
timings["dsd_infos"] += time.time() - t0
with (dataset_dir / "series.jsonl").open("w") as series_jsonl_file:
dataset_context = {
"current_series_observations": [],
}
# Side-effects: mutate dataset_context, write files.
context = etree.iterparse(str(sdmx_file), events=["end"])
for event, element in context:
convert_sdmx_element(element, dataset_json, dataset_context, dsd_infos, series_jsonl_file)
if event == "end":
# Inspired from fast_iter, cf https://www.ibm.com/developerworks/xml/library/x-hiperfparse/
element.clear()
while element.getprevious() is not None:
del element.getparent()[0]
continue
del context
write_json_file(dataset_dir / "dataset.json", without_falsy_values(dataset_json))
log.debug("timings: {} total: {:.3f}".format(valmap("{:.3f}".format, timings), sum(timings.values())))
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def iter_child_directories(directory: Path) -> Iterator[Path]:
"""Iterate over child directories of a directory."""
for child in directory.iterdir():
if child.is_dir():
yield child
def iter_datasets_to_convert(
source_datasets_dir: Path, target_dir: Path, *, datasets, resume
) -> Iterator[Tuple[str, Path]]:
for source_dataset_dir in sorted(iter_child_directories(source_datasets_dir)):
dataset_code = source_dataset_dir.name
if datasets and dataset_code not in datasets:
log.debug(
"Skipping dataset %r because it is not mentioned by --datasets option",
dataset_code,
)
continue
sdmx_file = source_dataset_dir / "{}.sdmx.xml".format(dataset_code)
if not sdmx_file.is_file():
log.error(
"Skipping dataset %s because SDMX file %s is missing",
dataset_code,
str(sdmx_file),
)
continue
dataset_dir = target_dir / dataset_code
if resume and dataset_dir.is_dir():
log.debug(
"Skipping dataset %r because it already exists (due to --resume option)",
dataset_code,
)
continue
yield dataset_code, source_dataset_dir
def iter_normalized_observations(observations, frequency):
for observation in observations:
period = observation[0]
if frequency == "D":
period = normalize_period(period, frequency)
yield [period] + observation[1:]
def normalize_period(s, frequency):
if frequency == "D":
m = DAILY_PERIOD_RE.match(s)
if m:
return "{}-{}-{}".format(m.group("year"), m.group("month"), m.group("day"))
def toc_to_category_tree(source_dir: Path, dataset_codes_to_convert: Set[str]):
"""Walk recursively table_of_contents.xml and return category_tree_json and dataset.json stubs."""
# Parse "table_of_contents", abbreviated "toc".
toc_element = etree.parse(str(source_dir / "table_of_contents.xml")).getroot()
dataset_json_stubs = {}
category_tree_json = toc_element_to_category_tree(toc_element, dataset_json_stubs, dataset_codes_to_convert)
return category_tree_json, dataset_json_stubs
def toc_element_to_category_tree(xml_element, dataset_json_stubs, dataset_codes_to_convert: Set[str]):
"""Walk recursively xml_element (table_of_contents.xml) and return category_tree_json.
Side-effects: fill dataset_json_stubs.
xml_element_tag = xml_element.tag[len("urn:eu.europa.ec.eurostat.navtree") + 2 :]
if xml_element_tag == "tree":
(
toc_element_to_category_tree(child_element, dataset_json_stubs, dataset_codes_to_convert)
for child_element in xml_element
),
elif xml_element_tag == "branch":

Bruno Duyé
committed
code = xml_element.findtext("{*}code")
if code == "DS-1062396":
# Remove "International trade in goods - detailed data" branch from category tree
# as it doesn't contains any data here.

Bruno Duyé
committed
# https://git.nomics.world/dbnomics-fetchers/management/issues/435
return None
toc_element_to_category_tree(child_element, dataset_json_stubs, dataset_codes_to_convert)
for child_element in xml_element.iterfind("{*}children/*")
),
)
)
return (
without_falsy_values(
{
"code": code,
"name": xml_element.findtext("{*}title[@language='en']"),
"children": children,
}
)
if children
else None
)
elif xml_element_tag == "leaf" and xml_element.attrib["type"] in (
"dataset",
"table",
):
if dataset_code not in dataset_codes_to_convert:
return None
dataset_name = xml_element.findtext("{*}title[@language='en']")
if dataset_code not in dataset_json_stubs:
dataset_json_stubs[dataset_code] = {
"code": dataset_code,
"name": dataset_name,
"description": xml_element.findtext("{*}shortDescription[@language='en']") or None,
"doc_href": xml_element.findtext("{*}metadata[@format='html']") or None,
}
return {
"code": dataset_code,
"name": dataset_name,
}
else:
log.warning(
"Unexpected node type: {!r}, type {!r} (code {!r})".format(
xml_element_tag,
xml_element.attrib["type"],
xml_element.findtext("{*}code"),
)
)
return None
datasets_from_env = os.getenv(DATASETS_ENV_VAR)
if datasets_from_env:
datasets_from_env = datasets_from_env.split(",")
parser.add_argument(
"source_dir",
type=Path,
help="path of source directory containing Eurostat series in source format",
)
parser.add_argument(
"target_dir",
type=Path,
help="path of target directory containing datasets & " "series in DBnomics JSON and TSV formats",
)
parser.add_argument(
"--datasets",
nargs="+",
metavar="DATASET_CODE",
default=datasets_from_env,
help="convert only the given datasets (datasets codes, space separated)",
)
parser.add_argument("--log", default="INFO", help="level of logging messages")
parser.add_argument("--resume", action="store_true", help="do not process already written datasets")
if not args.source_dir.is_dir():
parser.error("Could not find directory {!r}".format(str(args.source_dir)))
if not args.target_dir.is_dir():
parser.error("Could not find directory {!r}".format(str(args.target_dir)))
numeric_level = getattr(logging, args.log.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError("Invalid log level: {}".format(args.log))
logging.basicConfig(format="%(levelname)s:%(message)s", level=numeric_level)
log.info("Command-line arguments: %r", args)
write_json_file(args.target_dir / "provider.json", provider_json)
source_datasets_dir = args.source_dir / datasets_dir_name
datasets_to_convert = list(
iter_datasets_to_convert(
source_datasets_dir, target_dir=args.target_dir, datasets=args.datasets, resume=args.resume
dataset_codes_to_convert = set(dataset_code for (dataset_code, _) in datasets_to_convert)
category_tree_json, dataset_json_stubs = toc_to_category_tree(
source_dir=args.source_dir, dataset_codes_to_convert=dataset_codes_to_convert
)
convert_datasets(
datasets_to_convert=datasets_to_convert, dataset_json_stubs=dataset_json_stubs, target_dir=args.target_dir
)
log.info("Writing category tree...")
write_json_file(args.target_dir / "category_tree.json", category_tree_json)
def write_json_file(path, data):
with path.open("w") as f:
json.dump(data, f, ensure_ascii=False, indent=2, sort_keys=True)