add multipart upload (#19)

Co-authored-by: ggmaleva <ggmaleva@yandex.ru>
This commit is contained in:
Gregory 2024-01-18 12:31:06 +03:00 committed by GitHub
parent 68039786e1
commit 9c8824bc81
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 27439 additions and 13 deletions

View File

@ -30,7 +30,7 @@
<dependency>
<groupId>dev.vality</groupId>
<artifactId>file-storage-proto</artifactId>
<version>1.45-321e0e8</version>
<version>1.46-442b17d</version>
</dependency>
<dependency>
<groupId>dev.vality</groupId>

View File

@ -1,9 +1,6 @@
package dev.vality.file.storage.handler;
import dev.vality.file.storage.FileData;
import dev.vality.file.storage.FileNotFound;
import dev.vality.file.storage.FileStorageSrv;
import dev.vality.file.storage.NewFileResult;
import dev.vality.file.storage.*;
import dev.vality.file.storage.service.StorageService;
import dev.vality.file.storage.service.exception.FileNotFoundException;
import dev.vality.file.storage.util.CheckerUtil;
@ -54,6 +51,35 @@ public class FileStorageHandler implements FileStorageSrv.Iface {
}
}
@Override
public CreateMultipartUploadResult createMultipartUpload(Map<String, Value> metadata) {
log.info("Receive request for create multipart upload with metadata={}", metadata);
CreateMultipartUploadResult result = storageService.createMultipartUpload(metadata);
log.info("Successfully create multipart upload, fileId={}, uploadId={}",
result.getFileDataId(), result.getMultipartUploadId());
return result;
}
@Override
public UploadMultipartResult uploadMultipart(UploadMultipartRequestData request) {
log.debug("Receive request for upload file part, fileId={}, uploadId={}, sequencePart={}",
request.getFileDataId(), request.getMultipartUploadId(), request.getSequencePart());
UploadMultipartResult result = storageService.uploadMultipart(request);
log.debug("Successfully upload file part, fileId={}, uploadId={}, partId={}",
request.getFileDataId(), request.getMultipartUploadId(), result.getPartId());
return result;
}
@Override
public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest request) {
log.info("Receive request for complete multipart upload, fileId={}, uploadId={}",
request.getFileDataId(), request.getMultipartUploadId());
CompleteMultipartUploadResult result = storageService.completeMultipartUpload(request);
log.info("Successfully complete multipart upload, fileId={}, url={}",
request.getFileDataId(), result.getUploadUrl());
return result;
}
private FileNotFound fileNotFound(FileNotFoundException e) {
log.warn("File not found", e);
return new FileNotFound();

View File

@ -6,8 +6,9 @@ import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.*;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.Upload;
import dev.vality.file.storage.FileData;
import dev.vality.file.storage.NewFileResult;
import dev.vality.file.storage.*;
import dev.vality.file.storage.CompleteMultipartUploadRequest;
import dev.vality.file.storage.CompleteMultipartUploadResult;
import dev.vality.file.storage.configuration.properties.S3Properties;
import dev.vality.file.storage.service.exception.ExtractMetadataException;
import dev.vality.file.storage.service.exception.FileNotFoundException;
@ -112,6 +113,21 @@ public class S3Service implements StorageService {
return new FileData(fileDto.getFileDataId(), fileName, fileDto.getCreatedAt(), fileDto.getMetadata());
}
@Override
public CreateMultipartUploadResult createMultipartUpload(Map<String, Value> metadata) {
throw new UnsupportedOperationException("Method not supported");
}
@Override
public UploadMultipartResult uploadMultipart(UploadMultipartRequestData requestData) {
throw new UnsupportedOperationException("Method not supported");
}
@Override
public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest request) {
throw new UnsupportedOperationException("Method not supported");
}
@PreDestroy
public void terminate() {
transferManager.shutdownNow(true);

View File

@ -1,7 +1,7 @@
package dev.vality.file.storage.service;
import dev.vality.file.storage.FileData;
import dev.vality.file.storage.NewFileResult;
import dev.vality.file.storage.*;
import dev.vality.file.storage.CompleteMultipartUploadRequest;
import dev.vality.file.storage.configuration.properties.S3SdkV2Properties;
import dev.vality.file.storage.service.exception.FileNotFoundException;
import dev.vality.file.storage.service.exception.StorageException;
@ -12,6 +12,9 @@ import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;
@ -398,6 +401,135 @@ public class S3V2Service implements StorageService {
return contentDisposition.substring(fileNameIndex).replaceAll("\"", "");
}
@Override
public CreateMultipartUploadResult createMultipartUpload(Map<String, Value> metadata) {
var fileId = UUID.randomUUID().toString();
try {
var s3Metadata = new HashMap<String, String>();
s3Metadata.put(FILE_ID, fileId);
s3Metadata.put(CREATED_AT, Instant.now().toString());
metadata.forEach((key, value) -> s3Metadata.put(METADATA + key, DamselUtil.toJsonString(value)));
var createRequest = CreateMultipartUploadRequest.builder()
.bucket(s3SdkV2Properties.getBucketName())
.key(fileId)
.metadata(s3Metadata)
.build();
CreateMultipartUploadResponse createResponse = s3SdkV2Client.createMultipartUpload(createRequest);
var response = createResponse.sdkHttpResponse();
log.info("Check create multipart upload object with file metadata result {}:{}",
response.statusCode(), response.statusText());
if (response.isSuccessful()) {
log.info("Multipart upload was created, fileId={}, bucketName={}, uploadId={}",
fileId, s3SdkV2Properties.getBucketName(), createResponse.uploadId());
} else {
throw new StorageException(String.format(
"Failed to create multipart upload, fileId=%s, bucketName=%s",
fileId, s3SdkV2Properties.getBucketName()));
}
return new CreateMultipartUploadResult()
.setFileDataId(fileId)
.setMultipartUploadId(createResponse.uploadId());
} catch (SdkException ex) {
throw new StorageException(
String.format("Failed to create multipart upload, fileId=%s, bucketName=%s",
fileId, s3SdkV2Properties.getBucketName()),
ex);
}
}
@Override
public UploadMultipartResult uploadMultipart(UploadMultipartRequestData requestData) {
String fileId = requestData.getFileDataId();
String multipartUploadId = requestData.getMultipartUploadId();
try {
var uploadPartRequest = UploadPartRequest.builder()
.bucket(s3SdkV2Properties.getBucketName())
.key(fileId)
.uploadId(multipartUploadId)
.partNumber(requestData.getSequencePart())
.contentLength((long) requestData.getContentLength())
.build();
RequestBody requestBody = RequestBody.fromBytes(requestData.getContent());
UploadPartResponse uploadPartResponse = s3SdkV2Client.uploadPart(uploadPartRequest, requestBody);
var response = uploadPartResponse.sdkHttpResponse();
log.info("Check file part upload result {}:{}",
response.statusCode(), response.statusText());
if (response.isSuccessful()) {
log.info("File part was uploaded, fileId={}, bucketName={}, uploadId={}, partId={}",
fileId, s3SdkV2Properties.getBucketName(), multipartUploadId, uploadPartResponse.eTag());
} else {
throw new StorageException(String.format(
"Failed to upload file part, fileId=%s, bucketName=%s, uploadId=%s",
fileId, s3SdkV2Properties.getBucketName(), multipartUploadId));
}
return new UploadMultipartResult()
.setPartId(uploadPartResponse.eTag())
.setSequencePart(requestData.getSequencePart());
} catch (SdkException ex) {
throw new StorageException(
String.format("Failed to upload file part, fileId=%s, bucketName=%s, uploadId=%s",
fileId, s3SdkV2Properties.getBucketName(), multipartUploadId),
ex);
}
}
@Override
public CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest request) {
String fileId = request.getFileDataId();
String multipartUploadId = request.getMultipartUploadId();
try {
var completeRequest = buildRequest(request, fileId, multipartUploadId);
CompleteMultipartUploadResponse completeResponse = s3SdkV2Client.completeMultipartUpload(completeRequest);
var response = completeResponse.sdkHttpResponse();
log.info("Check complete multipart upload result {}:{}",
response.statusCode(), response.statusText());
if (response.isSuccessful()) {
log.info("Multipart upload was completed, fileId={}, bucketName={}, uploadId={}",
fileId, s3SdkV2Properties.getBucketName(), multipartUploadId);
} else {
throw new StorageException(String.format(
"Failed to complete multipart upload, fileId=%s, bucketName=%s, uploadId=%s",
fileId, s3SdkV2Properties.getBucketName(), multipartUploadId));
}
String objectUrl = s3SdkV2Client.utilities().getUrl(GetUrlRequest.builder()
.bucket(s3SdkV2Properties.getBucketName())
.key(fileId)
.build())
.toExternalForm();
log.info("Create url for multipart uploaded file, url={}, fileId={}, bucketName={}, uploadId={}",
objectUrl, fileId, s3SdkV2Properties.getBucketName(), multipartUploadId);
return new CompleteMultipartUploadResult()
.setUploadUrl(objectUrl);
} catch (SdkException ex) {
throw new StorageException(
String.format("Failed to complete multipart upload, fileId=%s, bucketName=%s, uploadId=%s",
fileId, s3SdkV2Properties.getBucketName(), multipartUploadId),
ex);
}
}
private software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest buildRequest(
CompleteMultipartUploadRequest request,
String fileId,
String multipartUploadId) {
List<CompletedPart> completedParts = request.getCompletedParts().stream()
.map(completedMultipart -> CompletedPart.builder()
.eTag(completedMultipart.getPartId())
.partNumber(completedMultipart.getSequencePart())
.build())
.toList();
CompletedMultipartUpload completedUpload = CompletedMultipartUpload.builder()
.parts(completedParts)
.build();
return software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest.builder()
.bucket(s3SdkV2Properties.getBucketName())
.key(fileId)
.uploadId(multipartUploadId)
.multipartUpload(completedUpload)
.build();
}
@RequiredArgsConstructor
@Getter
@ToString

View File

@ -1,7 +1,6 @@
package dev.vality.file.storage.service;
import dev.vality.file.storage.FileData;
import dev.vality.file.storage.NewFileResult;
import dev.vality.file.storage.*;
import dev.vality.msgpack.Value;
import java.net.URL;
@ -16,4 +15,10 @@ public interface StorageService {
FileData getFileData(String fileDataId);
CreateMultipartUploadResult createMultipartUpload(Map<String, Value> metadata);
UploadMultipartResult uploadMultipart(UploadMultipartRequestData requestData);
CompleteMultipartUploadResult completeMultipartUpload(CompleteMultipartUploadRequest request);
}

View File

@ -19,7 +19,9 @@ import org.springframework.test.context.TestPropertySource;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@ -65,7 +67,7 @@ public abstract class FileStorageTest {
NewFileResult fileResult = fileStorageClient.createNewFile(metadata, expirationTime);
Path path = getFileFromResources();
Path path = getFileFromResources("respect");
HttpPut requestPut = new HttpPut(fileResult.getUploadUrl());
requestPut.setHeader(
@ -260,7 +262,7 @@ public abstract class FileStorageTest {
NewFileResult fileResult = fileStorageClient.createNewFile(Collections.emptyMap(), expirationTime);
Path path = getFileFromResources();
Path path = getFileFromResources("respect");
HttpPut requestPut = new HttpPut(fileResult.getUploadUrl());
requestPut.setHeader(
@ -318,6 +320,13 @@ public abstract class FileStorageTest {
return Paths.get(url.toURI());
}
private Path getFileFromResources(String name) throws URISyntaxException {
ClassLoader classLoader = this.getClass().getClassLoader();
URL url = Objects.requireNonNull(classLoader.getResource(name));
return Paths.get(url.toURI());
}
public static HttpURLConnection getHttpURLConnection(URL url, String method, boolean doOutput) throws IOException {
return getHttpURLConnection(url, method, null, doOutput);
}
@ -334,4 +343,53 @@ public abstract class FileStorageTest {
}
return connection;
}
@Test
public void multipartUploadTest() throws Exception {
CreateMultipartUploadResult createResult = fileStorageClient.createMultipartUpload(Collections.emptyMap());
assertNotNull(createResult.getFileDataId());
assertNotNull(createResult.getMultipartUploadId());
List<CompletedMultipart> completedParts = new ArrayList<>();
int partNumber = 1;
ByteBuffer buffer = ByteBuffer.allocate(5 * 1024 * 1024);
Path path = getFileFromResources("test_registry.csv");
try (RandomAccessFile file = new RandomAccessFile(path.toFile(), "r")) {
long fileSize = file.length();
long position = 0;
while (position < fileSize) {
file.seek(position);
int bytesRead = file.getChannel().read(buffer);
buffer.flip();
var requestData = new UploadMultipartRequestData()
.setFileDataId(createResult.getFileDataId())
.setMultipartUploadId(createResult.getMultipartUploadId())
.setContent(buffer)
.setContentLength(bytesRead)
.setSequencePart(partNumber);
UploadMultipartResult response = fileStorageClient.uploadMultipart(requestData);
completedParts.add(new CompletedMultipart()
.setSequencePart(partNumber)
.setPartId(response.getPartId()));
buffer.clear();
position += bytesRead;
partNumber++;
}
} catch (IOException e) {
e.printStackTrace();
}
var completeRequest = new CompleteMultipartUploadRequest()
.setMultipartUploadId(createResult.getMultipartUploadId())
.setFileDataId(createResult.getFileDataId())
.setCompletedParts(completedParts);
CompleteMultipartUploadResult result = fileStorageClient.completeMultipartUpload(completeRequest);
assertNotNull(result);
assertNotNull(result.getUploadUrl());
}
}

File diff suppressed because it is too large Load Diff