SQR-029: DM-EFD prototype implementation

  • Frossie Economou,
  • Simon Krughoff,
  • Jonathan Sick,
  • Angelo Fausti,
  • Adam Thornton and
  • Joshua Hoblitt

Latest Revision: 2019-01-22

DM-EFD prototype implementation based on Kafka and InfluxDB.

Note

This technote is not yet published.

1   Introduction

2   The adopted technologies

2.1   The Confluent Kafka platform

2.2   The InfluxData stack

2.2.1   Why a time series database?

2.2.2   Why InfluxData?

The InfluxData stack provides a complete solution for storing, visualizing and processing time series data:

  • InfluxDB is an open-source time series database written in Go, it has an SQL-like query language (InfluxQL) and provides an HTTP API as well as API client libraries.
  • Chronograf is an open-source web application for time series visualization written in Go and React.js
  • Kapacitor is an open-source framework written in Go for processing, monitoring, and alerting on time series data.

3   Deploying Confluent Kafka and InfluxData

3.1   The GKE cluster specs

  • Machine type: n1-standard-2 (2 vCPUs, 7.5 GB memory)
  • Size: 3 nodes
  • Total cores: 6 vCPUs
  • Total memory: 22.50 GB

The above specs are sufficient for running JVM, but not recommended for production. See Running Kafka in Production and InfluxDB hardware guidelines for single node for more information.

The performance requirements for InfluxDB, based on the expected throughout (see below), falls into the moderate load category. Thus a single InfluxDB node instance for the DM-EFD should be enough.

3.2   Terraform and Helm

3.3   Monitoring

4   Connecting Kafka and InfluxDB

At the time of this implementation, the Confluent InfluxDB connector was still in preview and did not have the functionality we needed. Instead of the Confluent InfluxDB connector, we used the InfluxDB Sink connector developed by Landoop.

We added the Landoop InfluxDB Sink connector plugin version 1.1 to the cp-kafka-connect container and implemented scripts to facilitate its configuration.

A limitation of version 1.1, though, was the lack of support for the Avro array data type, which was solved by contributing to the plugin development.

4.1   The InfluxDB schema

One of the characteristics of InfluxDB is that it creates the database schema when it writes the data to the database, this is commonly known as schemaless or schema-on-write. The advantage is that no schema creation and database migrations are needed, greatly simplifying the database management. However, it also means that it is not possible to enforce a schema with InfluxDB only.

In the proposed architecture, the schema is controlled by Kafka through Avro and the Schema Registry. As the schema may need to evolve, it is important for InfluxDB, and for other consumers, to be able to handle data encoded with both old and new schema seamlessly. While this report does not explore schema evolution that is undoubtedly important and we will revisit.

The data in InfluxDB, however, does not necessarily need to follow the Avro schema. The InfluxDB Sink Connector supports KCQL, the Kafka Connect Query Language, that can be used to select fields to define the target measurement, and set tags to annotate the measurements.

In the current implementation, the InfluxDB schema is the simplest possible. We create an InfluxDB measurement with the same name as the topic and select all fields from the topic.

Example of an Avro schema for the MTM1M3_accelerometerData SAL topic, and the corresponding InfluxDB schema:

{
  "fields": [
    {
      "doc": "Timestamp when the Kafka message was created.",
      "name": "kafka_timestamp",
      "type": {
        "logicalType": "timestamp-millis",
        "type": "long"
      }
    },
    {
      "name": "timestamp",
      "type": "double"
    },
    {
      "name": "rawAccelerometers",
      "type": {
        "items": "float",
        "type": "array"
      }
    },
    {
      "name": "accelerometers",
      "type": {
        "items": "float",
        "type": "array"
      }
    },
    {
      "name": "angularAccelerationX",
      "type": "float"
    },
    {
      "name": "angularAccelerationY",
      "type": "float"
    },
    {
      "name": "angularAccelerationZ",
      "type": "float"
    }
  ],
  "name": "MTM1M3_accelerometerData",
  "namespace": "lsst.sal",
  "sal_subsystem": "MTM1M3",
  "sal_topic_type": "SALTelemetry",
  "sal_version": "3.8.35",
  "type": "record"
}
> SHOW FIELD KEYS FROM "mtm1m3-accelerometerdata"
name: mtm1m3-accelerometerdata
fieldKey             fieldType
--------             ---------
accelerometers0      float
accelerometers1      float
angularAccelerationX float
angularAccelerationY float
angularAccelerationZ float
kafka_timestamp      integer
rawAccelerometers0   float
rawAccelerometers1   float
timestamp            float

Note

  1. InfluxDB does not have double or long datatypes.
  2. InfluxDB does not support array data type. Fields named like <field name>0, <field name>1, ... were extracted from arrays in the Avro message.

5   The SAL mock experiment

With the SAL mock experiment, we want to access the performance of our prototype implementation of the DM-EFD.

In the following sections we explain the experiment we designed, how we produced messages for the SAL topics, and finally, we characterize the mean latency for a message from the time it was produced to the time InfluxDB writes it to the disk. Finally, we measure the InfluxDB ingestion rate during the experiment.

5.1   Designing the experiment

To run a realistic experiment that emulates the EFD, besides producing messages for each SAL topic, one would need to know the frequency of every topic, which is not available in the SAL schema.

From the current SAL XML schema we have a total of 1051 topics, in which 274 are commands, 541 are log events, and 236 are telemetry. For simplicity, we assume a distribution of frequencies for the different types of topics, as shown in the table below.

Producer ID Topic type # of topics Frequency (Hz) Expected throughput (messages/s)
0 SAL Commands 274 1 274
1 SAL Log Events 541 10 5410
2 SAL Telemetry 236 100 23600
  • Total number of topics: 1051
  • Total expected throughput: 29284 messages/s
  • Experiment Duration: 16h

5.2   Producing SAL topics

  • Converting SAL XML schema to Apache Avro
  • The AIOKafkaProducer

5.2.1   The measured throughput

_images/salmock_produced_total.png

Figure 1 The figure shows the producer throughput measured by the salmock_produced_total Prometheus metric.

  • Number of topics produced: 1051
  • Maximum measured throughput for the producers: 1330 messages/s

Another Prometheus metric of interest is cp_kafka_server_brokertopicmetrics_bytesinpersec which give us a mean throughput at the brokers, for all topics, of 40KB/s. We observe the same value when looking at the Network traffic as monitored by the InfluxDB telegraf client.

As a point of comparison, this throughput is lower than the Long-term mean ingest rate to the Engineering and Facilities Database of non-science images required to be supported for the EFD of 1.9 MB/s from OCS-REQ-0048.

We can do better by improving the producer throughput, and we demonstrate that we can reach a higher performance with a simple test when accessing the InfluxDB maximum ingestion rate for the current setup.

5.3   Latency measurements

_images/latency.png

Figure 2 The figure shows the roundtrip latency for a telemetry topic during the experiment, measured as the difference between the producer and consumer timestamps.

We characterize the roundtrip latency as the difference between the time the message was produced and the time InfluxDB writes it to the disk.

The median roundtrip latency for a telemetry topic produced over the duration of the experiment was 183ms with 99% of the messages with latency smaller than 1.34s.

This result would allow for quasi-realtime access to the telemetry stream from resources at the LDF. That would not be possible with the current baseline design (see discussion in DMTN-082).

5.4   The InfluxDB ingestion rate

_images/influxdb.png

Figure 3 The figure shows the InfluxDB ingestion rate in units of points per minute.

Because of the current InfluxDB schema, an InfluxDB point is equivalent to a message. The measured InfluxDB ingestion rate during the experiment was ~80k points/min or 1333 messages/s, which is the producer throughput (see above). This result is supported by the very low latency observed.

InfluxDB provides a metric write_error that counts the number of errors when writing points, and it was write_error=0 during the whole experiment.

During the experiment, we also saw the InfluxDB disk filling up at a rate of 682MB/h or 16GB/day. Even with InfluxDB data compression that means 5.7TB/year which seems too much, especially if we want to query over longer periods like OCS-REQ-0047 suggests, e.g., “raft 13 temperatures for past two years”. For the DM-EFD, we are considering downsampling the time series and using a retention policy, as discussed in the Lessons Learned.

Finally, a simple test can be done to assess the maximum InfluxDB ingestion rate for the current setup.

We stopped the InfluxDB Sink connector, and let the producer running during a period T. The Kafka brokers cached the messages produced during T, and as soon as the connector was re-started, all the messages were flushed to InfluxDB as if they were produced in a much higher throughput.

The figure below shows the result of this test, where we see a measured ingestion rate of 1M points per minute or 16k messages per second, a factor of 12 better than the previous result. Also, we had write_error=0 during this test.

_images/influxdb_max.png

Figure 4 The figure shows the InfluxDB maximum ingestion rate measured in units of points per minute.

In particular, these results are very encouraging because both Kafka and InfluxDB were deployed in modest hardware, and with default configurations. There is indeed room for improvement, and many aspects to explore in both Kafka and InfluxDB deployments.

5.5   Visualizing SAL Topics with Chronograf

Chronograf presents the SAL topics as InfluxDB measurements. One can use the Explore tool to browse and visualize them.

_images/chronograf.png

Figure 5 Visualization using the Chronograf Explore tool.

For monitoring the different telescope and observatory subsystems, it is possible to organize these visualizations in Dashboards.

6   The SAL Kafka writer

7   Lessons Learned

7.1   Downsampling and data retention

It was evident during the experiment that the disks fill up pretty quickly. The influxDB disk was filling up at a rate of ~700M/h which means that the 128G storage would be filled up in ~7 days. Similarly, for Kafka, we filled up the 5G disk of each broker in a few days. That means we need downsampling the data if we don’t want to lose it and configure retention policies to discard data after it is no longer useful automatically.

In InfluxDB it is easy to configure both downsampling and data retention.

InfluxDB organizes time series data in shards and will drop an entire shard when it enforces the retention policy. That means the retention policy’s duration must be longer than the shard duration.

For the experiments, we have created a Kafka database in InfluxDB to have a default retention policy of 24h and shard duration of 1h following the retention policy documentation.

InfluxDB creates retention policies per database, and it is possible to have multiple retention policies for the same database. To preserve data for a more extended period, we have created another retention policy with a duration of 1 year and a Continuous Query to average the time series every 30s.

_images/downsampling.png

Figure 6 The figure shows a raw time series (top) and an averaged time series by a continuous query (bottom).

Example of a continuous query for the mtm1m3-accelerometerdata topic. If we produce topics at 100Hz and average the time series in intervals of 30 seconds, the downsampling factor is 30000.

CREATE continuous query "mtm1m3-accelerometerdata" ON kafka
BEGINSELECT   Mean(accelerometers0) as mean_accelerometers0,
           Mean(accelerometers1) as mean_accelerometers1
  INTO     "kafka.one_year"."mtm1m3-accelerometerdata"
  FROM     "kafka.autogen"."mtm1m3-accelerometerdata"
  GROUP BY time(30s)
END

The retention policy of 24h in InfluxDB suggests that we configure a Kafka retention policy for the logs and topic offsets with the same duration. It means that InfluxDB can be unavailable for 24h and still recover the messages from the Kafka brokers. We added the following configuration parameters to the cp-kafka helm chart:

## Kafka Server properties
## ref: https://kafka.apache.org/documentation/#configuration
configurationOverrides:
  offsets.retention.minutes: 1440
  log.retention.hours: 24

7.2   The InfluxDB HTTP API

InfluxDB provides an HTTP API for accessing the data when using the HTTP API we set max_row_limit=0 in the InfluxDB configuration to avoid data truncation.

A code snippet to retrieve data from a particular topic would look like:

import requests

INFLUXDB_API_URL = "https://kafka-influxdb-demo.lsst.codes"
INFLUXDB_DATABASE = "kafka"

def get_topic_data(topic):
  params={'q': 'SELECT * FROM "{}\"."autogen"."{}" where time > now()-24h'.format(INFLUXDB_DATABASE, topic)}
  r = requests.post(url=INFLUXDB_API_URL + "/query", params=params)

  return r.json()

7.3   Backing up an InfluxDB database

InfluxDB supports backup and restores functions on online databases. A backup of a 24h worth of data database took less than 10 minutes in our current setup while running the SAL Mock Experiment and ingesting data at 80k points/min.

Backup files are split by shards, in Downsampling and data retention we configured our retention policy to 24h and shard duration to 1h, so the resulting backup has 24 files.

We do observe a drop in the ingestion rate to 50k points/min during the backup, but no write errors and Kafka design ensures nothing gets lost even if the InfluxDB ingestion rate slows down.

_images/influxdb_backup.png

Figure 7 The figures shows how the InfluxDB ingestion rate is affected during a backup.

7.4   User Defined Functions

8   APPENDIX

8.1   Kafka Terminology

  • Each server in the Kafka clusters is called a broker.
  • Kafka stores messages in a category name called topic.
  • A Kafka message is a key-value pair, and the key, message, or both, can be serialized as Avro.
  • A schema defines the structure of the Avro data format.
  • The Schema Registry defines a subject as a scope where a schema can evolve. The name of the subject depends on the configured subject name strategy, which by default is set to derive the subject name from the topic name.
  • The processes which publish messages to Kafka are called producers. Also, it publishes data on specific topics.
  • Consumers are the processes that subscribe to topics.
  • The position of the consumer in the log is called offset. Kafka retains that on a per-consumer basis.
  • The Kafka connector permits to build and run reusable consumers or producers that connects existing applications to Kafka topics.

8.2   InfluxDB Terminology

  • A measurement is conceptually similar to an SQL table. The measurement name describes the data stored in the associated fields.
  • A field corresponds to the actual data and are not indexed.
  • A tag is used to annotate your data (metadata) and is automatically indexed.
  • A point contains the field-set of a series for a given tag-set and timestamp. Points are equivalent to messages in Kafka.
  • A measurement and a tag-set define a series. A series* contains points.
  • The series cardinality depends mostly on how the tag-set is designed. A rule of thumb for InfluxDB is to have fewer series with more points than more series with fewer points to improve performance.
  • A database store one or more series.
  • A database can have one or more retention policies.

9   References