Creating a Stream and Ingesting Data Tutorial

This tutorial will walk you through creating a data stream, then Ingesting data into the stream.

Creating a Data Stream (topic)

Before we can ingest data, we need create a stream for the data. A stream is created by submitting the stream metadata to Kafka. For the exact semantics surrounding the metadata see Metadata Management Overview].

Here's an example of using `curl` to post the metadata payload to Kafka:

curl -X POST \
   http://{{host}}:{{port}}/topics \
   -H 'Content-Type: application/json' \
   -d '{
	"subject": "exp.data-platform.dvs-sandbox.Test",
	"streamType": "History",
	"derived": false,
	"dataClassification": "Public",
	"contact": "bob@pluralsight.com",
	"notes": "lorem ipsum",
	"schema": {
		"type": "record",
		"name": "Test",
		"namespace": "exp.data-platform.dvs-sandbox",
		"fields": [{
				"name": "id",
				"type": "string"
			},
			{
				"name": "messageNumber",
				"type": "int"
			},
			{
				"name": "messagePublishedAt",
				"type": {
					"type": "string",
					"logicalType": "iso-datetime"
				}
			}
		]
	}
}'


For additional details on the required format for the "schema" definition see Schemas (Avro).

Ingesting Data

Next, we can start POSTing data to `/ingest`. You will see some of the headers below that specify how the data is stored in the DVS. To get more information on the headers and what they mean, please see the ingest documentation.

curl -X POST \
	http://{{host}}:{{port}}/ingest \
	-H 'Content-Type: application/json' \
	-H 'hydra-ack: replicated' \
	-H 'hydra-kafka-topic: exp.data-platform.dvs-sandbox.Test' \
	-d '{
	"id": "5de47f5a-1c4f-4128-b537-4f44faabaaa1",
	"messageNumber": 1,
	"messagePublishedAt": "2018-11-08T01:00:00+00:00"
}'

To verify the data was successfully ingested

  • There are a few different ways to verify the publishing of your data:

    1. the HTTP endpoint (http://{{host}}:{{port}}/streams/{{topic_name}}?start=latest&groupId=test) is probably the easiest way. This endpoint uses chunked transfer encoding, meaning that the connection will be continuously open for as long as a). there is a client connected to it or b). no records are received within a timeout. (idleTimeout request param, defaults to 60s).
    2. if you're looking for a current “snapshot”, you can download a json file to your local computer by using the download parameter. For example: http://{{host}}:{{port}}/streams/{{topic_name}}?groupId=test&start=earliest&download=true
    3. or you could submit a streaming job to replicate the data, and verify that you're seeing messages you've sent to ingest be replicated (Streaming Data Tutorial).

Troubleshooting

If you're getting errors posting to ingest. 

  • make sure your headers are correct!
  • Hydra ingestion relies on several values to validate your data, make sure it gets sent to the right topic, and make sure you have the ability to update and delete messages
  • did you evolve your avro schema?
  • does your avro schema match your data?