Newer
Older
#! /usr/bin/env python3
# eurostat-fetcher -- Fetch series from Eurostat database
# https://git.nomics.world/dbnomics-fetchers/eurostat-fetcher
#
# eurostat-fetcher is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# eurostat-fetcher is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Convert Eurostat provider, categories, datasets and time series to DBnomics JSON and TSV files."""
from lxml import etree
from toolz import get_in, valmap
"terms_of_use": "http://ec.europa.eu/eurostat/about/policies/copyright",
"website": "http://ec.europa.eu/eurostat/home",
}
args = None # Will be defined by main().
log = logging.getLogger(__name__)
namespace_url_by_name = {"xml": "http://www.w3.org/XML/1998/namespace"}
DAILY_PERIOD_RE = re.compile(r"(?P<year>\d{4})(?P<month>\d{2})(?P<day>\d{2})")
def convert_sdmx_element(element, dataset_json, dataset_context, dsd_infos, series_jsonl_file):
# Due to event=end, given to iterparse, we receive <Obs> then <Series> elements, in this order.
if element.tag.endswith("Series"):
# Ignore some specific XML element attributes corresponding to series SDMX attributes,
# because series SDMX attributes do not exist in DBnomics.
series_element_attributes = OrderedDict(
[
(attribute_key, attribute_value)
for attribute_key, attribute_value in element.attrib.items()
if attribute_key not in {"TIME_FORMAT"} # Redundant with FREQ.
]
)
dimensions_codes_order = list(series_element_attributes.keys())
if dataset_json["dimensions_codes_order"] is None:
dataset_json["dimensions_codes_order"] = dimensions_codes_order
else:
# dimensions_codes_order must not change between series.
assert dataset_json["dimensions_codes_order"] == dimensions_codes_order, (
dataset_json["dimensions_codes_order"],
dimensions_codes_order,
)
# Fill series dimensions labels in dataset.json.
t0 = time.time()
for dimension_code, dimension_value_code in series_element_attributes.items():
if dimension_code not in dataset_json["dimensions_labels"]:
dimension_label = dsd_infos["concepts"].get(dimension_code)
if dimension_label and dimension_code not in dataset_json["dimensions_labels"]:
# Some dimensions labels are an empty string: e.g. bs_bs12_04.sdmx.xml
dataset_json["dimensions_labels"][dimension_code] = dimension_label
if (
dimension_code in dataset_json["dimensions_values_labels"]
and dimension_value_code in dataset_json["dimensions_values_labels"][dimension_code]
):
continue
codelist_code = dsd_infos["codelist_by_concept"][dimension_code]
dimension_value_label = get_in([codelist_code, dimension_value_code], dsd_infos["codelists"])
if dimension_value_label:
dataset_json["dimensions_values_labels"].setdefault(dimension_code, {})[
dimension_value_code
] = dimension_value_label
timings["series_labels"] += time.time() - t0
# Series code is not defined by provider: create it from dimensions values codes.
series_code = ".".join(series_element_attributes[dimension_code] for dimension_code in dimensions_codes_order)
observations_header = [["PERIOD", "VALUE"] + dsd_infos["attributes"]]
observations_body = (
list(iter_normalized_observations(dataset_context["current_series_observations"], frequency))
if frequency == "D"
series_json = {
"code": series_code,
"dimensions": [
series_element_attributes[dimension_code] # Every dimension MUST be defined for each series.
for dimension_code in dimensions_codes_order
],
"observations": observations_header + observations_body,
json.dump(series_json, series_jsonl_file, ensure_ascii=False, sort_keys=True)
series_jsonl_file.write("\n")
timings["series_file"] += time.time() - t0
# Reset context for next series.
dataset_context["current_series_observations"] = []
elif element.tag.endswith("Obs"):
# Fill observations attributes labels in dataset.json.
t0 = time.time()
for attribute_code, attribute_value_code in element.attrib.items():
# Ignore period and value observations XML attributes, because they don't need labels.
if attribute_code in ["TIME_PERIOD", "OBS_VALUE"]:
continue
attribute_label = dsd_infos["concepts"].get(attribute_code)
if attribute_label and attribute_code not in dataset_json["attributes_labels"]:
dataset_json["attributes_labels"][attribute_code] = attribute_label
# Some attributes values codes are multi-valued and concatenated into the same string.
attribute_value_codes = (
list(attribute_value_code) if attribute_code == "OBS_STATUS" else [attribute_value_code]
)
for attribute_value_code in attribute_value_codes:
if (
attribute_code in dataset_json["attributes_values_labels"]
and attribute_value_code in dataset_json["attributes_values_labels"][attribute_code]
):
continue
codelist_code = dsd_infos["codelist_by_concept"][attribute_code]
attribute_value_label = get_in([codelist_code, attribute_value_code], dsd_infos["codelists"])
if attribute_value_label:
dataset_json["attributes_values_labels"].setdefault(attribute_code, {})[
attribute_value_code
] = attribute_value_label
timings["observations_labels"] += time.time() - t0
obs_value = observations.value_to_float(element.attrib.get("OBS_VALUE"))
dataset_context["current_series_observations"].append(
[
element.attrib["TIME_PERIOD"], # Will be normalized later, if needed.
obs_value,
]
+ [element.attrib.get(attribute_name, "") for attribute_name in dsd_infos["attributes"]]
)
elif element.tag.endswith("Extracted"):
dataset_json["updated_at"] = element.text + "Z" # Assume the value is UTC time.
def convert_sdmx_file(dataset_json_stub, sdmx_file: Path, dataset_dir: Path):
timings = {k: 0 for k in {"series_labels", "series_file", "observations_labels", "dsd_infos"}}
assert dataset_json_stub.get("name"), dataset_json_stub
assert dataset_dir.is_dir(), dataset_dir
dataset_code = dataset_json_stub["code"]
dsd_file_path = args.source_dir / datasets_dir_name / dataset_code / "{}.dsd.xml".format(dataset_code)
dsd_element = etree.parse(str(dsd_file_path)).getroot()
# Initialize dataset.json data
dataset_json = {
"attributes_labels": {}, # Will be defined by each series.
"attributes_values_labels": {}, # Will be defined by each series.
"dimensions_codes_order": None, # Will be defined by first series.
"dimensions_labels": {}, # Will be defined by each series.
"dimensions_values_labels": {}, # Will be defined by each series.
dataset_json.update(dataset_json_stub)
dsd_infos = {
"attributes": [
element.attrib["conceptRef"]
for element in dsd_element.iterfind(".//{*}Attribute[@attachmentLevel='Observation']")
],
"codelists": {
element.attrib["id"]: {
code_element.attrib["value"]: code_element.findtext(
"./{*}Description[@xml:lang='en']", namespaces=namespace_url_by_name
)
for code_element in element.iterfind("./{*}Code")
}
},
"concepts": {
element.attrib["id"]: element.findtext("./{*}Name[@xml:lang='en']", namespaces=namespace_url_by_name)
},
"codelist_by_concept": {
element.attrib["conceptRef"]: element.attrib["codelist"]
for element in dsd_element.find(".//{*}Components")
if "conceptRef" in element.attrib and "codelist" in element.attrib
},
}
timings["dsd_infos"] += time.time() - t0
with (dataset_dir / "series.jsonl").open("w") as series_jsonl_file:
dataset_context = {
"current_series_observations": [],
}
# Side-effects: mutate dataset_context, write files.
context = etree.iterparse(str(sdmx_file), events=["end"])
for event, element in context:
convert_sdmx_element(element, dataset_json, dataset_context, dsd_infos, series_jsonl_file)
if event == "end":
# Inspired from fast_iter, cf https://www.ibm.com/developerworks/xml/library/x-hiperfparse/
element.clear()
while element.getprevious() is not None:
del element.getparent()[0]
continue
del context
write_json_file(dataset_dir / "dataset.json", without_falsy_values(dataset_json))
log.debug("timings: {} total: {:.3f}".format(valmap("{:.3f}".format, timings), sum(timings.values())))
def iter_normalized_observations(observations, frequency):
for observation in observations:
period = observation[0]
if frequency == "D":
period = normalize_period(period, frequency)
yield [period] + observation[1:]
def normalize_period(s, frequency):
if frequency == "D":
m = DAILY_PERIOD_RE.match(s)
if m:
return "{}-{}-{}".format(m.group("year"), m.group("month"), m.group("day"))
def toc_to_category_tree(xml_element, toc_dataset_json_stub_by_code):
"""Walk recursively xml_element (table_of_contents.xml) and return category_tree_json.
Side-effects: fill toc_dataset_json_stub_by_code.
"""
xml_element_tag = xml_element.tag[len("urn:eu.europa.ec.eurostat.navtree") + 2 :]
if xml_element_tag == "tree":
return list(
filter(
None,
(toc_to_category_tree(child_element, toc_dataset_json_stub_by_code) for child_element in xml_element),
)
)
elif xml_element_tag == "branch":

Bruno Duyé
committed
code = xml_element.findtext("{*}code")
if code == "DS-1062396":
# Remove "International trade in goods - detailed data" branch from category tree
# as it doesn't contains any data here.

Bruno Duyé
committed
# https://git.nomics.world/dbnomics-fetchers/management/issues/435
return None
children = list(
filter(
None,
(
toc_to_category_tree(child_element, toc_dataset_json_stub_by_code)
for child_element in xml_element.iterfind("{*}children/*")
),
)
)
return (
without_falsy_values(
{
"code": code,
"name": xml_element.findtext("{*}title[@language='en']"),
"children": children,
}
)
if children
else None
)
elif xml_element_tag == "leaf" and xml_element.attrib["type"] in (
"dataset",
"table",
):
dataset_code = xml_element.findtext("{*}code")
dataset_name = xml_element.findtext("{*}title[@language='en']")
if dataset_code not in toc_dataset_json_stub_by_code:
toc_dataset_json_stub_by_code[dataset_code] = {
"code": dataset_code,
"name": dataset_name,
"description": xml_element.findtext("{*}shortDescription[@language='en']") or None,
"doc_href": xml_element.findtext("{*}metadata[@format='html']") or None,
}
return {
"code": dataset_code,
"name": dataset_name,
}
else:
log.warning(
"Unexpected node type: {!r}, type {!r} (code {!r})".format(
xml_element_tag,
xml_element.attrib["type"],
xml_element.findtext("{*}code"),
)
)
return None
datasets_from_env = os.getenv(DATASETS_ENV_VAR)
if datasets_from_env:
datasets_from_env = datasets_from_env.split(",")
parser.add_argument(
"source_dir",
type=Path,
help="path of source directory containing Eurostat series in source format",
)
parser.add_argument(
"target_dir",
type=Path,
help="path of target directory containing datasets & " "series in DBnomics JSON and TSV formats",
)
parser.add_argument(
"--datasets",
nargs="+",
metavar="DATASET_CODE",
default=datasets_from_env,
help="convert only the given datasets (datasets codes, space separated)",
)
parser.add_argument("--log", default="INFO", help="level of logging messages")
parser.add_argument("--resume", action="store_true", help="do not process already written datasets")
parser.add_argument("--start-from", metavar="DATASET_CODE", help="start indexing from dataset code")
if not args.source_dir.is_dir():
parser.error("Could not find directory {!r}".format(str(args.source_dir)))
if not args.target_dir.is_dir():
parser.error("Could not find directory {!r}".format(str(args.target_dir)))
numeric_level = getattr(logging, args.log.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError("Invalid log level: {}".format(args.log))
logging.basicConfig(format="%(levelname)s:%(message)s", level=numeric_level)
log.info("Command-line arguments: %r", args)
write_json_file(args.target_dir / "provider.json", provider_json)
# Parse "table_of_contents", abbreviated "toc".
toc_element = etree.parse(str(args.source_dir / "table_of_contents.xml")).getroot()
# Walk recursively table_of_contents.xml and return category_tree_json.
# Side-effects: fill toc_dataset_json_stub_by_code.
toc_dataset_json_stub_by_code = {}
category_tree_json = toc_to_category_tree(toc_element, toc_dataset_json_stub_by_code)
if category_tree_json:
write_json_file(args.target_dir / "category_tree.json", category_tree_json)
# Build list of datasets codes to convert
datasets_codes_to_convert = set()
for dataset_code in sorted(toc_dataset_json_stub_by_code):
if args.datasets and dataset_code not in args.datasets:
log.debug(
"Skipping dataset %r because it is not mentioned by --datasets option",
dataset_code,
)
if args.start_from is not None and dataset_code < args.start_from:
log.debug("Skipping dataset %r because of --start-from option", dataset_code)
continue
source_dataset_dir = args.source_dir / datasets_dir_name / dataset_code
if not source_dataset_dir.is_dir():
log.error(
"Skipping dataset %s because source directory %s is missing",
dataset_code,
str(source_dataset_dir),
)
sdmx_file = source_dataset_dir / "{}.sdmx.xml".format(dataset_code)
if not sdmx_file.is_file():
log.error(
"Skipping dataset %s because SDMX file %s is missing",
dataset_code,
str(sdmx_file),
)
dataset_dir = args.target_dir / dataset_code
if args.resume and dataset_dir.is_dir():
log.debug(
"Skipping dataset %r because it already exists (due to --resume option)",
dataset_code,
)
datasets_codes_to_convert.add(dataset_code)
log.info("Converting %d datasets...", len(datasets_codes_to_convert))
# Convert SDMX files. Side-effect: write files for each dataset.
converted_datasets_codes = set()
for index, dataset_code in enumerate(sorted(datasets_codes_to_convert), start=1):
if dataset_code in converted_datasets_codes:
log.debug("Skipping dataset %r because it was already converted", dataset_code)
continue
source_dataset_dir = args.source_dir / datasets_dir_name / dataset_code
sdmx_file = source_dataset_dir / "{}.sdmx.xml".format(dataset_code)
log.info(
"Converting SDMX source file %d/%d %s (%s)",
index,
len(datasets_codes_to_convert),
sdmx_file,
humanize.naturalsize(sdmx_file.stat().st_size, gnu=True),
)
dataset_dir = args.target_dir / dataset_code
dataset_dir.mkdir(exist_ok=True)
dataset_json_stub = toc_dataset_json_stub_by_code[dataset_code]
convert_sdmx_file(dataset_json_stub, sdmx_file, dataset_dir)
converted_datasets_codes.add(dataset_code)
def write_json_file(path, data):
with path.open("w") as f:
json.dump(data, f, ensure_ascii=False, indent=2, sort_keys=True)