mirror of
https://github.com/valitydev/redash.git
synced 2024-11-07 01:25:16 +00:00
271 lines
11 KiB
Python
271 lines
11 KiB
Python
# encoding: utf8
|
|
|
|
from tests import BaseTestCase
|
|
import datetime
|
|
from redash.models import Query, Group, Event, db
|
|
from redash.utils import utcnow
|
|
|
|
|
|
class QueryTest(BaseTestCase):
|
|
def test_changing_query_text_changes_hash(self):
|
|
q = self.factory.create_query()
|
|
old_hash = q.query_hash
|
|
|
|
q.query_text = "SELECT 2;"
|
|
db.session.flush()
|
|
self.assertNotEquals(old_hash, q.query_hash)
|
|
|
|
def test_search_finds_in_name(self):
|
|
q1 = self.factory.create_query(name=u"Testing seåřċħ")
|
|
q2 = self.factory.create_query(name=u"Testing seåřċħing")
|
|
q3 = self.factory.create_query(name=u"Testing seå řċħ")
|
|
queries = list(Query.search(u"seåřċħ", [self.factory.default_group.id]))
|
|
|
|
self.assertIn(q1, queries)
|
|
self.assertIn(q2, queries)
|
|
self.assertNotIn(q3, queries)
|
|
|
|
def test_search_finds_in_description(self):
|
|
q1 = self.factory.create_query(description=u"Testing seåřċħ")
|
|
q2 = self.factory.create_query(description=u"Testing seåřċħing")
|
|
q3 = self.factory.create_query(description=u"Testing seå řċħ")
|
|
|
|
queries = Query.search(u"seåřċħ", [self.factory.default_group.id])
|
|
|
|
self.assertIn(q1, queries)
|
|
self.assertIn(q2, queries)
|
|
self.assertNotIn(q3, queries)
|
|
|
|
def test_search_by_id_returns_query(self):
|
|
q1 = self.factory.create_query(description="Testing search")
|
|
q2 = self.factory.create_query(description="Testing searching")
|
|
q3 = self.factory.create_query(description="Testing sea rch")
|
|
db.session.flush()
|
|
queries = Query.search(str(q3.id), [self.factory.default_group.id])
|
|
|
|
self.assertIn(q3, queries)
|
|
self.assertNotIn(q1, queries)
|
|
self.assertNotIn(q2, queries)
|
|
|
|
def test_search_respects_groups(self):
|
|
other_group = Group(org=self.factory.org, name="Other Group")
|
|
db.session.add(other_group)
|
|
ds = self.factory.create_data_source(group=other_group)
|
|
|
|
q1 = self.factory.create_query(description="Testing search", data_source=ds)
|
|
q2 = self.factory.create_query(description="Testing searching")
|
|
q3 = self.factory.create_query(description="Testing sea rch")
|
|
|
|
queries = list(Query.search("Testing", [self.factory.default_group.id]))
|
|
|
|
self.assertNotIn(q1, queries)
|
|
self.assertIn(q2, queries)
|
|
self.assertIn(q3, queries)
|
|
|
|
queries = list(Query.search("Testing", [other_group.id, self.factory.default_group.id]))
|
|
self.assertIn(q1, queries)
|
|
self.assertIn(q2, queries)
|
|
self.assertIn(q3, queries)
|
|
|
|
queries = list(Query.search("Testing", [other_group.id]))
|
|
self.assertIn(q1, queries)
|
|
self.assertNotIn(q2, queries)
|
|
self.assertNotIn(q3, queries)
|
|
|
|
def test_returns_each_query_only_once(self):
|
|
other_group = self.factory.create_group()
|
|
second_group = self.factory.create_group()
|
|
ds = self.factory.create_data_source(group=other_group)
|
|
ds.add_group(second_group, False)
|
|
|
|
q1 = self.factory.create_query(description="Testing search", data_source=ds)
|
|
db.session.flush()
|
|
queries = list(Query.search("Testing", [self.factory.default_group.id, other_group.id, second_group.id]))
|
|
|
|
self.assertEqual(1, len(queries))
|
|
|
|
def test_save_updates_updated_at_field(self):
|
|
# This should be a test of ModelTimestampsMixin, but it's easier to test in context of existing model... :-\
|
|
one_day_ago = utcnow().date() - datetime.timedelta(days=1)
|
|
q = self.factory.create_query(created_at=one_day_ago, updated_at=one_day_ago)
|
|
db.session.flush()
|
|
q.name = 'x'
|
|
db.session.flush()
|
|
self.assertNotEqual(q.updated_at, one_day_ago)
|
|
|
|
def test_search_is_case_insensitive(self):
|
|
q = self.factory.create_query(name="Testing search")
|
|
|
|
self.assertIn(q, Query.search('testing', [self.factory.default_group.id]))
|
|
|
|
|
|
class QueryRecentTest(BaseTestCase):
|
|
def test_global_recent(self):
|
|
q1 = self.factory.create_query()
|
|
q2 = self.factory.create_query()
|
|
db.session.flush()
|
|
e = Event(org=self.factory.org, user=self.factory.user, action="edit",
|
|
object_type="query", object_id=q1.id)
|
|
db.session.add(e)
|
|
recent = Query.recent([self.factory.default_group.id])
|
|
self.assertIn(q1, recent)
|
|
self.assertNotIn(q2, recent)
|
|
|
|
def test_recent_excludes_drafts(self):
|
|
q1 = self.factory.create_query()
|
|
q2 = self.factory.create_query(is_draft=True)
|
|
|
|
db.session.add_all([
|
|
Event(org=self.factory.org, user=self.factory.user,
|
|
action="edit", object_type="query",
|
|
object_id=q1.id),
|
|
Event(org=self.factory.org, user=self.factory.user,
|
|
action="edit", object_type="query",
|
|
object_id=q2.id)
|
|
])
|
|
recent = Query.recent([self.factory.default_group.id])
|
|
|
|
self.assertIn(q1, recent)
|
|
self.assertNotIn(q2, recent)
|
|
|
|
def test_recent_for_user(self):
|
|
q1 = self.factory.create_query()
|
|
q2 = self.factory.create_query()
|
|
db.session.flush()
|
|
e = Event(org=self.factory.org, user=self.factory.user, action="edit",
|
|
object_type="query", object_id=q1.id)
|
|
db.session.add(e)
|
|
recent = Query.recent([self.factory.default_group.id], user_id=self.factory.user.id)
|
|
|
|
self.assertIn(q1, recent)
|
|
self.assertNotIn(q2, recent)
|
|
|
|
recent = Query.recent([self.factory.default_group.id], user_id=self.factory.user.id + 1)
|
|
self.assertNotIn(q1, recent)
|
|
self.assertNotIn(q2, recent)
|
|
|
|
def test_respects_groups(self):
|
|
q1 = self.factory.create_query()
|
|
ds = self.factory.create_data_source(group=self.factory.create_group())
|
|
q2 = self.factory.create_query(data_source=ds)
|
|
db.session.flush()
|
|
Event(org=self.factory.org, user=self.factory.user, action="edit",
|
|
object_type="query", object_id=q1.id)
|
|
Event(org=self.factory.org, user=self.factory.user, action="edit",
|
|
object_type="query", object_id=q2.id)
|
|
|
|
recent = Query.recent([self.factory.default_group.id])
|
|
|
|
self.assertIn(q1, recent)
|
|
self.assertNotIn(q2, recent)
|
|
|
|
|
|
class TestQueryByUser(BaseTestCase):
|
|
def test_returns_only_users_queries(self):
|
|
q = self.factory.create_query(user=self.factory.user)
|
|
q2 = self.factory.create_query(user=self.factory.create_user())
|
|
|
|
queries = Query.by_user(self.factory.user)
|
|
|
|
# not using self.assertIn/NotIn because otherwise this fails :O
|
|
self.assertTrue(q in list(queries))
|
|
self.assertFalse(q2 in list(queries))
|
|
|
|
def test_returns_drafts_by_the_user(self):
|
|
q = self.factory.create_query(is_draft=True)
|
|
q2 = self.factory.create_query(is_draft=True, user=self.factory.create_user())
|
|
|
|
queries = Query.by_user(self.factory.user)
|
|
|
|
# not using self.assertIn/NotIn because otherwise this fails :O
|
|
self.assertTrue(q in queries)
|
|
self.assertFalse(q2 in queries)
|
|
|
|
def test_returns_only_queries_from_groups_the_user_is_member_in(self):
|
|
q = self.factory.create_query()
|
|
q2 = self.factory.create_query(data_source=self.factory.create_data_source(group=self.factory.create_group()))
|
|
|
|
queries = Query.by_user(self.factory.user)
|
|
|
|
# not using self.assertIn/NotIn because otherwise this fails :O
|
|
self.assertTrue(q in queries)
|
|
self.assertFalse(q2 in queries)
|
|
|
|
|
|
class TestQueryFork(BaseTestCase):
|
|
def assert_visualizations(self, origin_q, origin_v, forked_q, forked_v):
|
|
self.assertEqual(origin_v.options, forked_v.options)
|
|
self.assertEqual(origin_v.type, forked_v.type)
|
|
self.assertNotEqual(origin_v.id, forked_v.id)
|
|
self.assertNotEqual(origin_v.query_rel, forked_v.query_rel)
|
|
self.assertEqual(forked_q.id, forked_v.query_rel.id)
|
|
|
|
def test_fork_with_visualizations(self):
|
|
# prepare original query and visualizations
|
|
data_source = self.factory.create_data_source(
|
|
group=self.factory.create_group())
|
|
query = self.factory.create_query(data_source=data_source,
|
|
description="this is description")
|
|
visualization_chart = self.factory.create_visualization(
|
|
query_rel=query, description="chart vis", type="CHART",
|
|
options="""{"yAxis": [{"type": "linear"}, {"type": "linear", "opposite": true}], "series": {"stacking": null}, "globalSeriesType": "line", "sortX": true, "seriesOptions": {"count": {"zIndex": 0, "index": 0, "type": "line", "yAxis": 0}}, "xAxis": {"labels": {"enabled": true}, "type": "datetime"}, "columnMapping": {"count": "y", "created_at": "x"}, "bottomMargin": 50, "legend": {"enabled": true}}""")
|
|
visualization_box = self.factory.create_visualization(
|
|
query_rel=query, description="box vis", type="BOXPLOT",
|
|
options="{}")
|
|
fork_user = self.factory.create_user()
|
|
forked_query = query.fork(fork_user)
|
|
db.session.flush()
|
|
|
|
forked_visualization_chart = None
|
|
forked_visualization_box = None
|
|
forked_table = None
|
|
count_table = 0
|
|
for v in forked_query.visualizations:
|
|
if v.description == "chart vis":
|
|
forked_visualization_chart = v
|
|
if v.description == "box vis":
|
|
forked_visualization_box = v
|
|
if v.type == "TABLE":
|
|
count_table += 1
|
|
forked_table = v
|
|
self.assert_visualizations(query, visualization_chart, forked_query,
|
|
forked_visualization_chart)
|
|
self.assert_visualizations(query, visualization_box, forked_query,
|
|
forked_visualization_box)
|
|
|
|
self.assertEqual(forked_query.org, query.org)
|
|
self.assertEqual(forked_query.data_source, query.data_source)
|
|
self.assertEqual(forked_query.latest_query_data,
|
|
query.latest_query_data)
|
|
self.assertEqual(forked_query.description, query.description)
|
|
self.assertEqual(forked_query.query_text, query.query_text)
|
|
self.assertEqual(forked_query.query_hash, query.query_hash)
|
|
self.assertEqual(forked_query.user, fork_user)
|
|
self.assertEqual(forked_query.description, query.description)
|
|
self.assertTrue(forked_query.name.startswith('Copy'))
|
|
# num of TABLE must be 1. default table only
|
|
self.assertEqual(count_table, 1)
|
|
self.assertEqual(forked_table.name, "Table")
|
|
self.assertEqual(forked_table.description, "")
|
|
self.assertEqual(forked_table.options, "{}")
|
|
|
|
def test_fork_from_query_that_has_no_visualization(self):
|
|
# prepare original query and visualizations
|
|
data_source = self.factory.create_data_source(
|
|
group=self.factory.create_group())
|
|
query = self.factory.create_query(data_source=data_source,
|
|
description="this is description")
|
|
fork_user = self.factory.create_user()
|
|
|
|
forked_query = query.fork(fork_user)
|
|
|
|
count_table = 0
|
|
count_vis = 0
|
|
for v in forked_query.visualizations:
|
|
count_vis += 1
|
|
if v.type == "TABLE":
|
|
count_table += 1
|
|
|
|
self.assertEqual(count_table, 1)
|
|
self.assertEqual(count_vis, 1)
|