mirror of
https://github.com/empayre/fleet.git
synced 2024-11-06 08:55:24 +00:00
Add AWS S3 as file carving backend (#126)
This adds the option to set up an S3 bucket as the storage backend for file carving (partially solving #111). It works by using the multipart upload capabilities of S3 to maintain compatibility with the "upload in blocks" protocol that osquery uses. It does this basically replacing the carve_blocks table while still maintaining the metadata in the original place (it would probably be possible to rely completely on S3 by using object tagging at the cost of listing performance). To make this pluggable, I created a new field in the service struct dedicated to the CarveStore which, if no configuration for S3 is set up will be just a reference to the standard datastore, otherwise it will point to the S3 one (effectively this separation will allow in the future to add more backends).
This commit is contained in:
parent
b77c8883d6
commit
c89cd370d5
@ -18,6 +18,7 @@ import (
|
||||
"github.com/e-dard/netbug"
|
||||
"github.com/fleetdm/fleet/server/config"
|
||||
"github.com/fleetdm/fleet/server/datastore/mysql"
|
||||
"github.com/fleetdm/fleet/server/datastore/s3"
|
||||
"github.com/fleetdm/fleet/server/health"
|
||||
"github.com/fleetdm/fleet/server/kolide"
|
||||
"github.com/fleetdm/fleet/server/launcher"
|
||||
@ -118,6 +119,7 @@ the way that the Fleet server works.
|
||||
}
|
||||
|
||||
var ds kolide.Datastore
|
||||
var carveStore kolide.CarveStore
|
||||
var err error
|
||||
mailService := mail.NewService()
|
||||
|
||||
@ -125,6 +127,14 @@ the way that the Fleet server works.
|
||||
if err != nil {
|
||||
initFatal(err, "initializing datastore")
|
||||
}
|
||||
if config.S3.Bucket != "" {
|
||||
carveStore, err = s3.New(config.S3, ds)
|
||||
if err != nil {
|
||||
initFatal(err, "initializing S3 carvestore")
|
||||
}
|
||||
} else {
|
||||
carveStore = ds
|
||||
}
|
||||
|
||||
migrationStatus, err := ds.MigrationStatus()
|
||||
if err != nil {
|
||||
@ -181,7 +191,7 @@ the way that the Fleet server works.
|
||||
liveQueryStore := live_query.NewRedisLiveQuery(redisPool)
|
||||
ssoSessionStore := sso.NewSessionStore(redisPool)
|
||||
|
||||
svc, err := service.NewService(ds, resultStore, logger, config, mailService, clock.C, ssoSessionStore, liveQueryStore)
|
||||
svc, err := service.NewService(ds, resultStore, logger, config, mailService, clock.C, ssoSessionStore, liveQueryStore, carveStore)
|
||||
if err != nil {
|
||||
initFatal(err, "initializing service")
|
||||
}
|
||||
|
@ -936,7 +936,7 @@ the stream listed:
|
||||
|
||||
#### PubSub
|
||||
|
||||
### `pubsub_project`
|
||||
##### `pubsub_project`
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` is set to `pubsub`.
|
||||
|
||||
@ -955,7 +955,7 @@ for authentication with the service.
|
||||
project: my-gcp-project
|
||||
```
|
||||
|
||||
### `pubsub_result_topic`
|
||||
##### `pubsub_result_topic`
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` is set to `pubsub`.
|
||||
|
||||
@ -970,7 +970,7 @@ The identifier of the pubsub topic that client results will be published to.
|
||||
result_topic: osquery_result
|
||||
```
|
||||
|
||||
### `pubsub_status_topic`
|
||||
##### `pubsub_status_topic`
|
||||
|
||||
This flag only has effect if `osquery_status_log_plugin` is set to `pubsub`.
|
||||
|
||||
@ -984,3 +984,78 @@ The identifier of the pubsub topic that osquery status logs will be published to
|
||||
pubsub:
|
||||
status_topic: osquery_status
|
||||
```
|
||||
|
||||
|
||||
#### S3 file carving backend
|
||||
|
||||
##### `s3_bucket`
|
||||
|
||||
Name of the S3 bucket to use to store file carves.
|
||||
|
||||
- Default value: none
|
||||
- Environment variable: `KOLIDE_S3_BUCKET`
|
||||
- Config file format:
|
||||
|
||||
```
|
||||
s3:
|
||||
bucket: some-carve-bucket
|
||||
```
|
||||
|
||||
##### `s3_prefix`
|
||||
|
||||
Prefix to prepend to carve objects.
|
||||
|
||||
All carve objects will also be prefixed by date and hour (UTC), making the resulting keys look like: `<prefix><year>/<month>/<day>/<hour>/<carve-name>`.
|
||||
|
||||
- Default value: none
|
||||
- Environment variable: `KOLIDE_S3_PREFIX`
|
||||
- Config file format:
|
||||
|
||||
```
|
||||
s3:
|
||||
prefix: carves-go-here/
|
||||
```
|
||||
|
||||
##### `s3_access_key_id`
|
||||
|
||||
AWS access key ID to use for S3 authentication.
|
||||
|
||||
If `s3_access_key_id` and `s3_secret_access_key` are omitted, Fleet will try to use
|
||||
[the default credential provider chain](https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html#specifying-credentials).
|
||||
|
||||
The IAM identity used in this context must be allowed to perform the following actions on the bucket: `s3:PutObject`, `s3:GetObject`, `s3:ListMultipartUploadParts`, `s3:ListBucket`, `s3:GetBucketLocation`.
|
||||
|
||||
- Default value: none
|
||||
- Environment variable: `KOLIDE_S3_ACCESS_KEY_ID`
|
||||
- Config file format:
|
||||
|
||||
```
|
||||
s3:
|
||||
access_key_id: AKIAIOSFODNN7EXAMPLE
|
||||
```
|
||||
|
||||
##### `s3_secret_access_key`
|
||||
|
||||
AWS secret access key to use for S3 authentication.
|
||||
|
||||
- Default value: none
|
||||
- Environment variable: `KOLIDE_S3_SECRET_ACCESS_KEY`
|
||||
- Config file format:
|
||||
|
||||
```
|
||||
s3:
|
||||
secret_access_key: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
|
||||
```
|
||||
|
||||
##### `s3_sts_assume_role_arn`
|
||||
|
||||
AWS STS role ARN to use for S3 authentication.
|
||||
|
||||
- Default value: none
|
||||
- Environment variable: `KOLIDE_S3_STS_ASSUME_ROLE_ARN`
|
||||
- Config file format:
|
||||
|
||||
```
|
||||
s3:
|
||||
sts_assume_role_arn: arn:aws:iam::1234567890:role/some-s3-role
|
||||
```
|
||||
|
@ -2,6 +2,8 @@
|
||||
|
||||
Fleet supports osquery's file carving functionality as of Fleet 3.3.0. This allows the Fleet server to request files (and sets of files) from osquery agents, returning the full contents to Fleet.
|
||||
|
||||
File carving data can be either stored in Fleet's database or to an external S3 bucket. For information on how to configure the latter, consult the [configuration docs](https://github.com/fleetdm/fleet/blob/master/docs/infrastructure/configuring-the-fleet-binary.md#s3-file-carving-backend).
|
||||
|
||||
## Configuration
|
||||
|
||||
Given a working flagfile for connecting osquery agents to Fleet, add the following flags to enable carving:
|
||||
@ -21,6 +23,8 @@ The `carver_block_size` flag should be configured in osquery. 2MB (`2000000`) is
|
||||
|
||||
The configured value must be less than the value of `max_allowed_packet` in the MySQL connection, allowing for some overhead. The default for MySQL 5.7 is 4MB and for MySQL 8 it is 64MB.
|
||||
|
||||
In case S3 is used as the storage backend, this value must be instead set to be at least 5MB due to the [constraints of S3's multipart uploads](https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html).
|
||||
|
||||
Using a smaller value for `carver_block_size` will lead to more HTTP requests during the carving process, resulting in longer carve times and higher load on the Fleet server. If the value is too high, HTTP requests may run long enough to cause server timeouts.
|
||||
|
||||
### Compression
|
||||
@ -64,7 +68,9 @@ fleetctl get carve 3 --stdout | tar -x
|
||||
|
||||
### Expiration
|
||||
|
||||
Carve contents remain available for 24 hours after the first data is provided from the osquery client. After this time, the carve contents are cleaned from the database and the carve is marked as "expired".
|
||||
Carve contents remain available for 24 hours after the first data is provided from the osquery client. After this time, the carve contents are cleaned from the database and the carve is marked as "expired".
|
||||
|
||||
The same is not true if S3 is used as the storage backend. In that scenario, it is suggested to setup a [bucket lifecycle configuration](https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lifecycle-mgmt.html) to avoid retaining data in excess. Fleet, in an "eventual consistent" manner (i.e. by periodically performing comparisons), will keep the metadata relative to the files carves in sync with what it is actually available in the bucket.
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
|
@ -114,6 +114,15 @@ type KinesisConfig struct {
|
||||
ResultStream string `yaml:"result_stream"`
|
||||
}
|
||||
|
||||
// S3Config defines config to enable file carving storage to an S3 bucket
|
||||
type S3Config struct {
|
||||
Bucket string
|
||||
Prefix string
|
||||
AccessKeyID string `yaml:"access_key_id"`
|
||||
SecretAccessKey string `yaml:"secret_access_key"`
|
||||
StsAssumeRoleArn string `yaml:"sts_assume_role_arn"`
|
||||
}
|
||||
|
||||
// PubSubConfig defines configs the for Google PubSub logging plugin
|
||||
type PubSubConfig struct {
|
||||
Project string
|
||||
@ -144,6 +153,7 @@ type KolideConfig struct {
|
||||
Logging LoggingConfig
|
||||
Firehose FirehoseConfig
|
||||
Kinesis KinesisConfig
|
||||
S3 S3Config
|
||||
PubSub PubSubConfig
|
||||
Filesystem FilesystemConfig
|
||||
}
|
||||
@ -270,6 +280,13 @@ func (man Manager) addConfigs() {
|
||||
man.addConfigString("kinesis.result_stream", "",
|
||||
"Kinesis stream name for result logs")
|
||||
|
||||
// S3 for file carving
|
||||
man.addConfigString("s3.bucket", "", "Bucket where to store file carves")
|
||||
man.addConfigString("s3.prefix", "", "Prefix under which carves are stored")
|
||||
man.addConfigString("s3.access_key_id", "", "Access Key ID for AWS authentication")
|
||||
man.addConfigString("s3.secret_access_key", "", "Secret Access Key for AWS authentication")
|
||||
man.addConfigString("s3.sts_assume_role_arn", "", "ARN of role to assume for AWS")
|
||||
|
||||
// PubSub
|
||||
man.addConfigString("pubsub.project", "", "Google Cloud Project to use")
|
||||
man.addConfigString("pubsub.status_topic", "", "PubSub topic for status logs")
|
||||
@ -365,6 +382,13 @@ func (man Manager) LoadConfig() KolideConfig {
|
||||
ResultStream: man.getConfigString("kinesis.result_stream"),
|
||||
StsAssumeRoleArn: man.getConfigString("kinesis.sts_assume_role_arn"),
|
||||
},
|
||||
S3: S3Config{
|
||||
Bucket: man.getConfigString("s3.bucket"),
|
||||
Prefix: man.getConfigString("s3.prefix"),
|
||||
AccessKeyID: man.getConfigString("s3.access_key_id"),
|
||||
SecretAccessKey: man.getConfigString("s3.secret_access_key"),
|
||||
StsAssumeRoleArn: man.getConfigString("s3.sts_assume_role_arn"),
|
||||
},
|
||||
PubSub: PubSubConfig{
|
||||
Project: man.getConfigString("pubsub.project"),
|
||||
StatusTopic: man.getConfigString("pubsub.status_topic"),
|
||||
|
@ -11,6 +11,8 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var mockCreatedAt time.Time = time.Now().UTC().Truncate(time.Second)
|
||||
|
||||
func testCarveMetadata(t *testing.T, ds kolide.Datastore) {
|
||||
h := test.NewHost(t, ds, "foo.local", "192.168.1.10", "1", "1", time.Now())
|
||||
|
||||
@ -23,6 +25,7 @@ func testCarveMetadata(t *testing.T, ds kolide.Datastore) {
|
||||
CarveId: "carve_id",
|
||||
RequestId: "request_id",
|
||||
SessionId: "session_id",
|
||||
CreatedAt: mockCreatedAt,
|
||||
}
|
||||
|
||||
expectedCarve, err := ds.NewCarve(expectedCarve)
|
||||
@ -32,17 +35,15 @@ func testCarveMetadata(t *testing.T, ds kolide.Datastore) {
|
||||
|
||||
carve, err := ds.CarveBySessionId(expectedCarve.SessionId)
|
||||
require.NoError(t, err)
|
||||
expectedCarve.CreatedAt = carve.CreatedAt // Ignore created_at field
|
||||
assert.Equal(t, expectedCarve, carve)
|
||||
|
||||
carve, err = ds.Carve(expectedCarve.ID)
|
||||
expectedCarve.CreatedAt = carve.CreatedAt // Ignore created_at field
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, expectedCarve, carve)
|
||||
|
||||
// Check for increment of max block
|
||||
|
||||
err = ds.NewBlock(carve.ID, 0, nil)
|
||||
err = ds.NewBlock(carve, 0, nil)
|
||||
require.NoError(t, err)
|
||||
expectedCarve.MaxBlock = 0
|
||||
|
||||
@ -56,7 +57,7 @@ func testCarveMetadata(t *testing.T, ds kolide.Datastore) {
|
||||
|
||||
// Check for increment of max block
|
||||
|
||||
err = ds.NewBlock(carve.ID, 1, nil)
|
||||
err = ds.NewBlock(carve, 1, nil)
|
||||
require.NoError(t, err)
|
||||
expectedCarve.MaxBlock = 1
|
||||
|
||||
@ -84,6 +85,7 @@ func testCarveBlocks(t *testing.T, ds kolide.Datastore) {
|
||||
CarveId: "carve_id",
|
||||
RequestId: "request_id",
|
||||
SessionId: "session_id",
|
||||
CreatedAt: mockCreatedAt,
|
||||
}
|
||||
|
||||
carve, err := ds.NewCarve(carve)
|
||||
@ -97,13 +99,13 @@ func testCarveBlocks(t *testing.T, ds kolide.Datastore) {
|
||||
require.NoError(t, err, "generate block")
|
||||
expectedBlocks[i] = block
|
||||
|
||||
err = ds.NewBlock(carve.ID, i, block)
|
||||
err = ds.NewBlock(carve, i, block)
|
||||
require.NoError(t, err, "write block %v", block)
|
||||
}
|
||||
|
||||
// Verify retrieved blocks match inserted blocks
|
||||
for i := int64(0); i < blockCount; i++ {
|
||||
data, err := ds.GetBlock(carve.ID, i)
|
||||
data, err := ds.GetBlock(carve, i)
|
||||
require.NoError(t, err, "get block %d %v", i, expectedBlocks[i])
|
||||
assert.Equal(t, expectedBlocks[i], data)
|
||||
}
|
||||
@ -124,6 +126,7 @@ func testCarveCleanupCarves(t *testing.T, ds kolide.Datastore) {
|
||||
CarveId: "carve_id",
|
||||
RequestId: "request_id",
|
||||
SessionId: "session_id",
|
||||
CreatedAt: mockCreatedAt,
|
||||
}
|
||||
|
||||
carve, err := ds.NewCarve(carve)
|
||||
@ -137,7 +140,7 @@ func testCarveCleanupCarves(t *testing.T, ds kolide.Datastore) {
|
||||
require.NoError(t, err, "generate block")
|
||||
expectedBlocks[i] = block
|
||||
|
||||
err = ds.NewBlock(carve.ID, i, block)
|
||||
err = ds.NewBlock(carve, i, block)
|
||||
require.NoError(t, err, "write block %v", block)
|
||||
}
|
||||
|
||||
@ -145,7 +148,7 @@ func testCarveCleanupCarves(t *testing.T, ds kolide.Datastore) {
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 0, expired)
|
||||
|
||||
_, err = ds.GetBlock(carve.ID, 0)
|
||||
_, err = ds.GetBlock(carve, 0)
|
||||
require.NoError(t, err)
|
||||
|
||||
expired, err = ds.CleanupCarves(time.Now().Add(24 * time.Hour))
|
||||
@ -153,7 +156,7 @@ func testCarveCleanupCarves(t *testing.T, ds kolide.Datastore) {
|
||||
assert.Equal(t, 1, expired)
|
||||
|
||||
// Should no longer be able to get data
|
||||
_, err = ds.GetBlock(carve.ID, 0)
|
||||
_, err = ds.GetBlock(carve, 0)
|
||||
require.Error(t, err, "data should be expired")
|
||||
|
||||
carve, err = ds.Carve(carve.ID)
|
||||
@ -161,7 +164,6 @@ func testCarveCleanupCarves(t *testing.T, ds kolide.Datastore) {
|
||||
assert.True(t, carve.Expired)
|
||||
}
|
||||
|
||||
|
||||
func testCarveListCarves(t *testing.T, ds kolide.Datastore) {
|
||||
h := test.NewHost(t, ds, "foo.local", "192.168.1.10", "1", "1", time.Now())
|
||||
|
||||
@ -174,13 +176,15 @@ func testCarveListCarves(t *testing.T, ds kolide.Datastore) {
|
||||
CarveId: "carve_id",
|
||||
RequestId: "request_id",
|
||||
SessionId: "session_id",
|
||||
CreatedAt: mockCreatedAt,
|
||||
MaxBlock: -1,
|
||||
}
|
||||
|
||||
expectedCarve, err := ds.NewCarve(expectedCarve)
|
||||
require.NoError(t, err)
|
||||
assert.NotEqual(t, 0, expectedCarve.ID)
|
||||
// Add a block to this carve
|
||||
err = ds.NewBlock(expectedCarve.ID, 0, nil)
|
||||
err = ds.NewBlock(expectedCarve, 0, nil)
|
||||
require.NoError(t, err)
|
||||
expectedCarve.MaxBlock = 0
|
||||
|
||||
@ -193,6 +197,7 @@ func testCarveListCarves(t *testing.T, ds kolide.Datastore) {
|
||||
CarveId: "carve_id2",
|
||||
RequestId: "request_id2",
|
||||
SessionId: "session_id2",
|
||||
CreatedAt: mockCreatedAt,
|
||||
}
|
||||
|
||||
expectedCarve2, err = ds.NewCarve(expectedCarve2)
|
||||
@ -202,9 +207,6 @@ func testCarveListCarves(t *testing.T, ds kolide.Datastore) {
|
||||
|
||||
carves, err := ds.ListCarves(kolide.CarveListOptions{Expired: true})
|
||||
require.NoError(t, err)
|
||||
// Ignore created_at timestamps
|
||||
expectedCarve.CreatedAt = carves[0].CreatedAt
|
||||
expectedCarve2.CreatedAt = carves[1].CreatedAt
|
||||
assert.Equal(t, []*kolide.CarveMetadata{expectedCarve, expectedCarve2}, carves)
|
||||
|
||||
// Expire the carves
|
||||
@ -219,3 +221,34 @@ func testCarveListCarves(t *testing.T, ds kolide.Datastore) {
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, carves, 2)
|
||||
}
|
||||
|
||||
func testCarveUpdateCarve(t *testing.T, ds kolide.Datastore) {
|
||||
h := test.NewHost(t, ds, "foo.local", "192.168.1.10", "1", "1", time.Now())
|
||||
|
||||
actualCount := int64(10)
|
||||
carve := &kolide.CarveMetadata{
|
||||
HostId: h.ID,
|
||||
Name: "foobar",
|
||||
BlockCount: actualCount,
|
||||
BlockSize: 20,
|
||||
CarveSize: actualCount * 20,
|
||||
CarveId: "carve_id",
|
||||
RequestId: "request_id",
|
||||
SessionId: "session_id",
|
||||
CreatedAt: mockCreatedAt,
|
||||
}
|
||||
|
||||
carve, err := ds.NewCarve(carve)
|
||||
require.NoError(t, err)
|
||||
|
||||
carve.Expired = true
|
||||
carve.MaxBlock = 10
|
||||
carve.BlockCount = 15 // it should not get updated
|
||||
err = ds.UpdateCarve(carve)
|
||||
require.NoError(t, err)
|
||||
|
||||
carve.BlockCount = actualCount
|
||||
dbCarve, err := ds.Carve(carve.ID)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, carve, dbCarve)
|
||||
}
|
||||
|
@ -91,4 +91,5 @@ var testFunctions = [...]func(*testing.T, kolide.Datastore){
|
||||
testCarveBlocks,
|
||||
testCarveListCarves,
|
||||
testCarveCleanupCarves,
|
||||
testCarveUpdateCarve,
|
||||
}
|
||||
|
@ -5,14 +5,15 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/fleetdm/fleet/server/kolide"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func (d *Datastore) NewCarve(metadata *kolide.CarveMetadata) (*kolide.CarveMetadata, error) {
|
||||
stmt := `INSERT INTO carve_metadata (
|
||||
host_id,
|
||||
created_at,
|
||||
name,
|
||||
block_count,
|
||||
block_size,
|
||||
@ -28,12 +29,14 @@ func (d *Datastore) NewCarve(metadata *kolide.CarveMetadata) (*kolide.CarveMetad
|
||||
?,
|
||||
?,
|
||||
?,
|
||||
?,
|
||||
?
|
||||
)`
|
||||
|
||||
result, err := d.db.Exec(
|
||||
stmt,
|
||||
metadata.HostId,
|
||||
metadata.CreatedAt.Format(mySQLTimestampFormat),
|
||||
metadata.Name,
|
||||
metadata.BlockCount,
|
||||
metadata.BlockSize,
|
||||
@ -52,6 +55,27 @@ func (d *Datastore) NewCarve(metadata *kolide.CarveMetadata) (*kolide.CarveMetad
|
||||
return metadata, nil
|
||||
}
|
||||
|
||||
// UpdateCarve updates the carve metadata in database
|
||||
// Only max_block and expired are updatable
|
||||
func (d *Datastore) UpdateCarve(metadata *kolide.CarveMetadata) error {
|
||||
stmt := `
|
||||
UPDATE carve_metadata SET
|
||||
max_block = ?,
|
||||
expired = ?
|
||||
WHERE id = ?
|
||||
`
|
||||
_, err := d.db.Exec(
|
||||
stmt,
|
||||
metadata.MaxBlock,
|
||||
metadata.Expired,
|
||||
metadata.ID,
|
||||
)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "update carve metadata")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Datastore) CleanupCarves(now time.Time) (int, error) {
|
||||
var countExpired int
|
||||
err := d.withRetryTxx(func(tx *sqlx.Tx) error {
|
||||
@ -127,7 +151,7 @@ const carveSelectFields = `
|
||||
request_id,
|
||||
session_id,
|
||||
expired,
|
||||
(SELECT COALESCE(MAX(block_id), -1) FROM carve_blocks WHERE metadata_id = id) AS max_block
|
||||
max_block
|
||||
`
|
||||
|
||||
func (d *Datastore) Carve(carveId int64) (*kolide.CarveMetadata, error) {
|
||||
@ -196,7 +220,7 @@ func (d *Datastore) ListCarves(opt kolide.CarveListOptions) ([]*kolide.CarveMeta
|
||||
return carves, nil
|
||||
}
|
||||
|
||||
func (d *Datastore) NewBlock(metadataId int64, blockId int64, data []byte) error {
|
||||
func (d *Datastore) NewBlock(metadata *kolide.CarveMetadata, blockId int64, data []byte) error {
|
||||
stmt := `
|
||||
INSERT INTO carve_blocks (
|
||||
metadata_id,
|
||||
@ -207,21 +231,29 @@ func (d *Datastore) NewBlock(metadataId int64, blockId int64, data []byte) error
|
||||
?,
|
||||
?
|
||||
)`
|
||||
if _, err := d.db.Exec(stmt, metadataId, blockId, data); err != nil {
|
||||
if _, err := d.db.Exec(stmt, metadata.ID, blockId, data); err != nil {
|
||||
return errors.Wrap(err, "insert carve block")
|
||||
}
|
||||
|
||||
if metadata.MaxBlock < blockId {
|
||||
// Update max_block
|
||||
metadata.MaxBlock = blockId
|
||||
if err := d.UpdateCarve(metadata); err != nil {
|
||||
return errors.Wrap(err, "insert carve block")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Datastore) GetBlock(metadataId int64, blockId int64) ([]byte, error) {
|
||||
func (d *Datastore) GetBlock(metadata *kolide.CarveMetadata, blockId int64) ([]byte, error) {
|
||||
stmt := `
|
||||
SELECT data
|
||||
FROM carve_blocks
|
||||
WHERE metadata_id = ? AND block_id = ?
|
||||
`
|
||||
var data []byte
|
||||
if err := d.db.Get(&data, stmt, metadataId, blockId); err != nil {
|
||||
if err := d.db.Get(&data, stmt, metadata.ID, blockId); err != nil {
|
||||
return nil, errors.Wrap(err, "select data")
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,37 @@
|
||||
package tables
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func init() {
|
||||
MigrationClient.AddMigration(Up_20201215091637, Down_20201215091637)
|
||||
}
|
||||
|
||||
func Up_20201215091637(tx *sql.Tx) error {
|
||||
query := `
|
||||
ALTER TABLE carve_metadata
|
||||
ADD max_block INT DEFAULT -1,
|
||||
MODIFY session_id VARCHAR(255) NOT NULL;
|
||||
`
|
||||
if _, err := tx.Exec(query); err != nil {
|
||||
return errors.Wrap(err, "alter carve session_id size")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func Down_20201215091637(tx *sql.Tx) error {
|
||||
query := `
|
||||
ALTER TABLE carve_metadata
|
||||
DROP max_block,
|
||||
MODIFY session_id VARCHAR(64) NOT NULL;
|
||||
`
|
||||
if _, err := tx.Exec(query); err != nil {
|
||||
return errors.Wrap(err, "revert carve session_id size")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -23,7 +23,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
defaultSelectLimit = 1000000
|
||||
defaultSelectLimit = 1000000
|
||||
mySQLTimestampFormat = "2006-01-02 15:04:05" // %Y/%m/%d %H:%M:%S
|
||||
)
|
||||
|
||||
var (
|
||||
|
237
server/datastore/s3/carves.go
Normal file
237
server/datastore/s3/carves.go
Normal file
@ -0,0 +1,237 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/fleetdm/fleet/server/kolide"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultMaxS3Keys = 1000
|
||||
cleanupSize = 1000
|
||||
// This is Golang's way of formatting timestrings, it's confusing, I know.
|
||||
// If you are used to more conventional timestrings, this is equivalent
|
||||
// to %Y/%m/%d/%H (year/month/day/hour)
|
||||
timePrefixFormat = "2006/01/02/15"
|
||||
)
|
||||
|
||||
// generateS3Key builds S3 key from carve metadata
|
||||
// all keys are prefixed by date so that they can easily be listed chronologically
|
||||
func (d *Datastore) generateS3Key(metadata *kolide.CarveMetadata) string {
|
||||
simpleDateHour := metadata.CreatedAt.Format(timePrefixFormat)
|
||||
return fmt.Sprintf("%s%s/%s", d.prefix, simpleDateHour, metadata.Name)
|
||||
}
|
||||
|
||||
// NewCarve initializes a new file carving session
|
||||
func (d *Datastore) NewCarve(metadata *kolide.CarveMetadata) (*kolide.CarveMetadata, error) {
|
||||
objectKey := d.generateS3Key(metadata)
|
||||
res, err := d.s3client.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
|
||||
Bucket: &d.bucket,
|
||||
Key: &objectKey,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "s3 multipart carve create")
|
||||
}
|
||||
metadata.SessionId = *res.UploadId
|
||||
return d.metadatadb.NewCarve(metadata)
|
||||
}
|
||||
|
||||
// UpdateCarve updates carve definition in database
|
||||
// Only max_block and expired are updatable
|
||||
func (d *Datastore) UpdateCarve(metadata *kolide.CarveMetadata) error {
|
||||
return d.metadatadb.UpdateCarve(metadata)
|
||||
}
|
||||
|
||||
// listS3Carves lists all keys up to a given one or if the passed max number
|
||||
// of keys has been reached; keys are returned in a set-like map
|
||||
func (d *Datastore) listS3Carves(lastPrefix string, maxKeys int) (map[string]bool, error) {
|
||||
var err error
|
||||
var continuationToken string
|
||||
result := make(map[string]bool)
|
||||
if maxKeys <= 0 {
|
||||
maxKeys = defaultMaxS3Keys
|
||||
}
|
||||
if !strings.HasPrefix(lastPrefix, d.prefix) {
|
||||
lastPrefix = d.prefix + lastPrefix
|
||||
}
|
||||
for {
|
||||
carveFilesPage, err := d.s3client.ListObjectsV2(&s3.ListObjectsV2Input{
|
||||
Bucket: &d.bucket,
|
||||
Prefix: &d.prefix,
|
||||
ContinuationToken: &continuationToken,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, carveObject := range carveFilesPage.Contents {
|
||||
result[*carveObject.Key] = true
|
||||
if strings.HasPrefix(*carveObject.Key, lastPrefix) || len(result) >= maxKeys {
|
||||
return result, nil
|
||||
}
|
||||
}
|
||||
if !*carveFilesPage.IsTruncated {
|
||||
break
|
||||
}
|
||||
continuationToken = *carveFilesPage.ContinuationToken
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
|
||||
// CleanupCarves is a noop on the S3 side since users should rely on the bucket
|
||||
// lifecycle configurations provided by AWS. This will compare a portion of the
|
||||
// metadata present in the database and mark as expired the carves no longer
|
||||
// available in S3 (ignores the `now` argument)
|
||||
func (d *Datastore) CleanupCarves(now time.Time) (int, error) {
|
||||
var err error
|
||||
// Get the 1000 oldest carves
|
||||
nonExpiredCarves, err := d.ListCarves(kolide.CarveListOptions{
|
||||
ListOptions: kolide.ListOptions{PerPage: cleanupSize},
|
||||
Expired: false,
|
||||
})
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "s3 carve cleanup")
|
||||
}
|
||||
// List carves in S3 up to a hour+1 prefix
|
||||
lastCarveNextHour := nonExpiredCarves[len(nonExpiredCarves)-1].CreatedAt.Add(time.Hour)
|
||||
lastCarvePrefix := d.prefix + lastCarveNextHour.Format(timePrefixFormat)
|
||||
carveKeys, err := d.listS3Carves(lastCarvePrefix, 2*cleanupSize)
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "s3 carve cleanup")
|
||||
}
|
||||
// Compare carve metadata in DB with S3 listing and update expiration flag
|
||||
cleanCount := 0
|
||||
for _, carve := range nonExpiredCarves {
|
||||
if _, ok := carveKeys[d.generateS3Key(carve)]; !ok {
|
||||
carve.Expired = true
|
||||
err = d.UpdateCarve(carve)
|
||||
cleanCount++
|
||||
}
|
||||
}
|
||||
return cleanCount, err
|
||||
}
|
||||
|
||||
// Carve returns carve metadata by ID
|
||||
func (d *Datastore) Carve(carveID int64) (*kolide.CarveMetadata, error) {
|
||||
return d.metadatadb.Carve(carveID)
|
||||
}
|
||||
|
||||
// CarveBySessionId returns carve metadata by session ID
|
||||
func (d *Datastore) CarveBySessionId(sessionID string) (*kolide.CarveMetadata, error) {
|
||||
return d.metadatadb.CarveBySessionId(sessionID)
|
||||
}
|
||||
|
||||
// CarveByName returns carve metadata by name
|
||||
func (d *Datastore) CarveByName(name string) (*kolide.CarveMetadata, error) {
|
||||
return d.metadatadb.CarveByName(name)
|
||||
}
|
||||
|
||||
// ListCarves returns a list of the currently available carves
|
||||
func (d *Datastore) ListCarves(opt kolide.CarveListOptions) ([]*kolide.CarveMetadata, error) {
|
||||
return d.metadatadb.ListCarves(opt)
|
||||
}
|
||||
|
||||
// listCompletedParts returns a list of the parts in a multipart updaload given a key and uploadID
|
||||
// results are wrapped into the s3.CompletedPart struct
|
||||
func (d *Datastore) listCompletedParts(objectKey, uploadID string) ([]*s3.CompletedPart, error) {
|
||||
var res []*s3.CompletedPart
|
||||
var partMarker int64
|
||||
for {
|
||||
parts, err := d.s3client.ListParts(&s3.ListPartsInput{
|
||||
Bucket: &d.bucket,
|
||||
Key: &objectKey,
|
||||
UploadId: &uploadID,
|
||||
PartNumberMarker: &partMarker,
|
||||
})
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
for _, p := range parts.Parts {
|
||||
res = append(res, &s3.CompletedPart{
|
||||
ETag: p.ETag,
|
||||
PartNumber: p.PartNumber,
|
||||
})
|
||||
}
|
||||
if !*parts.IsTruncated {
|
||||
break
|
||||
}
|
||||
partMarker = *parts.NextPartNumberMarker
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// NewBlock uploads a new block for a specific carve
|
||||
func (d *Datastore) NewBlock(metadata *kolide.CarveMetadata, blockID int64, data []byte) error {
|
||||
objectKey := d.generateS3Key(metadata)
|
||||
partNumber := blockID + 1 // PartNumber is 1-indexed
|
||||
_, err := d.s3client.UploadPart(&s3.UploadPartInput{
|
||||
Body: bytes.NewReader(data),
|
||||
Bucket: &d.bucket,
|
||||
Key: &objectKey,
|
||||
PartNumber: &partNumber,
|
||||
UploadId: &metadata.SessionId,
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "s3 multipart carve upload")
|
||||
}
|
||||
if metadata.MaxBlock < blockID {
|
||||
metadata.MaxBlock = blockID
|
||||
if err = d.UpdateCarve(metadata); err != nil {
|
||||
return errors.Wrap(err, "s3 multipart carve upload")
|
||||
}
|
||||
}
|
||||
if blockID >= metadata.BlockCount-1 {
|
||||
// The last block was reached, multipart upload can be completed
|
||||
parts, err := d.listCompletedParts(objectKey, metadata.SessionId)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "s3 multipart carve upload")
|
||||
}
|
||||
_, err = d.s3client.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
|
||||
Bucket: &d.bucket,
|
||||
Key: &objectKey,
|
||||
UploadId: &metadata.SessionId,
|
||||
MultipartUpload: &s3.CompletedMultipartUpload{Parts: parts},
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "s3 multipart carve upload")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetBlock returns a block of data for a carve
|
||||
func (d *Datastore) GetBlock(metadata *kolide.CarveMetadata, blockID int64) ([]byte, error) {
|
||||
objectKey := d.generateS3Key(metadata)
|
||||
// blockID is 0-indexed and sequential so can be perfectly used for evaluating ranges
|
||||
// range extremes are inclusive as for RFC-2616 (section 14.35)
|
||||
// no need to cap the rangeEnd to the carve size as S3 will do that by itself
|
||||
rangeStart := blockID * metadata.BlockSize
|
||||
rangeString := fmt.Sprintf("bytes=%d-%d", rangeStart, rangeStart+metadata.BlockSize-1)
|
||||
res, err := d.s3client.GetObject(&s3.GetObjectInput{
|
||||
Bucket: &d.bucket,
|
||||
Key: &objectKey,
|
||||
Range: &rangeString,
|
||||
})
|
||||
if err != nil {
|
||||
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == s3.ErrCodeNoSuchKey {
|
||||
// The carve does not exists in S3, mark expired
|
||||
metadata.Expired = true
|
||||
if updateErr := d.UpdateCarve(metadata); err != nil {
|
||||
err = errors.Wrap(err, updateErr.Error())
|
||||
}
|
||||
}
|
||||
return nil, errors.Wrap(err, "s3 carve get block")
|
||||
}
|
||||
defer res.Body.Close()
|
||||
carveData, err := ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "s3 carve get block")
|
||||
}
|
||||
return carveData, nil
|
||||
}
|
68
server/datastore/s3/s3.go
Normal file
68
server/datastore/s3/s3.go
Normal file
@ -0,0 +1,68 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||
"github.com/fleetdm/fleet/server/config"
|
||||
"github.com/fleetdm/fleet/server/kolide"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const awsRegionHint = "us-east-1"
|
||||
|
||||
// Datastore is a type implementing the CarveStore interface
|
||||
// relying on AWS S3 storage
|
||||
type Datastore struct {
|
||||
metadatadb kolide.CarveStore
|
||||
s3client *s3.S3
|
||||
bucket string
|
||||
prefix string
|
||||
}
|
||||
|
||||
// New initializes an S3 Datastore
|
||||
func New(config config.S3Config, metadatadb kolide.CarveStore) (*Datastore, error) {
|
||||
conf := &aws.Config{}
|
||||
|
||||
// Use default auth provire if no static credentials were provided
|
||||
if config.AccessKeyID != "" && config.SecretAccessKey != "" {
|
||||
conf.Credentials = credentials.NewStaticCredentials(
|
||||
config.AccessKeyID,
|
||||
config.SecretAccessKey,
|
||||
"",
|
||||
)
|
||||
}
|
||||
|
||||
sess, err := session.NewSession(conf)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "create S3 client")
|
||||
}
|
||||
|
||||
// Assume role if configured
|
||||
if config.StsAssumeRoleArn != "" {
|
||||
stscreds.NewCredentials(sess, config.StsAssumeRoleArn)
|
||||
creds := stscreds.NewCredentials(sess, config.StsAssumeRoleArn)
|
||||
conf.Credentials = creds
|
||||
sess, err = session.NewSession(conf)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "create S3 client")
|
||||
}
|
||||
}
|
||||
|
||||
region, err := s3manager.GetBucketRegion(context.TODO(), sess, config.Bucket, awsRegionHint)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "create S3 client")
|
||||
}
|
||||
|
||||
return &Datastore{
|
||||
metadatadb: metadatadb,
|
||||
s3client: s3.New(sess, &aws.Config{Region: ®ion}),
|
||||
bucket: config.Bucket,
|
||||
prefix: config.Prefix,
|
||||
}, nil
|
||||
}
|
@ -7,14 +7,16 @@ import (
|
||||
|
||||
type CarveStore interface {
|
||||
NewCarve(metadata *CarveMetadata) (*CarveMetadata, error)
|
||||
UpdateCarve(metadata *CarveMetadata) error
|
||||
Carve(carveId int64) (*CarveMetadata, error)
|
||||
CarveBySessionId(sessionId string) (*CarveMetadata, error)
|
||||
CarveByName(name string) (*CarveMetadata, error)
|
||||
ListCarves(opt CarveListOptions) ([]*CarveMetadata, error)
|
||||
NewBlock(carveId, blockId int64, data []byte) error
|
||||
GetBlock(carveId, blockId int64) ([]byte, error)
|
||||
NewBlock(metadata *CarveMetadata, blockId int64, data []byte) error
|
||||
GetBlock(metadata *CarveMetadata, blockId int64) ([]byte, error)
|
||||
// CleanupCarves will mark carves older than 24 hours expired, and delete the
|
||||
// associated data blocks.
|
||||
// associated data blocks. This behaves differently for carves stored in S3
|
||||
// (check the implementation godoc comment for more details)
|
||||
CleanupCarves(now time.Time) (expired int, err error)
|
||||
}
|
||||
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
package mock
|
||||
|
||||
import(
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/fleetdm/fleet/server/kolide"
|
||||
@ -12,6 +12,8 @@ var _ kolide.CarveStore = (*CarveStore)(nil)
|
||||
|
||||
type NewCarveFunc func(c *kolide.CarveMetadata) (*kolide.CarveMetadata, error)
|
||||
|
||||
type UpdateCarveFunc func(c *kolide.CarveMetadata) error
|
||||
|
||||
type CarveFunc func(carveId int64) (*kolide.CarveMetadata, error)
|
||||
|
||||
type ListCarvesFunc func(opt kolide.CarveListOptions) ([]*kolide.CarveMetadata, error)
|
||||
@ -20,9 +22,9 @@ type CarveBySessionIdFunc func(sessionId string) (*kolide.CarveMetadata, error)
|
||||
|
||||
type CarveByNameFunc func(name string) (*kolide.CarveMetadata, error)
|
||||
|
||||
type NewBlockFunc func(metadataId int64, blockId int64, data []byte) error
|
||||
type NewBlockFunc func(metadata *kolide.CarveMetadata, blockId int64, data []byte) error
|
||||
|
||||
type GetBlockFunc func(metadataId int64, blockId int64) ([]byte, error)
|
||||
type GetBlockFunc func(metadata *kolide.CarveMetadata, blockId int64) ([]byte, error)
|
||||
|
||||
type CleanupCarvesFunc func(now time.Time) (expired int, err error)
|
||||
|
||||
@ -30,6 +32,9 @@ type CarveStore struct {
|
||||
NewCarveFunc NewCarveFunc
|
||||
NewCarveFuncInvoked bool
|
||||
|
||||
UpdateCarveFunc UpdateCarveFunc
|
||||
UpdateCarveFuncInvoked bool
|
||||
|
||||
CarveFunc CarveFunc
|
||||
CarveFuncInvoked bool
|
||||
|
||||
@ -57,7 +62,12 @@ func (s *CarveStore) NewCarve(c *kolide.CarveMetadata) (*kolide.CarveMetadata, e
|
||||
return s.NewCarveFunc(c)
|
||||
}
|
||||
|
||||
func (s *CarveStore) Carve(carveId int64) (*kolide.CarveMetadata, error){
|
||||
func (s *CarveStore) UpdateCarve(c *kolide.CarveMetadata) error {
|
||||
s.UpdateCarveFuncInvoked = true
|
||||
return s.UpdateCarveFunc(c)
|
||||
}
|
||||
|
||||
func (s *CarveStore) Carve(carveId int64) (*kolide.CarveMetadata, error) {
|
||||
s.CarveFuncInvoked = true
|
||||
return s.CarveFunc(carveId)
|
||||
}
|
||||
@ -77,15 +87,14 @@ func (s *CarveStore) CarveByName(name string) (*kolide.CarveMetadata, error) {
|
||||
return s.CarveByNameFunc(name)
|
||||
}
|
||||
|
||||
|
||||
func (s *CarveStore) NewBlock(metadataId int64, blockId int64, data []byte) error {
|
||||
func (s *CarveStore) NewBlock(metadata *kolide.CarveMetadata, blockId int64, data []byte) error {
|
||||
s.NewBlockFuncInvoked = true
|
||||
return s.NewBlockFunc(metadataId, blockId, data)
|
||||
return s.NewBlockFunc(metadata, blockId, data)
|
||||
}
|
||||
|
||||
func (s *CarveStore) GetBlock(metadataId int64, blockId int64) ([]byte, error) {
|
||||
func (s *CarveStore) GetBlock(metadata *kolide.CarveMetadata, blockId int64) ([]byte, error) {
|
||||
s.GetBlockFuncInvoked = true
|
||||
return s.GetBlockFunc(metadataId, blockId)
|
||||
return s.GetBlockFunc(metadata, blockId)
|
||||
}
|
||||
|
||||
func (s *CarveStore) CleanupCarves(now time.Time) (expired int, err error) {
|
||||
|
@ -76,7 +76,6 @@ func (c *Client) GetCarve(carveId int64) (*kolide.CarveMetadata, error) {
|
||||
return &responseBody.Carve, nil
|
||||
}
|
||||
|
||||
|
||||
func (c *Client) getCarveBlock(carveId, blockId int64) ([]byte, error) {
|
||||
path := fmt.Sprintf(
|
||||
"/api/v1/kolide/carves/%d/block/%d",
|
||||
@ -110,23 +109,23 @@ func (c *Client) getCarveBlock(carveId, blockId int64) ([]byte, error) {
|
||||
}
|
||||
|
||||
type carveReader struct {
|
||||
carve kolide.CarveMetadata
|
||||
carve kolide.CarveMetadata
|
||||
bytesRead int64
|
||||
curBlock int64
|
||||
buffer []byte
|
||||
client *Client
|
||||
curBlock int64
|
||||
buffer []byte
|
||||
client *Client
|
||||
}
|
||||
|
||||
func newCarveReader(carve kolide.CarveMetadata, client *Client) *carveReader {
|
||||
return &carveReader{
|
||||
carve: carve,
|
||||
client: client,
|
||||
carve: carve,
|
||||
client: client,
|
||||
bytesRead: 0,
|
||||
curBlock: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *carveReader) Read(p []byte) (n int, err error) {
|
||||
func (r *carveReader) Read(p []byte) (n int, err error) {
|
||||
if len(p) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
@ -160,7 +159,7 @@ func (r *carveReader) Read(p []byte) (n int, err error) {
|
||||
return copyLen, nil
|
||||
}
|
||||
|
||||
// ListCarves lists the file carving sessio
|
||||
// DownloadCarve creates a Reader downloading a carve (by ID)
|
||||
func (c *Client) DownloadCarve(id int64) (io.Reader, error) {
|
||||
path := fmt.Sprintf("/api/v1/kolide/carves/%d", id)
|
||||
response, err := c.AuthenticatedDo("GET", path, "", nil)
|
||||
|
@ -9,11 +9,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/WatchBeam/clock"
|
||||
kitlog "github.com/go-kit/kit/log"
|
||||
"github.com/fleetdm/fleet/server/config"
|
||||
"github.com/fleetdm/fleet/server/kolide"
|
||||
"github.com/fleetdm/fleet/server/logging"
|
||||
"github.com/fleetdm/fleet/server/sso"
|
||||
kitlog "github.com/go-kit/kit/log"
|
||||
"github.com/kolide/kit/version"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
@ -21,7 +21,7 @@ import (
|
||||
// NewService creates a new service from the config struct
|
||||
func NewService(ds kolide.Datastore, resultStore kolide.QueryResultStore,
|
||||
logger kitlog.Logger, config config.KolideConfig, mailService kolide.MailService,
|
||||
c clock.Clock, sso sso.SessionStore, lq kolide.LiveQueryStore) (kolide.Service, error) {
|
||||
c clock.Clock, sso sso.SessionStore, lq kolide.LiveQueryStore, carveStore kolide.CarveStore) (kolide.Service, error) {
|
||||
var svc kolide.Service
|
||||
|
||||
osqueryLogger, err := logging.New(config, logger)
|
||||
@ -31,6 +31,7 @@ func NewService(ds kolide.Datastore, resultStore kolide.QueryResultStore,
|
||||
|
||||
svc = service{
|
||||
ds: ds,
|
||||
carveStore: carveStore,
|
||||
resultStore: resultStore,
|
||||
liveQueryStore: lq,
|
||||
logger: logger,
|
||||
@ -49,6 +50,7 @@ func NewService(ds kolide.Datastore, resultStore kolide.QueryResultStore,
|
||||
|
||||
type service struct {
|
||||
ds kolide.Datastore
|
||||
carveStore kolide.CarveStore
|
||||
resultStore kolide.QueryResultStore
|
||||
liveQueryStore kolide.LiveQueryStore
|
||||
logger kitlog.Logger
|
||||
|
@ -5,14 +5,14 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
hostctx "github.com/fleetdm/fleet/server/contexts/host"
|
||||
"github.com/fleetdm/fleet/server/kolide"
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
maxCarveSize = 8 * 1024 * 1024 * 1024 // 8MB
|
||||
maxCarveSize = 8 * 1024 * 1024 * 1024 // 8GB
|
||||
maxBlockSize = 256 * 1024 * 1024 // 256MB
|
||||
)
|
||||
|
||||
@ -45,8 +45,9 @@ func (svc service) CarveBegin(ctx context.Context, payload kolide.CarveBeginPayl
|
||||
return nil, osqueryError{message: "internal error: generate session ID for carve: " + err.Error()}
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
carve := &kolide.CarveMetadata{
|
||||
Name: fmt.Sprintf("%s-%s-%s", host.HostName, time.Now().Format(time.RFC3339), payload.RequestId),
|
||||
Name: fmt.Sprintf("%s-%s-%s", host.HostName, now.Format(time.RFC3339), payload.RequestId),
|
||||
HostId: host.ID,
|
||||
BlockCount: payload.BlockCount,
|
||||
BlockSize: payload.BlockSize,
|
||||
@ -54,9 +55,10 @@ func (svc service) CarveBegin(ctx context.Context, payload kolide.CarveBeginPayl
|
||||
CarveId: payload.CarveId,
|
||||
RequestId: payload.RequestId,
|
||||
SessionId: sessionId.String(),
|
||||
CreatedAt: now,
|
||||
}
|
||||
|
||||
carve, err = svc.ds.NewCarve(carve)
|
||||
carve, err = svc.carveStore.NewCarve(carve)
|
||||
if err != nil {
|
||||
return nil, osqueryError{message: "internal error: new carve: " + err.Error()}
|
||||
}
|
||||
@ -67,7 +69,7 @@ func (svc service) CarveBegin(ctx context.Context, payload kolide.CarveBeginPayl
|
||||
func (svc service) CarveBlock(ctx context.Context, payload kolide.CarveBlockPayload) error {
|
||||
// Note host did not authenticate via node key. We need to authenticate them
|
||||
// by the session ID and request ID
|
||||
carve, err := svc.ds.CarveBySessionId(payload.SessionId)
|
||||
carve, err := svc.carveStore.CarveBySessionId(payload.SessionId)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "find carve by session_id")
|
||||
}
|
||||
@ -90,7 +92,7 @@ func (svc service) CarveBlock(ctx context.Context, payload kolide.CarveBlockPayl
|
||||
return fmt.Errorf("exceeded declared block size %d: %d", carve.BlockSize, len(payload.Data))
|
||||
}
|
||||
|
||||
if err := svc.ds.NewBlock(carve.ID, payload.BlockId, payload.Data); err != nil {
|
||||
if err := svc.carveStore.NewBlock(carve, payload.BlockId, payload.Data); err != nil {
|
||||
return errors.Wrap(err, "save block data")
|
||||
}
|
||||
|
||||
@ -98,15 +100,15 @@ func (svc service) CarveBlock(ctx context.Context, payload kolide.CarveBlockPayl
|
||||
}
|
||||
|
||||
func (svc service) GetCarve(ctx context.Context, id int64) (*kolide.CarveMetadata, error) {
|
||||
return svc.ds.Carve(id)
|
||||
return svc.carveStore.Carve(id)
|
||||
}
|
||||
|
||||
func (svc service) ListCarves(ctx context.Context, opt kolide.CarveListOptions) ([]*kolide.CarveMetadata, error) {
|
||||
return svc.ds.ListCarves(opt)
|
||||
return svc.carveStore.ListCarves(opt)
|
||||
}
|
||||
|
||||
func (svc service) GetBlock(ctx context.Context, carveId, blockId int64) ([]byte, error) {
|
||||
metadata, err := svc.ds.Carve(carveId)
|
||||
metadata, err := svc.carveStore.Carve(carveId)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "get carve by name")
|
||||
}
|
||||
@ -119,12 +121,10 @@ func (svc service) GetBlock(ctx context.Context, carveId, blockId int64) ([]byte
|
||||
return nil, fmt.Errorf("block %d not yet available", blockId)
|
||||
}
|
||||
|
||||
data, err := svc.ds.GetBlock(metadata.ID, blockId)
|
||||
data, err := svc.carveStore.GetBlock(metadata, blockId)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "get block %d", blockId)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
hostctx "github.com/fleetdm/fleet/server/contexts/host"
|
||||
"github.com/fleetdm/fleet/server/kolide"
|
||||
@ -21,7 +22,7 @@ func TestCarveBegin(t *testing.T) {
|
||||
RequestId: "carve_request",
|
||||
}
|
||||
ms := new(mock.Store)
|
||||
svc := service{ds: ms}
|
||||
svc := service{carveStore: ms}
|
||||
expectedMetadata := kolide.CarveMetadata{
|
||||
ID: 7,
|
||||
HostId: host.ID,
|
||||
@ -40,8 +41,9 @@ func TestCarveBegin(t *testing.T) {
|
||||
metadata, err := svc.CarveBegin(ctx, payload)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, metadata.SessionId)
|
||||
metadata.SessionId = "" // Clear this before comparison
|
||||
metadata.Name = "" // Clear this before comparison
|
||||
metadata.SessionId = "" // Clear this before comparison
|
||||
metadata.Name = "" // Clear this before comparison
|
||||
metadata.CreatedAt = time.Time{} // Clear this before comparison
|
||||
assert.Equal(t, expectedMetadata, *metadata)
|
||||
}
|
||||
|
||||
@ -54,7 +56,7 @@ func TestCarveBeginNewCarveError(t *testing.T) {
|
||||
RequestId: "carve_request",
|
||||
}
|
||||
ms := new(mock.Store)
|
||||
svc := service{ds: ms}
|
||||
svc := service{carveStore: ms}
|
||||
ms.NewCarveFunc = func(metadata *kolide.CarveMetadata) (*kolide.CarveMetadata, error) {
|
||||
return nil, fmt.Errorf("ouch!")
|
||||
}
|
||||
@ -68,7 +70,7 @@ func TestCarveBeginNewCarveError(t *testing.T) {
|
||||
|
||||
func TestCarveBeginEmptyError(t *testing.T) {
|
||||
ms := new(mock.Store)
|
||||
svc := service{ds: ms}
|
||||
svc := service{carveStore: ms}
|
||||
ctx := hostctx.NewContext(context.Background(), kolide.Host{})
|
||||
|
||||
_, err := svc.CarveBegin(ctx, kolide.CarveBeginPayload{})
|
||||
@ -78,7 +80,7 @@ func TestCarveBeginEmptyError(t *testing.T) {
|
||||
|
||||
func TestCarveBeginMissingHostError(t *testing.T) {
|
||||
ms := new(mock.Store)
|
||||
svc := service{ds: ms}
|
||||
svc := service{carveStore: ms}
|
||||
|
||||
_, err := svc.CarveBegin(context.Background(), kolide.CarveBeginPayload{})
|
||||
require.Error(t, err)
|
||||
@ -94,7 +96,7 @@ func TestCarveBeginBlockSizeMaxError(t *testing.T) {
|
||||
RequestId: "carve_request",
|
||||
}
|
||||
ms := new(mock.Store)
|
||||
svc := service{ds: ms}
|
||||
svc := service{carveStore: ms}
|
||||
|
||||
ctx := hostctx.NewContext(context.Background(), host)
|
||||
|
||||
@ -112,7 +114,7 @@ func TestCarveBeginCarveSizeMaxError(t *testing.T) {
|
||||
RequestId: "carve_request",
|
||||
}
|
||||
ms := new(mock.Store)
|
||||
svc := service{ds: ms}
|
||||
svc := service{carveStore: ms}
|
||||
|
||||
ctx := hostctx.NewContext(context.Background(), host)
|
||||
|
||||
@ -130,7 +132,7 @@ func TestCarveBeginCarveSizeError(t *testing.T) {
|
||||
RequestId: "carve_request",
|
||||
}
|
||||
ms := new(mock.Store)
|
||||
svc := service{ds: ms}
|
||||
svc := service{carveStore: ms}
|
||||
ctx := hostctx.NewContext(context.Background(), host)
|
||||
|
||||
// Too big
|
||||
@ -148,7 +150,7 @@ func TestCarveBeginCarveSizeError(t *testing.T) {
|
||||
func TestCarveCarveBlockGetCarveError(t *testing.T) {
|
||||
sessionId := "foobar"
|
||||
ms := new(mock.Store)
|
||||
svc := service{ds: ms}
|
||||
svc := service{carveStore: ms}
|
||||
ms.CarveBySessionIdFunc = func(sessionId string) (*kolide.CarveMetadata, error) {
|
||||
return nil, fmt.Errorf("ouch!")
|
||||
}
|
||||
@ -175,7 +177,7 @@ func TestCarveCarveBlockRequestIdError(t *testing.T) {
|
||||
SessionId: sessionId,
|
||||
}
|
||||
ms := new(mock.Store)
|
||||
svc := service{ds: ms}
|
||||
svc := service{carveStore: ms}
|
||||
ms.CarveBySessionIdFunc = func(sessionId string) (*kolide.CarveMetadata, error) {
|
||||
assert.Equal(t, metadata.SessionId, sessionId)
|
||||
return metadata, nil
|
||||
@ -204,7 +206,7 @@ func TestCarveCarveBlockBlockCountExceedError(t *testing.T) {
|
||||
SessionId: sessionId,
|
||||
}
|
||||
ms := new(mock.Store)
|
||||
svc := service{ds: ms}
|
||||
svc := service{carveStore: ms}
|
||||
ms.CarveBySessionIdFunc = func(sessionId string) (*kolide.CarveMetadata, error) {
|
||||
assert.Equal(t, metadata.SessionId, sessionId)
|
||||
return metadata, nil
|
||||
@ -235,7 +237,7 @@ func TestCarveCarveBlockBlockCountMatchError(t *testing.T) {
|
||||
MaxBlock: 3,
|
||||
}
|
||||
ms := new(mock.Store)
|
||||
svc := service{ds: ms}
|
||||
svc := service{carveStore: ms}
|
||||
ms.CarveBySessionIdFunc = func(sessionId string) (*kolide.CarveMetadata, error) {
|
||||
assert.Equal(t, metadata.SessionId, sessionId)
|
||||
return metadata, nil
|
||||
@ -266,7 +268,7 @@ func TestCarveCarveBlockBlockSizeError(t *testing.T) {
|
||||
MaxBlock: 3,
|
||||
}
|
||||
ms := new(mock.Store)
|
||||
svc := service{ds: ms}
|
||||
svc := service{carveStore: ms}
|
||||
ms.CarveBySessionIdFunc = func(sessionId string) (*kolide.CarveMetadata, error) {
|
||||
assert.Equal(t, metadata.SessionId, sessionId)
|
||||
return metadata, nil
|
||||
@ -297,12 +299,12 @@ func TestCarveCarveBlockNewBlockError(t *testing.T) {
|
||||
MaxBlock: 3,
|
||||
}
|
||||
ms := new(mock.Store)
|
||||
svc := service{ds: ms}
|
||||
svc := service{carveStore: ms}
|
||||
ms.CarveBySessionIdFunc = func(sessionId string) (*kolide.CarveMetadata, error) {
|
||||
assert.Equal(t, metadata.SessionId, sessionId)
|
||||
return metadata, nil
|
||||
}
|
||||
ms.NewBlockFunc = func(carveId int64, blockId int64, data []byte) error {
|
||||
ms.NewBlockFunc = func(carve *kolide.CarveMetadata, blockId int64, data []byte) error {
|
||||
return fmt.Errorf("kaboom!")
|
||||
}
|
||||
|
||||
@ -337,13 +339,13 @@ func TestCarveCarveBlock(t *testing.T) {
|
||||
BlockId: 4,
|
||||
}
|
||||
ms := new(mock.Store)
|
||||
svc := service{ds: ms}
|
||||
svc := service{carveStore: ms}
|
||||
ms.CarveBySessionIdFunc = func(sessionId string) (*kolide.CarveMetadata, error) {
|
||||
assert.Equal(t, metadata.SessionId, sessionId)
|
||||
return metadata, nil
|
||||
}
|
||||
ms.NewBlockFunc = func(carveId int64, blockId int64, data []byte) error {
|
||||
assert.Equal(t, metadata.ID, carveId)
|
||||
ms.NewBlockFunc = func(carve *kolide.CarveMetadata, blockId int64, data []byte) error {
|
||||
assert.Equal(t, metadata, carve)
|
||||
assert.Equal(t, int64(4), blockId)
|
||||
assert.Equal(t, payload.Data, data)
|
||||
return nil
|
||||
@ -367,13 +369,13 @@ func TestCarveGetBlock(t *testing.T) {
|
||||
MaxBlock: 3,
|
||||
}
|
||||
ms := new(mock.Store)
|
||||
svc := service{ds: ms}
|
||||
svc := service{carveStore: ms}
|
||||
ms.CarveFunc = func(carveId int64) (*kolide.CarveMetadata, error) {
|
||||
assert.Equal(t, metadata.ID, carveId)
|
||||
return metadata, nil
|
||||
}
|
||||
ms.GetBlockFunc = func(metadataId int64, blockId int64) ([]byte, error) {
|
||||
assert.Equal(t, metadata.ID, metadataId)
|
||||
ms.GetBlockFunc = func(carve *kolide.CarveMetadata, blockId int64) ([]byte, error) {
|
||||
assert.Equal(t, metadata.ID, carve.ID)
|
||||
assert.Equal(t, int64(3), blockId)
|
||||
return []byte("foobar"), nil
|
||||
}
|
||||
@ -396,7 +398,7 @@ func TestCarveGetBlockNotAvailableError(t *testing.T) {
|
||||
MaxBlock: 3,
|
||||
}
|
||||
ms := new(mock.Store)
|
||||
svc := service{ds: ms}
|
||||
svc := service{carveStore: ms}
|
||||
ms.CarveFunc = func(carveId int64) (*kolide.CarveMetadata, error) {
|
||||
assert.Equal(t, metadata.ID, carveId)
|
||||
return metadata, nil
|
||||
@ -421,13 +423,13 @@ func TestCarveGetBlockGetBlockError(t *testing.T) {
|
||||
MaxBlock: 3,
|
||||
}
|
||||
ms := new(mock.Store)
|
||||
svc := service{ds: ms}
|
||||
svc := service{carveStore: ms}
|
||||
ms.CarveFunc = func(carveId int64) (*kolide.CarveMetadata, error) {
|
||||
assert.Equal(t, metadata.ID, carveId)
|
||||
return metadata, nil
|
||||
}
|
||||
ms.GetBlockFunc = func(metadataId int64, blockId int64) ([]byte, error) {
|
||||
assert.Equal(t, metadata.ID, metadataId)
|
||||
ms.GetBlockFunc = func(carve *kolide.CarveMetadata, blockId int64) ([]byte, error) {
|
||||
assert.Equal(t, metadata.ID, carve.ID)
|
||||
assert.Equal(t, int64(3), blockId)
|
||||
return nil, fmt.Errorf("yow!!")
|
||||
}
|
||||
@ -437,5 +439,3 @@ func TestCarveGetBlockGetBlockError(t *testing.T) {
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "yow!!")
|
||||
}
|
||||
|
||||
|
||||
|
@ -12,12 +12,12 @@ import (
|
||||
|
||||
func newTestService(ds kolide.Datastore, rs kolide.QueryResultStore, lq kolide.LiveQueryStore) (kolide.Service, error) {
|
||||
mailer := &mockMailService{SendEmailFn: func(e kolide.Email) error { return nil }}
|
||||
return NewService(ds, rs, kitlog.NewNopLogger(), config.TestConfig(), mailer, clock.C, nil, lq)
|
||||
return NewService(ds, rs, kitlog.NewNopLogger(), config.TestConfig(), mailer, clock.C, nil, lq, ds)
|
||||
}
|
||||
|
||||
func newTestServiceWithClock(ds kolide.Datastore, rs kolide.QueryResultStore, lq kolide.LiveQueryStore, c clock.Clock) (kolide.Service, error) {
|
||||
mailer := &mockMailService{SendEmailFn: func(e kolide.Email) error { return nil }}
|
||||
return NewService(ds, rs, kitlog.NewNopLogger(), config.TestConfig(), mailer, c, nil, lq)
|
||||
return NewService(ds, rs, kitlog.NewNopLogger(), config.TestConfig(), mailer, c, nil, lq, ds)
|
||||
}
|
||||
|
||||
func createTestAppConfig(t *testing.T, ds kolide.Datastore) *kolide.AppConfig {
|
||||
|
Loading…
Reference in New Issue
Block a user