March 15, 2019

514 words 3 mins read

Publishing multiple event types to a topic

Publishing multiple event types to a topic

How to manage multiple schemas in a topic ? In many scenarios you will end up producing multiple event types into a single topic, how to efficiently manage it ?

Schema Registry is exactly meant for that, to manage & govern data models across topics.

Here we have a use case where a stream processing app that produces two different message types:

Address Event - Contains human readable address fields.

Geo-Location Event - contains GPS co-ordinates (Latitude & Longitude).

In this example let’s use Kafka streams, that takes ‘Parking events’ as input and can produce both address and geo-location event to ‘Parking Location’ topic.

KStreams App

Step 1: Publish parking events to parking-lots topic:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" --data @sampledata/parking.json "http://localhost:8082/topics/parking-lots"

Parking event sample :

{
  "records": [
    {
      "key": "6191W",
      "value": {
        "st_marker_id": "6191W",
        "magic_id": 1,
        "bay_id": "3374",
        "location": {
          "latitude": "-37.80658679914876",
          "human_address": "{\"address\": \"\", \"city\": \"\", \"state\": \"\", \"zip\": \"\"}",
          "longitude": "144.9511118550584"
        },
        "lon": "144.9511118550584",
        "lat": "-37.80658679914876",
        "status": "Present"
      }
    }
  ]
}

Step 2: Create AVRO schemas:

Address Event - address.avsc

{
     "type": "record",
     "namespace": "com.multievents",
     "name": "ParkingAddress",
     "version": "1",
     "fields": [
       { "name": "human_address", "type": "string", "doc": "Complete Address" },
       { "name": "status", "type": "string", "doc": "Parking present or unoccupied" }
     ]
}

Geo-Location - geolocation.avsc

{
     "type": "record",
     "namespace": "com.multievents",
     "name": "ParkingGeolocation",
     "version": "1",
     "fields": [
       { "name": "latitude", "type": "double", "doc": "Latitude of com.multievents space" },
       { "name": "longitude", "type": "double", "doc": "Longitude of com.multievents space" },
       { "name": "status", "type": "string", "doc": "Parking present or unoccupied" }
     ]
}

Step 3: Generate POJO from AVRO schemas

I have used gradle-avro-plugin for generating POJOs.

Note Refer to the complete build.gradle from repository multi-events

def generateAvro = tasks.register("generateAvro", com.commercehub.gradle.plugin.avro.GenerateAvroJavaTask) {
    source("src/main/resources/avro")
    outputDir = file("src/main/java/avro")
}

tasks.named("compileJava").configure {
    source(generateAvro)
}

Step 4: Build KStreams app to produce Address event and Geolocation event to ‘parking_lots’ topic:

Step 4.1: Create serdes


 Serializer serializer = new KafkaAvroSerializer();
 serializer.configure(streamConfigurations, false);
 Deserializer deserializer = new KafkaAvroDeserializer();
 deserializer.configure(streamConfigurations, false);

 Serde<ParkingAddress> parkingAdressSerde = Serdes.serdeFrom(serializer, deserializer);
 Serde<ParkingGeolocation> parkingGeolocationSerde = Serdes.serdeFrom(serializer, deserializer);

Step 4.2: Map values

This part of the KStreams app splits the incoming event(parking events from ‘parking-lots’ topic) and produces two new event types.


final KStream<String, String> parkingDataStream = parkingDatastreamsBuilder.stream("parking-lots",
                Consumed.with(Serdes.String(), Serdes.String()));

parkingDataStream.map((key, value) -> processAddressDataStream(value))
        .peek((k, v) -> System.out.println(k + " => " + v))
        .to("parking-location", Produced.with(Serdes.String(), parkingAdressSerde));

parkingDataStream.map((key, value) -> processGeolocationDataStream(value))
        .peek((k, v) -> System.out.println(k + " => " + v))
        .to("parking-location", Produced.with(Serdes.String(), parkingGeolocationSerde));

Note Refer to the repository for more.

Step 5: Set value.subject.name.strategy

By default the configuration value.subject.name.strategy is set to TopicNameStrategy, which allows only one schema per topic.

Change it to ‘TopicRecordNameStrategy’

streamConfigurations.put("value.subject.name.strategy", TopicRecordNameStrategy.class.getName());

Note You will be seeing this error if TopicRecordNameStrategy is not set:

stacktrace=org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"ParkingGeolocation","namespace":"com.parking","fields":[{"name":"latitude","type":"double","doc":"Latitude of com.parking space"},{"name":"longitude","type":"double","doc":"Longitude of com.parking space"},{"name":"status","type":"string","doc":"Parking present or unoccupied"}],"version":"1"}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409

This blog is a good reference to understand this concept in detail: https://www.confluent.io/blog/put-several-event-types-kafka-topic/

Step 6: Verify published events

Verify Address Events
Verify Geo-location Events

Note Refer to this github repository multi-events for more details.