Replace Celery with RQ (except for execute_query tasks) (#4093)

* add rq and an rq_worker service

* add rq_scheduler and an rq_scheduler service

* move beat schedule to periodic_jobs queue

* move version checks to RQ

* move query result cleanup to RQ

* use timedelta and DRY up a bit

* move custom tasks to RQ

* do actual schema refreshes in rq

* rename 'period_jobs' to 'periodic', as it obviously holds jobs

* move send_email to rq

* DRY up enqueues

* ditch  and use a partially applied  decorator

* move subscribe to rq

* move check_alerts_for_query to rq

* move record_event to rq

* make tests play nicely with rq

* 👋 beat

* rename rq_scheduler to plain scheduler, now that there's no Celery scheduler entrypoint

* add some color to rq-worker's output

* add logging context to rq jobs (while keeping execute_query context via get_task_logger for now)

* move schedule to its own module

* cancel previously scheduled periodic jobs. not sure this is a good idea.

* rename redash.scheduler to redash.schedule

* allow custom dynamic jobs to be added decleratively

* add basic monitoring to rq queues

* add worker monitoring

* pleasing the CodeClimate overlords

* adjust cypress docker-compose.yml to include rq changes

* DRY up Cypress docker-compose

* add rq dependencies to cypress docker-compose service

* an odd attempt at watching docker-compose logs when running with Cypress

* Revert "an odd attempt at watching docker-compose logs when running with Cypress"

This reverts commit 016bd1a93e3efa84a9f27d0f2acb972ce1957bcd.

* show docker-compose logs at Cypress shutdown

* Revert "DRY up Cypress docker-compose"

This reverts commit 43abac7084c207ab9e39192ac79d520448c2c527.

* minimal version for binding is 3.2

* remove unneccesary code reloads on cypress

* add a  command which errors if any of the workers running inside the current machine haven't been active in the last minute

* SCHEMAS_REFRESH_QUEUE is no longer a required setting

* split tasks/queries.py to execution.py and maintenance.py

* fix tests after query execution split

* pleasing the CodeClimate overlords

* rename worker to celery_worker and rq_worker to worker

* use /rq_status instead of /jobs

* show started jobs' time ago according to UTC

* replace all spaces in column names

* fix query tests after execution split

* exit with an int

* general lint

* add an entrypoint for rq_healthcheck

* fix indentation

* delete all existing periodic jobs before scheduling them

* remove some unrequired requires

* move schedule example to redash.schedule

* add RQ integration to Sentry's setup

* pleasing the CodeClimate overlords

* remove replication settings from docker-compose - a proper way to scale using docker-compose would be the --scale CLI option, which will be described in the knowledge based

* revert to calling a function in dynamic settings to allow periodic jobs to be scheduled after app has been loaded

* don't need to depend on context when templating failure reports

* set the timeout_ttl to double the interval to avoid job results from expiring and having periodic jobs not reschedule

* whoops, bad merge

* describe custom jobs and don't actually schedule them

* fix merge
This commit is contained in:
Omer Lachish 2019-10-15 23:59:22 +03:00 committed by GitHub
parent f6e1470a7c
commit 5a5fdecdde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 712 additions and 285 deletions

View File

@ -14,9 +14,16 @@ services:
REDASH_REDIS_URL: "redis://redis:6379/0"
REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres"
REDASH_RATELIMIT_ENABLED: "false"
worker:
scheduler:
build: ../
command: scheduler
depends_on:
- server
environment:
REDASH_REDIS_URL: "redis://redis:6379/0"
worker:
build: ../
command: worker
depends_on:
- server
environment:
@ -24,7 +31,18 @@ services:
REDASH_LOG_LEVEL: "INFO"
REDASH_REDIS_URL: "redis://redis:6379/0"
REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres"
QUEUES: "queries,scheduled_queries,celery,schemas"
QUEUES: "default periodic schemas"
celery_worker:
build: ../
command: celery_worker
depends_on:
- server
environment:
PYTHONUNBUFFERED: 0
REDASH_LOG_LEVEL: "INFO"
REDASH_REDIS_URL: "redis://redis:6379/0"
REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres"
QUEUES: "queries,scheduled_queries"
WORKERS_COUNT: 2
cypress:
build:
@ -32,7 +50,9 @@ services:
dockerfile: .circleci/Dockerfile.cypress
depends_on:
- server
- celery_worker
- worker
- scheduler
environment:
CYPRESS_baseUrl: "http://server:5000"
PERCY_TOKEN: ${PERCY_TOKEN}

View File

@ -1,9 +1,9 @@
#!/bin/bash
set -e
worker() {
celery_worker() {
WORKERS_COUNT=${WORKERS_COUNT:-2}
QUEUES=${QUEUES:-queries,scheduled_queries,celery,schemas}
QUEUES=${QUEUES:-queries,scheduled_queries}
WORKER_EXTRA_OPTIONS=${WORKER_EXTRA_OPTIONS:-}
echo "Starting $WORKERS_COUNT workers for queues: $QUEUES..."
@ -11,23 +11,36 @@ worker() {
}
scheduler() {
WORKERS_COUNT=${WORKERS_COUNT:-1}
QUEUES=${QUEUES:-celery}
SCHEDULE_DB=${SCHEDULE_DB:-celerybeat-schedule}
echo "Starting RQ scheduler..."
echo "Starting scheduler and $WORKERS_COUNT workers for queues: $QUEUES..."
exec /app/manage.py rq scheduler
}
exec /usr/local/bin/celery worker --app=redash.worker --beat -s$SCHEDULE_DB -c$WORKERS_COUNT -Q$QUEUES -linfo --max-tasks-per-child=10 -Ofair
dev_scheduler() {
echo "Starting dev RQ scheduler..."
exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq scheduler
}
worker() {
echo "Starting RQ worker..."
exec /app/manage.py rq worker $QUEUES
}
dev_worker() {
echo "Starting dev RQ worker..."
exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq worker $QUEUES
}
dev_celery_worker() {
WORKERS_COUNT=${WORKERS_COUNT:-2}
QUEUES=${QUEUES:-queries,scheduled_queries,celery,schemas}
SCHEDULE_DB=${SCHEDULE_DB:-celerybeat-schedule}
QUEUES=${QUEUES:-queries,scheduled_queries}
echo "Starting dev scheduler and $WORKERS_COUNT workers for queues: $QUEUES..."
echo "Starting $WORKERS_COUNT workers for queues: $QUEUES..."
exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- /usr/local/bin/celery worker --app=redash.worker --beat -s$SCHEDULE_DB -c$WORKERS_COUNT -Q$QUEUES -linfo --max-tasks-per-child=10 -Ofair
exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- /usr/local/bin/celery worker --app=redash.worker -c$WORKERS_COUNT -Q$QUEUES -linfo --max-tasks-per-child=10 -Ofair
}
server() {
@ -45,6 +58,10 @@ celery_healthcheck() {
exec /usr/local/bin/celery inspect ping --app=redash.worker -d celery@$HOSTNAME
}
rq_healthcheck() {
exec /app/manage.py rq healthcheck
}
help() {
echo "Redash Docker."
echo ""
@ -52,10 +69,14 @@ help() {
echo ""
echo "server -- start Redash server (with gunicorn)"
echo "worker -- start Celery worker"
echo "scheduler -- start Celery worker with a beat (scheduler) process"
echo "dev_worker -- start Celery worker with a beat (scheduler) process which picks up code changes and reloads"
echo "celery_worker -- start Celery worker"
echo "dev_celery_worker -- start Celery worker process which picks up code changes and reloads"
echo "worker -- start a single RQ worker"
echo "dev_worker -- start a single RQ worker with code reloading"
echo "scheduler -- start an rq-scheduler instance"
echo "dev_scheduler -- start an rq-scheduler instance with code reloading"
echo "celery_healthcheck -- runs a Celery healthcheck. Useful for Docker's HEALTHCHECK mechanism."
echo "rq_healthcheck -- runs a RQ healthcheck that verifies that all local workers are active. Useful for Docker's HEALTHCHECK mechanism."
echo ""
echo "shell -- open shell"
echo "dev_server -- start Flask development server with debugger and auto reload"
@ -89,10 +110,30 @@ case "$1" in
shift
scheduler
;;
dev_scheduler)
shift
dev_scheduler
;;
celery_worker)
shift
celery_worker
;;
dev_celery_worker)
shift
dev_celery_worker
;;
dev_worker)
shift
dev_worker
;;
rq_healthcheck)
shift
rq_healthcheck
;;
celery_healthcheck)
shift
celery_healthcheck
;;
dev_server)
export FLASK_DEBUG=1
exec /app/manage.py runserver --debugger --reload -h 0.0.0.0

View File

@ -58,10 +58,6 @@ const queryColumns = commonColumns.concat([
{ title: 'Scheduled', dataIndex: 'scheduled' },
]);
const otherTasksColumns = commonColumns.concat([
{ title: 'Task Name', dataIndex: 'task_name' },
]);
const queuesColumns = map(
['Name', 'Active', 'Reserved', 'Waiting'],
c => ({ title: c, dataIndex: c.toLowerCase() }),
@ -97,16 +93,3 @@ export function QueriesTable({ loading, items }) {
}
QueriesTable.propTypes = TablePropTypes;
export function OtherTasksTable({ loading, items }) {
return (
<Table
loading={loading}
columns={otherTasksColumns}
rowKey="task_id"
dataSource={items}
/>
);
}
OtherTasksTable.propTypes = TablePropTypes;

View File

@ -18,6 +18,9 @@ export default function Layout({ activeTab, children }) {
<Tabs.TabPane key="tasks" tab={<a href="admin/queries/tasks">Celery Status</a>}>
{(activeTab === 'tasks') ? children : null}
</Tabs.TabPane>
<Tabs.TabPane key="jobs" tab={<a href="admin/queries/jobs">RQ Status</a>}>
{(activeTab === 'jobs') ? children : null}
</Tabs.TabPane>
<Tabs.TabPane key="outdated_queries" tab={<a href="admin/queries/outdated">Outdated Queries</a>}>
{(activeTab === 'outdated_queries') ? children : null}
</Tabs.TabPane>

View File

@ -0,0 +1,78 @@
import { map } from 'lodash';
import React from 'react';
import PropTypes from 'prop-types';
import Badge from 'antd/lib/badge';
import Table from 'antd/lib/table';
import { Columns } from '@/components/items-list/components/ItemsTable';
// Tables
const otherJobsColumns = [
{ title: 'Queue', dataIndex: 'queue' },
{ title: 'Job Name', dataIndex: 'name' },
Columns.timeAgo({ title: 'Start Time', dataIndex: 'started_at' }),
Columns.timeAgo({ title: 'Enqueue Time', dataIndex: 'enqueued_at' }),
];
const workersColumns = [Columns.custom(
value => (
<span><Badge status={{ busy: 'processing',
idle: 'default',
started: 'success',
suspended: 'warning' }[value]}
/> {value}
</span>
), { title: 'State', dataIndex: 'state' },
)].concat(map(['Hostname', 'PID', 'Name', 'Queues', 'Successful Job Count',
'Failed Job Count', 'Birth Date', 'Total Working Time'],
c => ({ title: c, dataIndex: c.toLowerCase().replace(/\s/g, '_') })));
const queuesColumns = map(
['Name', 'Started', 'Queued'],
c => ({ title: c, dataIndex: c.toLowerCase() }),
);
const TablePropTypes = {
loading: PropTypes.bool.isRequired,
items: PropTypes.arrayOf(PropTypes.object).isRequired,
};
export function WorkersTable({ loading, items }) {
return (
<Table
loading={loading}
columns={workersColumns}
rowKey="name"
dataSource={items}
/>
);
}
WorkersTable.propTypes = TablePropTypes;
export function QueuesTable({ loading, items }) {
return (
<Table
loading={loading}
columns={queuesColumns}
rowKey="name"
dataSource={items}
/>
);
}
QueuesTable.propTypes = TablePropTypes;
export function OtherJobsTable({ loading, items }) {
return (
<Table
loading={loading}
columns={otherJobsColumns}
rowKey="id"
dataSource={items}
/>
);
}
OtherJobsTable.propTypes = TablePropTypes;

View File

@ -0,0 +1,124 @@
import { flatMap, values } from 'lodash';
import React from 'react';
import { react2angular } from 'react2angular';
import Alert from 'antd/lib/alert';
import Tabs from 'antd/lib/tabs';
import * as Grid from 'antd/lib/grid';
import Layout from '@/components/admin/Layout';
import { CounterCard } from '@/components/admin/CeleryStatus';
import { WorkersTable, QueuesTable, OtherJobsTable } from '@/components/admin/RQStatus';
import { $http } from '@/services/ng';
import recordEvent from '@/services/recordEvent';
import { routesToAngularRoutes } from '@/lib/utils';
import moment from 'moment';
class Jobs extends React.Component {
state = {
isLoading: true,
error: null,
queueCounters: [],
overallCounters: { started: 0, queued: 0 },
startedJobs: [],
workers: [],
};
componentDidMount() {
recordEvent('view', 'page', 'admin/rq_status');
$http
.get('/api/admin/queries/rq_status')
.then(({ data }) => this.processQueues(data))
.catch(error => this.handleError(error));
}
componentWillUnmount() {
// Ignore data after component unmounted
this.processQueues = () => {};
this.handleError = () => {};
}
processQueues = ({ queues, workers }) => {
const queueCounters = values(queues).map(({ name, started, queued }) => ({
name,
started: started.length,
queued: queued.length,
}));
const overallCounters = queueCounters.reduce(
(c, q) => ({
started: c.started + q.started,
queued: c.queued + q.queued,
}),
{ started: 0, queued: 0 },
);
const startedJobs = flatMap(values(queues), queue => queue.started.map(job => ({
...job,
enqueued_at: moment.utc(job.enqueued_at),
started_at: moment.utc(job.started_at),
})));
this.setState({ isLoading: false, queueCounters, startedJobs, overallCounters, workers });
};
handleError = (error) => {
this.setState({ isLoading: false, error });
};
render() {
const { isLoading, error, queueCounters, startedJobs, overallCounters, workers } = this.state;
return (
<Layout activeTab="jobs">
<div className="p-15">
{error && (
<Alert type="error" message="Failed loading status. Please refresh." />
)}
{!error && (
<React.Fragment>
<Grid.Row gutter={15} className="m-b-15">
<Grid.Col span={8}>
<CounterCard title="Started Jobs" value={overallCounters.started} loading={isLoading} />
</Grid.Col>
<Grid.Col span={8}>
<CounterCard title="Queued Jobs" value={overallCounters.queued} loading={isLoading} />
</Grid.Col>
</Grid.Row>
<Tabs defaultActiveKey="queues" animated={false}>
<Tabs.TabPane key="queues" tab="Queues">
<QueuesTable loading={isLoading} items={queueCounters} />
</Tabs.TabPane>
<Tabs.TabPane key="workers" tab="Workers">
<WorkersTable loading={isLoading} items={workers} />
</Tabs.TabPane>
<Tabs.TabPane key="other" tab="Other Jobs">
<OtherJobsTable loading={isLoading} items={startedJobs} />
</Tabs.TabPane>
</Tabs>
</React.Fragment>
)}
</div>
</Layout>
);
}
}
export default function init(ngModule) {
ngModule.component('pageJobs', react2angular(Jobs));
return routesToAngularRoutes([
{
path: '/admin/queries/jobs',
title: 'RQ Status',
key: 'jobs',
},
], {
template: '<page-jobs></page-jobs>',
});
}
init.init = true;

View File

@ -7,7 +7,7 @@ import Alert from 'antd/lib/alert';
import Tabs from 'antd/lib/tabs';
import * as Grid from 'antd/lib/grid';
import Layout from '@/components/admin/Layout';
import { CounterCard, QueuesTable, QueriesTable, OtherTasksTable } from '@/components/admin/CeleryStatus';
import { CounterCard, QueuesTable, QueriesTable } from '@/components/admin/CeleryStatus';
import { $http } from '@/services/ng';
import recordEvent from '@/services/recordEvent';
@ -29,7 +29,6 @@ class Tasks extends React.Component {
queues: [],
queries: [],
otherTasks: [],
counters: { active: 0, reserved: 0, waiting: 0 },
};
@ -50,7 +49,6 @@ class Tasks extends React.Component {
processTasks = (tasks) => {
const queues = {};
const queries = [];
const otherTasks = [];
const counters = { active: 0, reserved: 0, waiting: 0 };
@ -70,12 +68,10 @@ class Tasks extends React.Component {
if (task.task_name === 'redash.tasks.execute_query') {
queries.push(task);
} else {
otherTasks.push(task);
}
});
this.setState({ isLoading: false, queues: values(queues), queries, otherTasks, counters });
this.setState({ isLoading: false, queues: values(queues), queries, counters });
};
handleError = (error) => {
@ -83,7 +79,7 @@ class Tasks extends React.Component {
};
render() {
const { isLoading, error, queues, queries, otherTasks, counters } = this.state;
const { isLoading, error, queues, queries, counters } = this.state;
return (
<Layout activeTab="tasks">
@ -113,9 +109,6 @@ class Tasks extends React.Component {
<Tabs.TabPane key="queries" tab="Queries">
<QueriesTable loading={isLoading} items={queries} />
</Tabs.TabPane>
<Tabs.TabPane key="other" tab="Other Tasks">
<OtherTasksTable loading={isLoading} items={otherTasks} />
</Tabs.TabPane>
</Tabs>
</React.Fragment>
)}

View File

@ -19,11 +19,38 @@ services:
REDASH_REDIS_URL: "redis://redis:6379/0"
REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres"
REDASH_RATELIMIT_ENABLED: "false"
scheduler:
build: .
command: dev_scheduler
volumes:
- type: bind
source: .
target: /app
depends_on:
- server
environment:
REDASH_REDIS_URL: "redis://redis:6379/0"
REDASH_MAIL_DEFAULT_SENDER: redash@example.com
REDASH_MAIL_SERVER: email
worker:
build: .
command: dev_worker
volumes:
- type: bind
source: .
target: /app
depends_on:
- server
tty: true
environment:
PYTHONUNBUFFERED: 0
REDASH_LOG_LEVEL: "INFO"
REDASH_REDIS_URL: "redis://redis:6379/0"
REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres"
QUEUES: "default periodic schemas"
celery-worker:
build: .
command: dev_celery_worker
volumes:
- type: bind
source: .
@ -36,7 +63,7 @@ services:
REDASH_LOG_LEVEL: "INFO"
REDASH_REDIS_URL: "redis://redis:6379/0"
REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres"
QUEUES: "queries,scheduled_queries,celery,schemas"
QUEUES: "queries,scheduled_queries"
WORKERS_COUNT: 2
REDASH_MAIL_DEFAULT_SENDER: redash@example.com
REDASH_MAIL_SERVER: email
@ -56,4 +83,4 @@ services:
image: djfarrelly/maildev
ports:
- "1080:80"
restart: unless-stopped
restart: unless-stopped

View File

@ -1,3 +1,4 @@
from __future__ import absolute_import
import logging
import os
import sys

View File

@ -6,7 +6,7 @@ from flask import current_app
from flask.cli import FlaskGroup, run_command
from redash import __version__, create_app, settings
from redash.cli import data_sources, database, groups, organization, queries, users
from redash.cli import data_sources, database, groups, organization, queries, users, rq
from redash.monitor import get_status
@ -35,6 +35,7 @@ manager.add_command(groups.manager, "groups")
manager.add_command(data_sources.manager, "ds")
manager.add_command(organization.manager, "org")
manager.add_command(queries.manager, "queries")
manager.add_command(rq.manager, "rq")
manager.add_command(run_command, "runserver")

41
redash/cli/rq.py Normal file
View File

@ -0,0 +1,41 @@
from __future__ import absolute_import
import socket
import sys
import datetime
from click import argument
from flask.cli import AppGroup
from rq import Connection, Worker
from redash import redis_connection
from redash.schedule import rq_scheduler, schedule_periodic_jobs
manager = AppGroup(help="RQ management commands.")
@manager.command()
def scheduler():
schedule_periodic_jobs()
rq_scheduler.run()
@manager.command()
@argument('queues', nargs=-1)
def worker(queues='default'):
with Connection(redis_connection):
w = Worker(queues)
w.work()
@manager.command()
def healthcheck():
hostname = socket.gethostname()
with Connection(redis_connection):
all_workers = Worker.all()
local_workers = [w for w in all_workers if w.hostname == hostname]
heartbeats = [w.last_heartbeat for w in local_workers]
time_since_seen = [datetime.datetime.utcnow() - hb for hb in heartbeats]
active = [t.seconds < 60 for t in time_since_seen]
sys.exit(int(not all(active)))

View File

@ -8,7 +8,7 @@ from redash.handlers.base import json_response, record_event
from redash.permissions import require_super_admin
from redash.serializers import QuerySerializer
from redash.utils import json_loads
from redash.monitor import celery_tasks
from redash.monitor import celery_tasks, rq_status
@routes.route('/api/admin/queries/outdated', methods=['GET'])
@ -52,3 +52,15 @@ def queries_tasks():
}
return json_response(response)
@routes.route('/api/admin/queries/rq_status', methods=['GET'])
@require_super_admin
@login_required
def queries_rq_status():
record_event(current_org, current_user._get_current_object(), {
'action': 'list',
'object_type': 'rq_status'
})
return json_response(rq_status())

View File

@ -1,9 +1,13 @@
from __future__ import absolute_import
import itertools
from sqlalchemy import union_all
from redash import redis_connection, __version__, settings
from redash.models import db, DataSource, Query, QueryResult, Dashboard, Widget
from redash.utils import json_loads
from redash.worker import celery
from rq import Queue, Worker
from rq.job import Job
from rq.registry import StartedJobRegistry
def get_redis_status():
@ -134,3 +138,44 @@ def celery_tasks():
tasks += get_waiting_in_queue(queue_name)
return tasks
def fetch_jobs(queue, job_ids):
return [{
'id': job.id,
'name': job.func_name,
'queue': queue.name,
'enqueued_at': job.enqueued_at,
'started_at': job.started_at
} for job in Job.fetch_many(job_ids, connection=redis_connection) if job is not None]
def rq_queues():
return {
q.name: {
'name': q.name,
'started': fetch_jobs(q, StartedJobRegistry(queue=q).get_job_ids()),
'queued': fetch_jobs(q, q.job_ids)
} for q in Queue.all(connection=redis_connection)}
def rq_workers():
return [{
'name': w.name,
'hostname': w.hostname,
'pid': w.pid,
'queues': ", ".join([q.name for q in w.queues]),
'state': w.state,
'last_heartbeat': w.last_heartbeat,
'birth_date': w.birth_date,
'successful_job_count': w.successful_job_count,
'failed_job_count': w.failed_job_count,
'total_working_time': w.total_working_time
} for w in Worker.all(connection=redis_connection)]
def rq_status():
return {
'queues': rq_queues(),
'workers': rq_workers()
}

56
redash/schedule.py Normal file
View File

@ -0,0 +1,56 @@
from __future__ import absolute_import
from datetime import datetime, timedelta
from functools import partial
from random import randint
from rq_scheduler import Scheduler
from redash import settings, redis_connection
from redash.tasks import (sync_user_details, refresh_queries,
empty_schedules, refresh_schemas,
cleanup_query_results,
version_check, send_aggregated_errors)
rq_scheduler = Scheduler(connection=redis_connection,
queue_name="periodic",
interval=5)
def schedule(**kwargs):
interval = kwargs['interval']
if isinstance(interval, timedelta):
interval = interval.seconds
kwargs['interval'] = interval
kwargs['result_ttl'] = kwargs.get('result_ttl', interval * 2)
rq_scheduler.schedule(scheduled_time=datetime.utcnow(), **kwargs)
def schedule_periodic_jobs():
for job in rq_scheduler.get_jobs():
job.delete()
jobs = [
{"func": refresh_queries, "interval": 30},
{"func": empty_schedules, "interval": timedelta(minutes=60)},
{"func": refresh_schemas, "interval": timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)},
{"func": sync_user_details, "timeout": 60, "ttl": 45, "interval": timedelta(minutes=1)},
{"func": send_aggregated_errors, "interval": timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL)}
]
if settings.QUERY_RESULTS_CLEANUP_ENABLED:
jobs.append({"func": cleanup_query_results, "interval": timedelta(minutes=5)})
if settings.VERSION_CHECK:
# We schedule the version check to run at a random time in order to spread the requests from all users evenly.
rq_scheduler.cron('{minute} {hour} * * *'.format(
minute=randint(0, 59),
hour=randint(0, 23)),
func=version_check)
# Add your own custom periodic jobs in your dynamic_settings module.
jobs.extend(settings.dynamic_settings.periodic_jobs() or [])
for job in jobs:
schedule(**job)

View File

@ -52,7 +52,6 @@ QUERY_RESULTS_CLEANUP_COUNT = int(os.environ.get("REDASH_QUERY_RESULTS_CLEANUP_C
QUERY_RESULTS_CLEANUP_MAX_AGE = int(os.environ.get("REDASH_QUERY_RESULTS_CLEANUP_MAX_AGE", "7"))
SCHEMAS_REFRESH_SCHEDULE = int(os.environ.get("REDASH_SCHEMAS_REFRESH_SCHEDULE", 30))
SCHEMAS_REFRESH_QUEUE = os.environ.get("REDASH_SCHEMAS_REFRESH_QUEUE", "celery")
AUTH_TYPE = os.environ.get("REDASH_AUTH_TYPE", "api_key")
INVITATION_TOKEN_MAX_AGE = int(os.environ.get("REDASH_INVITATION_TOKEN_MAX_AGE", 60 * 60 * 24 * 7))
@ -208,6 +207,10 @@ CELERYD_WORKER_TASK_LOG_FORMAT = os.environ.get(
(LOG_PREFIX + '[%(asctime)s][PID:%(process)d][%(levelname)s][%(processName)s] '
'task_name=%(task_name)s '
'task_id=%(task_id)s %(message)s')))
RQ_WORKER_JOB_LOG_FORMAT = os.environ.get("REDASH_RQ_WORKER_JOB_LOG_FORMAT",
(LOG_PREFIX + '[%(asctime)s][PID:%(process)d][%(levelname)s][%(name)s] '
'job.description=%(job_description)s '
'job.id=%(job_id)s %(message)s'))
# Mail settings:
MAIL_SERVER = os.environ.get('REDASH_MAIL_SERVER', 'localhost')

View File

@ -10,15 +10,21 @@ def query_time_limit(is_scheduled, user_id, org_id):
return scheduled_time_limit if is_scheduled else adhoc_time_limit
def periodic_jobs():
"""Schedule any custom periodic jobs here. For example:
from time import timedelta
from somewhere import some_job, some_other_job
return [
{"func": some_job, "interval": timedelta(hours=1)},
{"func": some_other_job, "interval": timedelta(days=1)}
]
"""
pass
# This provides the ability to override the way we store QueryResult's data column.
# Reference implementation: redash.models.DBPersistence
QueryResultPersistence = None
# Provide any custom tasks you'd like to run periodically
def custom_tasks():
return {
# 'key': {
# 'task': 'redash.task.example',
# 'schedule': timedelta(minutes=5)
# }
}

View File

@ -1,4 +1,5 @@
from .general import record_event, version_check, send_mail, sync_user_details
from .queries import QueryTask, refresh_queries, refresh_schemas, cleanup_query_results, execute_query, empty_schedules
from .queries import (QueryTask, enqueue_query, execute_query, refresh_queries,
refresh_schemas, cleanup_query_results, empty_schedules)
from .alerts import check_alerts_for_query
from .failure_report import notify_of_failure
from .failure_report import send_aggregated_errors

View File

@ -1,11 +1,10 @@
from celery.utils.log import get_task_logger
from flask import current_app
import datetime
from redash.worker import celery
from redash.worker import celery, job, get_job_logger
from redash import models, utils
logger = get_task_logger(__name__)
logger = get_job_logger(__name__)
def notify_subscriptions(alert, new_state):
@ -25,7 +24,7 @@ def should_notify(alert, new_state):
return new_state != alert.state or (alert.state == models.Alert.TRIGGERED_STATE and passed_rearm_threshold)
@celery.task(name="redash.tasks.check_alerts_for_query", time_limit=300, soft_time_limit=240)
@job('default', timeout=300)
def check_alerts_for_query(query_id):
logger.debug("Checking query %d for alerts", query_id)

View File

@ -1,8 +1,8 @@
import logging
import datetime
import re
from collections import Counter
from redash.tasks.general import send_mail
from redash.worker import celery
from redash import redis_connection, settings, models
from redash.utils import json_dumps, json_loads, base_url, render_template
@ -21,7 +21,6 @@ def comment_for(failure):
)
@celery.task(name="redash.tasks.send_aggregated_errors")
def send_aggregated_errors():
for k in redis_connection.scan_iter(key("*")):
user_id = re.search(r'\d+', k).group()
@ -69,3 +68,13 @@ def notify_of_failure(message, query):
'schedule_failures': query.schedule_failures,
'failed_at': datetime.datetime.utcnow().strftime("%B %d, %Y %I:%M%p UTC")
}))
def track_failure(query, error):
logging.debug(error)
query.schedule_failures += 1
models.db.session.add(query)
models.db.session.commit()
notify_of_failure(error, query)

View File

@ -1,16 +1,15 @@
import requests
from celery.utils.log import get_task_logger
from flask_mail import Message
from redash import mail, models, settings
from redash.models import users
from redash.version_check import run_version_check
from redash.worker import celery
from redash.worker import job, get_job_logger
logger = get_task_logger(__name__)
logger = get_job_logger(__name__)
@celery.task(name="redash.tasks.record_event")
@job('default')
def record_event(raw_event):
event = models.Event.record(raw_event)
models.db.session.commit()
@ -29,12 +28,11 @@ def record_event(raw_event):
logger.exception("Failed posting to %s", hook)
@celery.task(name="redash.tasks.version_check")
def version_check():
run_version_check()
@celery.task(name="redash.tasks.subscribe")
@job('default')
def subscribe(form):
logger.info("Subscribing to: [security notifications=%s], [newsletter=%s]", form['security_notifications'], form['newsletter'])
data = {
@ -47,7 +45,7 @@ def subscribe(form):
requests.post('https://beacon.redash.io/subscribe', json=data)
@celery.task(name="redash.tasks.send_mail")
@job('default')
def send_mail(to, subject, html, text):
try:
message = Message(recipients=to,
@ -60,14 +58,5 @@ def send_mail(to, subject, html, text):
logger.exception('Failed sending message: %s', message.subject)
@celery.task(
name="redash.tasks.sync_user_details",
ignore_result=True,
soft_time_limit=60,
time_limit=120,
# let the task expire after 45 seconds since there will be
# another task 15 seconds later anyway
expires=45,
)
def sync_user_details():
users.sync_last_active_at()

View File

@ -0,0 +1,2 @@
from .maintenance import refresh_queries, refresh_schemas, cleanup_query_results, empty_schedules
from .execution import QueryTask, execute_query, enqueue_query

View File

@ -7,12 +7,11 @@ from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from six import text_type
from redash import models, redis_connection, settings, statsd_client
from redash.models.parameterized_query import InvalidParameterError, QueryDetachedFromDataSourceError
from redash import models, redis_connection, settings
from redash.query_runner import InterruptException
from redash.tasks.alerts import check_alerts_for_query
from redash.tasks.failure_report import notify_of_failure
from redash.utils import gen_query_hash, json_dumps, utcnow, mustache_render
from redash.tasks.failure_report import track_failure
from redash.utils import gen_query_hash, json_dumps, utcnow
from redash.worker import celery
logger = get_task_logger(__name__)
@ -161,138 +160,6 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query
return job
@celery.task(name="redash.tasks.empty_schedules")
def empty_schedules():
logger.info("Deleting schedules of past scheduled queries...")
queries = models.Query.past_scheduled_queries()
for query in queries:
query.schedule = None
models.db.session.commit()
logger.info("Deleted %d schedules.", len(queries))
@celery.task(name="redash.tasks.refresh_queries")
def refresh_queries():
logger.info("Refreshing queries...")
outdated_queries_count = 0
query_ids = []
with statsd_client.timer('manager.outdated_queries_lookup'):
for query in models.Query.outdated_queries():
if settings.FEATURE_DISABLE_REFRESH_QUERIES:
logging.info("Disabled refresh queries.")
elif query.org.is_disabled:
logging.debug("Skipping refresh of %s because org is disabled.", query.id)
elif query.data_source is None:
logging.debug("Skipping refresh of %s because the datasource is none.", query.id)
elif query.data_source.paused:
logging.debug("Skipping refresh of %s because datasource - %s is paused (%s).",
query.id, query.data_source.name, query.data_source.pause_reason)
else:
query_text = query.query_text
parameters = {p['name']: p.get('value') for p in query.parameters}
if any(parameters):
try:
query_text = query.parameterized.apply(parameters).query
except InvalidParameterError as e:
error = u"Skipping refresh of {} because of invalid parameters: {}".format(query.id, e.message)
track_failure(query, error)
continue
except QueryDetachedFromDataSourceError as e:
error = ("Skipping refresh of {} because a related dropdown "
"query ({}) is unattached to any datasource.").format(query.id, e.query_id)
track_failure(query, error)
continue
enqueue_query(query_text, query.data_source, query.user_id,
scheduled_query=query,
metadata={'Query ID': query.id, 'Username': 'Scheduled'})
query_ids.append(query.id)
outdated_queries_count += 1
statsd_client.gauge('manager.outdated_queries', outdated_queries_count)
logger.info("Done refreshing queries. Found %d outdated queries: %s" % (outdated_queries_count, query_ids))
status = redis_connection.hgetall('redash:status')
now = time.time()
redis_connection.hmset('redash:status', {
'outdated_queries_count': outdated_queries_count,
'last_refresh_at': now,
'query_ids': json_dumps(query_ids)
})
statsd_client.gauge('manager.seconds_since_refresh', now - float(status.get('last_refresh_at', now)))
@celery.task(name="redash.tasks.cleanup_query_results")
def cleanup_query_results():
"""
Job to cleanup unused query results -- such that no query links to them anymore, and older than
settings.QUERY_RESULTS_MAX_AGE (a week by default, so it's less likely to be open in someone's browser and be used).
Each time the job deletes only settings.QUERY_RESULTS_CLEANUP_COUNT (100 by default) query results so it won't choke
the database in case of many such results.
"""
logging.info("Running query results clean up (removing maximum of %d unused results, that are %d days old or more)",
settings.QUERY_RESULTS_CLEANUP_COUNT, settings.QUERY_RESULTS_CLEANUP_MAX_AGE)
unused_query_results = models.QueryResult.unused(settings.QUERY_RESULTS_CLEANUP_MAX_AGE).limit(settings.QUERY_RESULTS_CLEANUP_COUNT)
deleted_count = models.QueryResult.query.filter(
models.QueryResult.id.in_(unused_query_results.subquery())
).delete(synchronize_session=False)
models.db.session.commit()
logger.info("Deleted %d unused query results.", deleted_count)
@celery.task(name="redash.tasks.refresh_schema", time_limit=90, soft_time_limit=60)
def refresh_schema(data_source_id):
ds = models.DataSource.get_by_id(data_source_id)
logger.info(u"task=refresh_schema state=start ds_id=%s", ds.id)
start_time = time.time()
try:
ds.get_schema(refresh=True)
logger.info(u"task=refresh_schema state=finished ds_id=%s runtime=%.2f", ds.id, time.time() - start_time)
statsd_client.incr('refresh_schema.success')
except SoftTimeLimitExceeded:
logger.info(u"task=refresh_schema state=timeout ds_id=%s runtime=%.2f", ds.id, time.time() - start_time)
statsd_client.incr('refresh_schema.timeout')
except Exception:
logger.warning(u"Failed refreshing schema for the data source: %s", ds.name, exc_info=1)
statsd_client.incr('refresh_schema.error')
logger.info(u"task=refresh_schema state=failed ds_id=%s runtime=%.2f", ds.id, time.time() - start_time)
@celery.task(name="redash.tasks.refresh_schemas")
def refresh_schemas():
"""
Refreshes the data sources schemas.
"""
blacklist = [int(ds_id) for ds_id in redis_connection.smembers('data_sources:schema:blacklist') if ds_id]
global_start_time = time.time()
logger.info(u"task=refresh_schemas state=start")
for ds in models.DataSource.query:
if ds.paused:
logger.info(u"task=refresh_schema state=skip ds_id=%s reason=paused(%s)", ds.id, ds.pause_reason)
elif ds.id in blacklist:
logger.info(u"task=refresh_schema state=skip ds_id=%s reason=blacklist", ds.id)
elif ds.org.is_disabled:
logger.info(u"task=refresh_schema state=skip ds_id=%s reason=org_disabled", ds.id)
else:
refresh_schema.apply_async(args=(ds.id,), queue=settings.SCHEMAS_REFRESH_QUEUE)
logger.info(u"task=refresh_schemas state=finish total_runtime=%.2f", time.time() - global_start_time)
def signal_handler(*args):
raise InterruptException
@ -317,16 +184,6 @@ def _resolve_user(user_id, is_api_key, query_id):
return None
def track_failure(query, error):
logging.debug(error)
query.schedule_failures += 1
models.db.session.add(query)
models.db.session.commit()
notify_of_failure(error, query)
# We could have created this as a celery.Task derived class, and act as the task itself. But this might result in weird
# issues as the task class created once per process, so decided to have a plain object instead.
class QueryExecutor(object):
@ -370,7 +227,8 @@ class QueryExecutor(object):
run_time = time.time() - started_at
logger.info(u"task=execute_query query_hash=%s data_length=%s error=[%s]", self.query_hash, data and len(data), error)
logger.info(u"task=execute_query query_hash=%s data_length=%s error=[%s]",
self.query_hash, data and len(data), error)
_unlock(self.query_hash, self.data_source.id)
@ -408,7 +266,7 @@ class QueryExecutor(object):
self.metadata['Query Hash'] = self.query_hash
self.metadata['Queue'] = self.task.request.delivery_info['routing_key']
self.metadata['Scheduled'] = self.scheduled_query is not None
return query_runner.annotate_query(self.query, self.metadata)
def _log_progress(self, state):

View File

@ -0,0 +1,142 @@
import logging
import time
from celery.exceptions import SoftTimeLimitExceeded
from redash import models, redis_connection, settings, statsd_client
from redash.models.parameterized_query import (InvalidParameterError,
QueryDetachedFromDataSourceError)
from redash.tasks.failure_report import track_failure
from redash.utils import json_dumps
from redash.worker import job, get_job_logger
from .execution import enqueue_query
logger = get_job_logger(__name__)
def empty_schedules():
logger.info("Deleting schedules of past scheduled queries...")
queries = models.Query.past_scheduled_queries()
for query in queries:
query.schedule = None
models.db.session.commit()
logger.info("Deleted %d schedules.", len(queries))
def refresh_queries():
logger.info("Refreshing queries...")
outdated_queries_count = 0
query_ids = []
with statsd_client.timer('manager.outdated_queries_lookup'):
for query in models.Query.outdated_queries():
if settings.FEATURE_DISABLE_REFRESH_QUERIES:
logging.info("Disabled refresh queries.")
elif query.org.is_disabled:
logging.debug("Skipping refresh of %s because org is disabled.", query.id)
elif query.data_source is None:
logging.debug("Skipping refresh of %s because the datasource is none.", query.id)
elif query.data_source.paused:
logging.debug("Skipping refresh of %s because datasource - %s is paused (%s).",
query.id, query.data_source.name, query.data_source.pause_reason)
else:
query_text = query.query_text
parameters = {p['name']: p.get('value') for p in query.parameters}
if any(parameters):
try:
query_text = query.parameterized.apply(parameters).query
except InvalidParameterError as e:
error = u"Skipping refresh of {} because of invalid parameters: {}".format(query.id, e.message)
track_failure(query, error)
continue
except QueryDetachedFromDataSourceError as e:
error = ("Skipping refresh of {} because a related dropdown "
"query ({}) is unattached to any datasource.").format(query.id, e.query_id)
track_failure(query, error)
continue
enqueue_query(query_text, query.data_source, query.user_id,
scheduled_query=query,
metadata={'Query ID': query.id, 'Username': 'Scheduled'})
query_ids.append(query.id)
outdated_queries_count += 1
statsd_client.gauge('manager.outdated_queries', outdated_queries_count)
logger.info("Done refreshing queries. Found %d outdated queries: %s" % (outdated_queries_count, query_ids))
status = redis_connection.hgetall('redash:status')
now = time.time()
redis_connection.hmset('redash:status', {
'outdated_queries_count': outdated_queries_count,
'last_refresh_at': now,
'query_ids': json_dumps(query_ids)
})
statsd_client.gauge('manager.seconds_since_refresh', now - float(status.get('last_refresh_at', now)))
def cleanup_query_results():
"""
Job to cleanup unused query results -- such that no query links to them anymore, and older than
settings.QUERY_RESULTS_MAX_AGE (a week by default, so it's less likely to be open in someone's browser and be used).
Each time the job deletes only settings.QUERY_RESULTS_CLEANUP_COUNT (100 by default) query results so it won't choke
the database in case of many such results.
"""
logging.info("Running query results clean up (removing maximum of %d unused results, that are %d days old or more)",
settings.QUERY_RESULTS_CLEANUP_COUNT, settings.QUERY_RESULTS_CLEANUP_MAX_AGE)
unused_query_results = models.QueryResult.unused(settings.QUERY_RESULTS_CLEANUP_MAX_AGE)
deleted_count = models.QueryResult.query.filter(
models.QueryResult.id.in_(unused_query_results.limit(settings.QUERY_RESULTS_CLEANUP_COUNT).subquery())
).delete(synchronize_session=False)
models.db.session.commit()
logger.info("Deleted %d unused query results.", deleted_count)
@job('schemas')
def refresh_schema(data_source_id):
ds = models.DataSource.get_by_id(data_source_id)
logger.info(u"task=refresh_schema state=start ds_id=%s", ds.id)
start_time = time.time()
try:
ds.get_schema(refresh=True)
logger.info(u"task=refresh_schema state=finished ds_id=%s runtime=%.2f", ds.id, time.time() - start_time)
statsd_client.incr('refresh_schema.success')
except SoftTimeLimitExceeded:
logger.info(u"task=refresh_schema state=timeout ds_id=%s runtime=%.2f", ds.id, time.time() - start_time)
statsd_client.incr('refresh_schema.timeout')
except Exception:
logger.warning(u"Failed refreshing schema for the data source: %s", ds.name, exc_info=1)
statsd_client.incr('refresh_schema.error')
logger.info(u"task=refresh_schema state=failed ds_id=%s runtime=%.2f", ds.id, time.time() - start_time)
def refresh_schemas():
"""
Refreshes the data sources schemas.
"""
blacklist = [int(ds_id) for ds_id in redis_connection.smembers('data_sources:schema:blacklist') if ds_id]
global_start_time = time.time()
logger.info(u"task=refresh_schemas state=start")
for ds in models.DataSource.query:
if ds.paused:
logger.info(u"task=refresh_schema state=skip ds_id=%s reason=paused(%s)", ds.id, ds.pause_reason)
elif ds.id in blacklist:
logger.info(u"task=refresh_schema state=skip ds_id=%s reason=blacklist", ds.id)
elif ds.org.is_disabled:
logger.info(u"task=refresh_schema state=skip ds_id=%s reason=org_disabled", ds.id)
else:
refresh_schema.delay(ds.id)
logger.info(u"task=refresh_schemas state=finish total_runtime=%.2f", time.time() - global_start_time)

View File

@ -3,6 +3,7 @@ from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.celery import CeleryIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.redis import RedisIntegration
from sentry_sdk.integrations.rq import RqIntegration
from redash import settings, __version__
@ -25,5 +26,6 @@ def init():
release=__version__,
before_send=before_send,
send_default_pii=True,
integrations=[FlaskIntegration(), CeleryIntegration(), SqlalchemyIntegration(), RedisIntegration()]
integrations=[FlaskIntegration(), CeleryIntegration(), SqlalchemyIntegration(),
RedisIntegration(), RqIntegration()]
)

View File

@ -1,19 +1,47 @@
from __future__ import absolute_import
from datetime import timedelta
from random import randint
from functools import partial
from flask import current_app
import logging
from celery import Celery
from celery.schedules import crontab
from celery.signals import worker_process_init
from celery.utils.log import get_logger
from redash import create_app, extensions, settings
from rq import get_current_job
from rq.decorators import job as rq_job
from redash import create_app, extensions, settings, redis_connection
from redash.metrics import celery as celery_metrics # noqa
logger = get_logger(__name__)
job = partial(rq_job, connection=redis_connection)
class CurrentJobFilter(logging.Filter):
def filter(self, record):
current_job = get_current_job()
record.job_id = current_job.id if current_job else ''
record.job_description = current_job.description if current_job else ''
return True
def get_job_logger(name):
logger = logging.getLogger('rq.job.' + name)
handler = logging.StreamHandler()
handler.formatter = logging.Formatter(settings.RQ_WORKER_JOB_LOG_FORMAT)
handler.addFilter(CurrentJobFilter())
logger.addHandler(handler)
logger.propagate = False
return logger
celery = Celery('redash',
broker=settings.CELERY_BROKER,
@ -21,48 +49,8 @@ celery = Celery('redash',
redis_backend_use_ssl=settings.CELERY_SSL_CONFIG,
include='redash.tasks')
# The internal periodic Celery tasks to automatically schedule.
celery_schedule = {
'refresh_queries': {
'task': 'redash.tasks.refresh_queries',
'schedule': timedelta(seconds=30)
},
'empty_schedules': {
'task': 'redash.tasks.empty_schedules',
'schedule': timedelta(minutes=60)
},
'refresh_schemas': {
'task': 'redash.tasks.refresh_schemas',
'schedule': timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)
},
'sync_user_details': {
'task': 'redash.tasks.sync_user_details',
'schedule': timedelta(minutes=1),
},
'send_aggregated_errors': {
'task': 'redash.tasks.send_aggregated_errors',
'schedule': timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL),
}
}
if settings.VERSION_CHECK:
celery_schedule['version_check'] = {
'task': 'redash.tasks.version_check',
# We need to schedule the version check to run at a random hour/minute, to spread the requests from all users
# evenly.
'schedule': crontab(minute=randint(0, 59), hour=randint(0, 23))
}
if settings.QUERY_RESULTS_CLEANUP_ENABLED:
celery_schedule['cleanup_query_results'] = {
'task': 'redash.tasks.cleanup_query_results',
'schedule': timedelta(minutes=5)
}
celery_schedule.update(settings.dynamic_settings.custom_tasks())
celery.conf.update(result_backend=settings.CELERY_RESULT_BACKEND,
beat_schedule=celery_schedule,
timezone='UTC',
result_expires=settings.CELERY_RESULT_EXPIRES,
worker_log_format=settings.CELERYD_WORKER_LOG_FORMAT,

View File

@ -36,6 +36,8 @@ SQLAlchemy-Utils==0.33.11
sqlparse==0.2.4
statsd==2.1.2
gunicorn==19.7.1
rq==1.1.0
rq-scheduler==0.9
celery==4.3.0
kombu==4.6.3
jsonschema==2.4.0

View File

@ -22,7 +22,7 @@ class TestSendAggregatedErrorsTask(BaseTestCase):
notify_of_failure(message, query)
return key(query.user.id)
@mock.patch('redash.tasks.failure_report.render_template')
@mock.patch('redash.tasks.failure_report.render_template', return_value='')
def send_email(self, user, render_template):
send_failure_report(user.id)
@ -93,7 +93,7 @@ class TestSendAggregatedErrorsTask(BaseTestCase):
self.notify(query=query)
self.notify(query=query)
failures = self.send_email(query.user)
latest_failure = dateutil.parser.parse(failures[0]['failed_at'])
self.assertNotEqual(2000, latest_failure.year)

View File

@ -8,7 +8,7 @@ from tests import BaseTestCase
from redash import redis_connection, models
from redash.utils import json_dumps
from redash.query_runner.pg import PostgreSQL
from redash.tasks.queries import QueryExecutionError, enqueue_query, execute_query
from redash.tasks.queries.execution import QueryExecutionError, enqueue_query, execute_query
FakeResult = namedtuple('FakeResult', 'id')

View File

@ -1,8 +1,9 @@
from mock import patch, call, ANY
from tests import BaseTestCase
from redash.tasks import refresh_queries
from redash.tasks.queries.maintenance import refresh_queries
from redash.models import Query
ENQUEUE_QUERY = 'redash.tasks.queries.maintenance.enqueue_query'
class TestRefreshQuery(BaseTestCase):
def test_enqueues_outdated_queries(self):
@ -15,7 +16,7 @@ class TestRefreshQuery(BaseTestCase):
query_text="select 42;",
data_source=self.factory.create_data_source())
oq = staticmethod(lambda: [query1, query2])
with patch('redash.tasks.queries.enqueue_query') as add_job_mock, \
with patch(ENQUEUE_QUERY) as add_job_mock, \
patch.object(Query, 'outdated_queries', oq):
refresh_queries()
self.assertEqual(add_job_mock.call_count, 2)
@ -34,13 +35,13 @@ class TestRefreshQuery(BaseTestCase):
oq = staticmethod(lambda: [query])
query.data_source.pause()
with patch.object(Query, 'outdated_queries', oq):
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
with patch(ENQUEUE_QUERY) as add_job_mock:
refresh_queries()
add_job_mock.assert_not_called()
query.data_source.resume()
with patch('redash.tasks.queries.enqueue_query') as add_job_mock:
with patch(ENQUEUE_QUERY) as add_job_mock:
refresh_queries()
add_job_mock.assert_called_with(
query.query_text, query.data_source, query.user_id,
@ -59,7 +60,7 @@ class TestRefreshQuery(BaseTestCase):
"value": "42",
"title": "n"}]})
oq = staticmethod(lambda: [query])
with patch('redash.tasks.queries.enqueue_query') as add_job_mock, \
with patch(ENQUEUE_QUERY) as add_job_mock, \
patch.object(Query, 'outdated_queries', oq):
refresh_queries()
add_job_mock.assert_called_with(
@ -79,7 +80,7 @@ class TestRefreshQuery(BaseTestCase):
"value": 42, # <-- should be text!
"title": "n"}]})
oq = staticmethod(lambda: [query])
with patch('redash.tasks.queries.enqueue_query') as add_job_mock, \
with patch(ENQUEUE_QUERY) as add_job_mock, \
patch.object(Query, 'outdated_queries', oq):
refresh_queries()
add_job_mock.assert_not_called()
@ -100,7 +101,7 @@ class TestRefreshQuery(BaseTestCase):
dropdown_query = self.factory.create_query(id=100, data_source=None)
oq = staticmethod(lambda: [query])
with patch('redash.tasks.queries.enqueue_query') as add_job_mock, \
with patch(ENQUEUE_QUERY) as add_job_mock, \
patch.object(Query, 'outdated_queries', oq):
refresh_queries()
add_job_mock.assert_not_called()

View File

@ -7,19 +7,19 @@ from redash.tasks import refresh_schemas
class TestRefreshSchemas(BaseTestCase):
def test_calls_refresh_of_all_data_sources(self):
self.factory.data_source # trigger creation
with patch('redash.tasks.queries.refresh_schema.apply_async') as refresh_job:
with patch('redash.tasks.queries.maintenance.refresh_schema.delay') as refresh_job:
refresh_schemas()
refresh_job.assert_called()
def test_skips_paused_data_sources(self):
self.factory.data_source.pause()
with patch('redash.tasks.queries.refresh_schema.apply_async') as refresh_job:
with patch('redash.tasks.queries.maintenance.refresh_schema.delay') as refresh_job:
refresh_schemas()
refresh_job.assert_not_called()
self.factory.data_source.resume()
with patch('redash.tasks.queries.refresh_schema.apply_async') as refresh_job:
with patch('redash.tasks.queries.maintenance.refresh_schema.delay') as refresh_job:
refresh_schemas()
refresh_job.assert_called()