Commit f916131d authored by Pierre Dittgen's avatar Pierre Dittgen
Browse files

Generate iip dataset

parent b1026489
__pycache__
\ No newline at end of file
#!/usr/bin/env python3
# meti-fetcher -- Fetch series from Meti Japan macro economic database
# By: Pierre Dittgen <pierre.dittgen@cepremap.org>
#
# Copyright (C) 2018 Cepremap
# https://git.nomics.world/dbnomics-fetchers/meti-fetcher
#
# meti-fetcher is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# meti-fetcher is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Converter for Meti Japan provider
http://www.meti.go.jp
"""
import argparse
import logging
import os
import convert_util as cu
LOG = logging.Logger('meti fetcher')
DATA_MODEL_VERSION = '0.7.8'
DATAPACKAGE_JSON = {
"dbnomics": {
"data_model_version": DATA_MODEL_VERSION
}
}
PROVIDER_JSON = dict(
code='METI',
name='Ministry of Economy, Trade and Industry',
region='JP',
terms_of_use='http://www.meti.go.jp/english/other/terms_of_use.html',
website='http://www.meti.go.jp/english/'
)
FREQ_LABEL_MAP = {
'A': 'annual',
'Q': 'quarterly',
'M': 'monthly',
}
PERIOD_CODE_TO_LABEL = {
'FY': 'annual, fiscal year',
'CY': 'annual, civilian year',
'Q': 'quarterly',
'M': 'monthly',
}
CONCEPT_TO_CODE_MAP = {
"industrial production": 'IP',
"producer's shipments": 'PS',
"producer's inventory of finished goods": "PIF",
"producer's inventory ratio of finished goods": "PIR",
"producer's inventory ratio of finished goods (average)": "PIRA",
}
def csv_filename_to_id(filename):
""" Return 'gom2e' from b2010_gom2e.csv """
return filename[:-4]
def generate_tsv_from_csv_data_line(csv_code, csv_name, cols, ds_dir):
""" Parse data, generates time series dans return series info list """
# TODO
return []
def compute_freq_and_code(period_value):
""" 2017CY -> ('A', 'CY')
2017FY -> ('A', 'FY')
2017Q1 -> ('Q', 'Q')
201701 -> ('M', 'M')
"""
assert len(period_value) == 6, period_value
suff = period_value[4:]
if suff in ('CY', 'FY'):
return ('A', suff)
elif suff[0] == 'Q':
return ('Q', 'Q')
elif suff[0] in ('0', '1'):
return ('M', 'M')
cu.die("What's the suffix? [{}]".format(suff))
return None
def norm_period_values(period_values, freq):
"""
2007CY, ... 2017CY -> 2007, ... 2017
2007FY, ... 2017FY -> 2007, ... 2017
2007Q1, ... 2017Q4 -> 2007-Q1, ... 2017-Q4
"""
if freq == 'A':
return [p[:4] for p in period_values]
elif freq in ('Q', 'M'):
return ['{}-{}'.format(p[:4], p[4:]) for p in period_values]
cu.die('Unknown freq: [{}]'.format(freq))
class CSVLineHelper:
""" Handy class that helps parsing CSV time series """
def __init__(self, csv_code, csv_name, ds_dir, header_cols):
""" Initializes instance computing period_infos """
self.csv_code = csv_code
self.csv_name = csv_name
self.ds_dir = ds_dir
assert header_cols[0] == 'Item_Number'
period_values = []
current_period_values = []
start = 0
# Cuts period list into sublist
# Period list are separated by blank ('') values
for col_id, col_val in enumerate(header_cols[3:]):
# separator found
if col_val == '':
if current_period_values:
period_values.append((start, col_id, current_period_values))
current_period_values = []
start = col_id+1
else:
current_period_values.append(col_val)
if current_period_values:
period_values.append((start, start + len(current_period_values), current_period_values))
self.period_infos = []
code_set = set()
for p in period_values:
freq, code = compute_freq_and_code(p[2][0])
assert not code in code_set, "Code [{}] already found".format(code)
p_info = {
'col_interval': (p[0]+3, p[1]+3),
'norm_period_values': norm_period_values(p[2], freq),
'freq': freq,
'code': code,
}
self.period_infos.append(p_info)
code_set.add(code)
def generate_tsv_from_csv_row(self, cols):
""" Generates all TSV from a CSV line """
ts_infos = []
row_code = cols[0]
row_name = cols[1].strip()
for period_info in self.period_infos:
code = period_info['code']
ts_code = '{}.{}.{}'.format(self.csv_code, row_code, code)
ts_name = '{} - {} ({})'.format(self.csv_name, row_name, PERIOD_CODE_TO_LABEL[code])
period_values = period_info['norm_period_values']
ci = period_info['col_interval']
obs_values = cols[ci[0]:ci[1]]
assert len(period_values) == len(obs_values)
# Write TSV file
tsv_filepath = os.path.join(self.ds_dir, '{}.tsv'.format(ts_code))
with open(tsv_filepath, mode='w', encoding='utf-8') as tsv_fd:
tsv_fd.write('PERIOD\tVALUE\n')
for p_val, obs_val in zip(period_values, obs_values):
tsv_fd.write('{}\t{}\n'.format(p_val, obs_val))
# Append to ts_infos
ts_infos.append((ts_code, ts_name, {'frequency': period_info['freq']}))
return ts_infos
def extract_concept(csv_name):
""" extract 'IP' (Industrial production) from
'Seasonally adjusted Index by Industry : Industrial Production (2010=100.0)' """
idx = csv_name.find(':')
assert idx != -1, "No «:» found in [{}]".format(csv_name)
# Extracts string after ':'
chunk = csv_name[idx+1:]
# Extracts string before '(' if present
if '(' in chunk:
chunk = chunk[:chunk.find('(')]
chunk = chunk.strip().lower()
return CONCEPT_TO_CODE_MAP[chunk]
def extract_timeseries_from_csv(csv_code, csv_filepath, ds_dir, with_concept=False):
"""
Extracts time series from csv_filepath
Generates tsv files
Returns time series info to write dataset.json
"""
series_info = []
with open(csv_filepath, mode='r', encoding='ascii') as csv_fd:
csv_name = None
in_data = False
csv_lh = None
# Reads input CSV line by line
for line in csv_fd.readlines():
# print('Line #{}'.format(idx))
cols = line.strip().split(',')
# Most frequently use case
if in_data:
series_info.extend(csv_lh.generate_tsv_from_csv_row(cols))
continue
# First line: get csv name
if csv_name is None:
csv_name = cols[0]
continue
# Header
if not in_data and cols[0] == 'Item_Number':
csv_lh = CSVLineHelper(csv_code, csv_name, ds_dir, cols)
in_data = True
continue
# Adds dimensions values for all ts in the same CSV
_series_info = []
so_val = csv_code[7].upper()
#print('csv_code = [{}], seasonal_adjustment = [{}]'.format(csv_code, so_val))
concept_val = extract_concept(csv_name) if with_concept else None
for si in series_info:
dim_dict = si[2]
dim_dict['seasonal_adjustment'] = so_val
if with_concept:
dim_dict['concept'] = concept_val
_series_info.append((si[0], si[1], dim_dict))
return _series_info
def generate_dataset(ds_code, source_dir, ds_name, target_dir, with_concept=False):
"""
creates dataset_dir
generates time series tsv files
generates dataset.json
"""
ds_dir = os.path.join(target_dir, ds_code)
if not os.path.exists(ds_dir):
os.mkdir(ds_dir)
LOG.info('Working on %s dataset', ds_code)
series_info = []
for filename in sorted(os.listdir(source_dir)):
if not cu.ends_with(filename, '.csv'):
continue
csv_filepath = os.path.join(source_dir, filename)
csv_code = csv_filename_to_id(filename)
series_info.extend(extract_timeseries_from_csv(csv_code, csv_filepath, ds_dir, with_concept))
datasetjson_filepath = os.path.join(ds_dir, 'dataset.json')
write_dataset_json(datasetjson_filepath, ds_code, ds_name, series_info, with_concept)
def write_dataset_json(json_filepath, ds_code, ds_name, series_info, with_concept=False):
""" Writes dataset.json """
dataset_data = {
'code': ds_code,
'name': ds_name,
'dimensions_labels': {
'frequency': 'Frequency',
'seasonal_adjustment': 'Seasonal adjustment',
},
'dimensions_values_labels': {
'frequency': {
'A': 'Annual',
'Q': 'Quarterly',
'M': 'Monthly',
},
'seasonal_adjustment': {
'S': 'Seasonaly adjusted',
'O': 'Original',
}
},
'dimensions_code_order': [
'frequency', 'seasonal_adjustment',
],
'series': []
}
if with_concept:
dataset_data['dimensions_labels']['concept'] = 'Concept'
dataset_data['dimensions_values_labels']['concept'] = {
'IP': 'industrial production',
'PS': "producer's shipments",
'PIF': "producer's inventory of finished goods",
'PIR': "producer's inventory ratio of finished goods",
'PIRA': "producer's inventory ratio of finished goods (average)",
}
dataset_data['dimensions_code_order'].append('concept')
for si in series_info:
series_dict = dict(
code=si[0],
name=si[1],
dimensions=si[2]
)
dataset_data['series'].append(series_dict)
cu.write_json_file(json_filepath, dataset_data)
def main():
""" Converts downloaded CSV files into datasets and time series """
parser = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument(
'source_dir',
help='path of source directory containing downloaded DeStatis data',
)
parser.add_argument(
'target_dir',
help='path of target directory containing JSON data',
)
args = parser.parse_args()
source_dir = args.source_dir
assert os.path.exists(source_dir)
assert os.access(source_dir, os.R_OK)
target_dir = args.target_dir
assert os.path.exists(target_dir)
assert os.access(target_dir, os.W_OK)
# Standard metadata
cu.write_json_file(os.path.join(target_dir, 'provider.json'), PROVIDER_JSON)
cu.write_json_file(os.path.join(target_dir, 'datapackage.json'), DATAPACKAGE_JSON)
# Works on file
generate_dataset('iip', os.path.join(source_dir, 'b2010_g1e'),
'Indices of Industrial Production by Industry (2010 = 100.0)', target_dir, with_concept=True)
# TODO: manage non ASCII characters
# generate_dataset('sanzi', os.path.join(source_dir, 'b2010_ke'),
# 'Indices of Tertiary Industry Activity by industry (2010 = 100.0)', target_dir)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
"""
Handy functions to ease convert work in dbnomics
"""
import json
def ends_with(text, subtext):
""" true if subtext found at the end of text """
stlen = len(subtext)
if stlen > len(text):
return False
return text[-stlen:] == subtext
def write_json_file(file_path, data):
""" Writes data the json way to file_path """
with open(file_path, 'w', encoding='utf-8') as json_fd:
json.dump(data, json_fd, ensure_ascii=False, indent=2, sort_keys=True)
def write_category_json_file(file_path, categories_info, dataset_info):
""" Write category_tree.json from declarative data """
category_data = []
for cat in categories_info:
children = []
for code in cat['_ds_codes']:
if code in dataset_info:
children.append(dict(
code=dataset_info[code]['code'],
name=dataset_info[code]['name']
))
# Adds category only if it contains datasets
if children:
cat_info = cat.copy()
del cat_info['_ds_codes']
cat_info['children'] = children
category_data.append(cat_info)
write_json_file(file_path, category_data)
def write_dataset_json_file(file_path, dataset_info, datasets_info):
""" Write dataset.json from declarative data """
def compute_dim_values_labels(dim_info):
""" Computes dimensions_values_labels dict """
data = dict()
for code, _, values in dim_info:
# print('Values = {}'.format(values))
data[code] = dict([(v[0], v[1]) for v in values])
return data
def compute_series_info(series_info):
""" Computes series info """
data = dict(
code=series_info[0],
name=series_info[1],
)
if series_info[2]:
data['dimensions'] = series_info[2]
if len(series_info) == 4:
data['notes'] = series_info[3]
return data
ds_code = dataset_info['_code']
dim_info = dataset_info['_dimensions_info']
dataset_data = dict(
code=datasets_info[ds_code]['code'],
name=datasets_info[ds_code]['name'],
dimensions_labels=dict((di[0], di[1]) for di in dim_info),
dimensions_values_labels=compute_dim_values_labels(dim_info),
series=sorted([compute_series_info(s) for s in dataset_info['_series_info']],
key=lambda s: s['code'])
)
if '_updated_date' in dataset_info:
dataset_data['updated_at'] = dataset_info['_updated_date']
if 'notes' in datasets_info[ds_code]:
dataset_data['notes'] = datasets_info[ds_code]['notes']
write_json_file(file_path, dataset_data)
def die(msg):
import sys
sys.stderr.write('{}\n'.format(msg))
sys.exit(1)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment