mirror of
https://github.com/empayre/fleet.git
synced 2024-11-07 01:15:22 +00:00
aecc2fed75
This only applies to Premium users, we want to show the vulnerabilities' published date anywhere vulnerabilities are shown including API endpoints and third party integrations.
193 lines
5.2 KiB
Go
193 lines
5.2 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
|
|
"github.com/fleetdm/fleet/v4/server/fleet"
|
|
kitlog "github.com/go-kit/kit/log"
|
|
"github.com/go-kit/kit/log/level"
|
|
)
|
|
|
|
const (
|
|
maxRetries = 5
|
|
// nvdCVEURL is the base link to a CVE on the NVD website, only the CVE code
|
|
// needs to be appended to make it a valid link.
|
|
nvdCVEURL = "https://nvd.nist.gov/vuln/detail/"
|
|
)
|
|
|
|
const (
|
|
// types of integrations - jobs like Jira and Zendesk support different
|
|
// integrations, this identifies the integration type of a message.
|
|
intgTypeVuln = "vuln"
|
|
intgTypeFailingPolicy = "failingPolicy"
|
|
)
|
|
|
|
// Job defines an interface for jobs that can be run by the Worker
|
|
type Job interface {
|
|
// Name is the unique name of the job.
|
|
Name() string
|
|
|
|
// Run performs the actual work.
|
|
Run(ctx context.Context, argsJSON json.RawMessage) error
|
|
}
|
|
|
|
// failingPolicyArgs are the args common to all integrations that can process
|
|
// failing policies.
|
|
type failingPolicyArgs struct {
|
|
PolicyID uint `json:"policy_id"`
|
|
PolicyName string `json:"policy_name"`
|
|
PolicyCritical bool `json:"policy_critical"`
|
|
Hosts []fleet.PolicySetHost `json:"hosts"`
|
|
TeamID *uint `json:"team_id,omitempty"`
|
|
}
|
|
|
|
// vulnArgs are the args common to all integrations that can process
|
|
// vulnerabilities.
|
|
type vulnArgs struct {
|
|
CVE string `json:"cve,omitempty"`
|
|
EPSSProbability *float64 `json:"epss_probability,omitempty"` // Premium feature only
|
|
CVSSScore *float64 `json:"cvss_score,omitempty"` // Premium feature only
|
|
CISAKnownExploit *bool `json:"cisa_known_exploit,omitempty"` // Premium feature only
|
|
CVEPublished *time.Time `json:"cve_published,omitempty"` // Premium feature only
|
|
}
|
|
|
|
// Worker runs jobs. NOT SAFE FOR CONCURRENT USE.
|
|
type Worker struct {
|
|
ds fleet.Datastore
|
|
log kitlog.Logger
|
|
|
|
registry map[string]Job
|
|
}
|
|
|
|
func NewWorker(ds fleet.Datastore, log kitlog.Logger) *Worker {
|
|
return &Worker{
|
|
ds: ds,
|
|
log: log,
|
|
registry: make(map[string]Job),
|
|
}
|
|
}
|
|
|
|
func (w *Worker) Register(jobs ...Job) {
|
|
for _, j := range jobs {
|
|
name := j.Name()
|
|
if _, ok := w.registry[name]; ok {
|
|
panic(fmt.Sprintf("job %s already registered", name))
|
|
}
|
|
w.registry[name] = j
|
|
}
|
|
}
|
|
|
|
// QueueJob inserts a job to be processed by the worker for the job processor
|
|
// identified by the name (e.g. "jira"). The args value is marshaled as JSON
|
|
// and provided to the job processor when the job is executed.
|
|
func QueueJob(ctx context.Context, ds fleet.Datastore, name string, args interface{}) (*fleet.Job, error) {
|
|
argsJSON, err := json.Marshal(args)
|
|
if err != nil {
|
|
return nil, ctxerr.Wrap(ctx, err, "marshal args")
|
|
}
|
|
job := &fleet.Job{
|
|
Name: name,
|
|
Args: (*json.RawMessage)(&argsJSON),
|
|
State: fleet.JobStateQueued,
|
|
}
|
|
|
|
return ds.NewJob(ctx, job)
|
|
}
|
|
|
|
// ProcessJobs processes all queued jobs.
|
|
func (w *Worker) ProcessJobs(ctx context.Context) error {
|
|
const maxNumJobs = 100
|
|
|
|
// process jobs until there are none left or the context is cancelled
|
|
seen := make(map[uint]struct{})
|
|
for {
|
|
jobs, err := w.ds.GetQueuedJobs(ctx, maxNumJobs)
|
|
if err != nil {
|
|
return ctxerr.Wrap(ctx, err, "get queued jobs")
|
|
}
|
|
|
|
if len(jobs) == 0 {
|
|
break
|
|
}
|
|
|
|
for _, job := range jobs {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctxerr.Wrap(ctx, ctx.Err(), "context done")
|
|
default:
|
|
}
|
|
|
|
log := kitlog.With(w.log, "job_id", job.ID)
|
|
|
|
if _, ok := seen[job.ID]; ok {
|
|
level.Debug(log).Log("msg", "some jobs failed, retrying on next cron execution")
|
|
return nil
|
|
}
|
|
seen[job.ID] = struct{}{}
|
|
|
|
level.Debug(log).Log("msg", "processing job")
|
|
|
|
if err := w.processJob(ctx, job); err != nil {
|
|
level.Error(log).Log("msg", "process job", "err", err)
|
|
job.Error = err.Error()
|
|
if job.Retries < maxRetries {
|
|
level.Debug(log).Log("msg", "will retry job")
|
|
job.Retries += 1
|
|
} else {
|
|
job.State = fleet.JobStateFailure
|
|
}
|
|
} else {
|
|
job.State = fleet.JobStateSuccess
|
|
job.Error = ""
|
|
}
|
|
|
|
// When we update the job, the updated_at timestamp gets updated and the job gets "pushed" to the back
|
|
// of queue. GetQueuedJobs fetches jobs by updated_at, so it will not return the same job until the queue
|
|
// has been processed once.
|
|
if _, err := w.ds.UpdateJob(ctx, job.ID, job); err != nil {
|
|
level.Error(log).Log("update job", "err", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (w *Worker) processJob(ctx context.Context, job *fleet.Job) error {
|
|
j, ok := w.registry[job.Name]
|
|
if !ok {
|
|
return ctxerr.Errorf(ctx, "unknown job: %s", job.Name)
|
|
}
|
|
|
|
var args json.RawMessage
|
|
if job.Args != nil {
|
|
args = *job.Args
|
|
}
|
|
|
|
return j.Run(ctx, args)
|
|
}
|
|
|
|
type failingPoliciesTplArgs struct {
|
|
FleetURL string
|
|
PolicyID uint
|
|
PolicyName string
|
|
PolicyCritical bool
|
|
TeamID *uint
|
|
Hosts []fleet.PolicySetHost
|
|
}
|
|
|
|
func newFailingPoliciesTplArgs(fleetURL string, args *failingPolicyArgs) *failingPoliciesTplArgs {
|
|
return &failingPoliciesTplArgs{
|
|
FleetURL: fleetURL,
|
|
PolicyName: args.PolicyName,
|
|
PolicyID: args.PolicyID,
|
|
PolicyCritical: args.PolicyCritical,
|
|
TeamID: args.TeamID,
|
|
Hosts: args.Hosts,
|
|
}
|
|
}
|