fleet/server/pubsub/inmem_query_results.go
Zach Wasserman 8723b83039
Fix data race in inmem query results test (#1369)
Missed locking caused a race condition detected with the --race flag:

```
==================
WARNING: DATA RACE
Read at 0x00c0004b2cf0 by goroutine 67:
  runtime.mapaccess2_fast64()
      /usr/local/Cellar/go/1.16.5/libexec/src/runtime/map_fast64.go:52 +0x0
  github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).WriteResult()
      /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:37 +0x84
  github.com/fleetdm/fleet/v4/server/service.(*Service).ingestDistributedQuery()
      /Users/zwass/dev/fleet/server/service/service_osquery.go:1020 +0x258
  github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError()
      /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1499 +0x61c
  testing.tRunner()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202

Previous write at 0x00c0004b2cf0 by goroutine 104:
  runtime.mapassign_fast64()
      /usr/local/Cellar/go/1.16.5/libexec/src/runtime/map_fast64.go:92 +0x0
  github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).getChannel()
      /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:31 +0x1a4
  github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).ReadChannel()
      /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:53 +0x64
  github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError.func1()
      /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1493 +0x117

Goroutine 67 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1238 +0x5d7
  testing.runTests.func1()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1511 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
  testing.runTests()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1509 +0x612
  testing.(*M).Run()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1417 +0x3b3
  main.main()
      _testmain.go:303 +0x236

Goroutine 104 (running) created at:
  github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError()
      /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1492 +0x558
  testing.tRunner()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
==================
==================
WARNING: DATA RACE
Read at 0x00c0000ff2d8 by goroutine 67:
  github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).WriteResult()
      /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:37 +0xa4
  github.com/fleetdm/fleet/v4/server/service.(*Service).ingestDistributedQuery()
      /Users/zwass/dev/fleet/server/service/service_osquery.go:1020 +0x258
  github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError()
      /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1499 +0x61c
  testing.tRunner()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202

Previous write at 0x00c0000ff2d8 by goroutine 104:
  github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).getChannel()
      /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:31 +0x1bc
  github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).ReadChannel()
      /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:53 +0x64
  github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError.func1()
      /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1493 +0x117

Goroutine 67 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1238 +0x5d7
  testing.runTests.func1()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1511 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
  testing.runTests()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1509 +0x612
  testing.(*M).Run()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1417 +0x3b3
  main.main()
      _testmain.go:303 +0x236

Goroutine 104 (running) created at:
  github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError()
      /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1492 +0x558
  testing.tRunner()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
==================
--- FAIL: TestIngestDistributedQueryRecordCompletionError (0.01s)
    service_osquery_test.go:1502: PASS:	QueryCompletedByHost(string,uint)
    testing.go:1092: race detected during execution of test
==================
WARNING: DATA RACE
Read at 0x00c0000f8570 by goroutine 70:
  runtime.mapaccess2_fast64()
      /usr/local/Cellar/go/1.16.5/libexec/src/runtime/map_fast64.go:52 +0x0
  github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).WriteResult()
      /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:37 +0x84
  github.com/fleetdm/fleet/v4/server/service.(*Service).ingestDistributedQuery()
      /Users/zwass/dev/fleet/server/service/service_osquery.go:1020 +0x258
  github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery()
      /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1530 +0x5bc
  testing.tRunner()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202

Previous write at 0x00c0000f8570 by goroutine 71:
  runtime.mapassign_fast64()
      /usr/local/Cellar/go/1.16.5/libexec/src/runtime/map_fast64.go:92 +0x0
  github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).getChannel()
      /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:31 +0x1a4
  github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).ReadChannel()
      /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:53 +0x64
  github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery.func1()
      /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1524 +0x117

Goroutine 70 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1238 +0x5d7
  testing.runTests.func1()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1511 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
  testing.runTests()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1509 +0x612
  testing.(*M).Run()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1417 +0x3b3
  main.main()
      _testmain.go:303 +0x236

Goroutine 71 (running) created at:
  github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery()
      /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1523 +0x4f4
  testing.tRunner()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
==================
==================
WARNING: DATA RACE
Read at 0x00c000c480d8 by goroutine 70:
  github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).WriteResult()
      /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:37 +0xa4
  github.com/fleetdm/fleet/v4/server/service.(*Service).ingestDistributedQuery()
      /Users/zwass/dev/fleet/server/service/service_osquery.go:1020 +0x258
  github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery()
      /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1530 +0x5bc
  testing.tRunner()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202

Previous write at 0x00c000c480d8 by goroutine 71:
  github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).getChannel()
      /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:31 +0x1bc
  github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).ReadChannel()
      /Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:53 +0x64
  github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery.func1()
      /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1524 +0x117

Goroutine 70 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1238 +0x5d7
  testing.runTests.func1()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1511 +0xa6
  testing.tRunner()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
  testing.runTests()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1509 +0x612
  testing.(*M).Run()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1417 +0x3b3
  main.main()
      _testmain.go:303 +0x236

Goroutine 71 (running) created at:
  github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery()
      /Users/zwass/dev/fleet/server/service/service_osquery_test.go:1523 +0x4f4
  testing.tRunner()
      /usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
==================
--- FAIL: TestIngestDistributedQuery (0.01s)
    service_osquery_test.go:1532: PASS:	QueryCompletedByHost(string,uint)
    testing.go:1092: race detected during execution of test
FAIL
FAIL	github.com/fleetdm/fleet/v4/server/service	42.743s
```
2021-07-13 12:27:41 -07:00

64 lines
1.4 KiB
Go

package pubsub
import (
"context"
"strconv"
"sync"
"github.com/fleetdm/fleet/v4/server/fleet"
)
type inmemQueryResults struct {
resultChannels map[uint]chan interface{}
channelMutex sync.Mutex
}
var _ fleet.QueryResultStore = &inmemQueryResults{}
// NewInmemQueryResults initializes a new in-memory implementation of the
// QueryResultStore interface.
func NewInmemQueryResults() *inmemQueryResults {
return &inmemQueryResults{resultChannels: map[uint]chan interface{}{}}
}
func (im *inmemQueryResults) getChannel(id uint) chan interface{} {
im.channelMutex.Lock()
defer im.channelMutex.Unlock()
channel, ok := im.resultChannels[id]
if !ok {
channel = make(chan interface{})
im.resultChannels[id] = channel
}
return channel
}
func (im *inmemQueryResults) WriteResult(result fleet.DistributedQueryResult) error {
channel := im.getChannel(result.DistributedQueryCampaignID)
select {
case channel <- result:
// intentionally do nothing
default:
return noSubscriberError{strconv.Itoa(int(result.DistributedQueryCampaignID))}
}
return nil
}
func (im *inmemQueryResults) ReadChannel(ctx context.Context, campaign fleet.DistributedQueryCampaign) (<-chan interface{}, error) {
channel := im.getChannel(campaign.ID)
go func() {
<-ctx.Done()
close(channel)
im.channelMutex.Lock()
delete(im.resultChannels, campaign.ID)
im.channelMutex.Unlock()
}()
return channel, nil
}
func (im *inmemQueryResults) HealthCheck() error {
return nil
}