From 5a5fdecdde45ac454aecf4faff779a8405bea471 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Tue, 15 Oct 2019 23:59:22 +0300 Subject: [PATCH] Replace Celery with RQ (except for execute_query tasks) (#4093) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add rq and an rq_worker service * add rq_scheduler and an rq_scheduler service * move beat schedule to periodic_jobs queue * move version checks to RQ * move query result cleanup to RQ * use timedelta and DRY up a bit * move custom tasks to RQ * do actual schema refreshes in rq * rename 'period_jobs' to 'periodic', as it obviously holds jobs * move send_email to rq * DRY up enqueues * ditch and use a partially applied decorator * move subscribe to rq * move check_alerts_for_query to rq * move record_event to rq * make tests play nicely with rq * 👋 beat * rename rq_scheduler to plain scheduler, now that there's no Celery scheduler entrypoint * add some color to rq-worker's output * add logging context to rq jobs (while keeping execute_query context via get_task_logger for now) * move schedule to its own module * cancel previously scheduled periodic jobs. not sure this is a good idea. * rename redash.scheduler to redash.schedule * allow custom dynamic jobs to be added decleratively * add basic monitoring to rq queues * add worker monitoring * pleasing the CodeClimate overlords * adjust cypress docker-compose.yml to include rq changes * DRY up Cypress docker-compose * add rq dependencies to cypress docker-compose service * an odd attempt at watching docker-compose logs when running with Cypress * Revert "an odd attempt at watching docker-compose logs when running with Cypress" This reverts commit 016bd1a93e3efa84a9f27d0f2acb972ce1957bcd. * show docker-compose logs at Cypress shutdown * Revert "DRY up Cypress docker-compose" This reverts commit 43abac7084c207ab9e39192ac79d520448c2c527. * minimal version for binding is 3.2 * remove unneccesary code reloads on cypress * add a command which errors if any of the workers running inside the current machine haven't been active in the last minute * SCHEMAS_REFRESH_QUEUE is no longer a required setting * split tasks/queries.py to execution.py and maintenance.py * fix tests after query execution split * pleasing the CodeClimate overlords * rename worker to celery_worker and rq_worker to worker * use /rq_status instead of /jobs * show started jobs' time ago according to UTC * replace all spaces in column names * fix query tests after execution split * exit with an int * general lint * add an entrypoint for rq_healthcheck * fix indentation * delete all existing periodic jobs before scheduling them * remove some unrequired requires * move schedule example to redash.schedule * add RQ integration to Sentry's setup * pleasing the CodeClimate overlords * remove replication settings from docker-compose - a proper way to scale using docker-compose would be the --scale CLI option, which will be described in the knowledge based * revert to calling a function in dynamic settings to allow periodic jobs to be scheduled after app has been loaded * don't need to depend on context when templating failure reports * set the timeout_ttl to double the interval to avoid job results from expiring and having periodic jobs not reschedule * whoops, bad merge * describe custom jobs and don't actually schedule them * fix merge --- .circleci/docker-compose.cypress.yml | 24 ++- bin/docker-entrypoint | 69 ++++++-- client/app/components/admin/CeleryStatus.jsx | 17 -- client/app/components/admin/Layout.jsx | 3 + client/app/components/admin/RQStatus.jsx | 78 +++++++++ client/app/pages/admin/Jobs.jsx | 124 ++++++++++++++ client/app/pages/admin/Tasks.jsx | 13 +- docker-compose.yml | 31 +++- redash/__init__.py | 1 + redash/cli/__init__.py | 3 +- redash/cli/rq.py | 41 +++++ redash/handlers/admin.py | 14 +- redash/monitor.py | 45 +++++ redash/schedule.py | 56 +++++++ redash/settings/__init__.py | 5 +- redash/settings/dynamic_settings.py | 22 ++- redash/tasks/__init__.py | 5 +- redash/tasks/alerts.py | 7 +- redash/tasks/failure_report.py | 13 +- redash/tasks/general.py | 21 +-- redash/tasks/queries/__init__.py | 2 + .../{queries.py => queries/execution.py} | 154 +----------------- redash/tasks/queries/maintenance.py | 142 ++++++++++++++++ redash/utils/sentry.py | 4 +- redash/worker.py | 74 ++++----- requirements.txt | 2 + tests/tasks/test_failure_report.py | 4 +- tests/tasks/test_queries.py | 2 +- tests/tasks/test_refresh_queries.py | 15 +- tests/tasks/test_refresh_schemas.py | 6 +- 30 files changed, 712 insertions(+), 285 deletions(-) create mode 100644 client/app/components/admin/RQStatus.jsx create mode 100644 client/app/pages/admin/Jobs.jsx create mode 100644 redash/cli/rq.py create mode 100644 redash/schedule.py create mode 100644 redash/tasks/queries/__init__.py rename redash/tasks/{queries.py => queries/execution.py} (61%) create mode 100644 redash/tasks/queries/maintenance.py diff --git a/.circleci/docker-compose.cypress.yml b/.circleci/docker-compose.cypress.yml index 15d8cef4..ffa4ca69 100644 --- a/.circleci/docker-compose.cypress.yml +++ b/.circleci/docker-compose.cypress.yml @@ -14,9 +14,16 @@ services: REDASH_REDIS_URL: "redis://redis:6379/0" REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres" REDASH_RATELIMIT_ENABLED: "false" - worker: + scheduler: build: ../ command: scheduler + depends_on: + - server + environment: + REDASH_REDIS_URL: "redis://redis:6379/0" + worker: + build: ../ + command: worker depends_on: - server environment: @@ -24,7 +31,18 @@ services: REDASH_LOG_LEVEL: "INFO" REDASH_REDIS_URL: "redis://redis:6379/0" REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres" - QUEUES: "queries,scheduled_queries,celery,schemas" + QUEUES: "default periodic schemas" + celery_worker: + build: ../ + command: celery_worker + depends_on: + - server + environment: + PYTHONUNBUFFERED: 0 + REDASH_LOG_LEVEL: "INFO" + REDASH_REDIS_URL: "redis://redis:6379/0" + REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres" + QUEUES: "queries,scheduled_queries" WORKERS_COUNT: 2 cypress: build: @@ -32,7 +50,9 @@ services: dockerfile: .circleci/Dockerfile.cypress depends_on: - server + - celery_worker - worker + - scheduler environment: CYPRESS_baseUrl: "http://server:5000" PERCY_TOKEN: ${PERCY_TOKEN} diff --git a/bin/docker-entrypoint b/bin/docker-entrypoint index e6a122c4..d88266fa 100755 --- a/bin/docker-entrypoint +++ b/bin/docker-entrypoint @@ -1,9 +1,9 @@ #!/bin/bash set -e -worker() { +celery_worker() { WORKERS_COUNT=${WORKERS_COUNT:-2} - QUEUES=${QUEUES:-queries,scheduled_queries,celery,schemas} + QUEUES=${QUEUES:-queries,scheduled_queries} WORKER_EXTRA_OPTIONS=${WORKER_EXTRA_OPTIONS:-} echo "Starting $WORKERS_COUNT workers for queues: $QUEUES..." @@ -11,23 +11,36 @@ worker() { } scheduler() { - WORKERS_COUNT=${WORKERS_COUNT:-1} - QUEUES=${QUEUES:-celery} - SCHEDULE_DB=${SCHEDULE_DB:-celerybeat-schedule} + echo "Starting RQ scheduler..." - echo "Starting scheduler and $WORKERS_COUNT workers for queues: $QUEUES..." + exec /app/manage.py rq scheduler +} - exec /usr/local/bin/celery worker --app=redash.worker --beat -s$SCHEDULE_DB -c$WORKERS_COUNT -Q$QUEUES -linfo --max-tasks-per-child=10 -Ofair +dev_scheduler() { + echo "Starting dev RQ scheduler..." + + exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq scheduler +} + +worker() { + echo "Starting RQ worker..." + + exec /app/manage.py rq worker $QUEUES } dev_worker() { + echo "Starting dev RQ worker..." + + exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq worker $QUEUES +} + +dev_celery_worker() { WORKERS_COUNT=${WORKERS_COUNT:-2} - QUEUES=${QUEUES:-queries,scheduled_queries,celery,schemas} - SCHEDULE_DB=${SCHEDULE_DB:-celerybeat-schedule} + QUEUES=${QUEUES:-queries,scheduled_queries} - echo "Starting dev scheduler and $WORKERS_COUNT workers for queues: $QUEUES..." + echo "Starting $WORKERS_COUNT workers for queues: $QUEUES..." - exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- /usr/local/bin/celery worker --app=redash.worker --beat -s$SCHEDULE_DB -c$WORKERS_COUNT -Q$QUEUES -linfo --max-tasks-per-child=10 -Ofair + exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- /usr/local/bin/celery worker --app=redash.worker -c$WORKERS_COUNT -Q$QUEUES -linfo --max-tasks-per-child=10 -Ofair } server() { @@ -45,6 +58,10 @@ celery_healthcheck() { exec /usr/local/bin/celery inspect ping --app=redash.worker -d celery@$HOSTNAME } +rq_healthcheck() { + exec /app/manage.py rq healthcheck +} + help() { echo "Redash Docker." echo "" @@ -52,10 +69,14 @@ help() { echo "" echo "server -- start Redash server (with gunicorn)" - echo "worker -- start Celery worker" - echo "scheduler -- start Celery worker with a beat (scheduler) process" - echo "dev_worker -- start Celery worker with a beat (scheduler) process which picks up code changes and reloads" + echo "celery_worker -- start Celery worker" + echo "dev_celery_worker -- start Celery worker process which picks up code changes and reloads" + echo "worker -- start a single RQ worker" + echo "dev_worker -- start a single RQ worker with code reloading" + echo "scheduler -- start an rq-scheduler instance" + echo "dev_scheduler -- start an rq-scheduler instance with code reloading" echo "celery_healthcheck -- runs a Celery healthcheck. Useful for Docker's HEALTHCHECK mechanism." + echo "rq_healthcheck -- runs a RQ healthcheck that verifies that all local workers are active. Useful for Docker's HEALTHCHECK mechanism." echo "" echo "shell -- open shell" echo "dev_server -- start Flask development server with debugger and auto reload" @@ -89,10 +110,30 @@ case "$1" in shift scheduler ;; + dev_scheduler) + shift + dev_scheduler + ;; + celery_worker) + shift + celery_worker + ;; + dev_celery_worker) + shift + dev_celery_worker + ;; dev_worker) shift dev_worker ;; + rq_healthcheck) + shift + rq_healthcheck + ;; + celery_healthcheck) + shift + celery_healthcheck + ;; dev_server) export FLASK_DEBUG=1 exec /app/manage.py runserver --debugger --reload -h 0.0.0.0 diff --git a/client/app/components/admin/CeleryStatus.jsx b/client/app/components/admin/CeleryStatus.jsx index e21171ad..959b10e6 100644 --- a/client/app/components/admin/CeleryStatus.jsx +++ b/client/app/components/admin/CeleryStatus.jsx @@ -58,10 +58,6 @@ const queryColumns = commonColumns.concat([ { title: 'Scheduled', dataIndex: 'scheduled' }, ]); -const otherTasksColumns = commonColumns.concat([ - { title: 'Task Name', dataIndex: 'task_name' }, -]); - const queuesColumns = map( ['Name', 'Active', 'Reserved', 'Waiting'], c => ({ title: c, dataIndex: c.toLowerCase() }), @@ -97,16 +93,3 @@ export function QueriesTable({ loading, items }) { } QueriesTable.propTypes = TablePropTypes; - -export function OtherTasksTable({ loading, items }) { - return ( - - ); -} - -OtherTasksTable.propTypes = TablePropTypes; diff --git a/client/app/components/admin/Layout.jsx b/client/app/components/admin/Layout.jsx index 505582da..36f2f7e7 100644 --- a/client/app/components/admin/Layout.jsx +++ b/client/app/components/admin/Layout.jsx @@ -18,6 +18,9 @@ export default function Layout({ activeTab, children }) { Celery Status}> {(activeTab === 'tasks') ? children : null} + RQ Status}> + {(activeTab === 'jobs') ? children : null} + Outdated Queries}> {(activeTab === 'outdated_queries') ? children : null} diff --git a/client/app/components/admin/RQStatus.jsx b/client/app/components/admin/RQStatus.jsx new file mode 100644 index 00000000..fff2bc07 --- /dev/null +++ b/client/app/components/admin/RQStatus.jsx @@ -0,0 +1,78 @@ +import { map } from 'lodash'; +import React from 'react'; +import PropTypes from 'prop-types'; + +import Badge from 'antd/lib/badge'; +import Table from 'antd/lib/table'; +import { Columns } from '@/components/items-list/components/ItemsTable'; + +// Tables + +const otherJobsColumns = [ + { title: 'Queue', dataIndex: 'queue' }, + { title: 'Job Name', dataIndex: 'name' }, + Columns.timeAgo({ title: 'Start Time', dataIndex: 'started_at' }), + Columns.timeAgo({ title: 'Enqueue Time', dataIndex: 'enqueued_at' }), +]; + +const workersColumns = [Columns.custom( + value => ( + {value} + + ), { title: 'State', dataIndex: 'state' }, +)].concat(map(['Hostname', 'PID', 'Name', 'Queues', 'Successful Job Count', + 'Failed Job Count', 'Birth Date', 'Total Working Time'], +c => ({ title: c, dataIndex: c.toLowerCase().replace(/\s/g, '_') }))); + +const queuesColumns = map( + ['Name', 'Started', 'Queued'], + c => ({ title: c, dataIndex: c.toLowerCase() }), +); + +const TablePropTypes = { + loading: PropTypes.bool.isRequired, + items: PropTypes.arrayOf(PropTypes.object).isRequired, +}; + +export function WorkersTable({ loading, items }) { + return ( +
+ ); +} + +WorkersTable.propTypes = TablePropTypes; + +export function QueuesTable({ loading, items }) { + return ( +
+ ); +} + +QueuesTable.propTypes = TablePropTypes; + +export function OtherJobsTable({ loading, items }) { + return ( +
+ ); +} + +OtherJobsTable.propTypes = TablePropTypes; diff --git a/client/app/pages/admin/Jobs.jsx b/client/app/pages/admin/Jobs.jsx new file mode 100644 index 00000000..a7e005eb --- /dev/null +++ b/client/app/pages/admin/Jobs.jsx @@ -0,0 +1,124 @@ +import { flatMap, values } from 'lodash'; +import React from 'react'; +import { react2angular } from 'react2angular'; + +import Alert from 'antd/lib/alert'; +import Tabs from 'antd/lib/tabs'; +import * as Grid from 'antd/lib/grid'; +import Layout from '@/components/admin/Layout'; +import { CounterCard } from '@/components/admin/CeleryStatus'; +import { WorkersTable, QueuesTable, OtherJobsTable } from '@/components/admin/RQStatus'; + +import { $http } from '@/services/ng'; +import recordEvent from '@/services/recordEvent'; +import { routesToAngularRoutes } from '@/lib/utils'; +import moment from 'moment'; + +class Jobs extends React.Component { + state = { + isLoading: true, + error: null, + + queueCounters: [], + overallCounters: { started: 0, queued: 0 }, + startedJobs: [], + workers: [], + }; + + componentDidMount() { + recordEvent('view', 'page', 'admin/rq_status'); + $http + .get('/api/admin/queries/rq_status') + .then(({ data }) => this.processQueues(data)) + .catch(error => this.handleError(error)); + } + + componentWillUnmount() { + // Ignore data after component unmounted + this.processQueues = () => {}; + this.handleError = () => {}; + } + + processQueues = ({ queues, workers }) => { + const queueCounters = values(queues).map(({ name, started, queued }) => ({ + name, + started: started.length, + queued: queued.length, + })); + + const overallCounters = queueCounters.reduce( + (c, q) => ({ + started: c.started + q.started, + queued: c.queued + q.queued, + }), + { started: 0, queued: 0 }, + ); + + const startedJobs = flatMap(values(queues), queue => queue.started.map(job => ({ + ...job, + enqueued_at: moment.utc(job.enqueued_at), + started_at: moment.utc(job.started_at), + }))); + + this.setState({ isLoading: false, queueCounters, startedJobs, overallCounters, workers }); + }; + + handleError = (error) => { + this.setState({ isLoading: false, error }); + }; + + render() { + const { isLoading, error, queueCounters, startedJobs, overallCounters, workers } = this.state; + + return ( + +
+ {error && ( + + )} + + {!error && ( + + + + + + + + + + + + + + + + + + + + + + + )} +
+
+ ); + } +} + +export default function init(ngModule) { + ngModule.component('pageJobs', react2angular(Jobs)); + + return routesToAngularRoutes([ + { + path: '/admin/queries/jobs', + title: 'RQ Status', + key: 'jobs', + }, + ], { + template: '', + }); +} + +init.init = true; diff --git a/client/app/pages/admin/Tasks.jsx b/client/app/pages/admin/Tasks.jsx index f325638b..566d5c94 100644 --- a/client/app/pages/admin/Tasks.jsx +++ b/client/app/pages/admin/Tasks.jsx @@ -7,7 +7,7 @@ import Alert from 'antd/lib/alert'; import Tabs from 'antd/lib/tabs'; import * as Grid from 'antd/lib/grid'; import Layout from '@/components/admin/Layout'; -import { CounterCard, QueuesTable, QueriesTable, OtherTasksTable } from '@/components/admin/CeleryStatus'; +import { CounterCard, QueuesTable, QueriesTable } from '@/components/admin/CeleryStatus'; import { $http } from '@/services/ng'; import recordEvent from '@/services/recordEvent'; @@ -29,7 +29,6 @@ class Tasks extends React.Component { queues: [], queries: [], - otherTasks: [], counters: { active: 0, reserved: 0, waiting: 0 }, }; @@ -50,7 +49,6 @@ class Tasks extends React.Component { processTasks = (tasks) => { const queues = {}; const queries = []; - const otherTasks = []; const counters = { active: 0, reserved: 0, waiting: 0 }; @@ -70,12 +68,10 @@ class Tasks extends React.Component { if (task.task_name === 'redash.tasks.execute_query') { queries.push(task); - } else { - otherTasks.push(task); } }); - this.setState({ isLoading: false, queues: values(queues), queries, otherTasks, counters }); + this.setState({ isLoading: false, queues: values(queues), queries, counters }); }; handleError = (error) => { @@ -83,7 +79,7 @@ class Tasks extends React.Component { }; render() { - const { isLoading, error, queues, queries, otherTasks, counters } = this.state; + const { isLoading, error, queues, queries, counters } = this.state; return ( @@ -113,9 +109,6 @@ class Tasks extends React.Component { - - - )} diff --git a/docker-compose.yml b/docker-compose.yml index 69d6e821..9ba8c4c4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,11 +19,38 @@ services: REDASH_REDIS_URL: "redis://redis:6379/0" REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres" REDASH_RATELIMIT_ENABLED: "false" + scheduler: + build: . + command: dev_scheduler + volumes: + - type: bind + source: . + target: /app + depends_on: + - server + environment: + REDASH_REDIS_URL: "redis://redis:6379/0" REDASH_MAIL_DEFAULT_SENDER: redash@example.com REDASH_MAIL_SERVER: email worker: build: . command: dev_worker + volumes: + - type: bind + source: . + target: /app + depends_on: + - server + tty: true + environment: + PYTHONUNBUFFERED: 0 + REDASH_LOG_LEVEL: "INFO" + REDASH_REDIS_URL: "redis://redis:6379/0" + REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres" + QUEUES: "default periodic schemas" + celery-worker: + build: . + command: dev_celery_worker volumes: - type: bind source: . @@ -36,7 +63,7 @@ services: REDASH_LOG_LEVEL: "INFO" REDASH_REDIS_URL: "redis://redis:6379/0" REDASH_DATABASE_URL: "postgresql://postgres@postgres/postgres" - QUEUES: "queries,scheduled_queries,celery,schemas" + QUEUES: "queries,scheduled_queries" WORKERS_COUNT: 2 REDASH_MAIL_DEFAULT_SENDER: redash@example.com REDASH_MAIL_SERVER: email @@ -56,4 +83,4 @@ services: image: djfarrelly/maildev ports: - "1080:80" - restart: unless-stopped \ No newline at end of file + restart: unless-stopped diff --git a/redash/__init__.py b/redash/__init__.py index e6d3ec86..68be0629 100644 --- a/redash/__init__.py +++ b/redash/__init__.py @@ -1,3 +1,4 @@ +from __future__ import absolute_import import logging import os import sys diff --git a/redash/cli/__init__.py b/redash/cli/__init__.py index 61b27637..229f80ea 100644 --- a/redash/cli/__init__.py +++ b/redash/cli/__init__.py @@ -6,7 +6,7 @@ from flask import current_app from flask.cli import FlaskGroup, run_command from redash import __version__, create_app, settings -from redash.cli import data_sources, database, groups, organization, queries, users +from redash.cli import data_sources, database, groups, organization, queries, users, rq from redash.monitor import get_status @@ -35,6 +35,7 @@ manager.add_command(groups.manager, "groups") manager.add_command(data_sources.manager, "ds") manager.add_command(organization.manager, "org") manager.add_command(queries.manager, "queries") +manager.add_command(rq.manager, "rq") manager.add_command(run_command, "runserver") diff --git a/redash/cli/rq.py b/redash/cli/rq.py new file mode 100644 index 00000000..76e5544f --- /dev/null +++ b/redash/cli/rq.py @@ -0,0 +1,41 @@ +from __future__ import absolute_import +import socket +import sys +import datetime + +from click import argument +from flask.cli import AppGroup +from rq import Connection, Worker + +from redash import redis_connection +from redash.schedule import rq_scheduler, schedule_periodic_jobs + +manager = AppGroup(help="RQ management commands.") + + +@manager.command() +def scheduler(): + schedule_periodic_jobs() + rq_scheduler.run() + + +@manager.command() +@argument('queues', nargs=-1) +def worker(queues='default'): + with Connection(redis_connection): + w = Worker(queues) + w.work() + + +@manager.command() +def healthcheck(): + hostname = socket.gethostname() + with Connection(redis_connection): + all_workers = Worker.all() + + local_workers = [w for w in all_workers if w.hostname == hostname] + heartbeats = [w.last_heartbeat for w in local_workers] + time_since_seen = [datetime.datetime.utcnow() - hb for hb in heartbeats] + active = [t.seconds < 60 for t in time_since_seen] + + sys.exit(int(not all(active))) diff --git a/redash/handlers/admin.py b/redash/handlers/admin.py index 56adbbd9..ba8dd30e 100644 --- a/redash/handlers/admin.py +++ b/redash/handlers/admin.py @@ -8,7 +8,7 @@ from redash.handlers.base import json_response, record_event from redash.permissions import require_super_admin from redash.serializers import QuerySerializer from redash.utils import json_loads -from redash.monitor import celery_tasks +from redash.monitor import celery_tasks, rq_status @routes.route('/api/admin/queries/outdated', methods=['GET']) @@ -52,3 +52,15 @@ def queries_tasks(): } return json_response(response) + + +@routes.route('/api/admin/queries/rq_status', methods=['GET']) +@require_super_admin +@login_required +def queries_rq_status(): + record_event(current_org, current_user._get_current_object(), { + 'action': 'list', + 'object_type': 'rq_status' + }) + + return json_response(rq_status()) diff --git a/redash/monitor.py b/redash/monitor.py index 877a1806..f0d8abfa 100644 --- a/redash/monitor.py +++ b/redash/monitor.py @@ -1,9 +1,13 @@ +from __future__ import absolute_import import itertools from sqlalchemy import union_all from redash import redis_connection, __version__, settings from redash.models import db, DataSource, Query, QueryResult, Dashboard, Widget from redash.utils import json_loads from redash.worker import celery +from rq import Queue, Worker +from rq.job import Job +from rq.registry import StartedJobRegistry def get_redis_status(): @@ -134,3 +138,44 @@ def celery_tasks(): tasks += get_waiting_in_queue(queue_name) return tasks + + +def fetch_jobs(queue, job_ids): + return [{ + 'id': job.id, + 'name': job.func_name, + 'queue': queue.name, + 'enqueued_at': job.enqueued_at, + 'started_at': job.started_at + } for job in Job.fetch_many(job_ids, connection=redis_connection) if job is not None] + + +def rq_queues(): + return { + q.name: { + 'name': q.name, + 'started': fetch_jobs(q, StartedJobRegistry(queue=q).get_job_ids()), + 'queued': fetch_jobs(q, q.job_ids) + } for q in Queue.all(connection=redis_connection)} + + +def rq_workers(): + return [{ + 'name': w.name, + 'hostname': w.hostname, + 'pid': w.pid, + 'queues': ", ".join([q.name for q in w.queues]), + 'state': w.state, + 'last_heartbeat': w.last_heartbeat, + 'birth_date': w.birth_date, + 'successful_job_count': w.successful_job_count, + 'failed_job_count': w.failed_job_count, + 'total_working_time': w.total_working_time + } for w in Worker.all(connection=redis_connection)] + + +def rq_status(): + return { + 'queues': rq_queues(), + 'workers': rq_workers() + } diff --git a/redash/schedule.py b/redash/schedule.py new file mode 100644 index 00000000..43610a95 --- /dev/null +++ b/redash/schedule.py @@ -0,0 +1,56 @@ +from __future__ import absolute_import +from datetime import datetime, timedelta +from functools import partial +from random import randint + +from rq_scheduler import Scheduler + +from redash import settings, redis_connection +from redash.tasks import (sync_user_details, refresh_queries, + empty_schedules, refresh_schemas, + cleanup_query_results, + version_check, send_aggregated_errors) + +rq_scheduler = Scheduler(connection=redis_connection, + queue_name="periodic", + interval=5) + + +def schedule(**kwargs): + interval = kwargs['interval'] + if isinstance(interval, timedelta): + interval = interval.seconds + + kwargs['interval'] = interval + kwargs['result_ttl'] = kwargs.get('result_ttl', interval * 2) + + rq_scheduler.schedule(scheduled_time=datetime.utcnow(), **kwargs) + + +def schedule_periodic_jobs(): + for job in rq_scheduler.get_jobs(): + job.delete() + + jobs = [ + {"func": refresh_queries, "interval": 30}, + {"func": empty_schedules, "interval": timedelta(minutes=60)}, + {"func": refresh_schemas, "interval": timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE)}, + {"func": sync_user_details, "timeout": 60, "ttl": 45, "interval": timedelta(minutes=1)}, + {"func": send_aggregated_errors, "interval": timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL)} + ] + + if settings.QUERY_RESULTS_CLEANUP_ENABLED: + jobs.append({"func": cleanup_query_results, "interval": timedelta(minutes=5)}) + + if settings.VERSION_CHECK: + # We schedule the version check to run at a random time in order to spread the requests from all users evenly. + rq_scheduler.cron('{minute} {hour} * * *'.format( + minute=randint(0, 59), + hour=randint(0, 23)), + func=version_check) + + # Add your own custom periodic jobs in your dynamic_settings module. + jobs.extend(settings.dynamic_settings.periodic_jobs() or []) + + for job in jobs: + schedule(**job) diff --git a/redash/settings/__init__.py b/redash/settings/__init__.py index 1a7154df..53c0a025 100644 --- a/redash/settings/__init__.py +++ b/redash/settings/__init__.py @@ -52,7 +52,6 @@ QUERY_RESULTS_CLEANUP_COUNT = int(os.environ.get("REDASH_QUERY_RESULTS_CLEANUP_C QUERY_RESULTS_CLEANUP_MAX_AGE = int(os.environ.get("REDASH_QUERY_RESULTS_CLEANUP_MAX_AGE", "7")) SCHEMAS_REFRESH_SCHEDULE = int(os.environ.get("REDASH_SCHEMAS_REFRESH_SCHEDULE", 30)) -SCHEMAS_REFRESH_QUEUE = os.environ.get("REDASH_SCHEMAS_REFRESH_QUEUE", "celery") AUTH_TYPE = os.environ.get("REDASH_AUTH_TYPE", "api_key") INVITATION_TOKEN_MAX_AGE = int(os.environ.get("REDASH_INVITATION_TOKEN_MAX_AGE", 60 * 60 * 24 * 7)) @@ -208,6 +207,10 @@ CELERYD_WORKER_TASK_LOG_FORMAT = os.environ.get( (LOG_PREFIX + '[%(asctime)s][PID:%(process)d][%(levelname)s][%(processName)s] ' 'task_name=%(task_name)s ' 'task_id=%(task_id)s %(message)s'))) +RQ_WORKER_JOB_LOG_FORMAT = os.environ.get("REDASH_RQ_WORKER_JOB_LOG_FORMAT", + (LOG_PREFIX + '[%(asctime)s][PID:%(process)d][%(levelname)s][%(name)s] ' + 'job.description=%(job_description)s ' + 'job.id=%(job_id)s %(message)s')) # Mail settings: MAIL_SERVER = os.environ.get('REDASH_MAIL_SERVER', 'localhost') diff --git a/redash/settings/dynamic_settings.py b/redash/settings/dynamic_settings.py index eec0e224..3577d067 100644 --- a/redash/settings/dynamic_settings.py +++ b/redash/settings/dynamic_settings.py @@ -10,15 +10,21 @@ def query_time_limit(is_scheduled, user_id, org_id): return scheduled_time_limit if is_scheduled else adhoc_time_limit +def periodic_jobs(): + """Schedule any custom periodic jobs here. For example: + + from time import timedelta + from somewhere import some_job, some_other_job + + return [ + {"func": some_job, "interval": timedelta(hours=1)}, + {"func": some_other_job, "interval": timedelta(days=1)} + ] + """ + pass + + # This provides the ability to override the way we store QueryResult's data column. # Reference implementation: redash.models.DBPersistence QueryResultPersistence = None -# Provide any custom tasks you'd like to run periodically -def custom_tasks(): - return { - # 'key': { - # 'task': 'redash.task.example', - # 'schedule': timedelta(minutes=5) - # } - } diff --git a/redash/tasks/__init__.py b/redash/tasks/__init__.py index e1bccd8e..613111f4 100644 --- a/redash/tasks/__init__.py +++ b/redash/tasks/__init__.py @@ -1,4 +1,5 @@ from .general import record_event, version_check, send_mail, sync_user_details -from .queries import QueryTask, refresh_queries, refresh_schemas, cleanup_query_results, execute_query, empty_schedules +from .queries import (QueryTask, enqueue_query, execute_query, refresh_queries, + refresh_schemas, cleanup_query_results, empty_schedules) from .alerts import check_alerts_for_query -from .failure_report import notify_of_failure +from .failure_report import send_aggregated_errors diff --git a/redash/tasks/alerts.py b/redash/tasks/alerts.py index 9092cd30..37dce531 100644 --- a/redash/tasks/alerts.py +++ b/redash/tasks/alerts.py @@ -1,11 +1,10 @@ -from celery.utils.log import get_task_logger from flask import current_app import datetime -from redash.worker import celery +from redash.worker import celery, job, get_job_logger from redash import models, utils -logger = get_task_logger(__name__) +logger = get_job_logger(__name__) def notify_subscriptions(alert, new_state): @@ -25,7 +24,7 @@ def should_notify(alert, new_state): return new_state != alert.state or (alert.state == models.Alert.TRIGGERED_STATE and passed_rearm_threshold) -@celery.task(name="redash.tasks.check_alerts_for_query", time_limit=300, soft_time_limit=240) +@job('default', timeout=300) def check_alerts_for_query(query_id): logger.debug("Checking query %d for alerts", query_id) diff --git a/redash/tasks/failure_report.py b/redash/tasks/failure_report.py index df5334b4..3845dc06 100644 --- a/redash/tasks/failure_report.py +++ b/redash/tasks/failure_report.py @@ -1,8 +1,8 @@ +import logging import datetime import re from collections import Counter from redash.tasks.general import send_mail -from redash.worker import celery from redash import redis_connection, settings, models from redash.utils import json_dumps, json_loads, base_url, render_template @@ -21,7 +21,6 @@ def comment_for(failure): ) -@celery.task(name="redash.tasks.send_aggregated_errors") def send_aggregated_errors(): for k in redis_connection.scan_iter(key("*")): user_id = re.search(r'\d+', k).group() @@ -69,3 +68,13 @@ def notify_of_failure(message, query): 'schedule_failures': query.schedule_failures, 'failed_at': datetime.datetime.utcnow().strftime("%B %d, %Y %I:%M%p UTC") })) + + +def track_failure(query, error): + logging.debug(error) + + query.schedule_failures += 1 + models.db.session.add(query) + models.db.session.commit() + + notify_of_failure(error, query) diff --git a/redash/tasks/general.py b/redash/tasks/general.py index f23bebb7..9fc26a12 100644 --- a/redash/tasks/general.py +++ b/redash/tasks/general.py @@ -1,16 +1,15 @@ import requests -from celery.utils.log import get_task_logger from flask_mail import Message from redash import mail, models, settings from redash.models import users from redash.version_check import run_version_check -from redash.worker import celery +from redash.worker import job, get_job_logger -logger = get_task_logger(__name__) +logger = get_job_logger(__name__) -@celery.task(name="redash.tasks.record_event") +@job('default') def record_event(raw_event): event = models.Event.record(raw_event) models.db.session.commit() @@ -29,12 +28,11 @@ def record_event(raw_event): logger.exception("Failed posting to %s", hook) -@celery.task(name="redash.tasks.version_check") def version_check(): run_version_check() -@celery.task(name="redash.tasks.subscribe") +@job('default') def subscribe(form): logger.info("Subscribing to: [security notifications=%s], [newsletter=%s]", form['security_notifications'], form['newsletter']) data = { @@ -47,7 +45,7 @@ def subscribe(form): requests.post('https://beacon.redash.io/subscribe', json=data) -@celery.task(name="redash.tasks.send_mail") +@job('default') def send_mail(to, subject, html, text): try: message = Message(recipients=to, @@ -60,14 +58,5 @@ def send_mail(to, subject, html, text): logger.exception('Failed sending message: %s', message.subject) -@celery.task( - name="redash.tasks.sync_user_details", - ignore_result=True, - soft_time_limit=60, - time_limit=120, - # let the task expire after 45 seconds since there will be - # another task 15 seconds later anyway - expires=45, -) def sync_user_details(): users.sync_last_active_at() diff --git a/redash/tasks/queries/__init__.py b/redash/tasks/queries/__init__.py new file mode 100644 index 00000000..12009195 --- /dev/null +++ b/redash/tasks/queries/__init__.py @@ -0,0 +1,2 @@ +from .maintenance import refresh_queries, refresh_schemas, cleanup_query_results, empty_schedules +from .execution import QueryTask, execute_query, enqueue_query diff --git a/redash/tasks/queries.py b/redash/tasks/queries/execution.py similarity index 61% rename from redash/tasks/queries.py rename to redash/tasks/queries/execution.py index 69fbe16e..90d58e00 100644 --- a/redash/tasks/queries.py +++ b/redash/tasks/queries/execution.py @@ -7,12 +7,11 @@ from celery.result import AsyncResult from celery.utils.log import get_task_logger from six import text_type -from redash import models, redis_connection, settings, statsd_client -from redash.models.parameterized_query import InvalidParameterError, QueryDetachedFromDataSourceError +from redash import models, redis_connection, settings from redash.query_runner import InterruptException from redash.tasks.alerts import check_alerts_for_query -from redash.tasks.failure_report import notify_of_failure -from redash.utils import gen_query_hash, json_dumps, utcnow, mustache_render +from redash.tasks.failure_report import track_failure +from redash.utils import gen_query_hash, json_dumps, utcnow from redash.worker import celery logger = get_task_logger(__name__) @@ -161,138 +160,6 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query return job -@celery.task(name="redash.tasks.empty_schedules") -def empty_schedules(): - logger.info("Deleting schedules of past scheduled queries...") - - queries = models.Query.past_scheduled_queries() - for query in queries: - query.schedule = None - models.db.session.commit() - - logger.info("Deleted %d schedules.", len(queries)) - - -@celery.task(name="redash.tasks.refresh_queries") -def refresh_queries(): - logger.info("Refreshing queries...") - - outdated_queries_count = 0 - query_ids = [] - - with statsd_client.timer('manager.outdated_queries_lookup'): - for query in models.Query.outdated_queries(): - if settings.FEATURE_DISABLE_REFRESH_QUERIES: - logging.info("Disabled refresh queries.") - elif query.org.is_disabled: - logging.debug("Skipping refresh of %s because org is disabled.", query.id) - elif query.data_source is None: - logging.debug("Skipping refresh of %s because the datasource is none.", query.id) - elif query.data_source.paused: - logging.debug("Skipping refresh of %s because datasource - %s is paused (%s).", - query.id, query.data_source.name, query.data_source.pause_reason) - else: - query_text = query.query_text - - parameters = {p['name']: p.get('value') for p in query.parameters} - if any(parameters): - try: - query_text = query.parameterized.apply(parameters).query - except InvalidParameterError as e: - error = u"Skipping refresh of {} because of invalid parameters: {}".format(query.id, e.message) - track_failure(query, error) - continue - except QueryDetachedFromDataSourceError as e: - error = ("Skipping refresh of {} because a related dropdown " - "query ({}) is unattached to any datasource.").format(query.id, e.query_id) - track_failure(query, error) - continue - - enqueue_query(query_text, query.data_source, query.user_id, - scheduled_query=query, - metadata={'Query ID': query.id, 'Username': 'Scheduled'}) - - query_ids.append(query.id) - outdated_queries_count += 1 - - statsd_client.gauge('manager.outdated_queries', outdated_queries_count) - - logger.info("Done refreshing queries. Found %d outdated queries: %s" % (outdated_queries_count, query_ids)) - - status = redis_connection.hgetall('redash:status') - now = time.time() - - redis_connection.hmset('redash:status', { - 'outdated_queries_count': outdated_queries_count, - 'last_refresh_at': now, - 'query_ids': json_dumps(query_ids) - }) - - statsd_client.gauge('manager.seconds_since_refresh', now - float(status.get('last_refresh_at', now))) - - -@celery.task(name="redash.tasks.cleanup_query_results") -def cleanup_query_results(): - """ - Job to cleanup unused query results -- such that no query links to them anymore, and older than - settings.QUERY_RESULTS_MAX_AGE (a week by default, so it's less likely to be open in someone's browser and be used). - - Each time the job deletes only settings.QUERY_RESULTS_CLEANUP_COUNT (100 by default) query results so it won't choke - the database in case of many such results. - """ - - logging.info("Running query results clean up (removing maximum of %d unused results, that are %d days old or more)", - settings.QUERY_RESULTS_CLEANUP_COUNT, settings.QUERY_RESULTS_CLEANUP_MAX_AGE) - - unused_query_results = models.QueryResult.unused(settings.QUERY_RESULTS_CLEANUP_MAX_AGE).limit(settings.QUERY_RESULTS_CLEANUP_COUNT) - deleted_count = models.QueryResult.query.filter( - models.QueryResult.id.in_(unused_query_results.subquery()) - ).delete(synchronize_session=False) - models.db.session.commit() - logger.info("Deleted %d unused query results.", deleted_count) - - -@celery.task(name="redash.tasks.refresh_schema", time_limit=90, soft_time_limit=60) -def refresh_schema(data_source_id): - ds = models.DataSource.get_by_id(data_source_id) - logger.info(u"task=refresh_schema state=start ds_id=%s", ds.id) - start_time = time.time() - try: - ds.get_schema(refresh=True) - logger.info(u"task=refresh_schema state=finished ds_id=%s runtime=%.2f", ds.id, time.time() - start_time) - statsd_client.incr('refresh_schema.success') - except SoftTimeLimitExceeded: - logger.info(u"task=refresh_schema state=timeout ds_id=%s runtime=%.2f", ds.id, time.time() - start_time) - statsd_client.incr('refresh_schema.timeout') - except Exception: - logger.warning(u"Failed refreshing schema for the data source: %s", ds.name, exc_info=1) - statsd_client.incr('refresh_schema.error') - logger.info(u"task=refresh_schema state=failed ds_id=%s runtime=%.2f", ds.id, time.time() - start_time) - - -@celery.task(name="redash.tasks.refresh_schemas") -def refresh_schemas(): - """ - Refreshes the data sources schemas. - """ - blacklist = [int(ds_id) for ds_id in redis_connection.smembers('data_sources:schema:blacklist') if ds_id] - global_start_time = time.time() - - logger.info(u"task=refresh_schemas state=start") - - for ds in models.DataSource.query: - if ds.paused: - logger.info(u"task=refresh_schema state=skip ds_id=%s reason=paused(%s)", ds.id, ds.pause_reason) - elif ds.id in blacklist: - logger.info(u"task=refresh_schema state=skip ds_id=%s reason=blacklist", ds.id) - elif ds.org.is_disabled: - logger.info(u"task=refresh_schema state=skip ds_id=%s reason=org_disabled", ds.id) - else: - refresh_schema.apply_async(args=(ds.id,), queue=settings.SCHEMAS_REFRESH_QUEUE) - - logger.info(u"task=refresh_schemas state=finish total_runtime=%.2f", time.time() - global_start_time) - - def signal_handler(*args): raise InterruptException @@ -317,16 +184,6 @@ def _resolve_user(user_id, is_api_key, query_id): return None -def track_failure(query, error): - logging.debug(error) - - query.schedule_failures += 1 - models.db.session.add(query) - models.db.session.commit() - - notify_of_failure(error, query) - - # We could have created this as a celery.Task derived class, and act as the task itself. But this might result in weird # issues as the task class created once per process, so decided to have a plain object instead. class QueryExecutor(object): @@ -370,7 +227,8 @@ class QueryExecutor(object): run_time = time.time() - started_at - logger.info(u"task=execute_query query_hash=%s data_length=%s error=[%s]", self.query_hash, data and len(data), error) + logger.info(u"task=execute_query query_hash=%s data_length=%s error=[%s]", + self.query_hash, data and len(data), error) _unlock(self.query_hash, self.data_source.id) @@ -408,7 +266,7 @@ class QueryExecutor(object): self.metadata['Query Hash'] = self.query_hash self.metadata['Queue'] = self.task.request.delivery_info['routing_key'] self.metadata['Scheduled'] = self.scheduled_query is not None - + return query_runner.annotate_query(self.query, self.metadata) def _log_progress(self, state): diff --git a/redash/tasks/queries/maintenance.py b/redash/tasks/queries/maintenance.py new file mode 100644 index 00000000..ea786675 --- /dev/null +++ b/redash/tasks/queries/maintenance.py @@ -0,0 +1,142 @@ +import logging +import time + +from celery.exceptions import SoftTimeLimitExceeded +from redash import models, redis_connection, settings, statsd_client +from redash.models.parameterized_query import (InvalidParameterError, + QueryDetachedFromDataSourceError) +from redash.tasks.failure_report import track_failure +from redash.utils import json_dumps +from redash.worker import job, get_job_logger + +from .execution import enqueue_query + +logger = get_job_logger(__name__) + + +def empty_schedules(): + logger.info("Deleting schedules of past scheduled queries...") + + queries = models.Query.past_scheduled_queries() + for query in queries: + query.schedule = None + models.db.session.commit() + + logger.info("Deleted %d schedules.", len(queries)) + + +def refresh_queries(): + logger.info("Refreshing queries...") + + outdated_queries_count = 0 + query_ids = [] + + with statsd_client.timer('manager.outdated_queries_lookup'): + for query in models.Query.outdated_queries(): + if settings.FEATURE_DISABLE_REFRESH_QUERIES: + logging.info("Disabled refresh queries.") + elif query.org.is_disabled: + logging.debug("Skipping refresh of %s because org is disabled.", query.id) + elif query.data_source is None: + logging.debug("Skipping refresh of %s because the datasource is none.", query.id) + elif query.data_source.paused: + logging.debug("Skipping refresh of %s because datasource - %s is paused (%s).", + query.id, query.data_source.name, query.data_source.pause_reason) + else: + query_text = query.query_text + + parameters = {p['name']: p.get('value') for p in query.parameters} + if any(parameters): + try: + query_text = query.parameterized.apply(parameters).query + except InvalidParameterError as e: + error = u"Skipping refresh of {} because of invalid parameters: {}".format(query.id, e.message) + track_failure(query, error) + continue + except QueryDetachedFromDataSourceError as e: + error = ("Skipping refresh of {} because a related dropdown " + "query ({}) is unattached to any datasource.").format(query.id, e.query_id) + track_failure(query, error) + continue + + enqueue_query(query_text, query.data_source, query.user_id, + scheduled_query=query, + metadata={'Query ID': query.id, 'Username': 'Scheduled'}) + + query_ids.append(query.id) + outdated_queries_count += 1 + + statsd_client.gauge('manager.outdated_queries', outdated_queries_count) + + logger.info("Done refreshing queries. Found %d outdated queries: %s" % (outdated_queries_count, query_ids)) + + status = redis_connection.hgetall('redash:status') + now = time.time() + + redis_connection.hmset('redash:status', { + 'outdated_queries_count': outdated_queries_count, + 'last_refresh_at': now, + 'query_ids': json_dumps(query_ids) + }) + + statsd_client.gauge('manager.seconds_since_refresh', now - float(status.get('last_refresh_at', now))) + + +def cleanup_query_results(): + """ + Job to cleanup unused query results -- such that no query links to them anymore, and older than + settings.QUERY_RESULTS_MAX_AGE (a week by default, so it's less likely to be open in someone's browser and be used). + + Each time the job deletes only settings.QUERY_RESULTS_CLEANUP_COUNT (100 by default) query results so it won't choke + the database in case of many such results. + """ + + logging.info("Running query results clean up (removing maximum of %d unused results, that are %d days old or more)", + settings.QUERY_RESULTS_CLEANUP_COUNT, settings.QUERY_RESULTS_CLEANUP_MAX_AGE) + + unused_query_results = models.QueryResult.unused(settings.QUERY_RESULTS_CLEANUP_MAX_AGE) + deleted_count = models.QueryResult.query.filter( + models.QueryResult.id.in_(unused_query_results.limit(settings.QUERY_RESULTS_CLEANUP_COUNT).subquery()) + ).delete(synchronize_session=False) + models.db.session.commit() + logger.info("Deleted %d unused query results.", deleted_count) + + +@job('schemas') +def refresh_schema(data_source_id): + ds = models.DataSource.get_by_id(data_source_id) + logger.info(u"task=refresh_schema state=start ds_id=%s", ds.id) + start_time = time.time() + try: + ds.get_schema(refresh=True) + logger.info(u"task=refresh_schema state=finished ds_id=%s runtime=%.2f", ds.id, time.time() - start_time) + statsd_client.incr('refresh_schema.success') + except SoftTimeLimitExceeded: + logger.info(u"task=refresh_schema state=timeout ds_id=%s runtime=%.2f", ds.id, time.time() - start_time) + statsd_client.incr('refresh_schema.timeout') + except Exception: + logger.warning(u"Failed refreshing schema for the data source: %s", ds.name, exc_info=1) + statsd_client.incr('refresh_schema.error') + logger.info(u"task=refresh_schema state=failed ds_id=%s runtime=%.2f", ds.id, time.time() - start_time) + + +def refresh_schemas(): + """ + Refreshes the data sources schemas. + """ + blacklist = [int(ds_id) for ds_id in redis_connection.smembers('data_sources:schema:blacklist') if ds_id] + global_start_time = time.time() + + logger.info(u"task=refresh_schemas state=start") + + for ds in models.DataSource.query: + if ds.paused: + logger.info(u"task=refresh_schema state=skip ds_id=%s reason=paused(%s)", ds.id, ds.pause_reason) + elif ds.id in blacklist: + logger.info(u"task=refresh_schema state=skip ds_id=%s reason=blacklist", ds.id) + elif ds.org.is_disabled: + logger.info(u"task=refresh_schema state=skip ds_id=%s reason=org_disabled", ds.id) + else: + refresh_schema.delay(ds.id) + + logger.info(u"task=refresh_schemas state=finish total_runtime=%.2f", time.time() - global_start_time) diff --git a/redash/utils/sentry.py b/redash/utils/sentry.py index 1947e1a3..ea49be4a 100644 --- a/redash/utils/sentry.py +++ b/redash/utils/sentry.py @@ -3,6 +3,7 @@ from sentry_sdk.integrations.flask import FlaskIntegration from sentry_sdk.integrations.celery import CeleryIntegration from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration from sentry_sdk.integrations.redis import RedisIntegration +from sentry_sdk.integrations.rq import RqIntegration from redash import settings, __version__ @@ -25,5 +26,6 @@ def init(): release=__version__, before_send=before_send, send_default_pii=True, - integrations=[FlaskIntegration(), CeleryIntegration(), SqlalchemyIntegration(), RedisIntegration()] + integrations=[FlaskIntegration(), CeleryIntegration(), SqlalchemyIntegration(), + RedisIntegration(), RqIntegration()] ) diff --git a/redash/worker.py b/redash/worker.py index e960c34f..159fcff2 100644 --- a/redash/worker.py +++ b/redash/worker.py @@ -1,19 +1,47 @@ from __future__ import absolute_import from datetime import timedelta -from random import randint +from functools import partial from flask import current_app +import logging from celery import Celery -from celery.schedules import crontab from celery.signals import worker_process_init from celery.utils.log import get_logger -from redash import create_app, extensions, settings +from rq import get_current_job +from rq.decorators import job as rq_job + +from redash import create_app, extensions, settings, redis_connection from redash.metrics import celery as celery_metrics # noqa logger = get_logger(__name__) +job = partial(rq_job, connection=redis_connection) + + +class CurrentJobFilter(logging.Filter): + def filter(self, record): + current_job = get_current_job() + + record.job_id = current_job.id if current_job else '' + record.job_description = current_job.description if current_job else '' + + return True + + +def get_job_logger(name): + logger = logging.getLogger('rq.job.' + name) + + handler = logging.StreamHandler() + handler.formatter = logging.Formatter(settings.RQ_WORKER_JOB_LOG_FORMAT) + handler.addFilter(CurrentJobFilter()) + + logger.addHandler(handler) + logger.propagate = False + + return logger + celery = Celery('redash', broker=settings.CELERY_BROKER, @@ -21,48 +49,8 @@ celery = Celery('redash', redis_backend_use_ssl=settings.CELERY_SSL_CONFIG, include='redash.tasks') -# The internal periodic Celery tasks to automatically schedule. -celery_schedule = { - 'refresh_queries': { - 'task': 'redash.tasks.refresh_queries', - 'schedule': timedelta(seconds=30) - }, - 'empty_schedules': { - 'task': 'redash.tasks.empty_schedules', - 'schedule': timedelta(minutes=60) - }, - 'refresh_schemas': { - 'task': 'redash.tasks.refresh_schemas', - 'schedule': timedelta(minutes=settings.SCHEMAS_REFRESH_SCHEDULE) - }, - 'sync_user_details': { - 'task': 'redash.tasks.sync_user_details', - 'schedule': timedelta(minutes=1), - }, - 'send_aggregated_errors': { - 'task': 'redash.tasks.send_aggregated_errors', - 'schedule': timedelta(minutes=settings.SEND_FAILURE_EMAIL_INTERVAL), - } -} - -if settings.VERSION_CHECK: - celery_schedule['version_check'] = { - 'task': 'redash.tasks.version_check', - # We need to schedule the version check to run at a random hour/minute, to spread the requests from all users - # evenly. - 'schedule': crontab(minute=randint(0, 59), hour=randint(0, 23)) - } - -if settings.QUERY_RESULTS_CLEANUP_ENABLED: - celery_schedule['cleanup_query_results'] = { - 'task': 'redash.tasks.cleanup_query_results', - 'schedule': timedelta(minutes=5) - } - -celery_schedule.update(settings.dynamic_settings.custom_tasks()) celery.conf.update(result_backend=settings.CELERY_RESULT_BACKEND, - beat_schedule=celery_schedule, timezone='UTC', result_expires=settings.CELERY_RESULT_EXPIRES, worker_log_format=settings.CELERYD_WORKER_LOG_FORMAT, diff --git a/requirements.txt b/requirements.txt index ca10306f..ff734415 100644 --- a/requirements.txt +++ b/requirements.txt @@ -36,6 +36,8 @@ SQLAlchemy-Utils==0.33.11 sqlparse==0.2.4 statsd==2.1.2 gunicorn==19.7.1 +rq==1.1.0 +rq-scheduler==0.9 celery==4.3.0 kombu==4.6.3 jsonschema==2.4.0 diff --git a/tests/tasks/test_failure_report.py b/tests/tasks/test_failure_report.py index d9f44d11..0cefe213 100644 --- a/tests/tasks/test_failure_report.py +++ b/tests/tasks/test_failure_report.py @@ -22,7 +22,7 @@ class TestSendAggregatedErrorsTask(BaseTestCase): notify_of_failure(message, query) return key(query.user.id) - @mock.patch('redash.tasks.failure_report.render_template') + @mock.patch('redash.tasks.failure_report.render_template', return_value='') def send_email(self, user, render_template): send_failure_report(user.id) @@ -93,7 +93,7 @@ class TestSendAggregatedErrorsTask(BaseTestCase): self.notify(query=query) self.notify(query=query) - + failures = self.send_email(query.user) latest_failure = dateutil.parser.parse(failures[0]['failed_at']) self.assertNotEqual(2000, latest_failure.year) diff --git a/tests/tasks/test_queries.py b/tests/tasks/test_queries.py index 2afb962d..ea8efc71 100644 --- a/tests/tasks/test_queries.py +++ b/tests/tasks/test_queries.py @@ -8,7 +8,7 @@ from tests import BaseTestCase from redash import redis_connection, models from redash.utils import json_dumps from redash.query_runner.pg import PostgreSQL -from redash.tasks.queries import QueryExecutionError, enqueue_query, execute_query +from redash.tasks.queries.execution import QueryExecutionError, enqueue_query, execute_query FakeResult = namedtuple('FakeResult', 'id') diff --git a/tests/tasks/test_refresh_queries.py b/tests/tasks/test_refresh_queries.py index 7bf4919f..2300fff9 100644 --- a/tests/tasks/test_refresh_queries.py +++ b/tests/tasks/test_refresh_queries.py @@ -1,8 +1,9 @@ from mock import patch, call, ANY from tests import BaseTestCase -from redash.tasks import refresh_queries +from redash.tasks.queries.maintenance import refresh_queries from redash.models import Query +ENQUEUE_QUERY = 'redash.tasks.queries.maintenance.enqueue_query' class TestRefreshQuery(BaseTestCase): def test_enqueues_outdated_queries(self): @@ -15,7 +16,7 @@ class TestRefreshQuery(BaseTestCase): query_text="select 42;", data_source=self.factory.create_data_source()) oq = staticmethod(lambda: [query1, query2]) - with patch('redash.tasks.queries.enqueue_query') as add_job_mock, \ + with patch(ENQUEUE_QUERY) as add_job_mock, \ patch.object(Query, 'outdated_queries', oq): refresh_queries() self.assertEqual(add_job_mock.call_count, 2) @@ -34,13 +35,13 @@ class TestRefreshQuery(BaseTestCase): oq = staticmethod(lambda: [query]) query.data_source.pause() with patch.object(Query, 'outdated_queries', oq): - with patch('redash.tasks.queries.enqueue_query') as add_job_mock: + with patch(ENQUEUE_QUERY) as add_job_mock: refresh_queries() add_job_mock.assert_not_called() query.data_source.resume() - with patch('redash.tasks.queries.enqueue_query') as add_job_mock: + with patch(ENQUEUE_QUERY) as add_job_mock: refresh_queries() add_job_mock.assert_called_with( query.query_text, query.data_source, query.user_id, @@ -59,7 +60,7 @@ class TestRefreshQuery(BaseTestCase): "value": "42", "title": "n"}]}) oq = staticmethod(lambda: [query]) - with patch('redash.tasks.queries.enqueue_query') as add_job_mock, \ + with patch(ENQUEUE_QUERY) as add_job_mock, \ patch.object(Query, 'outdated_queries', oq): refresh_queries() add_job_mock.assert_called_with( @@ -79,7 +80,7 @@ class TestRefreshQuery(BaseTestCase): "value": 42, # <-- should be text! "title": "n"}]}) oq = staticmethod(lambda: [query]) - with patch('redash.tasks.queries.enqueue_query') as add_job_mock, \ + with patch(ENQUEUE_QUERY) as add_job_mock, \ patch.object(Query, 'outdated_queries', oq): refresh_queries() add_job_mock.assert_not_called() @@ -100,7 +101,7 @@ class TestRefreshQuery(BaseTestCase): dropdown_query = self.factory.create_query(id=100, data_source=None) oq = staticmethod(lambda: [query]) - with patch('redash.tasks.queries.enqueue_query') as add_job_mock, \ + with patch(ENQUEUE_QUERY) as add_job_mock, \ patch.object(Query, 'outdated_queries', oq): refresh_queries() add_job_mock.assert_not_called() \ No newline at end of file diff --git a/tests/tasks/test_refresh_schemas.py b/tests/tasks/test_refresh_schemas.py index df29f5f2..e6426f66 100644 --- a/tests/tasks/test_refresh_schemas.py +++ b/tests/tasks/test_refresh_schemas.py @@ -7,19 +7,19 @@ from redash.tasks import refresh_schemas class TestRefreshSchemas(BaseTestCase): def test_calls_refresh_of_all_data_sources(self): self.factory.data_source # trigger creation - with patch('redash.tasks.queries.refresh_schema.apply_async') as refresh_job: + with patch('redash.tasks.queries.maintenance.refresh_schema.delay') as refresh_job: refresh_schemas() refresh_job.assert_called() def test_skips_paused_data_sources(self): self.factory.data_source.pause() - with patch('redash.tasks.queries.refresh_schema.apply_async') as refresh_job: + with patch('redash.tasks.queries.maintenance.refresh_schema.delay') as refresh_job: refresh_schemas() refresh_job.assert_not_called() self.factory.data_source.resume() - with patch('redash.tasks.queries.refresh_schema.apply_async') as refresh_job: + with patch('redash.tasks.queries.maintenance.refresh_schema.delay') as refresh_job: refresh_schemas() refresh_job.assert_called()