Skip to content

Client cut

Client cut allows for splitting the uploaded object into chunks on the client side, with the ability to pause and resume uploads, as well as control the size of the chunks.

There is currently no way to resume downloading in this implementation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func CreateObjectClientCut(ctx context.Context, p pool.Pool, cnrID cid.ID) error {
    attributes := make([]object.Attribute, 0)
    filename := object.NewAttribute()
    filename.SetKey(object.AttributeFileName)
    filename.SetValue("name.txt")

    attributes = append(attributes, *filename)

    obj := object.New()
    obj.SetOwnerID(owner)
    obj.SetContainerID(cnrID)
    obj.SetAttributes(attributes...)

    var prmPut pool.PrmObjectPut
    prmPut.SetHeader(*obj)
    prmPut.SetPayload(bytes.NewReader([]byte("testing")))
    prmPut.SetClientCut(true)
    objID, err := p.PutObject(ctx, prmPut)
}

Arguments:

  1. Execution context
  2. Method arguments, which includes:
    • Container ID
    • Payload
    • bufferMaxSize defines the size of the uploaded chunk. It is compared with the maxObjectSize parameter in the network settings, and the smaller value is selected
    • objectHeader the object header
    • isPayloadPositioned a parameter required for resuming uploads, indicating whether the data stream has been positioned correctly for resumption or not
    • listener
    • Additional headers (optional)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private static ObjectId resumableUpload(FrostFSClient client, ContainerId containerId,
                                        String filePath, int bufferMaxSize, CallContext callContext,
                                        boolean payloadPositionedByClient) {
    var attribute = new ObjectAttribute("Filename", Paths.get(filePath).getFileName().toString());
    var header = new ObjectHeader(containerId, ObjectType.REGULAR, attribute);

    var prmObjectClientCutPut = PrmObjectClientCutPut.builder()
            .objectHeader(header)
            .bufferMaxSize(bufferMaxSize)
            .isPayloadPositioned(payloadPositionedByClient)
            .listener((uploadProgressInfo, objectPartInfo) ->
                    System.out.printf("Uploaded: %d/%d%n",
                            uploadProgressInfo.getParts().size(),
                            uploadProgressInfo.getTotalParts()))
            .build();
    try (UnstableFileInputStream file = new UnstableFileInputStream(filePath, FAIL_AFTER_READ_BYTES)) {
        prmObjectClientCutPut.setPayload(file);
        return client.putClientCutObject(prmObjectClientCutPut, callContext);
    } catch (IOException | FrostFSException e) {
        try (FileInputStream file = new FileInputStream(filePath)) {
            if (payloadPositionedByClient) {
                var progressInfo = prmObjectClientCutPut.getUploadProgressInfo();
                var lastPart = progressInfo.getParts().get(progressInfo.getParts().size() - 1);
                long offset = lastPart.getOffset() + lastPart.getLength();
                long skippedBytes = file.skip(offset);
                if (skippedBytes != offset) {
                    throw new RuntimeException("Skipped less bytes than expected");
                }
            }
            prmObjectClientCutPut.setPayload(file);
            return client.putClientCutObject(prmObjectClientCutPut, callContext);
        } catch (IOException ex) {
            throw new RuntimeException("Failed to upload object after retry", ex);
        }
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public static async Task<FrostFsObjectId> CreateObjectClientCut(
    IFrostFSClient client, 
    FrostFsContainerId containerId, 
    byte[] bytes)
{
    var header = new FrostFsObjectHeader(
                    containerId: containerId,
                    type: FrostFsObjectType.Regular,
                    [new FrostFsAttributePair("fileName", "test")]);

    var param = new PrmObjectClientCutPut(header, payload: new MemoryStream(bytes));

    return await client.PutClientCutObjectAsync(param, default).ConfigureAwait(true);
}

public static async Task<FrostFsObjectId?> CreateObjectClientCut(
    IFrostFSClient client, 
    FrostFsContainerId containerId, 
    byte[] bytes)
{
    var header = new FrostFsObjectHeader(
        containerId: containerId,
        type: FrostFsObjectType.Regular,
        [new FrostFsAttributePair("fileName", "test")]);

    var splitId = Guid.NewGuid();
    var progress = new UploadProgressInfo(splitId);

    var param = new PrmObjectClientCutPut(
        header,
        payload: new MemoryStream(bytes),
        progress: progress);

    FrostFsObjectId? objectId = null;
    bool interrupted = false;

    do
    {
        try
        {
            interrupted = false;
            objectId = await GetClient().PutClientCutObjectAsync(param, default);
        }
        catch (FrostFsException ex)
        {
            interrupted = true;
        }
    }
    while (interrupted);

    return objectId;
}

Result:

  • Object ID