Skip to main content

kafkaload

Synopsis

starlake kafkaload [options]

Description

Two modes are available : The batch mode and the streaming mode.

Batch mode

In batch mode, you start the kafka (off)loader regurarly and the last consumed offset will be stored in the comet_offsets topic config (see reference-kafka.conf for an example).

When offloading data from kafka to a file, you may ask to coalesce the result to a specific number of files / partitions. If you ask to coalesce to a single partition, the offloader will store the data in the exact filename you provided in the path argument.

The figure below describes the batch offloading process

The figure below describes the batch offloading process with comet-offsets-mode = "FILE"

Streaming mode

In this mode, te program keep running and you the comet_offsets topic is not used. The (off)loader will use a consumer group id you specify in the access options of the topic configuration you are dealing with.

Parameters

ParameterCardinalityDescription
--config:<value>OptionalTopic Name declared in reference.conf file
--connectionRef:<value>OptionalConnection to any specific sink
--format:<value>OptionalRead/Write format eq : parquet, json, csv ... Default to parquet.
--path:<value>OptionalSource file for load and target file for store
--options:<value>OptionalOptions to pass to Spark Reader
--write-config:<value>OptionalTopic Name declared in reference.conf file
--write-path:<value>OptionalSource file for load and target file for store
--write-mode:<value>OptionalWhen offload is true, describes how data should be stored on disk. Ignored if offload is false.
--write-options:<value>OptionalOptions to pass to Spark Writer
--write-format:<value>OptionalStreaming format eq. kafka, console ...
--write-coalesce:<value>OptionalShould we coalesce the resulting dataframe
--transform:<value>OptionalAny transformation to apply to message before loading / offloading it
--stream:<value>OptionalShould we use streaming mode ?
--streaming-trigger:<value>OptionalOnce / Continuous / ProcessingTime
--streaming-trigger-option:<value>Optional10 seconds for example. see https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/Trigger.html#ProcessingTime-java.lang.String-
--streaming-to-table:<value>OptionalTable name to sink to
--streaming-partition-by:<value>OptionalList of columns to use for partitioning