Fix concurrency bug in calendar cron (#17832)

#17441
This commit is contained in:
Lucas Manuel Rodriguez 2024-03-25 15:15:13 -03:00 committed by Victor Lyuboslavsky
parent 9090d8541f
commit 51cd71f464
No known key found for this signature in database
3 changed files with 504 additions and 35 deletions

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"slices"
"sync"
"time"
"github.com/fleetdm/fleet/v4/ee/server/calendar"
@ -14,7 +15,6 @@ import (
"github.com/go-kit/log"
kitlog "github.com/go-kit/log"
"github.com/go-kit/log/level"
"golang.org/x/sync/errgroup"
)
func newCalendarSchedule(
@ -162,16 +162,18 @@ func cronCalendarEventsForTeam(
// We execute this first to remove any calendar events for a user that is now passing
// policies on one of its hosts, and possibly create a new calendar event if they have
// another failing host on the same team.
if err := removeCalendarEventsFromPassingHosts(ctx, ds, calendarConfig, passingHosts, logger); err != nil {
level.Info(logger).Log("msg", "removing calendar events from passing hosts", "err", err)
}
start := time.Now()
removeCalendarEventsFromPassingHosts(ctx, ds, calendarConfig, passingHosts, logger)
level.Debug(logger).Log(
"msg", "passing_hosts", "took", time.Since(start),
)
// Process hosts that are failing calendar policies.
if err := processCalendarFailingHosts(
ctx, ds, calendarConfig, orgName, failingHosts, logger,
); err != nil {
level.Info(logger).Log("msg", "processing failing hosts", "err", err)
}
start = time.Now()
processCalendarFailingHosts(ctx, ds, calendarConfig, orgName, failingHosts, logger)
level.Debug(logger).Log(
"msg", "failing_hosts", "took", time.Since(start),
)
// At last we want to log the hosts that are failing and don't have an associated email.
logHostsWithoutAssociatedEmail(
@ -190,15 +192,18 @@ func processCalendarFailingHosts(
orgName string,
hosts []fleet.HostPolicyMembershipData,
logger kitlog.Logger,
) error {
) {
hosts = filterHostsWithSameEmail(hosts)
const consumers = 20
hostsCh := make(chan fleet.HostPolicyMembershipData)
g, ctx := errgroup.WithContext(ctx)
var wg sync.WaitGroup
for i := 0; i < consumers; i++ {
g.Go(func() error {
wg.Add(+1)
go func() {
defer wg.Done()
for host := range hostsCh {
logger := log.With(logger, "host_id", host.HostID)
@ -230,7 +235,8 @@ func processCalendarFailingHosts(
userCalendar := createUserCalendarFromConfig(ctx, calendarConfig, logger)
if err := userCalendar.Configure(host.Email); err != nil {
return fmt.Errorf("configure user calendar: %w", err)
level.Error(logger).Log("msg", "configure user calendar", "err", err)
continue // continue with next host
}
switch {
@ -249,11 +255,11 @@ func processCalendarFailingHosts(
continue // continue with next host
}
default:
return fmt.Errorf("get calendar event: %w", err)
level.Error(logger).Log("msg", "get calendar event from db", "err", err)
continue // continue with next host
}
}
return nil
})
}()
}
for _, host := range hosts {
@ -261,7 +267,7 @@ func processCalendarFailingHosts(
}
close(hostsCh)
return g.Wait()
wg.Wait()
}
func filterHostsWithSameEmail(hosts []fleet.HostPolicyMembershipData) []fleet.HostPolicyMembershipData {
@ -472,7 +478,7 @@ func removeCalendarEventsFromPassingHosts(
calendarConfig *fleet.GoogleCalendarIntegration,
hosts []fleet.HostPolicyMembershipData,
logger kitlog.Logger,
) error {
) {
hostIDsByEmail := make(map[string][]uint)
for _, host := range hosts {
hostIDsByEmail[host.Email] = append(hostIDsByEmail[host.Email], host.HostID)
@ -491,10 +497,13 @@ func removeCalendarEventsFromPassingHosts(
const consumers = 20
emailsCh := make(chan emailWithHosts)
g, ctx := errgroup.WithContext(ctx)
var wg sync.WaitGroup
for i := 0; i < consumers; i++ {
g.Go(func() error {
wg.Add(+1)
go func() {
defer wg.Done()
for email := range emailsCh {
hostCalendarEvent, calendarEvent, err := ds.GetHostCalendarEventByEmail(ctx, email.email)
@ -507,15 +516,16 @@ func removeCalendarEventsFromPassingHosts(
case fleet.IsNotFound(err):
continue
default:
return fmt.Errorf("get calendar event from DB: %w", err)
level.Error(logger).Log("msg", "get calendar event from DB", "err", err)
continue
}
userCalendar := createUserCalendarFromConfig(ctx, calendarConfig, logger)
if err := deleteCalendarEvent(ctx, ds, userCalendar, calendarEvent); err != nil {
return fmt.Errorf("delete user calendar event: %w", err)
level.Error(logger).Log("msg", "delete user calendar event", "err", err)
continue
}
}
return nil
})
}()
}
for _, emailWithHostIDs := range emails {
@ -523,7 +533,7 @@ func removeCalendarEventsFromPassingHosts(
}
close(emailsCh)
return g.Wait()
wg.Wait()
}
func logHostsWithoutAssociatedEmail(

View File

@ -2,12 +2,21 @@ package main
import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/fleetdm/fleet/v4/ee/server/calendar"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/fleetdm/fleet/v4/server/mock"
kitlog "github.com/go-kit/log"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
)
@ -188,5 +197,441 @@ func TestEventForDifferentHost(t *testing.T) {
err := cronCalendarEvents(ctx, ds, logger)
require.NoError(t, err)
}
func TestCalendarEventsMultipleHosts(t *testing.T) {
ds := new(mock.Store)
ctx := context.Background()
logger := kitlog.With(kitlog.NewLogfmtLogger(os.Stdout))
t.Cleanup(func() {
calendar.ClearMockEvents()
})
// TODO(lucas): Test!
webhookServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "POST", r.Method)
requestBodyBytes, err := io.ReadAll(r.Body)
require.NoError(t, err)
t.Logf("webhook request: %s\n", requestBodyBytes)
}))
t.Cleanup(func() {
webhookServer.Close()
})
ds.AppConfigFunc = func(ctx context.Context) (*fleet.AppConfig, error) {
return &fleet.AppConfig{
Integrations: fleet.Integrations{
GoogleCalendar: []*fleet.GoogleCalendarIntegration{
{
Domain: "example.com",
ApiKey: map[string]string{
fleet.GoogleCalendarEmail: "calendar-mock@example.com",
},
},
},
},
}, nil
}
teamID1 := uint(1)
ds.ListTeamsFunc = func(ctx context.Context, filter fleet.TeamFilter, opt fleet.ListOptions) ([]*fleet.Team, error) {
return []*fleet.Team{
{
ID: teamID1,
Config: fleet.TeamConfig{
Integrations: fleet.TeamIntegrations{
GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{
Enable: true,
WebhookURL: webhookServer.URL,
},
},
},
},
}, nil
}
policyID1 := uint(10)
policyID2 := uint(11)
ds.GetCalendarPoliciesFunc = func(ctx context.Context, teamID uint) ([]fleet.PolicyCalendarData, error) {
require.Equal(t, teamID1, teamID)
return []fleet.PolicyCalendarData{
{
ID: policyID1,
Name: "Policy 1",
},
{
ID: policyID2,
Name: "Policy 2",
},
}, nil
}
hostID1, userEmail1 := uint(100), "user1@example.com"
hostID2, userEmail2 := uint(101), "user2@example.com"
hostID3, userEmail3 := uint(102), "user3@other.com"
hostID4, userEmail4 := uint(103), "user4@other.com"
ds.GetTeamHostsPolicyMembershipsFunc = func(
ctx context.Context, domain string, teamID uint, policyIDs []uint,
) ([]fleet.HostPolicyMembershipData, error) {
require.Equal(t, teamID1, teamID)
require.Equal(t, []uint{policyID1, policyID2}, policyIDs)
return []fleet.HostPolicyMembershipData{
{
HostID: hostID1,
Email: userEmail1,
Passing: false,
},
{
HostID: hostID2,
Email: userEmail2,
Passing: true,
},
{
HostID: hostID3,
Email: userEmail3,
Passing: false,
},
{
HostID: hostID4,
Email: userEmail4,
Passing: true,
},
}, nil
}
ds.GetHostCalendarEventByEmailFunc = func(ctx context.Context, email string) (*fleet.HostCalendarEvent, *fleet.CalendarEvent, error) {
return nil, nil, notFoundErr{}
}
ds.CreateOrUpdateCalendarEventFunc = func(ctx context.Context,
email string,
startTime, endTime time.Time,
data []byte,
hostID uint,
webhookStatus fleet.CalendarWebhookStatus,
) (*fleet.CalendarEvent, error) {
switch email {
case userEmail1:
require.Equal(t, hostID1, hostID)
case userEmail2:
require.Equal(t, hostID2, hostID)
case userEmail3:
require.Equal(t, hostID3, hostID)
case userEmail4:
require.Equal(t, hostID4, hostID)
}
require.Equal(t, fleet.CalendarWebhookStatusNone, webhookStatus)
require.NotEmpty(t, data)
require.NotZero(t, startTime)
require.NotZero(t, endTime)
// Currently, the returned calendar event is unused.
return nil, nil
}
err := cronCalendarEvents(ctx, ds, logger)
require.NoError(t, err)
}
type notFoundErr struct{}
func (n notFoundErr) IsNotFound() bool {
return true
}
func (n notFoundErr) Error() string {
return "not found"
}
func TestCalendarEvents1KHosts(t *testing.T) {
ds := new(mock.Store)
ctx := context.Background()
var logger kitlog.Logger
if os.Getenv("CALENDAR_TEST_LOGGING") != "" {
logger = kitlog.With(kitlog.NewLogfmtLogger(os.Stdout))
} else {
logger = kitlog.NewNopLogger()
}
t.Cleanup(func() {
calendar.ClearMockEvents()
})
// TODO(lucas): Use for the test.
webhookServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, "POST", r.Method)
requestBodyBytes, err := io.ReadAll(r.Body)
require.NoError(t, err)
t.Logf("webhook request: %s\n", requestBodyBytes)
}))
t.Cleanup(func() {
webhookServer.Close()
})
ds.AppConfigFunc = func(ctx context.Context) (*fleet.AppConfig, error) {
return &fleet.AppConfig{
Integrations: fleet.Integrations{
GoogleCalendar: []*fleet.GoogleCalendarIntegration{
{
Domain: "example.com",
ApiKey: map[string]string{
fleet.GoogleCalendarEmail: "calendar-mock@example.com",
},
},
},
},
}, nil
}
teamID1 := uint(1)
teamID2 := uint(2)
teamID3 := uint(3)
teamID4 := uint(4)
teamID5 := uint(5)
ds.ListTeamsFunc = func(ctx context.Context, filter fleet.TeamFilter, opt fleet.ListOptions) ([]*fleet.Team, error) {
return []*fleet.Team{
{
ID: teamID1,
Config: fleet.TeamConfig{
Integrations: fleet.TeamIntegrations{
GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{
Enable: true,
WebhookURL: webhookServer.URL,
},
},
},
},
{
ID: teamID2,
Config: fleet.TeamConfig{
Integrations: fleet.TeamIntegrations{
GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{
Enable: true,
WebhookURL: webhookServer.URL,
},
},
},
},
{
ID: teamID3,
Config: fleet.TeamConfig{
Integrations: fleet.TeamIntegrations{
GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{
Enable: true,
WebhookURL: webhookServer.URL,
},
},
},
},
{
ID: teamID4,
Config: fleet.TeamConfig{
Integrations: fleet.TeamIntegrations{
GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{
Enable: true,
WebhookURL: webhookServer.URL,
},
},
},
},
{
ID: teamID5,
Config: fleet.TeamConfig{
Integrations: fleet.TeamIntegrations{
GoogleCalendar: &fleet.TeamGoogleCalendarIntegration{
Enable: true,
WebhookURL: webhookServer.URL,
},
},
},
},
}, nil
}
policyID1 := uint(10)
policyID2 := uint(11)
policyID3 := uint(12)
policyID4 := uint(13)
policyID5 := uint(14)
policyID6 := uint(15)
policyID7 := uint(16)
policyID8 := uint(17)
policyID9 := uint(18)
policyID10 := uint(19)
ds.GetCalendarPoliciesFunc = func(ctx context.Context, teamID uint) ([]fleet.PolicyCalendarData, error) {
switch teamID {
case teamID1:
return []fleet.PolicyCalendarData{
{
ID: policyID1,
Name: "Policy 1",
},
{
ID: policyID2,
Name: "Policy 2",
},
}, nil
case teamID2:
return []fleet.PolicyCalendarData{
{
ID: policyID3,
Name: "Policy 3",
},
{
ID: policyID4,
Name: "Policy 4",
},
}, nil
case teamID3:
return []fleet.PolicyCalendarData{
{
ID: policyID5,
Name: "Policy 5",
},
{
ID: policyID6,
Name: "Policy 6",
},
}, nil
case teamID4:
return []fleet.PolicyCalendarData{
{
ID: policyID7,
Name: "Policy 7",
},
{
ID: policyID8,
Name: "Policy 8",
},
}, nil
case teamID5:
return []fleet.PolicyCalendarData{
{
ID: policyID9,
Name: "Policy 9",
},
{
ID: policyID10,
Name: "Policy 10",
},
}, nil
default:
return nil, notFoundErr{}
}
}
hosts := make([]fleet.HostPolicyMembershipData, 0, 1000)
for i := 0; i < 1000; i++ {
hosts = append(hosts, fleet.HostPolicyMembershipData{
Email: fmt.Sprintf("user%d@example.com", i),
Passing: i%2 == 0,
HostID: uint(i),
HostDisplayName: fmt.Sprintf("display_name%d", i),
HostHardwareSerial: fmt.Sprintf("serial%d", i),
})
}
ds.GetTeamHostsPolicyMembershipsFunc = func(
ctx context.Context, domain string, teamID uint, policyIDs []uint,
) ([]fleet.HostPolicyMembershipData, error) {
var start, end int
switch teamID {
case teamID1:
start, end = 0, 200
case teamID2:
start, end = 200, 400
case teamID3:
start, end = 400, 600
case teamID4:
start, end = 600, 800
case teamID5:
start, end = 800, 1000
}
return hosts[start:end], nil
}
ds.GetHostCalendarEventByEmailFunc = func(ctx context.Context, email string) (*fleet.HostCalendarEvent, *fleet.CalendarEvent, error) {
return nil, nil, notFoundErr{}
}
eventsCreated := 0
var eventsCreatedMu sync.Mutex
eventPerHost := make(map[uint]*fleet.CalendarEvent)
ds.CreateOrUpdateCalendarEventFunc = func(ctx context.Context,
email string,
startTime, endTime time.Time,
data []byte,
hostID uint,
webhookStatus fleet.CalendarWebhookStatus,
) (*fleet.CalendarEvent, error) {
require.Equal(t, fmt.Sprintf("user%d@example.com", hostID), email)
eventsCreatedMu.Lock()
eventsCreated += 1
eventPerHost[hostID] = &fleet.CalendarEvent{
ID: hostID,
Email: email,
StartTime: startTime,
EndTime: endTime,
Data: data,
UpdateCreateTimestamps: fleet.UpdateCreateTimestamps{
CreateTimestamp: fleet.CreateTimestamp{
CreatedAt: time.Now(),
},
UpdateTimestamp: fleet.UpdateTimestamp{
UpdatedAt: time.Now(),
},
},
}
eventsCreatedMu.Unlock()
require.Equal(t, fleet.CalendarWebhookStatusNone, webhookStatus)
require.NotEmpty(t, data)
require.NotZero(t, startTime)
require.NotZero(t, endTime)
// Currently, the returned calendar event is unused.
return nil, nil
}
err := cronCalendarEvents(ctx, ds, logger)
require.NoError(t, err)
createdCalendarEvents := calendar.ListGoogleMockEvents()
require.Equal(t, eventsCreated, 500)
require.Len(t, createdCalendarEvents, 500)
hosts = make([]fleet.HostPolicyMembershipData, 0, 1000)
for i := 0; i < 1000; i++ {
hosts = append(hosts, fleet.HostPolicyMembershipData{
Email: fmt.Sprintf("user%d@example.com", i),
Passing: true,
HostID: uint(i),
HostDisplayName: fmt.Sprintf("display_name%d", i),
HostHardwareSerial: fmt.Sprintf("serial%d", i),
})
}
ds.GetHostCalendarEventByEmailFunc = func(ctx context.Context, email string) (*fleet.HostCalendarEvent, *fleet.CalendarEvent, error) {
hostID, err := strconv.Atoi(strings.TrimSuffix(strings.TrimPrefix(email, "user"), "@example.com"))
require.NoError(t, err)
if hostID%2 == 0 {
return nil, nil, notFoundErr{}
}
require.Contains(t, eventPerHost, uint(hostID))
return &fleet.HostCalendarEvent{
ID: uint(hostID),
HostID: uint(hostID),
CalendarEventID: uint(hostID),
WebhookStatus: fleet.CalendarWebhookStatusNone,
}, eventPerHost[uint(hostID)], nil
}
ds.DeleteCalendarEventFunc = func(ctx context.Context, calendarEventID uint) error {
return nil
}
err = cronCalendarEvents(ctx, ds, logger)
require.NoError(t, err)
createdCalendarEvents = calendar.ListGoogleMockEvents()
require.Len(t, createdCalendarEvents, 0)
}

View File

@ -3,23 +3,26 @@ package calendar
import (
"context"
"errors"
kitlog "github.com/go-kit/log"
"google.golang.org/api/calendar/v3"
"google.golang.org/api/googleapi"
"net/http"
"os"
"strconv"
"sync"
"time"
kitlog "github.com/go-kit/log"
"google.golang.org/api/calendar/v3"
"google.golang.org/api/googleapi"
)
type GoogleCalendarMockAPI struct {
logger kitlog.Logger
}
var mockEvents = make(map[string]*calendar.Event)
var mu sync.Mutex
var id uint64
var (
mockEvents = make(map[string]*calendar.Event)
mu sync.Mutex
id uint64
)
const latency = 500 * time.Millisecond
@ -44,6 +47,7 @@ func (lowLevelAPI *GoogleCalendarMockAPI) GetSetting(name string) (*calendar.Set
}
func (lowLevelAPI *GoogleCalendarMockAPI) CreateEvent(event *calendar.Event) (*calendar.Event, error) {
time.Sleep(latency)
mu.Lock()
defer mu.Unlock()
id += 1
@ -79,3 +83,13 @@ func (lowLevelAPI *GoogleCalendarMockAPI) DeleteEvent(id string) error {
delete(mockEvents, id)
return nil
}
func ListGoogleMockEvents() map[string]*calendar.Event {
return mockEvents
}
func ClearMockEvents() {
mu.Lock()
defer mu.Unlock()
mockEvents = make(map[string]*calendar.Event)
}