17.6. Kafka Command-Line Tools

The GeoMesa Kafka distribution includes a set of command-line tools for feature management, ingest, export and debugging.

To install the tools, see Setting up the Kafka Command Line Tools.

Once installed, the tools should be available through the command geomesa-kafka:

$ geomesa-kafka
INFO  Usage: geomesa-kafka [command] [command options]
  Commands:
    ...

Commands that are common to multiple back ends are described in Command-Line Tools. The commands here are Kafka-specific.

17.6.1. General Arguments

Most commands require you to specify the connection to Kafka. This generally includes the list of Kafka brokers and Zookeeper servers. Specify brokers with the --brokers (or -b) argument, and specify Zookeepers with --zookeepers (or -z).

Kafka stores metadata under a particular path in Zookeeper - this can be thought of as a namespace for feature types. Use --zkpath (or -p) to override the default path.

To connect to Confluent Schema Registry topics, use --schema-registry to provide the registry URL.

17.6.2. Commands

17.6.2.1. create-schema

See create-schema for an overview of this command.

In addition to the regular options, Kafka allows the number of partitions and the replication factor of the Kafka topic to be specified.

Argument

Description

--partitions

The number of partitions used for the Kafka topic

--replication

The replication factor for the Kafka topic

17.6.2.2. export

See export for an overview of this command.

Unlike the standard export, this command will not not terminate until it is cancelled (through a shell interrupt) or until --max-features have been read. Thus, it can be used to monitor a topic.

This command differs from the listen command (below) in that it allows filtering and output in various formats. It will also ignore drop and clear messages generated by feature deletion.

In addition to the regular options, Kafka allows control over the consumer behavior:

Argument

Description

--from-beginning

Start reading messages from the beginning of the Kafka topic, instead of the end

--num-consumers

Number of consumers used to read the topic

The --num-consumers argument can be used to increase read speed. However, there can be at most one consumer per topic partition.

The --from-beginning argument can be used to start reading the Kafka topic from the start. Otherwise, only new messages that are sent after this command is invoked will be read.

17.6.2.3. ingest

See ingest for an overview of this command.

In addition to the regular options, Kafka allows the number of partitions and the replication factor of the Kafka topic to be specified. In addition, an artificial delay can be inserted to simulate a live data stream.

Argument

Description

--partitions

The number of partitions used for the Kafka topic

--replication

The replication factor for the Kafka topic

--serialization

The serialization format to use

--delay

The delay inserted between messages

The --delay argument should be specified as a duration, in plain language. For example, 100 millis or 1 second. The ingest will pause after creating each SimpleFeature for the specified delay. This can be used to simulate a live data stream.

17.6.2.4. playback

The playback command can simulate a data stream by replaying features from a file directly on to a Kafka Data Store. Features are returned based on a date attribute in the feature. For example, if replaying three features that have dates that are each one second apart, each feature will be emitted after a delay of one second. The rate of export can be modified to speed up or slow down the original time differences.

Argument

Description

-c, --catalog *

The catalog table containing schema metadata

-f, --feature-name *

The name of the schema

--dtg

Date attribute to base playback on. If not specified, will use the default schema date field

--rate

Rate multiplier to speed-up (or slow down) features being returned, as a float

--live

Will modify the returned dates to match the current time

--config

Properties file used to configure the Kafka producer

-s, --spec

SimpleFeatureType specification as a GeoTools spec string, SFT config, or file with either

-C, --converter

GeoMesa converter specification as a config string, file name, or name of an available converter

--input-format

File format of input files (shp, csv, tsv, avro, etc). Optional, auto-detection will be attempted

--src-list

Input files are text files with lists of files, one per line, to ingest.

--partitions

The number of partitions used for the Kafka topic

--replication

The replication factor for the Kafka topic

--serialization

The serialization format to use

--schema-registry

URL to a Confluent Schema Registry

<files>...

Input files to ingest

The playback command is an extension of the ingest command, and accepts all the parameters outlined there. However, playback cannot run in distributed mode.

Also, note that the input files (specified in --src-list or <files>...) must be time-ordered by the --dtg attribute before ingest or the playback will not work as expected.

The --rate parameter can be used to speed up or slow down the replay. It is specified as a floating point number. For example --rate 10 will make replay ten times faster, while --rate 0.1 will make replay ten times slower.

The --src-list argument is useful when you have more files to ingest than the command line will allow you to specify. This file instructs GeoMesa to treat input files as new-line-separated file lists. As this makes it very easy to run ingest jobs that can take days it’s recommended to split lists into reasonable chunks that can be completed in a couple hours.

The --force argument can be used to suppress any confirmation prompts (generally from converter inference), which can be useful when scripting commands.

The <files>... argument specifies the files to be ingested. * may be used as a wild card in file paths. GeoMesa can handle gzip, bzip and xz file compression as long as the file extensions match the compression type. GeoMesa supports ingesting files from local disks or HDFS. In addition, Amazon’s S3 and Microsoft’s Azure file systems are supported with a few configuration changes. See Remote File System Support for details. Note: The behavior of this argument is changed by the --src-list argument.

By using a single - for the input files, input data may be piped directly to the ingest command using standard shell redirection. Note that this will only work in local mode, and will only use a single thread for ingestion. Schema inference is disabled in this case, and progress indicators may not be entirely accurate, as the total size isn’t known up front.

For example:

$ cat foo.csv | geomesa-accumulo ingest ... -
$ geomesa-accumulo ingest ... - <foo.csv

17.6.2.5. listen

This command behaves similarly to the export command (above), but it does not provide options for filtering or output format. It will show each message on the Kafka topic, including drop and clear messages generated from feature deletion.

This command will not not terminate until it is cancelled (through a shell interrupt).

Argument

Description

-f, --feature-name *

The name of the schema

--from-beginning

Start reading messages from the beginning of the Kafka topic, instead of the end

--num-consumers

Number of consumers used to read the topic

The --num-consumers argument can be used to increase read speed. However, there can be at most one consumer per topic partition.

The --from-beginning argument can be used to start reading the Kafka topic from the start. Otherwise, only new messages that are sent after this command is invoked will be read.

17.6.2.6. migrate-zookeeper-metadata

This command will migrate schema metadata out of Zookeeper. For additional information, see Zookeeper-less Usage.

Argument

Description

--delete

Delete the metadata out of Zookeeper after migrating it