Skip to content
Snippets Groups Projects

WIP: #45 - WTO: Write datasets in JSON repo for "annually" category

Closed Bruno Duyé requested to merge dev into master
All threads resolved!
1 file
+ 2
5
Compare changes
  • Side-by-side
  • Inline
+ 185
107
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# wto-fetcher -- Fetch series from wto (The World Trade Organization)
@@ -37,13 +38,13 @@ import logging
import os
import re
import sys
from collections import defaultdict
from collections import defaultdict, OrderedDict
import xlrd
from docopt import docopt
from slugify import slugify
from dbnomics_converters.base import verified_value, to_float
from dbnomics_converters.base import assert_no_error, to_float
from dbnomics_converters.datasets import validate_dataset
from dbnomics_converters.providers import validate_provider
from dbnomics_converters.series import validate_series
@@ -54,13 +55,22 @@ log = logging.getLogger(__name__)
PROVIDER = dict(
name='WTO',
long_name='World Trade Organization',
code='WTO',
name='World Trade Organization',
region='World',
website='https://www.wto.org/',
terms_of_use='https://www.wto.org/english/tratop_e/trips_e/intel2_e.htm'
)
FLOWS_CODES = {
# As given in fetcher description
"Exports": "X",
"Imports": "M",
"Re-exports": "RX", # to be specified
"Retained imports": "RI", # to be specified
"Domestic exports": "DE" # to be specified
}
# https://git.nomics.world/dbnomics/widukind-projects/blob/master/docs/fetcher_description/wto.rst
CATEGORIES = [
dict(
@@ -74,8 +84,17 @@ CATEGORIES = [
code='a1',
filename='services_annual_dataset.csv',
encoding='utf-8-sig',
# Do not use Indicator_Description, but Indicator_Code
dimensions=['Reporter_Description', 'Flow_Description', 'Partner_Description', 'Indicator_Code', 'Source_Description']
dimensions_names_and_codes_colnames=OrderedDict([
# dimensions labels and codes, and where to find values; in order to be used for series names (and series dirs names)
# dimension_label => (dimension_value_colname, dimension_code_colname).
# None for dimension_code_colname means dimension_value_code = slug(dimension_value_label)
['Flow', ('Flow_Description', None)],
# Do not use Indicator_Description, but Indicator_Code
['Indicator', ('Indicator_Code', 'Indicator_Code')],
['Reporter', ('Reporter_Description', 'Reporter_Code')],
['Partner', ('Partner_Description', 'Partner_Code')],
['Source', ('Source_Description', None)],
])
),
dict(
type='dataset',
@@ -83,7 +102,12 @@ CATEGORIES = [
code='a2',
filename='merchandise_values_annual_dataset.csv',
encoding='latin-1',
dimensions=['Reporter_description', 'Flow_Description', 'Partner_description', 'Indicator_description']
dimensions_names_and_codes_colnames=OrderedDict([
['Reporter', ('Reporter_description', 'Reporter_code')],
['Flow', ('Flow_Description', None)],
['Partner', ('Partner_description', 'Partner_code')],
['Indicator', ('Indicator_description', 'Indicator_code')]
])
),
dict(
type='dataset',
@@ -91,35 +115,40 @@ CATEGORIES = [
code='a3',
filename='merchandise_indices_annual_dataset.csv',
encoding='latin-1',
dimensions=['Reporter_description', 'Flow_Description', 'Partner_description', 'Unit']
dimensions_names_and_codes_colnames=OrderedDict([
['Reporter', ('Reporter_description', 'Reporter_code')],
['Flow', ('Flow_Description', None)],
['Partner', ('Partner_description', 'Partner_code')],
['Unit', ('Unit', None)]
])
),
]
# ),
# dict(
# type='category',
# name='Quarterly',
# code='quater',
# content=[
# dict(
# type='dataset',
# name='Quarterly merchandise trade value',
# code='q1',
# filename='quarterly_trade_e.xls',
# ),
# dict(
# type='dataset',
# name='Quarterly merchandise trade volume',
# code='q2',
# filename='quarterly_merch_trade_volume_e.xls',
# ),
# dict(
# type='dataset',
# name='Quarterly trade in commercial services value',
# code='q3',
# filename='qrtly_comm_serv_web_e.xls',
# ),
# ]
),
# dict(
# type='category',
# name='Quarterly',
# code='quater',
# content=[
# dict(
# type='dataset',
# name='Quarterly merchandise trade value',
# code='q1',
# filename='quarterly_trade_e.xls',
# ),
# dict(
# type='dataset',
# name='Quarterly merchandise trade volume',
# code='q2',
# filename='quarterly_merch_trade_volume_e.xls',
# ),
# dict(
# type='dataset',
# name='Quarterly trade in commercial services value',
# code='q3',
# filename='qrtly_comm_serv_web_e.xls',
# ),
# ]
# ),
dict(
type='category',
name='Monthly',
@@ -184,7 +213,7 @@ def main():
# Create provider.json
provider_json_data = PROVIDER
provider_json_data['categories'] = [category['name'] for category in CATEGORIES]
provider_json_data = verified_value(validate_provider(provider_json_data, format='json'))
provider_json_data = assert_no_error(validate_provider(provider_json_data, format='json'))
write_json_file(os.path.join(target_dir, 'provider.json'), provider_json_data)
for category in CATEGORIES:
@@ -213,7 +242,7 @@ def create_directories_subtree(category_or_dataset, parent_category_path):
],
'category_code': category['code']
}
# category_json_data = verified_value(validate_category(category_json_data, format='json', used_categories_code=used_categories_code))
# category_json_data = assert_no_error(validate_category(category_json_data, format='json', used_categories_code=used_categories_code))
write_json_file(os.path.join(category_path, element_dirname, 'category.json'), category_json_data)
element_type = category_or_dataset['type']
@@ -233,7 +262,7 @@ def create_directories_subtree(category_or_dataset, parent_category_path):
elif element['type'] == 'dataset':
create_dataset_and_series(element, element_path)
else:
raise "Unexpted type: {}".format(element['type'])
raise "Unexpected type: {}".format(element['type'])
def create_dataset_and_series(dataset, parent_category_path):
@@ -252,7 +281,7 @@ def create_dataset_and_series(dataset, parent_category_path):
elif filename.endswith('.xls'):
create_dataset_and_series_from_xls(dataset, dataset_path)
else:
raise ValueError("Unexepted file format: {} for dataset \"{}\"".format(filename, dataset['name']))
raise ValueError("Unexpected file format: {} for dataset {!r}".format(filename, dataset['name']))
def create_dataset_and_series_from_csv(dataset, dataset_path):
@@ -270,72 +299,95 @@ def create_dataset_and_series_from_csv(dataset, dataset_path):
return slug
csv_filename = os.path.join(source_dir, dataset['filename'])
dimension_labels = dataset['dimensions']
dimension_codes = list(map(cached_slugify, dimension_labels))
# init data struct that will contain all dataset in memory (good idea ?)
# ex: {
# 'afghanistan-exports-world-unit-value-index-prev-year-100':
# Collect all dimensions keys (combinations of dimensions codes) on dimensions_keys_set
# (using a set ensures that each combination will be stored once)
# ex: dimensions_keys_set = { ('Estonia', 'Egypt', 'Transport', 'Exports'), ('Estonia', 'Egypt', 'Travel', 'Exports'), ... }
dimensions_keys_set = set()
# Init data struct that will contain all dataset observations in memory.
# ex: observations = {
# ('Imports', 'SF41', 'Italy', 'Germany', 'Eurostat'): { <= this is a dimension_key
# 'unit': 'Volume index - Previous year = 100',
# 'observations': [(2001,111.123), (2002, 97.506)]
# 'observations': [(2001,111.123), (2002, 97.506), ...]
# },
# ...
# }
observations = dict()
# Ex: found_dimensions_values_codes = {
# 'indicator': {'SK22', 'SL', 'SDB1', 'SJ323', 'SC31', ...}
# 'source': {'imf', 'eurostat', 'oecd'}
# 'partner': {'DK', 'CL', 'E28', 'GR', 'CA', ...}
# 'reporter': {'RO', 'AT', 'BG', 'IL', 'LT', ...}
# 'flow': {'M', 'X'}
# }
found_dimensions_values_codes = defaultdict(set)
# dimensions_values_labels will store the label of each dimension_value code, grouped by dimension_code to avoid "collisions"
# Ex: dimensions_values_labels = {
# 'reporter': {'AT': 'Austria', 'BG': 'Bulgaria', 'BM': 'Bermuda', 'CZ': 'Czech Republic', ...}
# 'source': {'eurostat': 'Eurostat', 'imf': 'IMF', 'oecd': 'OECD'}
# 'indicator': {'SB': 'SB', 'SC': 'SC', 'SC1': 'SC1', ...}
# 'flow': {'M': 'Imports', 'X': 'Exports'}
# }
dimensions_values_labels = defaultdict(dict)
# Lets create the "reverse" dict of dimensions_values_labels for quick find dimensions_value_codes from dimensions_key
# Ex: dimensions_values_codes = {
# 'reporter': {'AT': 'Austria', 'BG': 'Bulgaria', 'BM': 'Bermuda', 'CZ': 'Czech Republic', ...}
# 'source': {'eurostat': 'Eurostat', 'imf': 'IMF', 'oecd': 'OECD'}
# 'flow': {'M': 'Imports', 'X': 'Exports'}
# ...
# }
series_data = dict()
# Using a set ensures that each combination of dimensions will be unique.
# ex: {('Angola', 'Imports', 'World', 'Volume index - Previous year = 100'),
# ('France', 'Imports', 'World', 'Volume index - Previous year = 100'),
# ... }
dimension_value_labels_set = set()
dimension_value_labels_by_code = defaultdict(set)
dimensions_values_codes = defaultdict(dict)
with open(csv_filename, encoding=dataset['encoding']) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
dimension_value_labels = tuple(
row[dimension_label]
for dimension_label in dimension_labels
)
dimension_value_labels_set.add(dimension_value_labels)
for dimension_label in dimension_labels:
dimension_value_label = row[dimension_label]
# dimensions "key" is the combination of dimension_codes of row. ex: ('AT', 'BG', 'S', 'M')
dimensions_key = []
for dimension_label, colnames in dataset['dimensions_names_and_codes_colnames'].items():
dimension_value_label = row[colnames[0]]
dimension_code = cached_slugify(dimension_label)
dimension_value_labels_by_code[dimension_code].add(dimension_value_label)
# Determining dimension_value_code
if dimension_code == 'flow':
# We don't use WTO codes for flow, but codes given in fetcher description (link at the top of file)
dimension_value_code = FLOWS_CODES[dimension_value_label]
else:
if colnames[1]:
dimension_value_code = row[colnames[1]]
else:
# No column for dimension_value_code => code is generated from label
dimension_value_code = cached_slugify(dimension_value_label)
found_dimensions_values_codes[dimension_code].add(dimension_value_code)
# Store this code to the set of dimensions_values labels, and its reverse storage: dimensions_values_codes
dimensions_values_labels[dimension_code][dimension_value_code] = dimension_value_label
dimensions_values_codes[dimension_code][dimension_value_label] = dimension_value_code
# Add the label to the key (composed of labels for more human-readeable values)
dimensions_key.append(dimension_value_label)
# Add values to series_data
if not dimension_value_labels in series_data.keys():
series_data[dimension_value_labels] = {'unit': row['Unit'], 'observations': []}
series_data[dimension_value_labels]['observations'].append([row['Year'], row['Value']])
# prepare data to be written in dataset.json
dataset_json_data = {
'name': dataset['name'],
'dataset_code': dataset['code'],
'dimension_keys': dimension_codes,
'concepts': dict(zip(
dimension_codes,
list(map(
lambda label: label.replace("_", " "),
dimension_labels,
))
)),
'codelists': {
dimension_code: {
cached_slugify(dimension_value_label): dimension_value_label
for dimension_value_label in dimension_value_labels
}
for dimension_code, dimension_value_labels in dimension_value_labels_by_code.items()
}
}
dimensions_key = tuple(dimensions_key)
dimensions_keys_set.add(dimensions_key)
if not dimensions_key in observations.keys():
observations[dimensions_key] = {'unit': row['Unit'], 'observations': []}
assert row['Unit'].strip() == observations[dimensions_key]['unit'], \
"Unhandled situation: the file {} contains more than one value for 'Unit' !".format(dataset['filename'])
# Store observation
observations[dimensions_key]['observations'].append([row['Year'], row['Value']])
# Create series directories, each including series.json and observations.tsv files.
dimensions_labels = dataset['dimensions_names_and_codes_colnames'].keys()
dimensions_codes = list(map(cached_slugify, dimensions_labels))
series_directories_names = set()
for dimension_value_labels in dimension_value_labels_set:
dimensions_values_codes = list(map(cached_slugify, dimension_value_labels))
for dimensions_key in dimensions_keys_set:
dimensions_key_labels_by_dimension_code = OrderedDict(zip(dimensions_codes, dimensions_key))
dimensions_key_values_codes = [
dimensions_values_codes[dimension_code][label]
for dimension_code, label in dimensions_key_labels_by_dimension_code.items()
]
series = {
'key': '-'.join(dimensions_values_codes),
'dimensions': dict(zip(dimension_codes, dimensions_values_codes)),
'code': '-'.join(dimensions_key_values_codes),
'dimensions': dict(zip(dimensions_codes, dimensions_key_values_codes)),
}
# Create series directory
series_directory_name = series["key"]
series_directory_name = '-'.join(map(make_dirname_compliant, dimensions_key))
series_directories_names.add(series_directory_name)
series_dir_path = os.path.join(dataset_path, series_directory_name)
os.mkdir(series_dir_path)
@@ -343,16 +395,34 @@ def create_dataset_and_series_from_csv(dataset, dataset_path):
# Create observations.tsv
write_series_tsv_file(
series_dir_path,
series_data[dimension_value_labels]['observations'],
series_data[dimension_value_labels]['unit'],
observations[dimensions_key]['observations'],
observations[dimensions_key]['unit'],
)
# Create series.json
series = verified_value(validate_series(series, format='json'))
series = assert_no_error(validate_series(series, format='json'))
write_json_file(os.path.join(series_dir_path, 'series.json'), series)
# prepare data to be written in dataset.json
dataset_json_data = {
'name': dataset['name'],
'code': dataset['code'],
'dimensions_values_labels': {
dimension_code: {
dimension_value_code: dimensions_values_labels[dimension_code][dimension_value_code]
for dimension_value_code in dimensions_values_labels_codes
}
for dimension_code, dimensions_values_labels_codes in found_dimensions_values_codes.items()
},
'dimensions_labels': {
cached_slugify(dimension_label): dimension_label
for dimension_label in dimensions_labels
},
'dimensions_codes_order': dimensions_codes,
}
dataset_json_data["series"] = list(sorted(series_directories_names))
dataset_json_data = verified_value(validate_dataset(dataset_json_data, format='json'))
dataset_json_data = assert_no_error(validate_dataset(dataset_json_data, format='json',
skip_series_duplicates_check=True))
write_json_file(os.path.join(dataset_path, 'dataset.json'), dataset_json_data)
@@ -380,9 +450,11 @@ def create_dataset_and_series_from_xls(dataset, dataset_path):
# Ensure all elements match regexp and there's no "holes"
for col_num, value in enumerate(periods_row):
row_coords = (row_num, first_non_empty_cell_col_number + col_num) # coords in file
assert value, "Unexepted empty cell at position {} ({}) in {} sheet".format(row_coords, xlrd.cellname(*row_coords), sheet.name)
assert value, "Unexpected empty cell at position {} ({}) in {} sheet".format(
row_coords, xlrd.cellname(*row_coords), sheet.name)
assert re.match(period_regexp, value), \
"Unexepted period \"{}\" at position {} ({}) in {} sheet".format(value, row_coords, xlrd.cellname(*row_coords), sheet.name)
"Unexpected period {!r} at position {} ({}) in {} sheet".format(
value, row_coords, xlrd.cellname(*row_coords), sheet.name)
return first_non_empty_cell_col_number, periods_row
def write_series(series, observations, unit, dataset_path, series_directory_name):
@@ -392,7 +464,7 @@ def create_dataset_and_series_from_xls(dataset, dataset_path):
series_dir_path = os.path.join(dataset_path, series_directory_name)
os.mkdir(series_dir_path)
# Create series.json
series = verified_value(validate_series(series, format='json'))
series = assert_no_error(validate_series(series, format='json'))
write_json_file(os.path.join(series_dir_path, 'series.json'), series)
# Write series observations
write_series_tsv_file(series_dir_path, observations, unit)
@@ -416,26 +488,27 @@ def create_dataset_and_series_from_xls(dataset, dataset_path):
continue
# Stop parsing if dataset defines a "stop_text" and this text is found in row
if dataset.get('stop_text') and dataset.get('stop_text') in row:
print("Info: reached stop text \"{}\" at line {}".format(dataset.get('stop_text'), row_num))
print("Info: reached stop text {!r} at line {}".format(dataset.get('stop_text'), row_num))
break
# Parse data rows
region_code = row[xls_constants['regions_codes_col_num']].strip()
region_label = row[xls_constants['regions_labels_col_num']].strip()
# Pass through rows without data
if set(row[first_value_col_num:]) == {''}:
print("Info: row {} - ignoring {}".format(row_num, "\"{}\"".format(region_label) if region_label else "line {}".format(row_num)))
print("Info: row {} - ignoring {}".format(row_num, "{!r}".format(region_label)
if region_label else "line {}".format(row_num)))
continue
assert region_label, "No region label found at row {}".format(row_num)
# Try to convert data to float and check that there is at least one valid "data" (float or "...") in row
converted_values = []
for i, value in enumerate(row[first_value_col_num:]):
if value in ['...', '', '']:
converted_values.append("NA") # TODO: Correct ?
if value in ['...', u'', '']:
converted_values.append(value)
else:
v_, error = to_float(value)
if error:
row_coords = (row_num, first_value_col_num + i)
assert not error, "Unexpected value \"{}\" at position {} ({}) in sheet \"{}\"".format(
assert not error, "Unexpected value {!r} at position {} ({}) in sheet {!r}".format(
value, row_coords, xlrd.cellname(*row_coords), sheet.name)
else:
converted_values.append(str(value))
@@ -447,13 +520,14 @@ def create_dataset_and_series_from_xls(dataset, dataset_path):
generated_region_code = region_code or slugify(region_label)
regions_codes.add((region_label, generated_region_code))
series = dict(
key=series_directory_name,
code=series_directory_name,
dimensions=dict(
Flow=flow_code,
Region=generated_region_code
)
)
assert not series_directory_name in series_directories_names, "Double series name: {}".format(series_directory_name)
assert not series_directory_name in series_directories_names, "Double series name: {}".format(
series_directory_name)
series_directories_names.add(series_directory_name)
write_series(series, observations, xls_constants['unit'], dataset_path, series_directory_name)
@@ -461,9 +535,9 @@ def create_dataset_and_series_from_xls(dataset, dataset_path):
dimension_codes = [slugify(label) for label in xls_constants['dimensions_labels']]
dataset_json_data = {
'name': dataset['name'],
'dataset_code': dataset['code'],
'dimension_keys': dimension_codes,
'codelists': {
'code': dataset['code'],
'dimensions_codes_order': dimension_codes,
'dimensions_values_labels': {
'flow': {
label_and_code[1]: label_and_code[0]
for label_and_code in xls_constants['flow_codes_and_names_by_sheet_names'].values()
@@ -473,14 +547,14 @@ def create_dataset_and_series_from_xls(dataset, dataset_path):
for region in regions_codes
}
},
'concepts': dict(zip(
'dimensions_labels': dict(zip(
dimension_codes,
xls_constants['dimensions_labels'],
)),
}
# Finaly, write dataset.json
dataset_json_data["series"] = list(sorted(series_directories_names))
dataset_json_data = verified_value(validate_dataset(dataset_json_data, format='json'))
dataset_json_data = assert_no_error(validate_dataset(dataset_json_data, format='json'))
write_json_file(os.path.join(dataset_path, 'dataset.json'), dataset_json_data)
@@ -502,5 +576,9 @@ def write_json_file(file_path, data):
json.dump(data, file_, ensure_ascii=False, indent=2, sort_keys=True)
def make_dirname_compliant(label):
return label.replace(" ", "_").replace("'", '-')
if __name__ == '__main__':
sys.exit(main())