fleet/server/worker/worker.go
Juan Fernandez 009a87d33e
Feature 10196: Add filepath to end-points and third party integrations (#11285)
Adds the software installed path property to the proper end-points and third party integrations (webhook, Zendesk and Jira).
2023-05-17 16:53:15 -04:00

216 lines
6.1 KiB
Go

package worker
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
"github.com/fleetdm/fleet/v4/server/fleet"
kitlog "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
)
const (
maxRetries = 5
// nvdCVEURL is the base link to a CVE on the NVD website, only the CVE code
// needs to be appended to make it a valid link.
nvdCVEURL = "https://nvd.nist.gov/vuln/detail/"
)
const (
// types of integrations - jobs like Jira and Zendesk support different
// integrations, this identifies the integration type of a message.
intgTypeVuln = "vuln"
intgTypeFailingPolicy = "failingPolicy"
)
// Job defines an interface for jobs that can be run by the Worker
type Job interface {
// Name is the unique name of the job.
Name() string
// Run performs the actual work.
Run(ctx context.Context, argsJSON json.RawMessage) error
}
// failingPolicyArgs are the args common to all integrations that can process
// failing policies.
type failingPolicyArgs struct {
PolicyID uint `json:"policy_id"`
PolicyName string `json:"policy_name"`
PolicyCritical bool `json:"policy_critical"`
Hosts []fleet.PolicySetHost `json:"hosts"`
TeamID *uint `json:"team_id,omitempty"`
}
// vulnArgs are the args common to all integrations that can process
// vulnerabilities.
type vulnArgs struct {
CVE string `json:"cve,omitempty"`
AffectedSoftwareIDs []uint `json:"affected_software,omitempty"`
EPSSProbability *float64 `json:"epss_probability,omitempty"` // Premium feature only
CVSSScore *float64 `json:"cvss_score,omitempty"` // Premium feature only
CISAKnownExploit *bool `json:"cisa_known_exploit,omitempty"` // Premium feature only
CVEPublished *time.Time `json:"cve_published,omitempty"` // Premium feature only
}
// Worker runs jobs. NOT SAFE FOR CONCURRENT USE.
type Worker struct {
ds fleet.Datastore
log kitlog.Logger
// For tests only, allows ignoring unknown jobs instead of failing them.
TestIgnoreUnknownJobs bool
registry map[string]Job
}
func NewWorker(ds fleet.Datastore, log kitlog.Logger) *Worker {
return &Worker{
ds: ds,
log: log,
registry: make(map[string]Job),
}
}
func (w *Worker) Register(jobs ...Job) {
for _, j := range jobs {
name := j.Name()
if _, ok := w.registry[name]; ok {
panic(fmt.Sprintf("job %s already registered", name))
}
w.registry[name] = j
}
}
// QueueJob inserts a job to be processed by the worker for the job processor
// identified by the name (e.g. "jira"). The args value is marshaled as JSON
// and provided to the job processor when the job is executed.
func QueueJob(ctx context.Context, ds fleet.Datastore, name string, args interface{}) (*fleet.Job, error) {
argsJSON, err := json.Marshal(args)
if err != nil {
return nil, ctxerr.Wrap(ctx, err, "marshal args")
}
job := &fleet.Job{
Name: name,
Args: (*json.RawMessage)(&argsJSON),
State: fleet.JobStateQueued,
}
return ds.NewJob(ctx, job)
}
// this defines the delays to add between retries (i.e. how the "not_before"
// timestamp of a job will be set for the next run). Keep in mind that at a
// minimum, the job will not be retried before the next cron run of the worker,
// but we want to ensure a minimum delay before retries to give a chance to
// e.g. transient network issues to resolve themselves.
var delayPerRetry = []time.Duration{
1: 0, // i.e. for the first retry, do it ASAP (on the next cron run)
2: 5 * time.Minute,
3: 10 * time.Minute,
4: 1 * time.Hour,
5: 2 * time.Hour,
}
// ProcessJobs processes all queued jobs.
func (w *Worker) ProcessJobs(ctx context.Context) error {
const maxNumJobs = 100
// process jobs until there are none left or the context is cancelled
seen := make(map[uint]struct{})
for {
jobs, err := w.ds.GetQueuedJobs(ctx, maxNumJobs)
if err != nil {
return ctxerr.Wrap(ctx, err, "get queued jobs")
}
if len(jobs) == 0 {
break
}
for _, job := range jobs {
select {
case <-ctx.Done():
return ctxerr.Wrap(ctx, ctx.Err(), "context done")
default:
}
log := kitlog.With(w.log, "job_id", job.ID)
if _, ok := seen[job.ID]; ok {
level.Debug(log).Log("msg", "some jobs failed, retrying on next cron execution")
return nil
}
seen[job.ID] = struct{}{}
level.Debug(log).Log("msg", "processing job")
if err := w.processJob(ctx, job); err != nil {
level.Error(log).Log("msg", "process job", "err", err)
job.Error = err.Error()
if job.Retries < maxRetries {
level.Debug(log).Log("msg", "will retry job")
job.Retries += 1
if job.Retries < len(delayPerRetry) {
job.NotBefore = time.Now().Add(delayPerRetry[job.Retries])
}
} else {
job.State = fleet.JobStateFailure
}
} else {
job.State = fleet.JobStateSuccess
job.Error = ""
}
// When we update the job, the updated_at timestamp gets updated and the job gets "pushed" to the back
// of queue. GetQueuedJobs fetches jobs by updated_at, so it will not return the same job until the queue
// has been processed once.
if _, err := w.ds.UpdateJob(ctx, job.ID, job); err != nil {
level.Error(log).Log("update job", "err", err)
}
}
}
return nil
}
func (w *Worker) processJob(ctx context.Context, job *fleet.Job) error {
j, ok := w.registry[job.Name]
if !ok {
if w.TestIgnoreUnknownJobs {
return nil
}
return ctxerr.Errorf(ctx, "unknown job: %s", job.Name)
}
var args json.RawMessage
if job.Args != nil {
args = *job.Args
}
return j.Run(ctx, args)
}
type failingPoliciesTplArgs struct {
FleetURL string
PolicyID uint
PolicyName string
PolicyCritical bool
TeamID *uint
Hosts []fleet.PolicySetHost
}
func newFailingPoliciesTplArgs(fleetURL string, args *failingPolicyArgs) *failingPoliciesTplArgs {
return &failingPoliciesTplArgs{
FleetURL: fleetURL,
PolicyName: args.PolicyName,
PolicyID: args.PolicyID,
PolicyCritical: args.PolicyCritical,
TeamID: args.TeamID,
Hosts: args.Hosts,
}
}