SQR-029: DM-EFD prototype implementation

  • Frossie Economou,
  • Simon Krughoff,
  • Jonathan Sick,
  • Angelo Fausti,
  • Adam Thornton and
  • Joshua Hoblitt

Latest Revision: 2019-01-17

DM-EFD prototype implementation based on Kafka and InfluxDB.

Note

This technote is not yet published.

1   Introduction

2   The adopted technologies

2.1   The Confluent Kafka platform

2.2   The InfluxData stack

2.2.1   Why a time series database?

2.2.2   Why InfluxData?

The InfluxData stack provides a complete solution for storing, visualizing and processing time series data:

  • InfluxDB is an open-source time series database written in Go, it has an SQL-like query language (InfluxQL) and provides an HTTP API as well as API client libraries.
  • Chronograf is an open-source web application for time series visualization written in Go and React.js
  • Kapacitor is an open-source framework written in Go for processing, monitoring, and alerting on time series data.

3   Deploying Confluent Kafka and InfluxData

3.1   The GKE cluster specs

  • Machine type: n1-standard-2 (2 vCPUs, 7.5 GB memory)
  • Size: 3 nodes
  • Total cores: 6 vCPUs
  • Total memory: 22.50 GB

The above specs are sufficient for running JVM, but not recommended for production. See Running Kafka in Production and InfluxDB hardware guidelines for single node for more information.

The performance requirements for InfluxDB, based on the expected throughout (see below), falls into the moderate load category, thus a single InfluxDB node instance for the DM-EFD should be enough.

3.2   Terraform and Helm

3.3   Monitoring

4   Connecting Kafka and InfluxDB

At the time of this implementation, the Confluent InfluxDB connector was still in preview and did not have the functionality we needed. Instead of the Confluent InfluxDB connector, we used the InfluxDB Sink connector developed by Landoop.

We added the Landoop InfluxDB Sink connector 1.1 plugin to the cp-kafka-connect container, and implemented scripts to facilitate its configuration.

A limitation of version 1.1, though, was the lack of support for the Avro array data type, which was solved by contributing to the plugin development.

4.1   The InfluxDB schema

One of the characteristics of InfluxDB is that the schema is created as the data is written to the database, this is commonly referred as schemaless or schema-on-write. The advantage is that no schema creation and database migrations are needed, greatly simplifying the database management. However, it also means that it is not possible to enforce a schema with InfluxDB only.

In the proposed architecture, the schema is controlled by Kafka through Avro and the Schema Registry. As the schema may need to evolve over time it is important for InfluxDB, and for other consumers, to be able to handle data encoded with both old and new schema seamlessly. While schema evolution is not explored throughout this report it is certainly important and must be revisited.

The data written to InfluxDB, however, does not necessarily need to follow the Avro schema. The InfluxDB Sink Connector supports KCQL, the Kafka Connect Query Language, That can be used to select fields define the target measurement, and set tags to annotate the measurements.

In the current implementation, the InfluxDB schema is the simplest possible. We create an InfluxDB measurement with the same name as the topic and select all fields from the topic.

Example of an Avro schema for the MTM1M3_accelerometerData SAL topic, and the corresponding InfluxDB schema:

{
  "fields": [
    {
      "doc": "Timestamp when the Kafka message was created.",
      "name": "kafka_timestamp",
      "type": {
        "logicalType": "timestamp-millis",
        "type": "long"
      }
    },
    {
      "name": "timestamp",
      "type": "double"
    },
    {
      "name": "rawAccelerometers",
      "type": {
        "items": "float",
        "type": "array"
      }
    },
    {
      "name": "accelerometers",
      "type": {
        "items": "float",
        "type": "array"
      }
    },
    {
      "name": "angularAccelerationX",
      "type": "float"
    },
    {
      "name": "angularAccelerationY",
      "type": "float"
    },
    {
      "name": "angularAccelerationZ",
      "type": "float"
    }
  ],
  "name": "MTM1M3_accelerometerData",
  "namespace": "lsst.sal",
  "sal_subsystem": "MTM1M3",
  "sal_topic_type": "SALTelemetry",
  "sal_version": "3.8.35",
  "type": "record"
}
> SHOW FIELD KEYS FROM "mtm1m3-accelerometerdata"
name: mtm1m3-accelerometerdata
fieldKey             fieldType
--------             ---------
accelerometers0      float
accelerometers1      float
angularAccelerationX float
angularAccelerationY float
angularAccelerationZ float
kafka_timestamp      integer
rawAccelerometers0   float
rawAccelerometers1   float
timestamp            float

Note

  1. InfluxDB does not have double or long datatypes.
  2. InfluxDB does not suppot derived data types like arrays. Fields named like <field name>0, <field name>1, ... were extracted from arrays in the Avro message.

4.2   Visualizing SAL Topics with Chronograf

In Chronograf, the SAL topics are listed as InfluxDB measurements. One can use the Explore tool to browse and visualize them.

_images/chronograf.png

Figure 1 Visualization using the Chronograf Explore tool.

These visualizations can be organized in Dashboards for monitoring the different telescope and observatory subsystems.

5   The SAL mock experiment

With the SAL mock experiment, we want to access the performance of our prototype implementation of the DM-EFD.

In the following sections we explain the experiment we designed, how we produced messages for the SAL topics, and finally we characterize the mean latency for a message from the time it was produced to the time it is written to InfluxDB and we finally measure the InfluxDB throughput during the experiment.

5.1   Designing the experiment

In order to run a realistic experiment that emulates the EFD, in addition to produce messages for each SAL topic, one would need to know the frequency in which every topic is produced, which is not available in the SAL schema.

From the current SAL XML schema we have a total of 1051 topics, in which 274 are commands, 541 are log events and 236 are telemetry. For simplicity, we assume a distribution of frequencies for the different types of topics, as shown in the table below.

Producer ID Topic type # of topics Frequency (Hz) Expected throughput (messages/s)
0 SAL Commands 274 1 274
1 SAL Log Events 541 10 5410
2 SAL Telemetry 236 100 23600
  • Total number of topics: 1051
  • Total expected throughput: 29284 messages/s
  • Experiment Duration: 16h

5.2   Producing SAL topics

  • Converting SAL XML schema to Apache Avro
  • The AIOKafkaProducer

5.2.1   The measured throughput

_images/salmock_produced_total.png

Figure 2 The producer throughput as measured by the salmock_produced_total Prometheus metric.

  • Number of topics produced: 1051
  • Maximum measured throughput for the producers: 1330 messages/s

Another Prometheus metric of interest is cp_kafka_server_brokertopicmetrics_bytesinpersec which give us a mean throughput at the brokers, for all topics, of 40KB/s. The same value is observed when looking at the Network traffic as monitored by the InfluxDB telegraf client.

As a point of comparison, this throughput is lower than the Long-term mean ingest rate to the Engineering and Facilities Database of non-science images required to be supported for the EFD of 1.9 MB/s from OCS-REQ-0048.

We can do better by improving our producers throughput, and we demonstrate that we can reach a higher throughput with a simple test when accessing the InfluxDB maximum throughput for the current setup.

5.3   Latency measurements

_images/latency.png

Figure 3 The roundtrip latency for a telemetry topic during the experiment, measured as the difference between the producer and InfluxDB (consumer) timestamps.

We characterize the roundtrip latency as the difference between the time when the message was produced and the time when it was written to InfluxDB.

The median roundtrip latency for a telemetry topic produced over the duration of the experiment was 183ms with 99% of the messages with latency smaller than 1.34s.

This result would allow for quasi-realtime access to the telemetry stream from resources at the LDF. This would not be possible with the current baseline design (see discussion in DMTN-082).

5.4   The InfluxDB throughput

_images/influxdb.png

Figure 4 InfluxDB throughput measured as number of points per minute.

Because of the current InfluxDB schema, an InfluxDB point is equivalent to a message. The measured InfluxDB throughput during the experiment was ~80k points/min or 1333 messages/s, which is basically the producer throughput (see above). This result is supported by the very low latency observed.

InfluxDB provides a metric write_error that counts the number of errors when writing points, and it was write_error=0 during the whole experiment.

During the experiment, we also saw the InfluxDB disk filling up at a rate of 682MB/h or 16GB/day. Even with InfluxDB data compression that means 5.7TB/year which seems too much specially if we want two query over longer periods of time like OCS-REQ-0047 suggests, e.g. “raft 13 temperatures for past 2 years”. For the DM-EFD, we are considering downsampling the time series and using a retention policy, as discussed in the Lessons Learned.

Finally, a simple test can be done to assess the maximum InfluxDB throughput for the current setup.

We stopped the InfluxDB Sink connector, and let the producer run for a period of time T, the messages produced during T were cached at the Kafka brokers. As soon as the connector was res-started, all the messages were flushed to InfluxDB as if they were produced in a much higher throughput.

The result of this test is shown in the figure below, were we see a measured throughput of 1M points/min or 16k messages/s a factor of 12 better than the previous result. Also, we had write_error=0 during this test.

_images/influxdb_max.png

Figure 5 InfluxDB maximum throughput measured as number of points/min.

In particular, these results are very encouraging because both Kafka and InfluxDB were deployed in modest hardware, and with default configurations. There is certainly room for improvement, and many aspects to explore in both Kafa and InfluxDB deployments.

6   The SAL Kafka writer

7   Lessons Learned

7.1   Downsampling and data retention

It was clear during the experiments that the disks fill up pretty quickly. InfluxDB disk was filling up at a rate of ~700M/h which means that the 128G disk would be filled up in ~7 days. Similarly, for Kafka, we filled up the 5G disk of each broker in a few days. That means we need downsampling the data if we don’t want to loose it and configure retention policies to automatically discard data after it’s no longer useful.

Both downsampling and data retention can be easily configured in InfluxDB.

Time Series data is organized in shards, and InfluxDB will drop an entire shard when the retention policy is enforced. That means the retention policy’s duration must be longer than the shard duration.

For the experiments, we have created our kafka database in InfluxDB to have a default retention policy of 24h and and shard duration of 1h following the retention policy documentation.

Retention policies are created per database, and it is possible to have multiple retention policies for the same database. In order to preserve data for a longer time period, we have created another retention policy with a duration of 1 year and a Continuos Query to average the time series every 30s.

_images/downsampling.png

Example of a continuous query for the mtm1m3-accelerometerdata topic. If the topic is produced at 100Hz and the time series is averaged in time intervals of 30s the downsampling factor is 30000.

CREATE continuous query "mtm1m3-accelerometerdata" ON kafka
BEGINSELECT   Mean(accelerometers0) as mean_accelerometers0,
           Mean(accelerometers1) as mean_accelerometers1
  INTO     "kafka.one_year"."mtm1m3-accelerometerdata"
  FROM     "kafka.autogen"."mtm1m3-accelerometerdata"
  GROUP BY time(30s)
END

The retention policy of 24h in InfluxDB suggests that we configure a Kafka retention policy for the logs and topic offsets with the same duration. It means that InfluxDB can be unavailable for 24h and still recover the messages from the Kafka brokers. The following configuration parameters were added to the cp-kafka helm chart:

## Kafka Server properties
## ref: https://kafka.apache.org/documentation/#configuration
configurationOverrides:
  offsets.retention.minutes: 1440
  log.retention.hours: 24

7.2   The InfluxDB HTTP API

InfluxDB provides an HTTP API for accessing the data, when using the HTTP API we set max_row_limit=0 in the InfluxDB configuration to avoid data truncation.

A code snippet to retrieve data from a particular topic would look like:

import requests

INFLUXDB_API_URL = "https://kafka-influxdb-demo.lsst.codes"
INFLUXDB_DATABASE = "kafka"

def get_topic_data(topic):
  params={'q': 'SELECT * FROM "{}\"."autogen"."{}" where time > now()-24h'.format(INFLUXDB_DATABASE, topic)}
  r = requests.post(url=INFLUXDB_API_URL + "/query", params=params)

  return r.json()

7.3   Backing up an InfluxDB database

InfluxDB supports backup and restore functions on online databases. A backup of a 24h worth of data database took less than 10 minutes in our current setup while running the SAL Mock Experiment and ingesting data at 80k points/min.

Backup files are split by shards, in Downsampling and data retention we configured our retention policy to 24h and shard duration to 1h, so the resulting backup has 24 files.

We do observe a drop in the ingestion rate to 50k points/min during the backup but no write errors, and Kafka design ensures nothing gets lost even if the InfluxDB ingestion rate slows down.

_images/influxdb_backup.png

7.4   User Defined Functions

8   APPENDIX

8.1   Kafka Terminology

  • Each server in the Kafka clusters is called a broker.
  • Kafka messages are stored as well as published in a category name called topic.
  • A kafka message is a key-value pair, and the key, message, or both, can be serialized as Avro.
  • A schema defines the structure of the Avro data format.
  • A subject is defined in the Schema Registry as a scope where a schema can evolve. The name of the subject depends on the configured subject name strategy, which by default is set to derive subject name from topic name.
  • The processes which publish messages to Kafka are called producers. In addition, it publishes data on specific topics.
  • The processes that subscribe to topics are called consumers.
  • The position of the consumer in the log and which is retained on a per-consumer basis is called offset.
  • The Kafka connector permits to build and run reusable consumers or producers that connects existing applications to Kafka topics.

8.2   InfluxDB Terminology

  • A measurement is conceptually similar to an SQL table. The measurement name describes the data stored in the associated fields.
  • A field corresponds to the actual data and are not indexed.
  • A tag is used to annotate your data (metadata) and is automatically indexed.
  • A point contains the field-set of a series for a given tag-set and timestamp. Points are equivalent to messages in Kafka.
  • A series contains Points and is defined by a measurement and a tag-set.
  • The series cardinality depends essentially on how the tag-set is designed. A rule of thumb for InfluxDB is to have fewer series with more points than more series with fewer points to improve performance.
  • A database store one or more series.
  • A database can have one or more retention policies.

9   References