Streaming Junos Telemetry To Apache Kafka Via Telegraf

As service providers and enterprises start to enable streaming telemetry from various networking and IOT vendors’ equipment and devices, it becomes painfully clear from the outset that a common infrastructure or amalgamation point for telemetry collection is sorely needed within the telemetry consumer’s backoffice. Unfortunately, there is no “universal collector” available today that can consume telemetry data from each and every producer. As a result, several different types of collectors are often deployed within a backoffice to handle data from various sources, eg. using Telegraf to ingest Juniper telemetry data, using Pipeline to ingest Cisco telemetry data, or more generally, using “Collector X” to ingest telemetry data from “Vendor Y“.

Most telemetry consumers have sophisticated goals in mind and are not simply looking to collect telemetry data and throw it onto a graph.  Instead, they are looking at advanced use cases where further processing is performed on the data (eg. analytics, machine learning) and intelligent actioning is triggered based on this analysis.  In order to successfully accomplish this, a common infrastructure is needed where various collectors push their ingested data to and where “listeners” pick up data of interest for further processing.  Many consumers are looking at leveraging a high-throughput data streaming platform, like Apache Kafka, precisely for this purpose. Apache Kafka is a massively scalable pub/sub message queue, where “Producers” publish messages to Kafka “Topics“, and on the receiving end, “Consumers” receive messages from the Topics that they subscribe to.

This blog post is meant to serve as a Quick Start Guide on how to get Junos streaming telemetry data pushed to a Kafka bus by using Telegraf as an intermediate collector, as depicted in Figure 1 below. This will be the first in many more such posts about “closing the loop” with streamed network telemetry data, with the aim towards implementing downstream analytics and actioning on that data.

graffle1
Figure 1:  Streaming Junos Telemetry To Apache Kafka Via Telegraf

Configuring gRPC Telemetry Streaming On The Juniper Router

The process to setup and configure the Juniper router for gRPC telemetry streaming is covered in depth in the “Prerequisites: OpenConfig & Network Agent Packages” and “Junos Configuration” sections of the following blog post.  Rather than repeat and duplicate the content here, the Reader is encouraged to peruse the post for further details.

Installing Apache Kafka

Installing Apache Kafka is a fairly straightforward process.  The first step is to install a couple of required packages, namely Java and ZooKeeper.  Apache Kafka is written in Scala and Java, and so relies on the Java Runtime Environment (JRE) in order to run.  ZooKeeper is used by Kafka to coordinate and synchronize the individual Kafka nodes that belong to a cluster.  It is used for various operations such as detecting when nodes have failed and electing leaders within the cluster.

Before installing the required packages, update the list of available packages to ensure that we install the newest versions of the packages and their dependencies, as follows:

root@ubuntu:~# sudo apt-get update
Hit:1 http://us.archive.ubuntu.com/ubuntu xenial InRelease
Get:2 http://us.archive.ubuntu.com/ubuntu xenial-updates InRelease [102 kB]
[... CONTENT OMITTED FOR BREVITY ...]
Get:10 http://security.ubuntu.com/ubuntu xenial-security/main i386 Packages [412 kB]
Fetched 3,730 kB in 9s (392 kB/s)
Reading package lists... Done

Next, we install the version of Java Runtime that comes packaged with Ubuntu:

root@ubuntu:~# sudo apt-get install default-jre
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following additional packages will be installed:
[... CONTENT OMITTED FOR BREVITY ...]
After this operation, 125 MB of additional disk space will be used.
Do you want to continue? [Y/n] y
Get:1 http://us.archive.ubuntu.com/ubuntu xenial/main amd64 liblcms2-2 amd64 2.6-3ubuntu2 [137 kB]
Get:2 http://us.archive.ubuntu.com/ubuntu xenial/main amd64 libjbig0 amd64 2.1-3.1 [26.6 kB]
Get:3 http://us.archive.ubuntu.com/ubuntu xenial/main amd64 java-common all 0.56ubuntu2 [7,742 B]
[... CONTENT OMITTED FOR BREVITY ...]
Setting up default-jre-headless (2:1.8-56ubuntu2) ...
Setting up openjdk-8-jre:amd64 (8u151-b12-0ubuntu0.16.04.2) ...
update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/policytool to provide /usr/bin/policytool (policytool) in auto mode
Setting up default-jre (2:1.8-56ubuntu2) ...
Processing triggers for libc-bin (2.23-0ubuntu10) ...
Processing triggers for ca-certificates (20170717~16.04.1) ...
Updating certificates in /etc/ssl/certs...
0 added, 0 removed; done.
Running hooks in /etc/ca-certificates/update.d...
done.

Finally, we install Apache ZooKeeper, as follows:

root@ubuntu:~# sudo apt-get install zookeeperd
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following additional packages will be installed:
  libjline-java liblog4j1.2-java libnetty-3.9-java libservlet2.5-java libslf4j-java libxerces2-java libxml-commons-external-java
  libxml-commons-resolver1.1-java libzookeeper-java zookeeper
[... CONTENT OMITTED FOR BREVITY ...]
After this operation, 125 MB of additional disk space will be used.
Do you want to continue? [Y/n] y
Get:1 http://us.archive.ubuntu.com/ubuntu xenial/main amd64 liblcms2-2 amd64 2.6-3ubuntu2 [137 kB]
Get:2 http://us.archive.ubuntu.com/ubuntu xenial/main amd64 libjbig0 amd64 2.1-3.1 [26.6 kB]
Get:3 http://us.archive.ubuntu.com/ubuntu xenial/main amd64 java-common all 0.56ubuntu2 [7,742 B]
[... CONTENT OMITTED FOR BREVITY ...]
Setting up zookeeperd (3.4.8-1) ...
Processing triggers for systemd (229-4ubuntu21.1) ...
Processing triggers for ureadahead (0.100.0-19) ...

Before continuing, let’s perform a quick test to make sure that ZooKeeper is indeed up and running. By default, ZooKeeper listens on port 2181, so we can try to connect to that port using Telnet and issue an administrative command, namely “ruok” while connected to that port. If the ZooKeeper server is running in a non-error state, it will respond with “imok” and will automatically close the Telnet session, as depicted below:

root@ubuntu:~# telnet localhost 2181
Trying ::1...
Connected to localhost.
Escape character is '^]'.
ruok
imokConnection closed by foreign host.

Now that all the prerequisites are installed, we are ready to download (using “wget”) and extract (untar) the Kafka package, as shown below.  Note that we also change the directory name to “kafka” for simplicity.

root@ubuntu:~# wget http://apache.claz.org/kafka/1.0.0/kafka_2.12-1.0.0.tgz
--2018-02-22 13:31:41--  http://apache.claz.org/kafka/1.0.0/kafka_2.12-1.0.0.tgz
Resolving apache.claz.org (apache.claz.org)... 74.63.227.45
Connecting to apache.claz.org (apache.claz.org)|74.63.227.45|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 49475271 (47M) [application/x-gzip]
Saving to: ‘kafka_2.12-1.0.0.tgz’
kafka_2.12-1.0.0.tgz                 100%[=====================================================================>]  47.18M   448KB/s    in 1m 45s
2018-02-22 13:33:26 (461 KB/s) - ‘kafka_2.12-1.0.0.tgz’ saved [49475271/49475271]
root@ubuntu:~# tar -xzf kafka_2.12-1.0.0.tgz
root@ubuntu:~# ls
kafka_2.12-1.0.0  kafka_2.12-1.0.0.tgz
root@ubuntu:~# mv kafka_2.12-1.0.0 kafka
root@ubuntu:~# ls
kafka

And that’s it!  To start the Kafka server (ie. Kafka broker) as a background process that is independent of your shell session, run the following command:

nohup ~/kafka/bin/kafka-server-start.sh ~/kafka/config/server.properties > ~/kafka/kafka.log 2>&1 &

Now that the Kafka server is up and running, let’s create a Kafka topic, eg. “juniper“, to which we will push our telemetry data to, using the following command (assuming Kafka has been installed in your home directory):

root@ubuntu:# ~/kafka/bin/kafka-topics.sh --create \
>   --zookeeper localhost:2181 \
>   --replication-factor 1 --partitions 1 \
>   --topic juniper
Created topic "juniper".

Verify that the topic has been successfully created:

root@ubuntu:# ~/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --list
juniper
root@ubuntu:#

Installing Telegraf

Telegraf is an open-source collector that can readily be used to ingest streaming telemetry data from Juniper devices.  Telegraf can either be installed as a subcomponent within an OpenNTI deployment or as a standalone instance.  To install OpenNTI, please refer to this introductory blog post.  For simplicity, the author assumes that the Reader has elected to install Telegraf as a standalone instance,  the installation instructions for which can be found in the “Installing Telegraf” section of the following blog postAlso for simplicity, in this post we are going to install Telegraf on the same server as Kafka.

Enabling The Kafka Output Plugin Within Telegraf

Now that Telegraf is installed, it is a relatively easy step to get ingested telemetry data pushed out to a Kafka bus.  This is because Telegraf natively supports a Kafka Output Plugin.  We simply have to include this plugin within the “OUTPUT PLUGINS” section of Telegraf configuration file (ie. “telegraf.conf“), and tweak a few of the plugin’s default parameters.

The first step is to generate an openconfig-telemetry-specific Telegraf configuration file that also supports the Kafka output plugin; to do this we issue the following command from the directory where the Telegraf executable is located (ie. “$HOME/go/src/github.com/influxdata/telegraf“):

telegraf --input-filter jti_openconfig_telemetry --output-filter kafka config > telegraf.conf

Now, we edit the auto-generated “telegraf.conf” file and configure a few parameters for the Kafka output plugin, as shown below:

###############################################################################
#                               OUTPUT PLUGINS                                #
###############################################################################

[[outputs.kafka]]
  ## URLs of kafka brokers
  brokers = ["localhost:9092"]
  ## Kafka topic for producer messages
  topic = "juniper"

  [... CONTENT OMITTED FOR BREVITY ...]

  ## Telegraf tag to use as a routing key
  ##  ie, if this tag exists, its value will be used as the routing key
  ##routing_tag = "host"

  ## CompressionCodec represents the various compression codecs recognized by
  ## Kafka in messages.
  ##  0 : No compression
  ##  1 : Gzip compression
  ##  2 : Snappy compression
  compression_codec = 0

  ##  RequiredAcks is used in Produce Requests to tell the broker how many
  ##  replica acknowledgements it must see before responding
  ##   0 : the producer never waits for an acknowledgement from the broker.
  ##       This option provides the lowest latency but the weakest durability
  ##       guarantees (some data will be lost when a server fails).
  ##   1 : the producer gets an acknowledgement after the leader replica has
  ##       received the data. This option provides better durability as the
  ##       client waits until the server acknowledges the request as successful
  ##       (only messages that were written to the now-dead leader but not yet
  ##       replicated will be lost).
  ##   -1: the producer gets an acknowledgement after all in-sync replicas have
  ##       received the data. This option provides the best durability, we
  ##       guarantee that no messages will be lost as long as at least one in
  ##       sync replica remains.
  required_acks = -1

  ##  The total number of times to retry sending a message
  max_retry = 3

  ## Optional SSL Config
  # ssl_ca = "/etc/telegraf/ca.pem"
  # ssl_cert = "/etc/telegraf/cert.pem"
  # ssl_key = "/etc/telegraf/key.pem"
  ## Use SSL but skip chain & host verification
  # insecure_skip_verify = false

  ## Optional SASL Config
  # sasl_username = "kafka"
  # sasl_password = "secret"

  data_format = "json"

The three main parameters that need to be changed from their default values are:

    1. brokers: Here, we specify the management IP address and port number (typically 9092) of the target Kafka brokers to which we will be streaming our telemetry data to.  Although a single broker is shown in the example, multiple devices can be specified simply by listing them in a comma-separated list (eg. [“1.1.1.1:9092”, “2.2.2.2:9092”]).
    2. topic: This is the Kafka topic to which Telegraf will publish its collected telemetry data to.  Listeners on this topic are then able to pick this data up off the bus and further process it or action on it.
    3. data_format:  This is where we specify the data format to generate.  By default this parameter is set to “influx“.  Change it to “json” so we can push the data in basic key/value pair format.

The remaining parameters from above, namely “compression_codec“, “required_acks” and “max_retry” can be left at their default values.

Enabling The OpenConfig_Telemetry Input Plugin Within Telegraf

Within the Telegraf configuration file, there is a section called “INPUT PLUGINS“, where we define the specifics about the “jti_openconfig_telemetry” plugin used to initiate telemetry subscription requests and ingest the incoming data via gRPC.  Details about the specific parameters that need to be tweaked can be found within the “Telegraf Collector Configuration” section of the following blog post.

Finally, start Telegraf (using the freshly modified configuration file) as a background process using the following command:

telegraf --config $HOME/go/src/github.com/influxdata/telegraf/telegraf.conf &

NOTE: To stop Telegraf, do a “ps -ef | grep telegraf“, find the associated PID for the process and kill it with “kill -HUP <PID>“.

 Verifying The Telemetry Data Push To Kafka

After following all of the steps above, you should be streaming gRPC telemetry data from the Juniper router to the Telegraf collector, which in turn should be sending the same data in JSON format to the Kafka bus.  To verify that this is in fact working, we can launch a Kafka listener using the “kafka-console-consumer.sh” shell script that comes with the Kafka installation, as follows:

root@ubuntu:~# ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic juniper

{
   "fields": {
      "/interfaces/interface/state/admin-status": "UP",
      "/interfaces/interface/state/description": "",
      "/interfaces/interface/state/enabled": true,
      "/interfaces/interface/state/ifindex": 519,
      "/interfaces/interface/state/last-change": 9083,
      "/interfaces/interface/state/mtu": 1514,
      "/interfaces/interface/state/name": "ge-0/0/0",
      "/interfaces/interface/state/oper-status": "UP",
      "/interfaces/interface/state/type": "ethernetCsmacd",
      "_component_id": 65535,
      "_sequence": 2,
      "_subcomponent_id": 0,
      "_timestamp": 1520286432878
   },
   "name": "/interfaces/interface[name='ge-0/0/0']/",
   "tags": {
      "/interfaces/interface/@name": "ge-0/0/0",
      "device": "10.49.114.164",
      "host": "ubuntu",
      "path": "sensor_1000_4_1:/interfaces/interface[name='ge-0/0/0']/:/interfaces/interface[name='ge-0/0/0']/:mib2d",
      "system_id": "mx2_re0"
   },
   "timestamp": 1520286432
}

{
   "fields": {
      "/components/component/state/description": "MX960",
      "/components/component/state/id": "VMX4edb",
      "/components/component/state/mfg-name": "",
      "/components/component/state/name": "Chassis",
      "/components/component/state/part-no": "",
      "/components/component/state/serial-no": "VMX4edb",
      "/components/component/state/type": "CHASSIS",
      "/components/component/state/version": "",
      "_component_id": 65535,
      "_sequence": 70,
      "_subcomponent_id": 0,
      "_timestamp": 1520289067790
   },
   "name": "/components/",
   "tags": {
      "/components/component/@name": "Chassis",
      "device": "10.49.114.164",
      "host": "ubuntu",
      "path": "sensor_1001:/components/:/components/:chassisd",
      "system_id": "mx2_re0"
   },
   "timestamp": 1520289067
}
[... CONTENT OMITTED FOR BREVITY ...]

Leave a Reply