Finding nearby parking space using Event Stream Processing
Finding public parking spaces in Melbourne CBD sometimes get harder. I always wanted to build an app that helps me find a parking space easily. Then I realised its an interesting real-time data processing use case.
Data Stream A: Melbourne city council exposes parking lot updates through a REST API. The data is updated every 2 mins.
Data Stream B: The driver’s current GPS location(Latitude, Longtitude) in real-time. Basically my car will be streaming LAT, LON as I keep moving.
Problem: I need to keep matching the nearest available parking lot as the car moves.
I am trying to solve this problem using Kafka. Also I will be using a combination of KStreams & KSQL for Stream Processing.
Note This is not a ideal solution yet. I am hoping to publish sequence of blogs as I continuously improve this project.
Note KSQL: Its one of the simplest tool to tackle such problems. But it is a evolving project. Currently it lacks sub-queries, cross-joins, etc. These are important if we need to compare streams without a common key. (Eg: Parking lots & Driver’s locaton have no common key to co-relate). But it has interesting functions like geo-distance() to calculate distance between two points, etc.
Step 1: Ingest Parking lot events into a Kafka Topic
City of melbourne exposes parking data over a REST interface PARKINGDATA-OPENAPI . The data is updated every 2 mins.
I am using kafka-connect-rest connector to pull data from this source.
Add connector to Kafka Connect
The following configs is used to spin up a new connector, to pull data from the REST API:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d @config/parkingservice.json
Connector configuration to publish parking data to PARKING-RAW topic:
{
"name": "parking-availability",
"config": {
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"connector.class": "com.tm.kafka.connect.rest.RestSourceConnector",
"tasks.max": "1",
"rest.source.poll.interval.ms": "10000",
"rest.source.method": "GET",
"rest.source.url": "https://data.melbourne.vic.gov.au/resource/vh2v-4nfs.json",
"rest.source.payload.converter.class": "com.tm.kafka.connect.rest.converter.StringPayloadConverter",
"rest.source.properties": "Content-Type:application/json,Accept::application/json",
"rest.source.topic.selector": "com.tm.kafka.connect.rest.selector.SimpleTopicSelector",
"rest.source.destination.topics": "parking-raw"
}
}
Step 2: Transformation - Split the JSON Array into Separate events
The REST API exposes the details of all parking spaces in a single JSON Array. Each JSON object represents a parking space, it is best to separate them into independent events.
KStreams snippet that splits events from parking-raw topic to parking-data topic.
StreamsBuilder customerStreamsBuilder = new StreamsBuilder();
final KStream<String, String> parkingRawStream = customerStreamsBuilder.stream("parking-raw",
Consumed.with(Serdes.String(), Serdes.String()));
parkingRawStream.flatMap((k, v) -> {
return splitAndCreateParkingKeyValueList(v);
}).to("parking-data",
Produced.with(Serdes.String(), Serdes.String()));
....
List<KeyValue<String,String>> parkingKeyValueList = new ArrayList<>();
JSONParser parser = new JSONParser();
JSONArray jsonArray = (JSONArray) parser.parse(parkingStreamValue);
for (Object obj : jsonArray) {
JSONObject jsonObject = (JSONObject) obj;
//had to do add this key for Join
jsonObject.put("magic_id", 1);
String parkingKey = jsonObject.get("st_marker_id").toString() + jsonObject.get("st_marker_id").toString() ;
parkingKeyValueList.add(new KeyValue(parkingKey, jsonObject.toString()));
}
Transformed data in parking-data topic:
[
{
"topic": "parking-data",
"partition": 0,
"offset": 10000,
"timestamp": 1583700144963,
"timestampType": "CREATE_TIME",
"headers": [],
"key": "793W793W",
"value": {
":@computed_region_evbi_jbp8": "1",
"st_marker_id": "793W",
"magic_id": 1,
"bay_id": "884",
"location": {
"latitude": "-37.811440710274084",
"human_address": "{\"address\": \"\", \"city\": \"\", \"state\": \"\", \"zip\": \"\"}",
"longitude": "144.96734907583001"
},
"lon": "144.96734907583001",
"lat": "-37.811440710274084",
"status": "Present"
},
"__confluent_index": 0
}
]
Step 3: Publishing customer location
Ideally this will be fed from drivers/customers mobile device, in this example we will publish mock data through REST Proxy.
Publishing customer location to customer-location topic:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" --data '{"records":[ {"key": "45678","value": {"customer_id": "45678","lon":"144.94567018233198","lat":"-37.816402032829835","magic_id": "1"}}]}' "http://localhost:8082/topics/customer-location"
Step 4: Stream Processing
4.1 Join the streams
Now that we have both the customer_location and the parking_location events in two different kafka topics, we can join both the topics to get the nearest parking spaces for each customer.
Create parking_availability stream:
CREATE STREAM parking_availability (st_marker_id VARCHAR, bay_id VARCHAR, lon Double, lat Double, status VARCHAR, magic_id INT) WITH (KAFKA_TOPIC='parking-data', VALUE_FORMAT='json');
Create customer_location stream:
CREATE STREAM customer_location (customer_id VARCHAR, lon DOUBLE, lat DOUBLE, magic_id INT) WITH (KAFKA_TOPIC='customer-location', VALUE_FORMAT='json');
Calculate distance
Note We are trying to join two streams without any common key. KSQL at the moment doesnt support sub-queries/cross joins, which will make this exercise whole lot easier. We will introduce a new field ‘MAGIG_ID’ in both the streams(Hardcoded to ‘1’ in both streams)'. This will allow us to use GEO_DISTANCE() function to calculate the distance between customer/driver & the parking lots. This is not an efficient approach, I will be continuously improving this further. For the moment the idea is to evaluate KSQL for such use cases.
I am using the GEO_DISTANCE() function to calculate the distance between the customer location and all the entries in the parking table where ths status is Unoccupied
CREATE STREAM NEARBY_PARKING_SPACE AS
SELECT GEO_DISTANCE(customer.LAT, customer.LON, parking.LAT, parking.LON, 'KM') AS DISTANCE, customer.LAT, customer.LON, customer.customer_id, parking.LAT, parking.LON FROM customer_details customer
LEFT JOIN parking_space_availability parking
WITHIN (0 SECONDS, 1 DAYS)
ON customer.magic_id = parking.magic_id
WHERE GEO_DISTANCE(customer.LAT, customer.LON, parking.LAT, parking.LON, 'KM') < 1 AND parking.status = 'Unoccupied'
EMIT CHANGES;
Now, We have a stream called NEARBY_PARKING_SPACE which contains all the parking lots that are unoccupied and that are within a distance of 1 KM.
4.2 Reassign the partition key
The end goal is to make this data available to a front end / app. KSQL Server has a REST Client which lets you query the data using JSON. Here we are reassigning the ROWKEY to CUSTOMER_ID so that we can control data access to a specific user.
CREATE Stream customer_parking_list AS SELECT * FROM NEARBY_PARKING_SPACE PARTITION BY customer_id ;
5 Access the Kafka Stream using REST
KSQL Server has REST interface to accept queries, this is quite useful if you want to build a UI on top of it. (Food for thought)
curl -X "POST" "http://localhost:8088/query" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-H "Accept: application/vnd.ksql.v1+json" \
-d $'{
"ksql": "SELECT * FROM customer_parking_list WHERE ROWKEY=\'45678\' EMIT CHANGES;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}'
Refer to the repository for more details - https://github.com/vidyavenu/parking-space