redash/tests/test_cli.py
Arik Fraimovich 2dff8b9a00
Black support for the Python codebase (#4297)
* Apply black formatting

* Add auto formatting when committing to master

* Update CONTRIBUTING.md re. Black & Prettier
2019-12-11 13:54:29 +02:00

578 lines
20 KiB
Python

import mock
import textwrap
from click.testing import CliRunner
from tests import BaseTestCase
from redash.utils.configuration import ConfigurationContainer
from redash.query_runner import query_runners
from redash.cli import manager
from redash.models import DataSource, Group, Organization, User, db
class DataSourceCommandTests(BaseTestCase):
def test_interactive_new(self):
runner = CliRunner()
pg_i = list(query_runners.keys()).index("pg") + 1
result = runner.invoke(
manager,
["ds", "new"],
input="test\n%s\n\n\nexample.com\n\n\ntestdb\n" % (pg_i,),
)
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
self.assertEqual(DataSource.query.count(), 1)
ds = DataSource.query.first()
self.assertEqual(ds.name, "test")
self.assertEqual(ds.type, "pg")
self.assertEqual(ds.options["dbname"], "testdb")
def test_options_new(self):
runner = CliRunner()
result = runner.invoke(
manager,
[
"ds",
"new",
"test",
"--options",
'{"host": "example.com", "dbname": "testdb"}',
"--type",
"pg",
],
)
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
self.assertEqual(DataSource.query.count(), 1)
ds = DataSource.query.first()
self.assertEqual(ds.name, "test")
self.assertEqual(ds.type, "pg")
self.assertEqual(ds.options["host"], "example.com")
self.assertEqual(ds.options["dbname"], "testdb")
def test_bad_type_new(self):
runner = CliRunner()
result = runner.invoke(manager, ["ds", "new", "test", "--type", "wrong"])
self.assertTrue(result.exception)
self.assertEqual(result.exit_code, 1)
self.assertIn("not supported", result.output)
self.assertEqual(DataSource.query.count(), 0)
def test_bad_options_new(self):
runner = CliRunner()
result = runner.invoke(
manager,
[
"ds",
"new",
"test",
"--options",
'{"host": 12345, "dbname": "testdb"}',
"--type",
"pg",
],
)
self.assertTrue(result.exception)
self.assertEqual(result.exit_code, 1)
self.assertIn("invalid configuration", result.output)
self.assertEqual(DataSource.query.count(), 0)
def test_list(self):
self.factory.create_data_source(
name="test1",
type="pg",
options=ConfigurationContainer(
{"host": "example.com", "dbname": "testdb1"}
),
)
self.factory.create_data_source(
name="test2",
type="sqlite",
options=ConfigurationContainer({"dbpath": "/tmp/test.db"}),
)
self.factory.create_data_source(
name="Atest",
type="sqlite",
options=ConfigurationContainer({"dbpath": "/tmp/test.db"}),
)
runner = CliRunner()
result = runner.invoke(manager, ["ds", "list"])
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
expected_output = """
Id: 3
Name: Atest
Type: sqlite
Options: {"dbpath": "/tmp/test.db"}
--------------------
Id: 1
Name: test1
Type: pg
Options: {"dbname": "testdb1", "host": "example.com"}
--------------------
Id: 2
Name: test2
Type: sqlite
Options: {"dbpath": "/tmp/test.db"}
"""
self.assertMultiLineEqual(
result.output, textwrap.dedent(expected_output).lstrip()
)
def test_connection_test(self):
self.factory.create_data_source(
name="test1",
type="sqlite",
options=ConfigurationContainer({"dbpath": "/tmp/test.db"}),
)
runner = CliRunner()
result = runner.invoke(manager, ["ds", "test", "test1"])
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
self.assertIn("Success", result.output)
def test_connection_bad_test(self):
self.factory.create_data_source(
name="test1",
type="sqlite",
options=ConfigurationContainer({"dbpath": __file__}),
)
runner = CliRunner()
result = runner.invoke(manager, ["ds", "test", "test1"])
self.assertTrue(result.exception)
self.assertEqual(result.exit_code, 1)
self.assertIn("Failure", result.output)
def test_connection_delete(self):
self.factory.create_data_source(
name="test1",
type="sqlite",
options=ConfigurationContainer({"dbpath": "/tmp/test.db"}),
)
runner = CliRunner()
result = runner.invoke(manager, ["ds", "delete", "test1"])
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
self.assertIn("Deleting", result.output)
self.assertEqual(DataSource.query.count(), 0)
def test_connection_bad_delete(self):
self.factory.create_data_source(
name="test1",
type="sqlite",
options=ConfigurationContainer({"dbpath": "/tmp/test.db"}),
)
runner = CliRunner()
result = runner.invoke(manager, ["ds", "delete", "wrong"])
self.assertTrue(result.exception)
self.assertEqual(result.exit_code, 1)
self.assertIn("Couldn't find", result.output)
self.assertEqual(DataSource.query.count(), 1)
def test_options_edit(self):
self.factory.create_data_source(
name="test1",
type="sqlite",
options=ConfigurationContainer({"dbpath": "/tmp/test.db"}),
)
runner = CliRunner()
result = runner.invoke(
manager,
[
"ds",
"edit",
"test1",
"--options",
'{"host": "example.com", "dbname": "testdb"}',
"--name",
"test2",
"--type",
"pg",
],
)
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
self.assertEqual(DataSource.query.count(), 1)
ds = DataSource.query.first()
self.assertEqual(ds.name, "test2")
self.assertEqual(ds.type, "pg")
self.assertEqual(ds.options["host"], "example.com")
self.assertEqual(ds.options["dbname"], "testdb")
def test_bad_type_edit(self):
self.factory.create_data_source(
name="test1",
type="sqlite",
options=ConfigurationContainer({"dbpath": "/tmp/test.db"}),
)
runner = CliRunner()
result = runner.invoke(manager, ["ds", "edit", "test", "--type", "wrong"])
self.assertTrue(result.exception)
self.assertEqual(result.exit_code, 1)
self.assertIn("not supported", result.output)
ds = DataSource.query.first()
self.assertEqual(ds.type, "sqlite")
def test_bad_options_edit(self):
ds = self.factory.create_data_source(
name="test1",
type="sqlite",
options=ConfigurationContainer({"dbpath": "/tmp/test.db"}),
)
runner = CliRunner()
result = runner.invoke(
manager,
[
"ds",
"new",
"test",
"--options",
'{"host": 12345, "dbname": "testdb"}',
"--type",
"pg",
],
)
self.assertTrue(result.exception)
self.assertEqual(result.exit_code, 1)
self.assertIn("invalid configuration", result.output)
ds = DataSource.query.first()
self.assertEqual(ds.type, "sqlite")
self.assertEqual(ds.options._config, {"dbpath": "/tmp/test.db"})
class GroupCommandTests(BaseTestCase):
def test_create(self):
gcount = Group.query.count()
perms = ["create_query", "edit_query", "view_query"]
runner = CliRunner()
result = runner.invoke(
manager, ["groups", "create", "test", "--permissions", ",".join(perms)]
)
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
self.assertEqual(Group.query.count(), gcount + 1)
g = Group.query.order_by(Group.id.desc()).first()
db.session.add(self.factory.org)
self.assertEqual(g.org_id, self.factory.org.id)
self.assertEqual(g.permissions, perms)
def test_change_permissions(self):
g = self.factory.create_group(permissions=["list_dashboards"])
db.session.flush()
g_id = g.id
perms = ["create_query", "edit_query", "view_query"]
runner = CliRunner()
result = runner.invoke(
manager,
[
"groups",
"change_permissions",
str(g_id),
"--permissions",
",".join(perms),
],
)
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
g = Group.query.filter(Group.id == g_id).first()
self.assertEqual(g.permissions, perms)
def test_list(self):
self.factory.create_group(name="test", permissions=["list_dashboards"])
self.factory.create_group(name="agroup", permissions=["list_dashboards"])
self.factory.create_group(name="bgroup", permissions=["list_dashboards"])
self.factory.create_user(
name="Fred Foobar",
email="foobar@example.com",
org=self.factory.org,
group_ids=[self.factory.default_group.id],
)
runner = CliRunner()
result = runner.invoke(manager, ["groups", "list"])
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
output = """
Id: 1
Name: admin
Type: builtin
Organization: default
Permissions: [admin,super_admin]
Users:
--------------------
Id: 4
Name: agroup
Type: regular
Organization: default
Permissions: [list_dashboards]
Users:
--------------------
Id: 5
Name: bgroup
Type: regular
Organization: default
Permissions: [list_dashboards]
Users:
--------------------
Id: 2
Name: default
Type: builtin
Organization: default
Permissions: [create_dashboard,create_query,edit_dashboard,edit_query,view_query,view_source,execute_query,list_users,schedule_query,list_dashboards,list_alerts,list_data_sources]
Users: Fred Foobar
--------------------
Id: 3
Name: test
Type: regular
Organization: default
Permissions: [list_dashboards]
Users:
"""
self.assertMultiLineEqual(result.output, textwrap.dedent(output).lstrip())
class OrganizationCommandTests(BaseTestCase):
def test_set_google_apps_domains(self):
domains = ["example.org", "example.com"]
runner = CliRunner()
result = runner.invoke(
manager, ["org", "set_google_apps_domains", ",".join(domains)]
)
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
db.session.add(self.factory.org)
self.assertEqual(self.factory.org.google_apps_domains, domains)
def test_show_google_apps_domains(self):
self.factory.org.settings[Organization.SETTING_GOOGLE_APPS_DOMAINS] = [
"example.org",
"example.com",
]
db.session.add(self.factory.org)
db.session.commit()
runner = CliRunner()
result = runner.invoke(manager, ["org", "show_google_apps_domains"])
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
output = """
Current list of Google Apps domains: example.org, example.com
"""
self.assertMultiLineEqual(result.output, textwrap.dedent(output).lstrip())
def test_list(self):
self.factory.create_org(name="test", slug="test_org")
self.factory.create_org(name="Borg", slug="B_org")
self.factory.create_org(name="Aorg", slug="A_org")
runner = CliRunner()
result = runner.invoke(manager, ["org", "list"])
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
output = """
Id: 4
Name: Aorg
Slug: A_org
--------------------
Id: 3
Name: Borg
Slug: B_org
--------------------
Id: 1
Name: Default
Slug: default
--------------------
Id: 2
Name: test
Slug: test_org
"""
self.assertMultiLineEqual(result.output, textwrap.dedent(output).lstrip())
class UserCommandTests(BaseTestCase):
def test_create_basic(self):
runner = CliRunner()
result = runner.invoke(
manager,
["users", "create", "foobar@example.com", "Fred Foobar"],
input="password1\npassword1\n",
)
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
u = User.query.filter(User.email == "foobar@example.com").first()
self.assertEqual(u.name, "Fred Foobar")
self.assertTrue(u.verify_password("password1"))
self.assertEqual(u.group_ids, [u.org.default_group.id])
def test_create_admin(self):
runner = CliRunner()
result = runner.invoke(
manager,
[
"users",
"create",
"foobar@example.com",
"Fred Foobar",
"--password",
"password1",
"--admin",
],
)
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
u = User.query.filter(User.email == "foobar@example.com").first()
self.assertEqual(u.name, "Fred Foobar")
self.assertTrue(u.verify_password("password1"))
self.assertEqual(u.group_ids, [u.org.default_group.id, u.org.admin_group.id])
def test_create_googleauth(self):
runner = CliRunner()
result = runner.invoke(
manager,
["users", "create", "foobar@example.com", "Fred Foobar", "--google"],
)
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
u = User.query.filter(User.email == "foobar@example.com").first()
self.assertEqual(u.name, "Fred Foobar")
self.assertIsNone(u.password_hash)
self.assertEqual(u.group_ids, [u.org.default_group.id])
def test_create_bad(self):
self.factory.create_user(email="foobar@example.com")
runner = CliRunner()
result = runner.invoke(
manager,
["users", "create", "foobar@example.com", "Fred Foobar"],
input="password1\npassword1\n",
)
self.assertTrue(result.exception)
self.assertEqual(result.exit_code, 1)
self.assertIn("Failed", result.output)
def test_delete(self):
self.factory.create_user(email="foobar@example.com")
ucount = User.query.count()
runner = CliRunner()
result = runner.invoke(manager, ["users", "delete", "foobar@example.com"])
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
self.assertEqual(
User.query.filter(User.email == "foobar@example.com").count(), 0
)
self.assertEqual(User.query.count(), ucount - 1)
def test_delete_bad(self):
ucount = User.query.count()
runner = CliRunner()
result = runner.invoke(manager, ["users", "delete", "foobar@example.com"])
self.assertIn("Deleted 0 users", result.output)
self.assertEqual(User.query.count(), ucount)
def test_password(self):
self.factory.create_user(email="foobar@example.com")
runner = CliRunner()
result = runner.invoke(
manager, ["users", "password", "foobar@example.com", "xyzzy"]
)
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
u = User.query.filter(User.email == "foobar@example.com").first()
self.assertTrue(u.verify_password("xyzzy"))
def test_password_bad(self):
runner = CliRunner()
result = runner.invoke(
manager, ["users", "password", "foobar@example.com", "xyzzy"]
)
self.assertTrue(result.exception)
self.assertEqual(result.exit_code, 1)
self.assertIn("not found", result.output)
def test_password_bad_org(self):
runner = CliRunner()
result = runner.invoke(
manager,
["users", "password", "foobar@example.com", "xyzzy", "--org", "default"],
)
self.assertTrue(result.exception)
self.assertEqual(result.exit_code, 1)
self.assertIn("not found", result.output)
def test_invite(self):
admin = self.factory.create_user(email="redash-admin@example.com")
runner = CliRunner()
with mock.patch("redash.cli.users.invite_user") as iu:
result = runner.invoke(
manager,
[
"users",
"invite",
"foobar@example.com",
"Fred Foobar",
"redash-admin@example.com",
],
)
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
self.assertTrue(iu.called)
c = iu.call_args[0]
db.session.add_all(c)
self.assertEqual(c[0].id, self.factory.org.id)
self.assertEqual(c[1].id, admin.id)
self.assertEqual(c[2].email, "foobar@example.com")
def test_list(self):
self.factory.create_user(
name="Fred Foobar", email="foobar@example.com", org=self.factory.org
)
self.factory.create_user(
name="William Foobar", email="william@example.com", org=self.factory.org
)
self.factory.create_user(
name="Andrew Foobar", email="andrew@example.com", org=self.factory.org
)
runner = CliRunner()
result = runner.invoke(manager, ["users", "list"])
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
output = """
Id: 3
Name: Andrew Foobar
Email: andrew@example.com
Organization: Default
Active: True
Groups: default
--------------------
Id: 1
Name: Fred Foobar
Email: foobar@example.com
Organization: Default
Active: True
Groups: default
--------------------
Id: 2
Name: William Foobar
Email: william@example.com
Organization: Default
Active: True
Groups: default
"""
self.assertMultiLineEqual(result.output, textwrap.dedent(output).lstrip())
def test_grant_admin(self):
u = self.factory.create_user(
name="Fred Foobar",
email="foobar@example.com",
org=self.factory.org,
group_ids=[self.factory.default_group.id],
)
runner = CliRunner()
result = runner.invoke(manager, ["users", "grant_admin", "foobar@example.com"])
self.assertFalse(result.exception)
self.assertEqual(result.exit_code, 0)
db.session.add(u)
self.assertEqual(u.group_ids, [u.org.default_group.id, u.org.admin_group.id])