Publishing multiple event types to a topic
How to manage multiple schemas in a topic ? In many scenarios you will end up producing multiple event types into a single topic, how to efficiently manage it ?
Schema Registry is exactly meant for that, to manage & govern data models across topics.
Here we have a use case where a stream processing app that produces two different message types:
Address Event - Contains human readable address fields.
Geo-Location Event - contains GPS co-ordinates (Latitude & Longitude).
In this example let’s use Kafka streams, that takes ‘Parking events’ as input and can produce both address and geo-location event to ‘Parking Location’ topic.
Step 1: Publish parking events to parking-lots topic:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" --data @sampledata/parking.json "http://localhost:8082/topics/parking-lots"
Parking event sample :
{
"records": [
{
"key": "6191W",
"value": {
"st_marker_id": "6191W",
"magic_id": 1,
"bay_id": "3374",
"location": {
"latitude": "-37.80658679914876",
"human_address": "{\"address\": \"\", \"city\": \"\", \"state\": \"\", \"zip\": \"\"}",
"longitude": "144.9511118550584"
},
"lon": "144.9511118550584",
"lat": "-37.80658679914876",
"status": "Present"
}
}
]
}
Step 2: Create AVRO schemas:
Address Event - address.avsc
{
"type": "record",
"namespace": "com.multievents",
"name": "ParkingAddress",
"version": "1",
"fields": [
{ "name": "human_address", "type": "string", "doc": "Complete Address" },
{ "name": "status", "type": "string", "doc": "Parking present or unoccupied" }
]
}
Geo-Location - geolocation.avsc
{
"type": "record",
"namespace": "com.multievents",
"name": "ParkingGeolocation",
"version": "1",
"fields": [
{ "name": "latitude", "type": "double", "doc": "Latitude of com.multievents space" },
{ "name": "longitude", "type": "double", "doc": "Longitude of com.multievents space" },
{ "name": "status", "type": "string", "doc": "Parking present or unoccupied" }
]
}
Step 3: Generate POJO from AVRO schemas
I have used gradle-avro-plugin for generating POJOs.
Note Refer to the complete build.gradle from repository multi-events
def generateAvro = tasks.register("generateAvro", com.commercehub.gradle.plugin.avro.GenerateAvroJavaTask) {
source("src/main/resources/avro")
outputDir = file("src/main/java/avro")
}
tasks.named("compileJava").configure {
source(generateAvro)
}
Step 4: Build KStreams app to produce Address event and Geolocation event to ‘parking_lots’ topic:
Step 4.1: Create serdes
Serializer serializer = new KafkaAvroSerializer();
serializer.configure(streamConfigurations, false);
Deserializer deserializer = new KafkaAvroDeserializer();
deserializer.configure(streamConfigurations, false);
Serde<ParkingAddress> parkingAdressSerde = Serdes.serdeFrom(serializer, deserializer);
Serde<ParkingGeolocation> parkingGeolocationSerde = Serdes.serdeFrom(serializer, deserializer);
Step 4.2: Map values
This part of the KStreams app splits the incoming event(parking events from ‘parking-lots’ topic) and produces two new event types.
final KStream<String, String> parkingDataStream = parkingDatastreamsBuilder.stream("parking-lots",
Consumed.with(Serdes.String(), Serdes.String()));
parkingDataStream.map((key, value) -> processAddressDataStream(value))
.peek((k, v) -> System.out.println(k + " => " + v))
.to("parking-location", Produced.with(Serdes.String(), parkingAdressSerde));
parkingDataStream.map((key, value) -> processGeolocationDataStream(value))
.peek((k, v) -> System.out.println(k + " => " + v))
.to("parking-location", Produced.with(Serdes.String(), parkingGeolocationSerde));
Note Refer to the repository for more.
Step 5: Set value.subject.name.strategy
By default the configuration value.subject.name.strategy is set to TopicNameStrategy, which allows only one schema per topic.
Change it to ‘TopicRecordNameStrategy’
streamConfigurations.put("value.subject.name.strategy", TopicRecordNameStrategy.class.getName());
Note You will be seeing this error if TopicRecordNameStrategy is not set:
stacktrace=org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"ParkingGeolocation","namespace":"com.parking","fields":[{"name":"latitude","type":"double","doc":"Latitude of com.parking space"},{"name":"longitude","type":"double","doc":"Longitude of com.parking space"},{"name":"status","type":"string","doc":"Parking present or unoccupied"}],"version":"1"}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409
This blog is a good reference to understand this concept in detail: https://www.confluent.io/blog/put-several-event-types-kafka-topic/
Step 6: Verify published events
Note Refer to this github repository multi-events for more details.