some tests for pipeline

This commit is contained in:
Alexey Lavrenuke 2016-03-11 19:28:47 +03:00
parent 1a563ade82
commit b34aee71d7
5 changed files with 130 additions and 13 deletions

View File

@ -10,7 +10,7 @@ class TimeChopper(object):
"""
TimeChopper splits incoming dataframes by index. Chunks are cached and
chunks for same key from different DFs are joined. Then chunks are passed
further.
further as (<timestamp>, <dataframe>) tuples.
"""
def __init__(self, source, cache_size):

View File

@ -0,0 +1,29 @@
import pytest
import pandas as pd
import numpy as np
from yandextank.plugins.Aggregator.aggregator import phout_columns
np.random.seed(42)
MAX_TS = 1000
def random_split(df):
i = 0
while True:
step = np.random.randint(500, 1200)
if i + step < len(df):
yield df.ix[i:i + step - 1]
i += step
else:
yield df.ix[i:]
break
@pytest.fixture
def data():
df = pd.DataFrame(
np.random.randint(0, MAX_TS, (10000, len(phout_columns))),
columns=phout_columns).set_index('time').sort_index()
df['tag'] = np.random.choice(list(range(3)), len(df))
return df

View File

@ -1,22 +1,35 @@
import pytest
import pandas as pd
import numpy as np
from yandextank.plugins.Aggregator.aggregator import phout_columns
from yandextank.plugins.Aggregator.chopper import TimeChopper
@pytest.fixture
def data():
df = pd.DataFrame(
np.random.randint(0, 100, (10000, len(phout_columns))),
columns=phout_columns).set_index('time').sort_index()
return df
from conftest import MAX_TS, random_split
class TestChopper(object):
def test_chopper_works_for_one_chunk(self, data):
def test_one_chunk(self, data):
chopper = TimeChopper([data], 5)
result = list(chopper)
assert len(result) == 100
assert len(result) == MAX_TS
concatinated = pd.concat(r[1] for r in result)
assert len(data) == len(concatinated), "We did not lose anything"
def test_multiple_chunks(self, data):
chunks = random_split(data)
chopper = TimeChopper(chunks, 5)
result = list(chopper)
assert len(result) == MAX_TS
concatinated = pd.concat(r[1] for r in result)
assert len(data) == len(concatinated), "We did not lose anything"
def test_partially_reversed_data(self, data):
chunks = list(random_split(data))
chunks[5], chunks[6] = chunks[6], chunks[5]
chopper = TimeChopper(chunks, 5)
result = list(chopper)
assert (len(result) == MAX_TS,
"DataFrame is splitted into proper number of chunks")
concatinated = pd.concat(r[1] for r in result)
assert len(data) == len(concatinated), "We did not lose anything"
assert np.allclose(concatinated.values,
data.values), "We did not corrupt the data"

View File

@ -0,0 +1,63 @@
import pandas as pd
import numpy as np
import json
from pkg_resources import resource_string
from Queue import Queue, Empty
from yandextank.plugins.Aggregator.chopper import TimeChopper
from yandextank.plugins.Aggregator.aggregator import Aggregator
from yandextank.plugins.Aggregator.plugin import DataPoller
from yandextank.core.util import Drain
from conftest import MAX_TS, random_split
AGGR_CONFIG = json.loads(resource_string("yandextank.plugins.Aggregator",
'config/phout.json'))
class TestPipeline(object):
def test_partially_reversed_data(self, data):
results_queue = Queue()
results = []
chunks = list(random_split(data))
chunks[5], chunks[6] = chunks[6], chunks[5]
pipeline = Aggregator(
TimeChopper(
DataPoller(source=chunks,
poll_period=0.1),
cache_size=3),
AGGR_CONFIG,
False)
drain = Drain(pipeline, results_queue)
drain.run()
assert results_queue.qsize() == MAX_TS
def test_slow_producer(self, data):
results_queue = Queue()
results = []
chunks = list(random_split(data))
chunks[5], chunks[6] = chunks[6], chunks[5]
def producer():
for chunk in chunks:
if np.random.random() > 0.5:
yield None
yield chunk
pipeline = Aggregator(
TimeChopper(
DataPoller(source=producer(),
poll_period=0.1),
cache_size=3),
AGGR_CONFIG,
False)
drain = Drain(pipeline, results_queue)
drain.run()
assert results_queue.qsize() == MAX_TS
# for _ in range(results_queue.qsize()):
# try:
# results += results_queue.get_nowait()
# except Empty:
# break

View File

@ -0,0 +1,12 @@
from conftest import random_split
import pandas as pd
import numpy as np
def test_random_split(data):
dataframes = list(random_split(data))
assert len(dataframes) > 1
concatinated = pd.concat(dataframes)
assert len(concatinated) == len(data), "We did not lose anything"
assert np.allclose(concatinated.values,
data.values), "We did not corrupt the data"