Skip to content

read-chunked

The read-chunked operation streams an InputStream into manageable chunks, enabling memory-efficient processing of large binary files.

XML Signature

xml
<chunking:read-chunked
    config-ref="Chunking_Config"
    chunkSize="1048576"
    content="#[payload]"/>

Parameters

ParameterTypeRequiredDefaultDescription
config-refStringYes-Reference to chunking configuration
chunkSizeIntegerNo65536Size of each chunk in bytes (64 KB default)
contentInputStreamNopayloadBinary content to chunk (defaults to message payload)

chunkSize Guidelines

Choose chunk size based on your use case:

Use CaseRecommended SizeReason
S3 Multipart Upload5 MB - 10 MBMeets S3 minimum part size (5 MB)
Network Transfer1 MB - 5 MBBalances memory and throughput
Checksum Calculation64 KB - 1 MBResponsive progress tracking
Memory-Constrained10 KB - 64 KBMinimizes memory footprint

WARNING

Chunk size directly determines memory usage. A 10 MB chunk size means ~10 MB RAM per concurrent flow execution.

Output

Returns a stream of chunk objects. The stream can be used with <foreach>, filtered with DataWeave, or transformed and passed to other components.

Chunk Properties

Each chunk object in the <foreach> loop provides:

PropertyTypeDescription
payload.databyte[]Raw binary chunk data
payload.indexint0-based chunk number
payload.offsetlongStarting byte position in source stream
payload.lengthintNumber of bytes in this chunk
payload.isFirstbooleantrue for the first chunk
payload.isLastbooleantrue for the final chunk

Examples

Basic Usage

Stream a file into 1 MB chunks and log each one:

xml
<flow name="basic-chunking">
    <!-- Read file as InputStream -->
    <file:read path="/data/large-file.bin"/>

    <!-- Chunk into 1 MB pieces -->
    <chunking:read-chunked
        config-ref="Chunking_Config"
        chunkSize="1048576"/>

    <!-- Process each chunk -->
    <foreach>
        <logger level="INFO"
                message="Processing chunk #[payload.index]: #[payload.length] bytes at offset #[payload.offset]"/>
    </foreach>
</flow>

S3 Multipart Upload Pattern

Upload a large file to S3 using multipart upload (5 MB chunks):

xml
<flow name="s3-multipart-upload">
    <file:read path="/data/video.mp4"/>

    <!-- Initialize multipart upload -->
    <s3:create-multipart-upload
        bucket="my-bucket"
        key="uploads/video.mp4"/>
    <set-variable
        variableName="uploadId"
        value="#[payload.uploadId]"/>

    <!-- Chunk file into 5 MB parts (S3 minimum) -->
    <chunking:read-chunked
        config-ref="Chunking_Config"
        chunkSize="5242880"/>

    <!-- Upload each part -->
    <foreach>
        <s3:upload-part
            bucket="my-bucket"
            key="uploads/video.mp4"
            uploadId="#[vars.uploadId]"
            partNumber="#[payload.index + 1]"
            content="#[payload.data]"/>

        <logger message="Uploaded part #[payload.index + 1]"/>
    </foreach>

    <!-- Complete multipart upload -->
    <s3:complete-multipart-upload
        bucket="my-bucket"
        key="uploads/video.mp4"
        uploadId="#[vars.uploadId]"/>
</flow>

Progress Tracking

Track chunking progress with percentage calculation:

xml
<flow name="progress-tracking">
    <file:read path="/data/archive.zip"/>

    <!-- Get total file size -->
    <file:read
        path="/data/archive.zip"
        lock="true"
        outputMimeType="application/java"
        target="totalSize"
        targetValue="#[sizeOf(payload)]"/>

    <chunking:read-chunked
        config-ref="Chunking_Config"
        chunkSize="1048576"/>

    <set-variable
        variableName="bytesProcessed"
        value="#[0]"/>

    <foreach>
        <!-- Update progress -->
        <set-variable
            variableName="bytesProcessed"
            value="#[vars.bytesProcessed + payload.length]"/>

        <set-variable
            variableName="percentComplete"
            value="#[(vars.bytesProcessed / vars.totalSize) * 100]"/>

        <logger level="INFO"
                message="Progress: #[vars.percentComplete as String {format: '0.00'}]% - Chunk #[payload.index]"/>

        <!-- Process chunk data -->
        <flow-ref name="process-chunk"/>
    </foreach>
</flow>

Checksum Calculation

Calculate SHA-256 hash of a large file in chunks:

xml
<flow name="calculate-checksum">
    <file:read path="/data/firmware.bin"/>

    <!-- Create MessageDigest instance -->
    <set-variable
        variableName="digest"
        value="#[java!java::security::MessageDigest::getInstance('SHA-256')]"/>

    <chunking:read-chunked
        config-ref="Chunking_Config"
        chunkSize="524288"/>

    <foreach>
        <!-- Update digest with chunk data -->
        <java:invoke-static
            class="java.security.MessageDigest"
            method="update(byte[])"
            args="#[payload.data]"
            target="digest"/>

        <logger message="Hashed chunk #[payload.index]"/>
    </foreach>

    <!-- Get final hash -->
    <set-payload
        value="#[vars.digest.digest() as String {format: 'hex'}]"/>
</flow>

Error Types

The connector throws these error types:

Error TypeDescriptionCause
CHUNKING:INVALID_CHUNK_SIZEInvalid chunk size parameterchunkSize ≤ 0
CHUNKING:READ_ERRORError reading from input streamI/O failure, corrupt stream
CHUNKING:CONNECTIVITYConnection/resource access failureFile not found, permission denied

Error Handling Example

xml
<flow name="chunking-with-error-handling">
    <try>
        <file:read path="/data/file.dat"/>

        <chunking:read-chunked
            config-ref="Chunking_Config"
            chunkSize="1048576"/>

        <foreach>
            <flow-ref name="process-chunk"/>
        </foreach>

        <error-handler>
            <on-error-continue type="CHUNKING:INVALID_CHUNK_SIZE">
                <logger level="ERROR"
                        message="Invalid chunk size configuration"/>
            </on-error-continue>

            <on-error-continue type="CHUNKING:READ_ERROR">
                <logger level="ERROR"
                        message="Failed to read chunk: #[error.description]"/>
            </on-error-continue>

            <on-error-continue type="CHUNKING:CONNECTIVITY">
                <logger level="ERROR"
                        message="Cannot access file: #[error.description]"/>
            </on-error-continue>
        </error-handler>
    </try>
</flow>

Memory Behavior

Constant Memory Guarantee

The connector maintains O(chunkSize) memory usage using a PushbackInputStream with a 1-byte probe technique:

  1. Read: Allocates a buffer of exactly chunkSize bytes
  2. Probe: Reads 1 additional byte to detect EOF
  3. Pushback: If byte exists, pushes it back for the next chunk
  4. Discard: After <foreach> processes the chunk, the buffer is garbage collected

This ensures memory usage remains constant regardless of file size.

Memory Examples

File SizeChunk SizeMemory UsedChunks Created
10 MB1 MB~1 MB10
1 GB5 MB~5 MB200
10 GB5 MB~5 MB2,000
665 MB10 KB~10 KB66,500

The connector was validated processing a 665 MB file with 66,500 chunks using only 10 KB of memory.

Lazy Evaluation

Chunks are created on-demand during <foreach> iteration:

xml
<chunking:read-chunked chunkSize="1048576"/> <!-- No memory allocated yet -->

<foreach> <!-- Chunks created one at a time as loop iterates -->
    <logger message="Chunk #[payload.index]"/>
</foreach>

What happens:

  1. Loop requests first chunk → Connector reads 1 MB
  2. Loop processes chunk → Chunk becomes eligible for GC
  3. Loop requests second chunk → Connector reads next 1 MB
  4. Previous chunk is garbage collected
  5. Process repeats for all chunks

Anti-Patterns

Don't Collect All Chunks

WRONG - This defeats the purpose of streaming:

xml
<foreach>
    <!-- ❌ Never do this! Loads entire file into memory -->
    <set-variable
        variableName="allChunks"
        value="#[vars.allChunks + [payload]]"/>
</foreach>

This will cause OutOfMemoryError for large files.

Correct Pattern

RIGHT - Process and discard each chunk:

xml
<foreach>
    <!-- ✅ Process chunk and let it be garbage collected -->
    <flow-ref name="process-chunk"/>
</foreach>

Each chunk is processed then discarded, maintaining constant memory.

Performance Considerations

Chunk Size vs. Throughput

Chunk SizeMemoryI/O CallsBest For
10 KBLowHighMemory-constrained environments
1 MBMediumMediumBalanced performance
10 MBHighLowHigh-throughput network transfers

Rule of Thumb: Larger chunks = fewer I/O operations but higher memory usage.

Concurrent Flows

Memory usage scales with concurrent executions:

Chunk Size1 Flow10 Flows100 Flows
1 MB1 MB10 MB100 MB
5 MB5 MB50 MB500 MB
10 MB10 MB100 MB1 GB

Configure chunk size based on expected concurrency and available heap memory.

See Also

Released under the MIT License.