Commit 956a069d authored by Pierre Dittgen's avatar Pierre Dittgen
Browse files

fetch providers

parent 0ea04e8f
Pipeline #195448 failed with stage
in 28 seconds
...@@ -17,11 +17,18 @@ ...@@ -17,11 +17,18 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from datetime import datetime from datetime import datetime
from typing import Iterator import os
from typing import Dict, Iterator, List, Tuple
import requests
from pydantic import BaseModel, HttpUrl from pydantic import BaseModel, HttpUrl
"""DBnomics python client.""" """DBnomics python client."""
default_api_base_url = os.environ.get('API_URL') or 'https://api.db.nomics.world/v22/'
# pseudo data-model # pseudo data-model
class ProviderCode(BaseModel): class ProviderCode(BaseModel):
...@@ -66,6 +73,7 @@ class Dataset(BaseModel): ...@@ -66,6 +73,7 @@ class Dataset(BaseModel):
name: str name: str
description: str = None description: str = None
dimensions: List[Dimension] dimensions: List[Dimension]
notes: List[str] = None
class SeriesCode(BaseModel): class SeriesCode(BaseModel):
__root__: str __root__: str
...@@ -77,20 +85,32 @@ class Series(BaseModel): ...@@ -77,20 +85,32 @@ class Series(BaseModel):
code: SeriesCode code: SeriesCode
name: str = None name: str = None
dimensions: SeriesDimensions dimensions: SeriesDimensions
observations: List[Tuple] observations: List
# client code # client code
def nonify_empty_string(d: Dict):
return {
k: v if v != "" else None
for k,v in d.items()
}
class DBnomicsClient(): class DBnomicsClient():
"""DBnomics client class.""" """DBnomics client class."""
def __init__(self, api_base_url=None): def __init__(self, api_base_url=None):
global default_api_base_url global default_api_base_url
if api_base_url is None: self.api_base_url = default_api_base_url if api_base_url is None else api_base_url
api_base_url = default_api_base_url self.api_base_url = self.api_base_url.rstrip("/")
def fetch_providers(self, provider_code: ProviderCode = None) -> List[Provider]: def fetch_providers(self, provider_code: ProviderCode = None) -> List[Provider]:
pass response = self._fetch("/providers")
provider_list = []
for provider_dict in response.get("providers", {}).get("docs", []):
if provider_code is None or provider_code == provider_dict.get("code"):
provider_list.append(Provider(**nonify_empty_string(provider_dict)))
return provider_list
def fetch_datasets(self, provider_code: ProviderCode, dataset_code: DatasetCode = None) -> Iterator[Dataset]: def fetch_datasets(self, provider_code: ProviderCode, dataset_code: DatasetCode = None) -> Iterator[Dataset]:
pass pass
...@@ -99,4 +119,9 @@ class DBnomicsClient(): ...@@ -99,4 +119,9 @@ class DBnomicsClient():
pass pass
def fetch_last_updates(self) -> List[Dataset]: def fetch_last_updates(self) -> List[Dataset]:
pass pass
\ No newline at end of file
def _fetch(self, path: str, args: Dict[str, str] = None) -> Dict:
req = requests.get(f"{self.api_base_url}{path}", params=args)
req.raise_for_status()
return req.json()
\ No newline at end of file
# dbnomics-python-client -- Access DBnomics time series from Python
# By: Christophe Benz <christophe.benz@cepremap.org>
#
# Copyright (C) 2017-2019 Cepremap
# https://git.nomics.world/dbnomics/dbnomics-python-client
#
# dbnomics-python-client is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# dbnomics-python-client is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
import pytest
import pandas as pd
from dbnomics import default_api_base_url, fetch_series, fetch_series_by_api_link
def test_fetch_series_by_code():
df = fetch_series('AMECO', 'ZUTN', 'EA19.1.0.0.0.ZUTN')
provider_codes = df["provider_code"].unique()
assert len(provider_codes) == 1
assert provider_codes[0] == "AMECO"
dataset_codes = df["dataset_code"].unique()
assert len(dataset_codes) == 1
assert dataset_codes[0] == "ZUTN"
series_codes = df["series_code"].unique()
assert len(series_codes) == 1
assert series_codes[0] == "EA19.1.0.0.0.ZUTN"
def test_fetch_series_by_code_mask():
df = fetch_series("IMF", "CPI", "M.FR+DE.PCPIEC_IX+PCPIA_IX")
provider_codes = df["provider_code"].unique()
assert len(provider_codes) == 1
assert provider_codes[0] == "IMF"
dataset_codes = df["dataset_code"].unique()
assert len(dataset_codes) == 1
assert dataset_codes[0] == "CPI"
series_codes = df["series_code"].unique()
assert len(series_codes) == 4
def test_fetch_series_by_code_mask_with_plus_in_dimension_code():
df = fetch_series('SCB', 'AKIAM', '"J+K"+"G+H".AM0301C1')
provider_codes = df["provider_code"].unique()
assert len(provider_codes) == 1
assert provider_codes[0] == "SCB"
dataset_codes = df["dataset_code"].unique()
assert len(dataset_codes) == 1
assert dataset_codes[0] == "AKIAM"
series_codes = df["series_code"].unique()
assert set(series_codes) == {'J+K.AM0301C1', 'G+H.AM0301C1'}, series_codes
def test_fetch_series_by_id():
df = fetch_series('AMECO/ZUTN/EA19.1.0.0.0.ZUTN')
provider_codes = df["provider_code"].unique()
assert len(provider_codes) == 1
assert provider_codes[0] == "AMECO"
dataset_codes = df["dataset_code"].unique()
assert len(dataset_codes) == 1
assert dataset_codes[0] == "ZUTN"
series_codes = df["series_code"].unique()
assert len(series_codes) == 1
assert series_codes[0] == "EA19.1.0.0.0.ZUTN"
def test_fetch_series_by_ids_in_same_dataset():
df = fetch_series([
'AMECO/ZUTN/EA19.1.0.0.0.ZUTN',
'AMECO/ZUTN/DNK.1.0.0.0.ZUTN',
])
provider_codes = df["provider_code"].unique()
assert len(provider_codes) == 1
assert provider_codes[0] == "AMECO"
dataset_codes = df["dataset_code"].unique()
assert len(dataset_codes) == 1
assert dataset_codes[0] == "ZUTN"
series_codes = df["series_code"].unique()
assert len(series_codes) == 2
assert series_codes[0] == "EA19.1.0.0.0.ZUTN"
assert series_codes[1] == "DNK.1.0.0.0.ZUTN"
def test_fetch_series_by_ids_in_different_datasets():
df = fetch_series([
'AMECO/ZUTN/EA19.1.0.0.0.ZUTN',
'BIS/cbs/Q.S.5A.4B.F.B.A.A.LC1.A.1C',
])
provider_codes = df["provider_code"].unique()
assert len(provider_codes) == 2
assert provider_codes[0] == "AMECO"
assert provider_codes[1] == "BIS"
dataset_codes = df["dataset_code"].unique()
assert len(dataset_codes) == 2
assert dataset_codes[0] == "ZUTN"
assert dataset_codes[1] == "cbs"
series_codes = df["series_code"].unique()
assert len(series_codes) == 2
assert series_codes[0] == "EA19.1.0.0.0.ZUTN"
assert series_codes[1] == "Q.S.5A.4B.F.B.A.A.LC1.A.1C"
def test_fetch_series_by_dimension():
df = fetch_series("WB", "DB", dimensions={
"country": ["FR", "IT", "ES"],
"indicator": ["IC.REG.COST.PC.FE.ZS.DRFN"],
})
provider_codes = df["provider_code"].unique()
assert len(provider_codes) == 1
assert provider_codes[0] == "WB"
dataset_codes = df["dataset_code"].unique()
assert len(dataset_codes) == 1
assert dataset_codes[0] == "DB"
series_codes = df["series_code"].unique()
assert len(series_codes)
def test_fetch_series_of_dataset():
df = fetch_series('AMECO', 'ZUTN')
provider_codes = df["provider_code"].unique()
assert len(provider_codes) == 1
assert provider_codes[0] == "AMECO"
dataset_codes = df["dataset_code"].unique()
assert len(dataset_codes) == 1
assert dataset_codes[0] == "ZUTN"
series_codes = df["series_code"].unique()
assert len(series_codes) > 1
def test_fetch_series_by_api_link():
df = fetch_series_by_api_link(
default_api_base_url +
"/series/BIS/PP-SS?dimensions=%7B%22FREQ%22%3A%5B%22Q%22%5D%2C%22REF_AREA%22%3A%5B%22AU%22%5D%7D&observations=1")
provider_codes = df["provider_code"].unique()
assert len(provider_codes) == 1
assert provider_codes[0] == "BIS"
dataset_codes = df["dataset_code"].unique()
assert len(dataset_codes) == 1
assert dataset_codes[0] == "PP-SS"
series_codes = df["series_code"].unique()
assert len(series_codes)
def test_fetch_series_with_na_values():
df = fetch_series('AMECO', 'ZUTN', 'DEU.1.0.0.0.ZUTN')
assert "NA" in list(df.original_value)
assert not any(df.original_value.isna())
assert "NA" not in list(df.value)
assert any(df.value.isna())
def test_fetch_series_with_max_nb_series():
df = fetch_series('AMECO', 'ZUTN', max_nb_series=20)
assert len(df.series_code.unique()) == 20
def test_fetch_series_with_filter_on_one_series():
filters = [{"code": "interpolate", "parameters": {"frequency": "monthly", "method": "spline"}}]
df = fetch_series('AMECO', 'ZUTN', 'DEU.1.0.0.0.ZUTN', filters=filters)
assert "filtered" in df
assert all(series_code.endswith("_filtered") for series_code in df[df.filtered == True]["series_code"])
assert all(df[df.filtered == True]["@frequency"] == "monthly")
def test_fetch_series_with_filter_on_one_series_with_filter_error(caplog):
filters = [{"code": "foo", "parameters": {}}]
with caplog.at_level(logging.INFO):
df = fetch_series('AMECO', 'ZUTN', 'DEU.1.0.0.0.ZUTN', filters=filters)
assert all(df.filtered == False)
assert len(caplog.records) == 1
assert caplog.records[0].levelname == 'ERROR'
assert "Filter not found" in caplog.records[0].message
def test_fetch_series_with_filter_on_one_series_with_filter_parameter_error(caplog):
filters = [{"code": "interpolate", "parameters": {"foo": "bar"}}]
with caplog.at_level(logging.INFO):
df = fetch_series('AMECO', 'ZUTN', 'DEU.1.0.0.0.ZUTN', filters=filters)
assert all(df.filtered == False)
assert len(caplog.records) == 1
assert caplog.records[0].levelname == 'ERROR'
assert "Error with filter parameters" in caplog.records[0].message
def test_fetch_series_with_filter_on_one_series_with_wrong_frequency(caplog):
filters = [{"code": "aggregate", "parameters": {"frequency": "annual"}}]
with caplog.at_level(logging.INFO):
df = fetch_series('AMECO', 'ZUTN', 'DEU.1.0.0.0.ZUTN', filters=filters)
assert all(df.filtered == False)
assert len(caplog.records) == 1
assert caplog.records[0].levelname == 'ERROR'
assert "Annual is already the lowest frequency" in caplog.records[0].message
# --- Tests above uses VCR.py (https://vcrpy.readthedocs.io/en/latest/usage.html) to load fixtures ---
@pytest.mark.vcr(decode_compressed_response=True)
def test_fetch_series_without_dimensions_labels():
df = fetch_series("WB", "DB", dimensions={
"country": ["FR", "IT", "ES"],
"indicator": ["IC.REG.COST.PC.FE.ZS.DRFN"],
}) # Thanks to @pytest.mark.vcr decorator, this request result will be read from cassettes/test_fetch_series_without_dimensions_labels.yaml file
# Check that all expected columns are present
expected_columns = {'indicator', 'country', 'indicator (label)', 'country (label)'}
assert expected_columns & set(df.columns) == expected_columns, set(df.columns)
# Check dimensions and dimensions_values_labels
df_line = df.iloc[30]
assert df_line['country'] == 'FR'
assert df_line['country (label)'] == 'France'
@pytest.mark.vcr(decode_compressed_response=True)
def test_fetch_series_without_dimensions_values_labels():
# Thanks to @pytest.mark.vcr decorator, this request result will be read from cassettes/test_fetch_series_without_dimensions_values_labels.yaml file
df = fetch_series('simu/lated/series1')
# In the case of any dimensions_values_labels, we do not want dimensions_labels column to be added
assert not 'Data Type' in list(df.columns)
@pytest.mark.vcr(decode_compressed_response=True)
def test_fetch_series_without_dimensions_labels_nor_dimensions_values_labels():
# Thanks to @pytest.mark.vcr decorator, this request result will be read from cassettes/test_fetch_series_without_dimensions_labels_nor_dimensions_values_labels.yaml file
df = fetch_series('simu/lated/series2')
# dimensions_labels column shouldn't exist
assert not 'Data Type' in list(df.columns)
# dbnomics-python-client -- Access DBnomics time series from Python
# By: Christophe Benz <christophe.benz@cepremap.org>
#
# Copyright (C) 2017-2019 Cepremap
# https://git.nomics.world/dbnomics/dbnomics-python-client
#
# dbnomics-python-client is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# dbnomics-python-client is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from dbnomics import DBnomicsClient, ProviderCode
def test_get_providers():
client = DBnomicsClient()
provider_list = client.fetch_providers()
assert len(provider_list) > 1
assert ProviderCode(__root__='IMF') in (provider.code for provider in provider_list)
def test_get_existing_provider():
client = DBnomicsClient()
provider_code = "IMF"
provider_list = client.fetch_providers(provider_code=provider_code)
assert len(provider_list) == 1
assert provider_list[0].code == ProviderCode(__root__=provider_code)
def test_get_not_existing_provider():
client = DBnomicsClient()
provider_list = client.fetch_providers(provider_code="XXX")
assert len(provider_list) == 0
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment