Commit 963b3e9f authored by Christophe Benz's avatar Christophe Benz
Browse files

Format files using "black"

parent b1768f7a
......@@ -35,35 +35,35 @@ log = logging.Logger(__name__)
PROVIDER_JSON = dict(
code='METI',
name='Ministry of Economy, Trade and Industry',
region='JP',
terms_of_use='http://www.meti.go.jp/english/other/terms_of_use.html',
website='http://www.meti.go.jp/english/'
code="METI",
name="Ministry of Economy, Trade and Industry",
region="JP",
terms_of_use="http://www.meti.go.jp/english/other/terms_of_use.html",
website="http://www.meti.go.jp/english/",
)
DATASETS_INFO = {
'iip': 'Indices of Industrial Production by Industry (2015 = 100.0)',
'sanzi': 'Indices of Tertiary Industry Activity by industry (2010 = 100.0)'
"iip": "Indices of Industrial Production by Industry (2015 = 100.0)",
"sanzi": "Indices of Tertiary Industry Activity by industry (2010 = 100.0)",
}
FREQ_LABEL_MAP = {
'A': 'annual',
'Q': 'quarterly',
'M': 'monthly',
"A": "annual",
"Q": "quarterly",
"M": "monthly",
}
PERIOD_CODE_TO_LABEL = {
'FY': 'annual, fiscal year',
'CY': 'annual, civilian year',
'Q': 'quarterly',
'M': 'monthly',
"FY": "annual, fiscal year",
"CY": "annual, civilian year",
"Q": "quarterly",
"M": "monthly",
}
CONCEPT_TO_CODE_MAP = {
"industrial production": 'IP',
"producer's shipments": 'PS',
"industrial production": "IP",
"producer's shipments": "PS",
"producer's inventory of finished goods": "PIF",
"producer's inventory ratio of finished goods": "PIR",
"producer's inventory ratio of finished goods (average)": "PIRA",
......@@ -84,12 +84,12 @@ def compute_freq_and_code(period_value):
"""
assert len(period_value) == 6, period_value
suff = period_value[4:]
if suff in ('CY', 'FY'):
return ('A', suff)
elif suff[0] == 'Q':
return ('Q', 'Q')
elif suff[0] in ('0', '1'):
return ('M', 'M')
if suff in ("CY", "FY"):
return ("A", suff)
elif suff[0] == "Q":
return ("Q", "Q")
elif suff[0] in ("0", "1"):
return ("M", "M")
cu.die("What's the suffix? [{}]".format(suff))
return None
......@@ -101,14 +101,14 @@ def norm_period_values(period_values, freq):
2007Q1, ... 2017Q4 -> 2007-Q1, ... 2017-Q4
"""
# sanitize period_values (trim 'p ' at the beginning of the period value)
period_values = [p.lstrip('p ') for p in period_values]
period_values = [p.lstrip("p ") for p in period_values]
if freq == 'A':
if freq == "A":
return [p[:4] for p in period_values]
elif freq in ('Q', 'M'):
return ['{}-{}'.format(p[:4], p[4:]) for p in period_values]
elif freq in ("Q", "M"):
return ["{}-{}".format(p[:4], p[4:]) for p in period_values]
cu.die('Unknown freq: [{}]'.format(freq))
cu.die("Unknown freq: [{}]".format(freq))
class CSVLineHelper:
......@@ -121,7 +121,7 @@ class CSVLineHelper:
self.csv_name = csv_name
self.ds_dir = ds_dir
assert header_cols[0] == 'Item_Number'
assert header_cols[0] == "Item_Number"
period_values = []
current_period_values = []
......@@ -131,16 +131,18 @@ class CSVLineHelper:
# Period list are separated by blank ('') values
for col_id, col_val in enumerate(header_cols[3:]):
# separator found
if col_val == '':
if col_val == "":
if current_period_values:
period_values.append((start, col_id, current_period_values))
current_period_values = []
start = col_id+1
start = col_id + 1
else:
current_period_values.append(col_val)
if current_period_values:
period_values.append((start, start + len(current_period_values), current_period_values))
period_values.append(
(start, start + len(current_period_values), current_period_values)
)
self.period_infos = []
code_set = set()
......@@ -148,18 +150,18 @@ class CSVLineHelper:
freq, code = compute_freq_and_code(p[2][0])
assert not code in code_set, "Code [{}] already found".format(code)
p_info = {
'col_interval': (p[0]+3, p[1]+3),
'obs_status': ['p' if p.startswith('p ') else '' for p in p[2]],
'norm_period_values': norm_period_values(p[2], freq),
'freq': freq,
'code': code,
"col_interval": (p[0] + 3, p[1] + 3),
"obs_status": ["p" if p.startswith("p ") else "" for p in p[2]],
"norm_period_values": norm_period_values(p[2], freq),
"freq": freq,
"code": code,
}
self.period_infos.append(p_info)
code_set.add(code)
@staticmethod
def norm_obs_values(obs_values):
return ['NA' if elt in ('', '-') else elt for elt in obs_values]
return ["NA" if elt in ("", "-") else elt for elt in obs_values]
def generate_tsv_from_csv_row(self, cols):
""" Generates all TSV from a CSV line """
......@@ -170,64 +172,68 @@ class CSVLineHelper:
row_name = cols[1].strip()
for period_info in self.period_infos:
code = period_info['code']
ts_code = '{}.{}.{}'.format(self.csv_code, row_code, code)
ts_name = '{} - {} ({})'.format(self.csv_name, row_name, PERIOD_CODE_TO_LABEL[code])
code = period_info["code"]
ts_code = "{}.{}.{}".format(self.csv_code, row_code, code)
ts_name = "{} - {} ({})".format(
self.csv_name, row_name, PERIOD_CODE_TO_LABEL[code]
)
period_values = period_info['norm_period_values']
ci = period_info['col_interval']
obs_values = CSVLineHelper.norm_obs_values(cols[ci[0]:ci[1]])
period_values = period_info["norm_period_values"]
ci = period_info["col_interval"]
obs_values = CSVLineHelper.norm_obs_values(cols[ci[0] : ci[1]])
assert len(period_values) == len(obs_values)
# Prepare data to be written
header = ['PERIOD', 'VALUE']
header = ["PERIOD", "VALUE"]
value_list = [period_values, obs_values]
if any([status != '' for status in period_info['obs_status']]):
header.append('OBS_STATUS')
value_list.append(period_info['obs_status'])
if any([status != "" for status in period_info["obs_status"]]):
header.append("OBS_STATUS")
value_list.append(period_info["obs_status"])
# Write TSV file
tsv_filepath = self.ds_dir / '{}.tsv'.format(ts_code)
with tsv_filepath.open('w', encoding='utf-8') as tsv_fd:
tsv_fd.write('\t'.join(header) + '\n')
tsv_filepath = self.ds_dir / "{}.tsv".format(ts_code)
with tsv_filepath.open("w", encoding="utf-8") as tsv_fd:
tsv_fd.write("\t".join(header) + "\n")
for t in zip(*value_list):
tsv_fd.write('\t'.join(t) + '\n')
tsv_fd.write("\t".join(t) + "\n")
# Append to ts_infos
ts_infos.append((ts_code, ts_name, {'frequency': period_info['freq']}))
ts_infos.append((ts_code, ts_name, {"frequency": period_info["freq"]}))
return ts_infos
def extract_concept(csv_name):
""" extract 'IP' (Industrial production) from
'Seasonally adjusted Index by Industry : Industrial Production (2010=100.0)' """
idx = csv_name.find(':')
idx = csv_name.find(":")
assert idx != -1, "No «:» found in [{}]".format(csv_name)
# Extracts string after ':'
chunk = csv_name[idx+1:]
chunk = csv_name[idx + 1 :]
# Extracts string before '(' if present
if '(' in chunk:
chunk = chunk[:chunk.find('(')]
if "(" in chunk:
chunk = chunk[: chunk.find("(")]
chunk = chunk.strip().lower()
return CONCEPT_TO_CODE_MAP[chunk]
def extract_timeseries_from_csv(csv_code, csv_filepath: Path, ds_dir: Path, with_concept=False):
def extract_timeseries_from_csv(
csv_code, csv_filepath: Path, ds_dir: Path, with_concept=False
):
"""
Extracts time series from csv_filepath
Generates tsv files
Returns time series info to write dataset.json
"""
series_info = []
with csv_filepath.open('rt', encoding='ascii') as csv_fd:
with csv_filepath.open("rt", encoding="ascii") as csv_fd:
csv_name = None
in_data = False
csv_lh = None
reader = csv.reader(csv_fd, delimiter=',', quotechar='"')
reader = csv.reader(csv_fd, delimiter=",", quotechar='"')
# Reads input CSV line by line
for cols in reader:
......@@ -243,7 +249,7 @@ def extract_timeseries_from_csv(csv_code, csv_filepath: Path, ds_dir: Path, with
continue
# Header
if not in_data and cols[0] == 'Item_Number':
if not in_data and cols[0] == "Item_Number":
csv_lh = CSVLineHelper(csv_code, csv_name, ds_dir, cols)
in_data = True
......@@ -256,80 +262,75 @@ def extract_timeseries_from_csv(csv_code, csv_filepath: Path, ds_dir: Path, with
concept_val = extract_concept(csv_name) if with_concept else None
for si in series_info:
dim_dict = si[2]
dim_dict['seasonal_adjustment'] = so_val
dim_dict["seasonal_adjustment"] = so_val
if with_concept:
dim_dict['concept'] = concept_val
dim_dict["concept"] = concept_val
_series_info.append((si[0], si[1], dim_dict))
return _series_info
def generate_dataset(ds_code, source_dir: Path, ds_name, target_dir: Path, with_concept=False):
def generate_dataset(
ds_code, source_dir: Path, ds_name, target_dir: Path, with_concept=False
):
"""
creates dataset_dir
generates time series tsv files
generates dataset.json
"""
log.info('Generating dataset [%s]', ds_code)
log.info("Generating dataset [%s]", ds_code)
ds_dir = target_dir / ds_code
if not ds_dir.exists():
ds_dir.mkdir()
log.info('Working on %s dataset', ds_code)
log.info("Working on %s dataset", ds_code)
series_info = []
for csv_filepath in sorted(source_dir.glob('*.csv')):
for csv_filepath in sorted(source_dir.glob("*.csv")):
csv_code = csv_filepath.stem
series_info.extend(extract_timeseries_from_csv(csv_code, csv_filepath, ds_dir, with_concept))
series_info.extend(
extract_timeseries_from_csv(csv_code, csv_filepath, ds_dir, with_concept)
)
datasetjson_filepath = ds_dir / 'dataset.json'
write_dataset_json(datasetjson_filepath, ds_code, ds_name, series_info, with_concept)
datasetjson_filepath = ds_dir / "dataset.json"
write_dataset_json(
datasetjson_filepath, ds_code, ds_name, series_info, with_concept
)
def write_dataset_json(json_filepath: Path, ds_code, ds_name, series_info, with_concept=False):
def write_dataset_json(
json_filepath: Path, ds_code, ds_name, series_info, with_concept=False
):
""" Writes dataset.json """
dataset_data = {
'code': ds_code,
'name': ds_name,
'dimensions_labels': {
'frequency': 'Frequency',
'seasonal_adjustment': 'Seasonal adjustment',
"code": ds_code,
"name": ds_name,
"dimensions_labels": {
"frequency": "Frequency",
"seasonal_adjustment": "Seasonal adjustment",
},
'dimensions_values_labels': {
'frequency': {
'A': 'Annual',
'Q': 'Quarterly',
'M': 'Monthly',
},
'seasonal_adjustment': {
'S': 'Seasonaly adjusted',
'O': 'Original',
}
"dimensions_values_labels": {
"frequency": {"A": "Annual", "Q": "Quarterly", "M": "Monthly",},
"seasonal_adjustment": {"S": "Seasonaly adjusted", "O": "Original",},
},
'dimensions_codes_order': [
'frequency', 'seasonal_adjustment',
],
'series': []
"dimensions_codes_order": ["frequency", "seasonal_adjustment",],
"series": [],
}
if with_concept:
dataset_data['dimensions_labels']['concept'] = 'Concept'
dataset_data['dimensions_values_labels']['concept'] = {
'IP': 'industrial production',
'PS': "producer's shipments",
'PIF': "producer's inventory of finished goods",
'PIR': "producer's inventory ratio of finished goods",
'PIRA': "producer's inventory ratio of finished goods (average)",
dataset_data["dimensions_labels"]["concept"] = "Concept"
dataset_data["dimensions_values_labels"]["concept"] = {
"IP": "industrial production",
"PS": "producer's shipments",
"PIF": "producer's inventory of finished goods",
"PIR": "producer's inventory ratio of finished goods",
"PIRA": "producer's inventory ratio of finished goods (average)",
}
dataset_data['dimensions_codes_order'].append('concept')
dataset_data["dimensions_codes_order"].append("concept")
for si in series_info:
series_dict = dict(
code=si[0],
name=si[1],
dimensions=si[2]
)
dataset_data['series'].append(series_dict)
series_dict = dict(code=si[0], name=si[1], dimensions=si[2])
dataset_data["series"].append(series_dict)
cu.write_json_file(json_filepath, dataset_data)
......@@ -338,14 +339,15 @@ def clean_csv_files(source_dir: Path):
""" Fix CSV files found in given source_dir and store fixed versions in temp directory
return temp directory path """
import tempfile
temp_dir = Path(tempfile.mkdtemp(prefix='sanzi'))
for filepath in source_dir.glob('*.csv'):
with filepath.open('rb') as bin_fd:
temp_dir = Path(tempfile.mkdtemp(prefix="sanzi"))
for filepath in source_dir.glob("*.csv"):
with filepath.open("rb") as bin_fd:
bcontent = bin_fd.read()
fixed_bcontent = bcontent.replace(b'\x81\x6A', b')')
fixed_bcontent = bcontent.replace(b"\x81\x6A", b")")
fixed_csv_filepath = temp_dir / filepath.name
with fixed_csv_filepath.open('wb') as bin_fd:
with fixed_csv_filepath.open("wb") as bin_fd:
bin_fd.write(fixed_bcontent)
return temp_dir
......@@ -353,23 +355,25 @@ def clean_csv_files(source_dir: Path):
def write_category_tree_json(json_filepath):
""" Creates category_tree data and saves it as a json file """
category_tree_data = [
{'code': 'iip', 'name': DATASETS_INFO['iip']},
{'code': 'sanzi', 'name': DATASETS_INFO['sanzi']},
{"code": "iip", "name": DATASETS_INFO["iip"]},
{"code": "sanzi", "name": DATASETS_INFO["sanzi"]},
]
cu.write_json_file(json_filepath, category_tree_data)
def main():
""" Converts downloaded CSV files into datasets and time series """
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('source_dir', type=Path, help='path of source directory')
parser.add_argument('target_dir', type=Path, help='path of target directory')
parser.add_argument('--log', default='WARNING', help='level of logging messages')
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument("source_dir", type=Path, help="path of source directory")
parser.add_argument("target_dir", type=Path, help="path of target directory")
parser.add_argument("--log", default="WARNING", help="level of logging messages")
args = parser.parse_args()
numeric_level = getattr(logging, args.log.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: {}'.format(args.log))
raise ValueError("Invalid log level: {}".format(args.log))
logging.basicConfig(
format="%(levelname)s:%(name)s:%(asctime)s:%(message)s",
level=numeric_level,
......@@ -385,21 +389,27 @@ def main():
parser.error("Target dir {!r} not found".format(str(target_dir)))
# Standard metadata
cu.write_json_file(target_dir / 'provider.json', PROVIDER_JSON)
write_category_tree_json(target_dir / 'category_tree.json')
cu.write_json_file(target_dir / "provider.json", PROVIDER_JSON)
write_category_tree_json(target_dir / "category_tree.json")
# iip dataset
generate_dataset('iip', source_dir / 'b2015_g1e', DATASETS_INFO['iip'], target_dir, with_concept=True)
generate_dataset(
"iip",
source_dir / "b2015_g1e",
DATASETS_INFO["iip"],
target_dir,
with_concept=True,
)
# sanzi dataset
sanzi_source_dir = source_dir / 'b2010_ke'
sanzi_source_dir = source_dir / "b2010_ke"
# sanzi csv files include bad character, first clean them and store fixed_csv in a temporary directory
temp_dir = clean_csv_files(sanzi_source_dir)
generate_dataset('sanzi', temp_dir, DATASETS_INFO['sanzi'], target_dir)
generate_dataset("sanzi", temp_dir, DATASETS_INFO["sanzi"], target_dir)
shutil.rmtree(str(temp_dir))
if __name__ == '__main__':
if __name__ == "__main__":
main()
......@@ -9,7 +9,7 @@ from pathlib import Path
def write_json_file(file_path: Path, data):
""" Writes data the json way to file_path """
with file_path.open('wt', encoding='utf-8') as json_fd:
with file_path.open("wt", encoding="utf-8") as json_fd:
json.dump(data, json_fd, ensure_ascii=False, indent=2, sort_keys=True)
......@@ -18,17 +18,18 @@ def write_category_json_file(file_path, categories_info, dataset_info):
category_data = []
for cat in categories_info:
children = []
for code in cat['_ds_codes']:
for code in cat["_ds_codes"]:
if code in dataset_info:
children.append(dict(
code=dataset_info[code]['code'],
name=dataset_info[code]['name']
))
children.append(
dict(
code=dataset_info[code]["code"], name=dataset_info[code]["name"]
)
)
# Adds category only if it contains datasets
if children:
cat_info = cat.copy()
del cat_info['_ds_codes']
cat_info['children'] = children
del cat_info["_ds_codes"]
cat_info["children"] = children
category_data.append(cat_info)
write_json_file(file_path, category_data)
......@@ -47,34 +48,34 @@ def write_dataset_json_file(file_path, dataset_info, datasets_info):
def compute_series_info(series_info):
""" Computes series info """
data = dict(
code=series_info[0],
name=series_info[1],
)
data = dict(code=series_info[0], name=series_info[1],)
if series_info[2]:
data['dimensions'] = series_info[2]
data["dimensions"] = series_info[2]
if len(series_info) == 4:
data['notes'] = series_info[3]
data["notes"] = series_info[3]
return data
ds_code = dataset_info['_code']
dim_info = dataset_info['_dimensions_info']
ds_code = dataset_info["_code"]
dim_info = dataset_info["_dimensions_info"]
dataset_data = dict(
code=datasets_info[ds_code]['code'],
name=datasets_info[ds_code]['name'],
code=datasets_info[ds_code]["code"],
name=datasets_info[ds_code]["name"],
dimensions_labels=dict((di[0], di[1]) for di in dim_info),
dimensions_values_labels=compute_dim_values_labels(dim_info),
series=sorted([compute_series_info(s) for s in dataset_info['_series_info']],
key=lambda s: s['code'])
series=sorted(
[compute_series_info(s) for s in dataset_info["_series_info"]],
key=lambda s: s["code"],
),
)
if '_updated_date' in dataset_info:
dataset_data['updated_at'] = dataset_info['_updated_date']
if 'notes' in datasets_info[ds_code]:
dataset_data['notes'] = datasets_info[ds_code]['notes']
if "_updated_date" in dataset_info:
dataset_data["updated_at"] = dataset_info["_updated_date"]
if "notes" in datasets_info[ds_code]:
dataset_data["notes"] = datasets_info[ds_code]["notes"]
write_json_file(file_path, dataset_data)
def die(msg):
import sys
sys.stderr.write('{}\n'.format(msg))
sys.stderr.write("{}\n".format(msg))
sys.exit(1)
......@@ -39,15 +39,15 @@ def download_binary_file(url, file_path: Path):
log.info("Downloading %s from %s ... ", str(file_path), url)
req = requests.get(url, stream=True)
with file_path.open('wb') as fout:
with file_path.open("wb") as fout:
req.raw.decode_content = True
shutil.copyfileobj(req.raw, fout)
log.info('-> done.')
log.info("-> done.")
def download_and_extract(url, target_dir: Path, dir_name):
""" Downloads zip archive and extracts it in a folder """
zip_filepath = target_dir / '{}.zip'.format(dir_name)
zip_filepath = target_dir / "{}.zip".format(dir_name)
download_binary_file(url, zip_filepath)
......@@ -56,7 +56,7 @@ def download_and_extract(url, target_dir: Path, dir_name):
csv_dir.mkdir(exist_ok=True)
with zipfile.ZipFile(zip_filepath) as zip_archive:
zip_archive.extractall(csv_dir)
log.info('Zip [%s] extracted.', str(zip_filepath))
log.info("Zip [%s] extracted.", str(zip_filepath))
# And removes zip archive
zip_filepath.unlink()
......@@ -66,20 +66,24 @@ def main():
""" Downloads and extracts zip files in folders """
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('target_dir', type=Path, help='path of target directory')
parser.add_argument('--debug-http', action='store_true', help='display http.client debug messages')
parser.add_argument('--log', default='INFO', help='level of logging messages')
parser.add_argument("target_dir", type=Path, help="path of target directory")
parser.add_argument(
"--debug-http", action="store_true", help="display http.client debug messages"
)
parser.add_argument("--log", default="INFO", help="level of logging messages")
args = parser.parse_args()
numeric_level = getattr(logging, args.log.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: {}'.format(args.log))
raise ValueError("Invalid log level: {}".format(args.log))
logging.basicConfig(
format="%(levelname)s:%(name)s:%(asctime)s:%(message)s",
level=numeric_level,
stream=sys.stdout, # Use stderr if script outputs data to stdout.
)
logging.getLogger("urllib3").setLevel(logging.DEBUG if args.debug_http else logging.WARNING)
logging.getLogger("urllib3").setLevel(
logging.DEBUG if args.debug_http else logging.WARNING
)
if args.debug_http:
http.client.HTTPConnection.debuglevel = 1
......@@ -87,13 +91,17 @@ def main():
if not target_dir.exists():
parser.error("Target dir {!r} not found".format(str(target_dir)))
log.info('Downloading meti data')
download_and_extract('http://www.meti.go.jp/english/statistics/tyo/iip/csv/b2015_g1e.zip',
target_dir, 'b2015_g1e')
download_and_extract('http://www.meti.go.jp/english/statistics/tyo/sanzi/csv/b2010_ke.zip',
target_dir, 'b2010_ke')
download_and_extract(
"http://www.meti.go.jp/english/statistics/tyo/iip/csv/b2015_g1e.zip",
target_dir,