JD-729: implement AWS SDK V2 client (#27)

add extracting filename logic, refactor
add configs for deploy local minio cluster
refactor, update tests
update readme
This commit is contained in:
Anatoly Karlov 2021-10-19 11:26:33 +07:00 committed by GitHub
parent c3bf1f5758
commit c863c6a346
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 995 additions and 138 deletions

View File

@ -1,3 +1,65 @@
# file-storage # file-storage
Прокси, связывающий rbkmoney сервисы и ceph. Имплементирует Amazon S3 клиент, который используется, как клиент для подключения к ceph.
Ceph используется для сохранения файлов. Сервис, обращающийся напрямую к s3 через AWS JAVA SDK. Используется для генерации pre-signed URL that can be used to
access an Amazon S3 resource without requiring the user of the URL to know the account's AWS security credentials.
## Параметры запуска
Для работы с 1 версией `AWS SDK S3`
```yaml
s3:
endpoint: 'http://127.0.0.1:32827'
bucket-name: 'files'
signing-region: 'RU'
client-protocol: 'http'
client-max-error-retry: 10
signer-override: 'S3SignerType'
# signer-override: 'AWSS3V4SignerType'
access-key: 'test'
secret-key: 'test'
s3-sdk-v2:
enabled: 'false'
```
дефолтная версия сигнера — `S3SignerType`, для использования более актуальной версии указывается `AWSS3V4SignerType`
Для работы с 2 версией `AWS SDK S3 V2`
```yaml
s3-sdk-v2:
enabled: 'true'
endpoint: 'http://127.0.0.1:9000'
bucket-name: 'files-v2'
region: 'RU'
access-key: 'minio'
secret-key: 'minio123'
```
Для работы сервиса может использоваться только одна из двух версий `AWS SDK S3`, переключение происходит
параметром `s3-sdk-v2.enabled=false`
## Minio
Если сервисом используется 2 версия `AWS SDK S3 V2`, и в качестве s3 кластера используется `minio`, то для поддержки
версионирования объектов __кластер должен использовать минимум несколько драйверов при старте__ для включения
механизма `Erasure Code`
Для включения механизма `Erasure Code` запуск сервера `minio` с использованием нескольких драйверов может выглядеть
следующим образом
```shell
minio server /data{1...12}
```
Цитата из официальной документации
> **Versioning feature is only available in erasure coded and distributed erasure coded setups.**
Источники
- [versioning-guide](https://docs.min.io/docs/minio-bucket-versioning-guide.html)
- [erasure-code-quickstart-guide](https://docs.min.io/docs/minio-erasure-code-quickstart-guide)
В репозитории в папке [minio-local-cluster](./minio-local-cluster/) содержатся примеры `docker-compose` манифестов
(спизж**ных из официальной репы https://github.com/minio/minio/tree/master/docs/orchestration/docker-compose)
для локального запуска сервера `minio` с включенным механизмом `Erasure Code`

View File

@ -0,0 +1,51 @@
version: '3.7'
# Settings and configurations that are common for all containers
x-minio-common: &minio-common
image: quay.io/minio/minio:RELEASE.2021-10-13T00-23-17Z
command: server --console-address ":9001" http://minio{1...4}/data{1...2}
expose:
- "9000"
- "9001"
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: minio123
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:9000/minio/health/live" ]
interval: 30s
timeout: 20s
retries: 3
# starts 4 docker containers running minio server instances.
# using nginx reverse proxy, load balancing, you can access
# it through port 9000.
services:
minio1:
<<: *minio-common
hostname: minio1
minio2:
<<: *minio-common
hostname: minio2
minio3:
<<: *minio-common
hostname: minio3
minio4:
<<: *minio-common
hostname: minio4
nginx:
image: nginx:1.19.2-alpine
hostname: nginx
volumes:
- ./nginx-minio-cluster.conf:/etc/nginx/nginx.conf:ro
ports:
- "9000:9000"
- "9001:9001"
depends_on:
- minio1
- minio2
- minio3
- minio4

View File

@ -0,0 +1,29 @@
version: '3.7'
services:
minio:
image: quay.io/minio/minio:RELEASE.2021-10-13T00-23-17Z
command: server --console-address ":9001" /data{1...12}
hostname: minio
expose:
- "9000"
- "9001"
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: minio123
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:9000/minio/health/live" ]
interval: 30s
timeout: 20s
retries: 3
nginx:
image: nginx:1.19.2-alpine
hostname: nginx
volumes:
- ./nginx-minio.conf:/etc/nginx/nginx.conf:ro
ports:
- "9000:9000"
- "9001:9001"
depends_on:
- minio

View File

@ -0,0 +1,104 @@
user nginx;
worker_processes auto;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 4096;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
sendfile on;
keepalive_timeout 65;
# include /etc/nginx/conf.d/*.conf;
upstream minio {
server minio1:9000;
server minio2:9000;
server minio3:9000;
server minio4:9000;
}
upstream console {
ip_hash;
server minio1:9001;
server minio2:9001;
server minio3:9001;
server minio4:9001;
}
server {
listen 9000;
listen [::]:9000;
server_name localhost;
# To allow special characters in headers
ignore_invalid_headers off;
# Allow any size file to be uploaded.
# Set to a value such as 1000m; to restrict file size to a specific value
client_max_body_size 0;
# To disable buffering
proxy_buffering off;
location / {
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_connect_timeout 300;
# Default is HTTP/1, keepalive is only enabled in HTTP/1.1
proxy_http_version 1.1;
proxy_set_header Connection "";
chunked_transfer_encoding off;
proxy_pass http://minio;
}
}
server {
listen 9001;
listen [::]:9001;
server_name localhost;
# To allow special characters in headers
ignore_invalid_headers off;
# Allow any size file to be uploaded.
# Set to a value such as 1000m; to restrict file size to a specific value
client_max_body_size 0;
# To disable buffering
proxy_buffering off;
location / {
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header X-NginX-Proxy true;
# This is necessary to pass the correct IP to be hashed
real_ip_header X-Real-IP;
proxy_connect_timeout 300;
# To support websocket
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
chunked_transfer_encoding off;
proxy_pass http://console;
}
}
}

View File

@ -0,0 +1,98 @@
user nginx;
worker_processes auto;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 4096;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
sendfile on;
keepalive_timeout 65;
# include /etc/nginx/conf.d/*.conf;
upstream minio {
server minio:9000;
}
upstream console {
ip_hash;
server minio:9001;
}
server {
listen 9000;
listen [::]:9000;
server_name localhost;
# To allow special characters in headers
ignore_invalid_headers off;
# Allow any size file to be uploaded.
# Set to a value such as 1000m; to restrict file size to a specific value
client_max_body_size 0;
# To disable buffering
proxy_buffering off;
location / {
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_connect_timeout 300;
# Default is HTTP/1, keepalive is only enabled in HTTP/1.1
proxy_http_version 1.1;
proxy_set_header Connection "";
chunked_transfer_encoding off;
proxy_pass http://minio;
}
}
server {
listen 9001;
listen [::]:9001;
server_name localhost;
# To allow special characters in headers
ignore_invalid_headers off;
# Allow any size file to be uploaded.
# Set to a value such as 1000m; to restrict file size to a specific value
client_max_body_size 0;
# To disable buffering
proxy_buffering off;
location / {
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header X-NginX-Proxy true;
# This is necessary to pass the correct IP to be hashed
real_ip_header X-Real-IP;
proxy_connect_timeout 300;
# To support websocket
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
chunked_transfer_encoding off;
proxy_pass http://console;
}
}
}

View File

@ -10,7 +10,7 @@
</parent> </parent>
<artifactId>file-storage</artifactId> <artifactId>file-storage</artifactId>
<version>2.0.0-SNAPSHOT</version> <version>2.1.0-SNAPSHOT</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>file-storage</name> <name>file-storage</name>
@ -93,6 +93,11 @@
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>2.17.56</version>
</dependency>
<!-- Test libs --> <!-- Test libs -->
<dependency> <dependency>
@ -109,7 +114,7 @@
<dependency> <dependency>
<groupId>com.rbkmoney</groupId> <groupId>com.rbkmoney</groupId>
<artifactId>testcontainers-annotations</artifactId> <artifactId>testcontainers-annotations</artifactId>
<version>1.3.0</version> <version>1.3.1</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -1,70 +0,0 @@
package com.rbkmoney.file.storage.configuration;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.rbkmoney.file.storage.configuration.properties.StorageProperties;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(StorageProperties.class)
public class AmazonS3ClientConfiguration {
private final StorageProperties storageProperties;
@Bean
public AmazonS3 storageClient(
AWSCredentialsProviderChain credentialsProviderChain,
ClientConfiguration clientConfiguration) {
return AmazonS3ClientBuilder.standard()
.withCredentials(credentialsProviderChain)
.withPathStyleAccessEnabled(true)
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
storageProperties.getEndpoint(),
storageProperties.getSigningRegion()
)
)
.withClientConfiguration(clientConfiguration)
.build();
}
@Bean
public AWSCredentialsProviderChain credentialsProviderChain() {
return new AWSCredentialsProviderChain(
new EnvironmentVariableCredentialsProvider(),
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(
storageProperties.getAccessKey(),
storageProperties.getSecretKey()
)
)
);
}
@Bean
public ClientConfiguration clientConfiguration() {
return new ClientConfiguration()
.withProtocol(storageProperties.getClientProtocol())
.withSignerOverride("S3SignerType")
.withMaxErrorRetry(storageProperties.getClientMaxErrorRetry());
}
@Bean
public TransferManager transferManager(AmazonS3 s3Client) {
return TransferManagerBuilder.standard()
.withS3Client(s3Client)
.build();
}
}

View File

@ -1,16 +0,0 @@
package com.rbkmoney.file.storage.configuration;
import com.rbkmoney.file.storage.FileStorageSrv;
import com.rbkmoney.file.storage.handler.FileStorageHandler;
import com.rbkmoney.file.storage.service.StorageService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HandlerConfiguration {
@Bean
public FileStorageSrv.Iface fileStorageHandler(StorageService storageService) {
return new FileStorageHandler(storageService);
}
}

View File

@ -0,0 +1,55 @@
package com.rbkmoney.file.storage.configuration;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.rbkmoney.file.storage.configuration.properties.S3Properties;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(S3Properties.class)
public class S3ClientConfig {
private final S3Properties s3Properties;
@Bean
public TransferManager transferManager(AmazonS3 s3Client) {
return TransferManagerBuilder.standard()
.withS3Client(s3Client)
.build();
}
@Bean
public AmazonS3 s3Client() {
return AmazonS3ClientBuilder.standard()
.withCredentials(
new AWSCredentialsProviderChain(
new EnvironmentVariableCredentialsProvider(),
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(
s3Properties.getAccessKey(),
s3Properties.getSecretKey()))))
.withPathStyleAccessEnabled(true)
.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
s3Properties.getEndpoint(),
s3Properties.getSigningRegion()))
.withClientConfiguration(
new ClientConfiguration()
.withProtocol(s3Properties.getClientProtocol())
.withSignerOverride(s3Properties.getSignerOverride())
.withMaxErrorRetry(s3Properties.getClientMaxErrorRetry()))
.build();
}
}

View File

@ -0,0 +1,56 @@
package com.rbkmoney.file.storage.configuration;
import com.rbkmoney.file.storage.configuration.properties.S3SdkV2Properties;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.presigner.S3Presigner;
import java.net.URI;
@Configuration
@RequiredArgsConstructor
@EnableConfigurationProperties(S3SdkV2Properties.class)
public class S3SdkV2ClientConfig {
private final S3SdkV2Properties s3SdkV2Properties;
@Bean(destroyMethod = "close")
public S3Presigner s3Presigner() {
return S3Presigner.builder()
.region(Region.of(s3SdkV2Properties.getRegion()))
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
s3SdkV2Properties.getAccessKey(),
s3SdkV2Properties.getSecretKey())))
.endpointOverride(URI.create(s3SdkV2Properties.getEndpoint()))
.serviceConfiguration(S3Configuration.builder()
.pathStyleAccessEnabled(true)
.checksumValidationEnabled(false)
.build())
.build();
}
@Bean(destroyMethod = "close")
public S3Client s3SdkV2Client() {
return S3Client.builder()
.region(Region.of(s3SdkV2Properties.getRegion()))
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(
s3SdkV2Properties.getAccessKey(),
s3SdkV2Properties.getSecretKey())))
.endpointOverride(URI.create(s3SdkV2Properties.getEndpoint()))
.serviceConfiguration(S3Configuration.builder()
.pathStyleAccessEnabled(true)
.build())
.build();
}
}

View File

@ -9,15 +9,16 @@ import org.springframework.stereotype.Component;
@Getter @Getter
@Setter @Setter
@Component @Component
@ConfigurationProperties("storage") @ConfigurationProperties("s3")
public class StorageProperties { public class S3Properties {
private String endpoint; private String endpoint;
private String bucketName;
private String signingRegion; private String signingRegion;
private String accessKey = "";
private String secretKey = "";
private Protocol clientProtocol; private Protocol clientProtocol;
private Integer clientMaxErrorRetry; private Integer clientMaxErrorRetry;
private String bucketName; private String signerOverride;
private String accessKey;
private String secretKey;
} }

View File

@ -0,0 +1,20 @@
package com.rbkmoney.file.storage.configuration.properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Getter
@Setter
@Component
@ConfigurationProperties("s3-sdk-v2")
public class S3SdkV2Properties {
private String endpoint;
private String bucketName;
private String region;
private String accessKey;
private String secretKey;
}

View File

@ -14,6 +14,7 @@ import com.rbkmoney.woody.api.flow.error.WUndefinedResultException;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.thrift.TException; import org.apache.thrift.TException;
import org.springframework.stereotype.Service;
import java.net.URL; import java.net.URL;
import java.time.Instant; import java.time.Instant;
@ -21,6 +22,7 @@ import java.util.Map;
import static com.rbkmoney.file.storage.util.CheckerUtil.checkString; import static com.rbkmoney.file.storage.util.CheckerUtil.checkString;
@Service
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
public class FileStorageHandler implements FileStorageSrv.Iface { public class FileStorageHandler implements FileStorageSrv.Iface {

View File

@ -8,7 +8,7 @@ import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.Upload; import com.amazonaws.services.s3.transfer.Upload;
import com.rbkmoney.file.storage.FileData; import com.rbkmoney.file.storage.FileData;
import com.rbkmoney.file.storage.NewFileResult; import com.rbkmoney.file.storage.NewFileResult;
import com.rbkmoney.file.storage.configuration.properties.StorageProperties; import com.rbkmoney.file.storage.configuration.properties.S3Properties;
import com.rbkmoney.file.storage.msgpack.Value; import com.rbkmoney.file.storage.msgpack.Value;
import com.rbkmoney.file.storage.service.exception.ExtractMetadataException; import com.rbkmoney.file.storage.service.exception.ExtractMetadataException;
import com.rbkmoney.file.storage.service.exception.FileNotFoundException; import com.rbkmoney.file.storage.service.exception.FileNotFoundException;
@ -19,6 +19,7 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.ToString; import lombok.ToString;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -34,9 +35,10 @@ import java.util.stream.Collectors;
import static java.lang.String.format; import static java.lang.String.format;
@Service @Service
@ConditionalOnProperty(value = "s3-sdk-v2.enabled", havingValue = "false")
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
public class AmazonS3StorageService implements StorageService { public class S3Service implements StorageService {
private static final String FILE_DATA_ID = "x-rbkmoney-file-data-id"; private static final String FILE_DATA_ID = "x-rbkmoney-file-data-id";
private static final String FILE_ID = "x-rbkmoney-file-id"; private static final String FILE_ID = "x-rbkmoney-file-id";
@ -46,12 +48,12 @@ public class AmazonS3StorageService implements StorageService {
private final TransferManager transferManager; private final TransferManager transferManager;
private final AmazonS3 s3Client; private final AmazonS3 s3Client;
private final StorageProperties storageProperties; private final S3Properties s3Properties;
private String bucketName; private String bucketName;
@PostConstruct @PostConstruct
public void init() { public void init() {
this.bucketName = storageProperties.getBucketName(); this.bucketName = s3Properties.getBucketName();
bucketInit(); bucketInit();
} }

View File

@ -0,0 +1,404 @@
package com.rbkmoney.file.storage.service;
import com.rbkmoney.file.storage.FileData;
import com.rbkmoney.file.storage.NewFileResult;
import com.rbkmoney.file.storage.configuration.properties.S3SdkV2Properties;
import com.rbkmoney.file.storage.msgpack.Value;
import com.rbkmoney.file.storage.service.exception.FileNotFoundException;
import com.rbkmoney.file.storage.service.exception.StorageException;
import com.rbkmoney.file.storage.util.DamselUtil;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;
import software.amazon.awssdk.services.s3.presigner.S3Presigner;
import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest;
import software.amazon.awssdk.services.s3.presigner.model.PutObjectPresignRequest;
import javax.annotation.PostConstruct;
import java.net.URL;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@Service
@ConditionalOnProperty(value = "s3-sdk-v2.enabled", havingValue = "true")
@Slf4j
@RequiredArgsConstructor
public class S3V2Service implements StorageService {
private static final String FILE_ID = "x-rbkmoney-file-id";
private static final String CREATED_AT = "x-rbkmoney-created-at";
private static final String METADATA = "x-rbkmoney-metadata-";
private static final String FILENAME_PARAM = "filename=";
private final S3SdkV2Properties s3SdkV2Properties;
private final S3Client s3SdkV2Client;
private final S3Presigner s3Presigner;
@PostConstruct
public void init() {
if (!doesBucketExist()) {
createBucket();
enableBucketVersioning();
}
}
@Override
public NewFileResult createNewFile(Map<String, Value> metadata, Instant expirationTime) {
var fileId = UUID.randomUUID().toString();
uploadFileMetadata(metadata, fileId);
var url = presignUploadUrl(expirationTime, fileId);
return new NewFileResult(fileId, url.toString());
}
@Override
public URL generateDownloadUrl(String fileId, Instant expirationTime) {
var versions = getObjectVersions(fileId);
checkFileExist(fileId, versions);
var fileVersionId = getFileVersionId(fileId, versions);
var presignRequest = GetObjectPresignRequest.builder()
.signatureDuration(Duration.between(Instant.now(), expirationTime))
.getObjectRequest(GetObjectRequest.builder()
.bucket(s3SdkV2Properties.getBucketName())
.key(fileId)
.versionId(fileVersionId)
.build())
.build();
var presignedRequest = s3Presigner.presignGetObject(presignRequest);
log.info("Download url was presigned, fileId={}, bucketName={}, isBrowserExecutable={}",
fileId, s3SdkV2Properties.getBucketName(), presignedRequest.isBrowserExecutable());
log.debug("Presigned http request={}", presignedRequest.httpRequest().toString());
return presignedRequest.url();
}
@Override
public FileData getFileData(String fileId) {
var versions = getObjectVersions(fileId);
checkFileExist(fileId, versions);
var fileMetadataVersionId = getFileMetadataVersionId(fileId, versions);
var fileMetadata = getFileMetadata(fileId, fileMetadataVersionId);
var fileVersionId = getFileVersionId(fileId, versions);
var fileName = getFileName(fileId, fileVersionId);
return new FileData(
fileMetadata.getFileId(),
fileName,
fileMetadata.getCreatedAt(),
fileMetadata.getMetadata());
}
// единственный доступный вариант проверки существования бакета на данный момент через catch
// в репе сдк висит таска https://github.com/aws/aws-sdk-java-v2/issues/392#issuecomment-880224831
// в первой версии сдк тоже через catch проверка на существование
// разница только в том, что проверка идет через метод S3Client#getBucketAcl
// во второй версии тоже есть этот метод, не уверен в чем разница с выбранным вариантом,
// но везде советуют его
private boolean doesBucketExist() {
try {
var request = HeadBucketRequest.builder()
.bucket(s3SdkV2Properties.getBucketName())
.build();
var headBucketResponse = s3SdkV2Client.headBucket(request);
var response = headBucketResponse.sdkHttpResponse();
log.info(String.format("Check exist bucket result %d:%s",
response.statusCode(), response.statusText()));
if (response.isSuccessful()) {
log.info("Bucket is exist, bucketName={}", s3SdkV2Properties.getBucketName());
} else {
throw new StorageException(String.format(
"Failed to check bucket on exist, bucketName=%s", s3SdkV2Properties.getBucketName()));
}
return true;
} catch (NoSuchBucketException ex) {
log.info("Bucket does not exist, bucketName={}", s3SdkV2Properties.getBucketName());
return false;
} catch (S3Exception ex) {
throw new StorageException(
String.format("Failed to check bucket on exist, bucketName=%s", s3SdkV2Properties.getBucketName()),
ex);
}
}
private void createBucket() {
try {
var s3Waiter = s3SdkV2Client.waiter();
var createBucketRequest = CreateBucketRequest.builder()
.bucket(s3SdkV2Properties.getBucketName())
.build();
s3SdkV2Client.createBucket(createBucketRequest);
var headBucketRequest = HeadBucketRequest.builder()
.bucket(s3SdkV2Properties.getBucketName())
.build();
// Wait until the bucket is created and print out the response.
s3Waiter.waitUntilBucketExists(headBucketRequest)
.matched()
.response()
.ifPresent(headBucketResponse -> {
var response = headBucketResponse.sdkHttpResponse();
log.info(String.format("Check created bucket result %d:%s",
response.statusCode(), response.statusText()));
if (response.isSuccessful()) {
log.info("Bucket has been created, bucketName={}", s3SdkV2Properties.getBucketName());
} else {
throw new StorageException(String.format(
"Failed to create bucket, bucketName=%s", s3SdkV2Properties.getBucketName()));
}
});
} catch (S3Exception ex) {
throw new StorageException(
String.format("Failed to create bucket, bucketName=%s", s3SdkV2Properties.getBucketName()),
ex);
}
}
private void enableBucketVersioning() {
try {
var request = PutBucketVersioningRequest.builder()
.bucket(s3SdkV2Properties.getBucketName())
.versioningConfiguration(VersioningConfiguration.builder()
.status(BucketVersioningStatus.ENABLED)
.build())
.build();
var putBucketVersioningResponse = s3SdkV2Client.putBucketVersioning(request);
var response = putBucketVersioningResponse.sdkHttpResponse();
log.info(String.format("Check enable versioning bucket result %d:%s",
response.statusCode(), response.statusText()));
if (response.isSuccessful()) {
log.info("Versioning bucket has been enabled, bucketName={}", s3SdkV2Properties.getBucketName());
} else {
throw new StorageException(String.format(
"Failed to enable bucket versioning, bucketName=%s", s3SdkV2Properties.getBucketName()));
}
} catch (S3Exception ex) {
throw new StorageException(
String.format("Failed to enable bucket versioning, " +
"bucketName=%s", s3SdkV2Properties.getBucketName()),
ex);
}
}
private void uploadFileMetadata(Map<String, Value> metadata, String fileId) {
try {
var s3Metadata = new HashMap<String, String>();
s3Metadata.put(FILE_ID, fileId);
s3Metadata.put(CREATED_AT, Instant.now().toString());
metadata.forEach((key, value) -> s3Metadata.put(METADATA + key, DamselUtil.toJsonString(value)));
var request = PutObjectRequest.builder()
.bucket(s3SdkV2Properties.getBucketName())
.key(fileId)
.metadata(s3Metadata)
.build();
var putObjectResponse = s3SdkV2Client.putObject(request, RequestBody.empty());
var response = putObjectResponse.sdkHttpResponse();
log.info(String.format("Check upload object version with file metadata result %d:%s",
response.statusCode(), response.statusText()));
if (response.isSuccessful()) {
log.info("Object version with file metadata was uploaded, fileId={}, bucketName={}",
fileId, s3SdkV2Properties.getBucketName());
} else {
throw new StorageException(String.format(
"Failed to upload object version with file metadata, fileId=%s, bucketName=%s",
fileId, s3SdkV2Properties.getBucketName()));
}
} catch (S3Exception ex) {
throw new StorageException(
String.format("Failed to upload object version with file metadata, fileId=%s, bucketName=%s",
fileId, s3SdkV2Properties.getBucketName()),
ex);
}
}
private URL presignUploadUrl(Instant expirationTime, String fileId) {
var presignRequest = PutObjectPresignRequest.builder()
.signatureDuration(Duration.between(Instant.now(), expirationTime))
.putObjectRequest(PutObjectRequest.builder()
.bucket(s3SdkV2Properties.getBucketName())
.key(fileId)
.build())
.build();
var presignedRequest = s3Presigner.presignPutObject(presignRequest);
log.info("Upload url was presigned, fileId={}, bucketName={}", fileId, s3SdkV2Properties.getBucketName());
log.debug("Presigned http request={}", presignedRequest.httpRequest().toString());
return presignedRequest.url();
}
private List<ObjectVersion> getObjectVersions(String fileId) {
try {
var request = ListObjectVersionsRequest.builder()
.bucket(s3SdkV2Properties.getBucketName())
.prefix(fileId)
.build();
var listObjectVersionsResponse = s3SdkV2Client.listObjectVersions(request);
var response = listObjectVersionsResponse.sdkHttpResponse();
log.info(String.format("Check list object versions result %d:%s",
response.statusCode(), response.statusText()));
if (response.isSuccessful()) {
log.info("List object versions has been got, fileId={}, bucketName={}",
fileId, s3SdkV2Properties.getBucketName());
return listObjectVersionsResponse.versions();
} else {
throw new StorageException(String.format(
"Failed to get list object versions, fileId=%s, bucketName=%s",
fileId, s3SdkV2Properties.getBucketName()));
}
} catch (S3Exception ex) {
throw new StorageException(
String.format(
"Failed to get list object versions, fileId=%s, bucketName=%s",
fileId, s3SdkV2Properties.getBucketName()),
ex);
}
}
private void checkFileExist(String fileId, List<ObjectVersion> versions) {
if (!doesFileExist(versions)) {
throw new FileNotFoundException(String.format(
"Failed to check object version with file on exist, fileId=%s, bucketName=%s",
fileId, s3SdkV2Properties.getBucketName()));
}
}
private Boolean doesFileExist(List<ObjectVersion> versions) {
// должно быть 2 ревизии это метаданные, 2ая это сам загруженный файл
return versions.size() == 2;
// && versions.stream()
// .filter(v -> v.size() > 0)
// .map(v -> true)
// .findFirst()
// .orElse(false);
}
private String getFileMetadataVersionId(String fileId, List<ObjectVersion> versions) {
return versions.stream()
.filter(Predicate.not(ObjectVersion::isLatest))
.findFirst()
.orElseThrow(() -> new StorageException(String.format(
"Object version with file metadata not found, fileId=%s, bucketId=%s",
fileId, s3SdkV2Properties.getBucketName())))
.versionId();
}
private FileMetadata getFileMetadata(String fileId, String fileMetadataVersionId) {
try {
var request = GetObjectRequest.builder()
.bucket(s3SdkV2Properties.getBucketName())
.key(fileId)
.versionId(fileMetadataVersionId)
.build();
return s3SdkV2Client.getObject(
request,
(getObjectResponse, inputStream) -> {
var response = getObjectResponse.sdkHttpResponse();
log.info(String.format("Check get object result %d:%s",
response.statusCode(), response.statusText()));
if (response.isSuccessful()) {
log.info("Object version with file metadata has been got, " +
"fileId={}, fileMetadataVersionId={}, bucketName={}",
fileId, fileMetadataVersionId, s3SdkV2Properties.getBucketName());
if (getObjectResponse.hasMetadata() && !getObjectResponse.metadata().isEmpty()) {
var s3Metadata = getObjectResponse.metadata();
var metadata = s3Metadata.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(METADATA)
&& entry.getValue() != null)
.collect(Collectors.toMap(
o -> o.getKey().substring(METADATA.length()),
o -> DamselUtil.fromJson(o.getValue(), Value.class)));
return new FileMetadata(fileId, s3Metadata.get(CREATED_AT), metadata);
} else {
throw new StorageException(String.format(
"Object version with file metadata is empty, " +
"fileId=%s, fileMetadataVersionId=%s, bucketId=%s",
fileId, fileMetadataVersionId, s3SdkV2Properties.getBucketName()));
}
} else {
throw new StorageException(String.format(
"Failed to get object version with file metadata," +
" fileId=%s, fileMetadataVersionId=%s, bucketName=%s",
fileId, fileMetadataVersionId, s3SdkV2Properties.getBucketName()));
}
});
} catch (S3Exception ex) {
throw new StorageException(
String.format(
"Failed to get object version with file metadata, " +
"fileId=%s, fileMetadataVersionId=%s, bucketName=%s",
fileId, fileMetadataVersionId, s3SdkV2Properties.getBucketName()),
ex);
}
}
private String getFileVersionId(String fileId, List<ObjectVersion> versions) {
return versions.stream()
.filter(ObjectVersion::isLatest)
.findFirst()
.orElseThrow(() -> new StorageException(String.format(
"Object version with file not found, fileId=%s, bucketId=%s",
fileId, s3SdkV2Properties.getBucketName())))
.versionId();
}
private String getFileName(String fileId, String fileVersionId) {
try {
var request = GetObjectRequest.builder()
.bucket(s3SdkV2Properties.getBucketName())
.key(fileId)
.versionId(fileVersionId)
.build();
return s3SdkV2Client.getObject(
request,
(getObjectResponse, inputStream) -> {
var response = getObjectResponse.sdkHttpResponse();
log.info(String.format("Check get object result %d:%s",
response.statusCode(), response.statusText()));
if (response.isSuccessful()) {
log.info("Object version with file has been got, " +
"fileId={}, fileVersionId={}, bucketName={}",
fileId, fileVersionId, s3SdkV2Properties.getBucketName());
return Optional.ofNullable(getObjectResponse.contentDisposition())
.map(this::extractFileName)
.or(() -> response.firstMatchingHeader("Content-Disposition")
.map(this::extractFileName))
.orElseThrow(() -> new StorageException(String.format(
"Header 'Content-Disposition' in object version with file is empty, " +
"fileId=%s, fileVersionId=%s, bucketId=%s",
fileId, fileVersionId, s3SdkV2Properties.getBucketName())));
} else {
throw new StorageException(String.format(
"Failed to get object version with file, " +
"fileId=%s, fileVersionId=%s, bucketName=%s",
fileId, fileVersionId, s3SdkV2Properties.getBucketName()));
}
});
} catch (S3Exception ex) {
throw new StorageException(
String.format(
"Failed to get object version with file, " +
"fileId=%s, fileVersionId=%s, bucketName=%s",
fileId, fileVersionId, s3SdkV2Properties.getBucketName()),
ex);
}
}
private String extractFileName(String contentDisposition) {
int fileNameIndex = contentDisposition.lastIndexOf(FILENAME_PARAM) + FILENAME_PARAM.length();
return contentDisposition.substring(fileNameIndex).replaceAll("\"", "");
}
@RequiredArgsConstructor
@Getter
@ToString
private static class FileMetadata {
private final String fileId;
private final String createdAt;
private final Map<String, Value> metadata;
}
}

View File

@ -1,4 +1,4 @@
package com.rbkmoney.file.storage.resource; package com.rbkmoney.file.storage.servlet;
import com.rbkmoney.file.storage.FileStorageSrv; import com.rbkmoney.file.storage.FileStorageSrv;
import com.rbkmoney.woody.thrift.impl.http.THServiceBuilder; import com.rbkmoney.woody.thrift.impl.http.THServiceBuilder;
@ -12,15 +12,15 @@ import java.io.IOException;
@RequiredArgsConstructor @RequiredArgsConstructor
public class FileStorageServlet extends GenericServlet { public class FileStorageServlet extends GenericServlet {
private Servlet thriftServlet; private final FileStorageSrv.Iface fileStorageHandler;
private final FileStorageSrv.Iface requestHandler; private Servlet thriftServlet;
@Override @Override
public void init(ServletConfig config) throws ServletException { public void init(ServletConfig config) throws ServletException {
super.init(config); super.init(config);
thriftServlet = new THServiceBuilder() thriftServlet = new THServiceBuilder()
.build(FileStorageSrv.Iface.class, requestHandler); .build(FileStorageSrv.Iface.class, fileStorageHandler);
} }
@Override @Override

View File

@ -36,19 +36,29 @@ spring:
ansi: ansi:
enabled: always enabled: always
storage: s3:
endpoint: localhost:32827 endpoint: 'http://127.0.0.1:32827'
bucketName: files bucket-name: 'files'
signingRegion: RU signing-region: 'RU'
clientProtocol: HTTP client-protocol: 'http'
clientMaxErrorRetry: 10 client-max-error-retry: 10
signer-override: 'S3SignerType'
# signer-override: 'AWSS3V4SignerType'
access-key: 'test'
secret-key: 'test'
s3-sdk-v2:
enabled: 'false'
endpoint: 'http://127.0.0.1:9000'
bucket-name: 'files-v2'
region: 'RU'
access-key: 'test'
secret-key: 'test'
testcontainers: testcontainers:
ceph: ceph:
tag: 'v3.0.5-stable-3.0-luminous-centos-7'
accessKey: 'test' accessKey: 'test'
secretKey: 'test' secretKey: 'test'
minio: minio:
tag: 'latest' user: 'minio'
user: 'user' password: 'minio123'
password: 'password'

View File

@ -8,7 +8,6 @@ import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.FileEntity; import org.apache.http.entity.FileEntity;
import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.thrift.TException; import org.apache.thrift.TException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -26,10 +25,7 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.util.ArrayList; import java.util.*;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -41,11 +37,11 @@ import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@TestPropertySource("classpath:application.yml") @TestPropertySource("classpath:application.yml")
@DirtiesContext @DirtiesContext
public abstract class AbstractFileStorageTest { public abstract class FileStorageTest {
private static final int TIMEOUT = 555000; private static final int TIMEOUT = 555000;
private static final String FILE_DATA = "test"; private static final String FILE_DATA = "test";
private static final String FILE_NAME = "rainbow-champion"; private static final String FILE_NAME = "asd123.asd";
protected FileStorageSrv.Iface fileStorageClient; protected FileStorageSrv.Iface fileStorageClient;
@ -63,9 +59,11 @@ public abstract class AbstractFileStorageTest {
@Test @Test
public void fileUploadWithHttpClientBuilderTest() throws IOException, URISyntaxException, TException { public void fileUploadWithHttpClientBuilderTest() throws IOException, URISyntaxException, TException {
String expirationTime = generateCurrentTimePlusDay().toString(); String expirationTime = generateCurrentTimePlusDay().toString();
HttpClient httpClient = HttpClientBuilder.create().build(); Map<String, com.rbkmoney.file.storage.msgpack.Value> metadata = new HashMap<>();
metadata.put("author", com.rbkmoney.file.storage.msgpack.Value.str("Mary Doe"));
metadata.put("version", com.rbkmoney.file.storage.msgpack.Value.str("1.0.0.0"));
NewFileResult fileResult = fileStorageClient.createNewFile(Collections.emptyMap(), expirationTime); NewFileResult fileResult = fileStorageClient.createNewFile(metadata, expirationTime);
Path path = getFileFromResources(); Path path = getFileFromResources();
@ -75,8 +73,9 @@ public abstract class AbstractFileStorageTest {
"attachment;filename=" + URLEncoder.encode(FILE_NAME, StandardCharsets.UTF_8.name())); "attachment;filename=" + URLEncoder.encode(FILE_NAME, StandardCharsets.UTF_8.name()));
requestPut.setEntity(new FileEntity(path.toFile())); requestPut.setEntity(new FileEntity(path.toFile()));
HttpClient httpClient = HttpClientBuilder.create().build();
HttpResponse response = httpClient.execute(requestPut); HttpResponse response = httpClient.execute(requestPut);
Assertions.assertEquals(response.getStatusLine().getStatusCode(), org.apache.http.HttpStatus.SC_OK); assertEquals(response.getStatusLine().getStatusCode(), org.apache.http.HttpStatus.SC_OK);
// генерация url с доступом только для загрузки // генерация url с доступом только для загрузки
String downloadUrl = fileStorageClient.generateDownloadUrl(fileResult.getFileDataId(), expirationTime); String downloadUrl = fileStorageClient.generateDownloadUrl(fileResult.getFileDataId(), expirationTime);
@ -95,7 +94,10 @@ public abstract class AbstractFileStorageTest {
try { try {
// создание нового файла // создание нового файла
String expirationTime = generateCurrentTimePlusDay().toString(); String expirationTime = generateCurrentTimePlusDay().toString();
NewFileResult fileResult = fileStorageClient.createNewFile(Collections.emptyMap(), expirationTime); Map<String, com.rbkmoney.file.storage.msgpack.Value> metadata = new HashMap<>();
metadata.put("author", com.rbkmoney.file.storage.msgpack.Value.str("Mary Doe"));
metadata.put("version", com.rbkmoney.file.storage.msgpack.Value.str("1.0.0.0"));
NewFileResult fileResult = fileStorageClient.createNewFile(metadata, expirationTime);
uploadTestData(fileResult, FILE_NAME, FILE_DATA); uploadTestData(fileResult, FILE_NAME, FILE_DATA);
// генерация url с доступом только для загрузки // генерация url с доступом только для загрузки

View File

@ -1,7 +0,0 @@
package com.rbkmoney.file.storage;
import com.rbkmoney.testcontainers.annotations.ceph.CephTestcontainer;
@CephTestcontainer
public class WithCeph extends AbstractFileStorageTest {
}

View File

@ -1,7 +0,0 @@
package com.rbkmoney.file.storage;
import com.rbkmoney.testcontainers.annotations.minio.MinioTestcontainer;
@MinioTestcontainer
public class WithMinio extends AbstractFileStorageTest {
}

View File

@ -0,0 +1,10 @@
package com.rbkmoney.file.storage.awssdks3v2;
import com.rbkmoney.file.storage.FileStorageTest;
import com.rbkmoney.testcontainers.annotations.ceph.CephTestcontainerSingleton;
@CephTestcontainerSingleton(
properties = {"s3-sdk-v2.enabled=true", "s3-sdk-v2.region=us-east-1"},
bucketName = "awssdks3v2")
public class WithCeph extends FileStorageTest {
}

View File

@ -0,0 +1,10 @@
package com.rbkmoney.file.storage.awssdks3v2;
import com.rbkmoney.file.storage.FileStorageTest;
import com.rbkmoney.testcontainers.annotations.minio.MinioTestcontainerSingleton;
@MinioTestcontainerSingleton(
properties = "s3-sdk-v2.enabled=true",
bucketName = "awssdks3v2")
public class WithMinio extends FileStorageTest {
}

View File

@ -0,0 +1,8 @@
package com.rbkmoney.file.storage.s3signer;
import com.rbkmoney.file.storage.FileStorageTest;
import com.rbkmoney.testcontainers.annotations.ceph.CephTestcontainerSingleton;
@CephTestcontainerSingleton(bucketName = "s3signer")
public class WithCeph extends FileStorageTest {
}

View File

@ -0,0 +1,8 @@
package com.rbkmoney.file.storage.s3signer;
import com.rbkmoney.file.storage.FileStorageTest;
import com.rbkmoney.testcontainers.annotations.minio.MinioTestcontainerSingleton;
@MinioTestcontainerSingleton(bucketName = "s3signer")
public class WithMinio extends FileStorageTest {
}

View File

@ -0,0 +1,10 @@
package com.rbkmoney.file.storage.s3v4signer;
import com.rbkmoney.file.storage.FileStorageTest;
import com.rbkmoney.testcontainers.annotations.ceph.CephTestcontainerSingleton;
@CephTestcontainerSingleton(
properties = "s3.signer-override=AWSS3V4SignerType",
bucketName = "s3v4signer")
public class WithCeph extends FileStorageTest {
}

View File

@ -0,0 +1,10 @@
package com.rbkmoney.file.storage.s3v4signer;
import com.rbkmoney.file.storage.FileStorageTest;
import com.rbkmoney.testcontainers.annotations.minio.MinioTestcontainerSingleton;
@MinioTestcontainerSingleton(
properties = "s3.signer-override=AWSS3V4SignerType",
bucketName = "s3v4signer")
public class WithMinio extends FileStorageTest {
}