...
 
Commits (2)
......@@ -125,11 +125,9 @@ async def process_resources(
resources_to_process = get_resources_to_process(resources, args)
# Emit SKIPPED event for each non-processed resource
def res_to_ids(resources):
"""Transform a given resource list into a resource id list."""
return [r.id for r in resources]
skipped_res_ids = set(res_to_ids(resources)) - set(res_to_ids(resources_to_process))
skipped_res_ids = {resource.id for resource in resources} - {
resource.id for resource in resources_to_process
}
for res_id in skipped_res_ids:
resource_event_by_id[res_id] = ResourceEvent(
id=res_id,
......
......@@ -21,10 +21,8 @@
"""Tests about event loader."""
import argparse
import shutil
import tempfile
from pathlib import Path
from typing import Any, Dict, List
from typing import List
import pytest
......@@ -41,25 +39,26 @@ class FakeArgs(argparse.Namespace):
def __getattr__(self, attr):
"""Mimic attribute access."""
return self.attrs[attr] if attr in self.attrs else None
EMPTY_DICT: Dict[str, Any] = {}
return self.attrs.get(attr)
@pytest.mark.asyncio
async def process_resources_core(
resources: List[Resource], assert_func=None, additional_args=EMPTY_DICT
base_dir: Path, resources: List[Resource], additional_args=None, on_event=None
):
"""Factorize code for test cases."""
def process_resource(resource: Resource):
pass
target_dir = Path(tempfile.mkdtemp(prefix="test", dir="/tmp"))
target_dir = base_dir / "process"
target_dir.mkdir()
try:
events = status.load_events(target_dir)
assert events is None
additional_args = {} if additional_args is None else additional_args
args = FakeArgs(target_dir=target_dir, **additional_args)
with status.open_status_writer(args) as append_event:
......@@ -70,35 +69,32 @@ async def process_resources_core(
on_event=append_event,
events=events,
)
if assert_func:
assert_func(resource_event_by_id=resource_event_by_id)
finally:
shutil.rmtree(str(target_dir))
except Exception as exc:
if on_event:
on_event(exc)
else:
raise exc
return resource_event_by_id
@pytest.mark.asyncio
async def test_skip_no_resources():
async def test_skip_no_resources(tmp_path):
"""Process all resources."""
resources = [Resource(id=letter) for letter in list("ABCDEF")]
def assert_func(resource_event_by_id=None):
assert resource_event_by_id is not None
assert len(resource_event_by_id) == len(resources)
assert all(
evt.status == status.ResourceStatus.SUCCESS
for evt in resource_event_by_id.values()
)
await process_resources_core(resources, assert_func)
resource_event_by_id = await process_resources_core(tmp_path, resources)
assert resource_event_by_id is not None
assert len(resource_event_by_id) == len(resources)
assert all(
evt.status == status.ResourceStatus.SUCCESS
for evt in resource_event_by_id.values()
)
@pytest.mark.asyncio
async def test_process_one_resource():
async def test_process_one_resource(tmp_path):
"""Process ony one resource."""
resources = [Resource(id=letter) for letter in list("ABCDEF")]
......@@ -106,24 +102,24 @@ async def test_process_one_resource():
additional_args = {"only": selected_resource_id}
def assert_func(resource_event_by_id=None):
assert resource_event_by_id is not None
assert len(resource_event_by_id) == len(resources)
assert all(
evt.status == status.ResourceStatus.SKIPPED
for resource_id, evt in resource_event_by_id.items()
if resource_id != selected_resource_id
)
assert (
resource_event_by_id[selected_resource_id].status
== status.ResourceStatus.SUCCESS
)
await process_resources_core(resources, assert_func, additional_args)
resource_event_by_id = await process_resources_core(
tmp_path, resources, additional_args
)
assert resource_event_by_id is not None
assert len(resource_event_by_id) == len(resources)
assert all(
evt.status == status.ResourceStatus.SKIPPED
for resource_id, evt in resource_event_by_id.items()
if resource_id != selected_resource_id
)
assert (
resource_event_by_id[selected_resource_id].status
== status.ResourceStatus.SUCCESS
)
@pytest.mark.asyncio
async def test_exclude_some_resources():
async def test_exclude_some_resources(tmp_path):
"""Process all resources except some ones."""
resources = [Resource(id=letter) for letter in list("ABCDEF")]
......@@ -131,18 +127,45 @@ async def test_exclude_some_resources():
additional_args = {"exclude": excluded_resource_ids}
def assert_func(resource_event_by_id=None):
assert resource_event_by_id is not None
assert len(resource_event_by_id) == len(resources)
assert all(
evt.status == status.ResourceStatus.SKIPPED
for resource_id, evt in resource_event_by_id.items()
if resource_id in excluded_resource_ids
)
assert (
evt.status == status.ResourceStatus.SUCCESS
for resource_id, evt in resource_event_by_id.items()
if resource_id not in excluded_resource_ids
)
await process_resources_core(resources, assert_func, additional_args)
resource_event_by_id = await process_resources_core(
tmp_path, resources, additional_args
)
assert resource_event_by_id is not None
assert len(resource_event_by_id) == len(resources)
assert all(
evt.status == status.ResourceStatus.SKIPPED
for resource_id, evt in resource_event_by_id.items()
if resource_id in excluded_resource_ids
)
assert all(
evt.status == status.ResourceStatus.SUCCESS
for resource_id, evt in resource_event_by_id.items()
if resource_id not in excluded_resource_ids
)
@pytest.mark.asyncio
async def test_limit_processed_resources(tmp_path):
"""Process a limited number of resources."""
resources = [Resource(id=letter) for letter in list("ABCDEF")]
limit_count = 4
limited_resource_ids = [resource.id for resource in resources[:limit_count]]
additional_args = {"limit": limit_count}
resource_event_by_id = await process_resources_core(
tmp_path, resources, additional_args
)
assert resource_event_by_id is not None
assert len(resource_event_by_id) == len(resources)
assert all(
evt.status == status.ResourceStatus.SUCCESS
for resource_id, evt in resource_event_by_id.items()
if resource_id in limited_resource_ids
)
assert all(
evt.status == status.ResourceStatus.SKIPPED
for resource_id, evt in resource_event_by_id.items()
if resource_id not in limited_resource_ids
)