...
 
Commits (2)
......@@ -22,7 +22,6 @@
import argparse
from pathlib import Path
from typing import List
import pytest
......@@ -31,44 +30,39 @@ from dbnomics_fetcher_toolbox.arguments import add_arguments_for_download
from dbnomics_fetcher_toolbox.resources import Resource, process_resources
@pytest.mark.asyncio
async def process_resources_core(
target_dir: Path, resources: List[Resource], args_str: str = None
):
"""Factorize code for test cases."""
def process_resource(resource: Resource):
pass
if args_str is None:
args_str = ""
def default_process_resource(resource: Resource):
"""Act as a `process_resource` function that does nothing."""
pass
events = status.load_events(target_dir)
assert events is None
def prepare_args(target_dir: Path, args_str: str = None):
"""Prepare command-line args for test cases."""
parser = argparse.ArgumentParser()
add_arguments_for_download(parser)
args = parser.parse_args(
args=list(filter(None, [str(target_dir), *args_str.split(" ")]))
additional_arg_list = [] if args_str is None else args_str.split(" ")
return parser.parse_args(
args=list(filter(None, [str(target_dir), *additional_arg_list]))
)
with status.open_status_writer(args) as append_event:
return await process_resources(
resources=resources,
args=args,
process_resource=process_resource,
on_event=append_event,
events=events,
)
@pytest.mark.asyncio
async def test_process_all_resources(tmp_path):
"""Process all resources."""
events = status.load_events(tmp_path)
assert events is None
resources = [Resource(id=letter) for letter in list("ABCDEF")]
args = prepare_args(tmp_path)
with status.open_status_writer(args) as append_event:
resource_event_by_id = await process_resources(
resources=resources,
args=args,
process_resource=default_process_resource,
on_event=append_event,
events=events,
)
resource_event_by_id = await process_resources_core(tmp_path, resources)
assert resource_event_by_id is not None
assert len(resource_event_by_id) == len(resources)
assert all(
......@@ -80,13 +74,22 @@ async def test_process_all_resources(tmp_path):
@pytest.mark.asyncio
async def test_process_one_resource(tmp_path):
"""Process ony one resource."""
events = status.load_events(tmp_path)
assert events is None
resources = [Resource(id=letter) for letter in list("ABCDEF")]
selected_resource_id = "C"
args = prepare_args(tmp_path, f"--only {selected_resource_id}")
with status.open_status_writer(args) as append_event:
resource_event_by_id = await process_resources(
resources=resources,
args=args,
process_resource=default_process_resource,
on_event=append_event,
events=events,
)
resource_event_by_id = await process_resources_core(
tmp_path, resources, args_str=f"--only {selected_resource_id}"
)
assert resource_event_by_id is not None
assert len(resource_event_by_id) == len(resources)
assert all(
......@@ -103,50 +106,66 @@ async def test_process_one_resource(tmp_path):
@pytest.mark.asyncio
async def test_exclude_some_resources(tmp_path):
"""Process all resources except some ones."""
events = status.load_events(tmp_path)
assert events is None
resources = [Resource(id=letter) for letter in list("ABCDEF")]
excluded_resource_ids = ["A", "E"]
excluded_resources_str = "A E"
excluded_resource_id_list = excluded_resources_str.split(" ")
args = prepare_args(tmp_path, f"--exclude {excluded_resources_str}")
with status.open_status_writer(args) as append_event:
resource_event_by_id = await process_resources(
resources=resources,
args=args,
process_resource=default_process_resource,
on_event=append_event,
events=events,
)
resource_event_by_id = await process_resources_core(
tmp_path,
resources,
args_str="--exclude {}".format(" ".join(excluded_resource_ids)),
)
assert resource_event_by_id is not None
assert len(resource_event_by_id) == len(resources)
assert all(
evt.status == status.ResourceStatus.SKIPPED
for resource_id, evt in resource_event_by_id.items()
if resource_id in excluded_resource_ids
if resource_id in excluded_resource_id_list
)
assert all(
evt.status == status.ResourceStatus.SUCCESS
for resource_id, evt in resource_event_by_id.items()
if resource_id not in excluded_resource_ids
if resource_id not in excluded_resource_id_list
)
@pytest.mark.asyncio
async def test_limit_processed_resources(tmp_path):
"""Process a limited number of resources."""
events = status.load_events(tmp_path)
assert events is None
resources = [Resource(id=letter) for letter in list("ABCDEF")]
limit_count = 4
limited_resource_ids = [resource.id for resource in resources[:limit_count]]
processed_resource_id_list = [resource.id for resource in resources[:limit_count]]
args = prepare_args(tmp_path, f"--limit {limit_count}")
with status.open_status_writer(args) as append_event:
resource_event_by_id = await process_resources(
resources=resources,
args=args,
process_resource=default_process_resource,
on_event=append_event,
events=events,
)
resource_event_by_id = await process_resources_core(
tmp_path, resources, args_str=f"--limit {limit_count}"
)
assert resource_event_by_id is not None
assert len(resource_event_by_id) == len(resources)
assert all(
evt.status == status.ResourceStatus.SUCCESS
for resource_id, evt in resource_event_by_id.items()
if resource_id in limited_resource_ids
if resource_id in processed_resource_id_list
)
assert all(
evt.status == status.ResourceStatus.SKIPPED
for resource_id, evt in resource_event_by_id.items()
if resource_id not in limited_resource_ids
if resource_id not in processed_resource_id_list
)