March 11, 2019

945 words 5 mins read

Finding nearby parking space using Event Stream Processing

Finding nearby parking space using Event Stream Processing

Finding public parking spaces in Melbourne CBD sometimes get harder. I always wanted to build an app that helps me find a parking space easily. Then I realised its an interesting real-time data processing use case.

Data Stream A: Melbourne city council exposes parking lot updates through a REST API. The data is updated every 2 mins.

Data Stream B: The driver’s current GPS location(Latitude, Longtitude) in real-time. Basically my car will be streaming LAT, LON as I keep moving.

Problem: I need to keep matching the nearest available parking lot as the car moves.

I am trying to solve this problem using Kafka. Also I will be using a combination of KStreams & KSQL for Stream Processing.

Note This is not a ideal solution yet. I am hoping to publish sequence of blogs as I continuously improve this project.

Note KSQL: Its one of the simplest tool to tackle such problems. But it is a evolving project. Currently it lacks sub-queries, cross-joins, etc. These are important if we need to compare streams without a common key. (Eg: Parking lots & Driver’s locaton have no common key to co-relate). But it has interesting functions like geo-distance() to calculate distance between two points, etc.

Step 1: Ingest Parking lot events into a Kafka Topic

City of melbourne exposes parking data over a REST interface PARKINGDATA-OPENAPI . The data is updated every 2 mins.

I am using kafka-connect-rest connector to pull data from this source.

Add connector to Kafka Connect

The following configs is used to spin up a new connector, to pull data from the REST API:

curl -i -X POST -H "Accept:application/json"  -H  "Content-Type:application/json" http://localhost:8083/connectors -d @config/parkingservice.json

Connector configuration to publish parking data to PARKING-RAW topic:

{
  "name": "parking-availability",
  "config": {
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "connector.class": "com.tm.kafka.connect.rest.RestSourceConnector",
    "tasks.max": "1",
    "rest.source.poll.interval.ms": "10000",
    "rest.source.method": "GET",
    "rest.source.url": "https://data.melbourne.vic.gov.au/resource/vh2v-4nfs.json",
    "rest.source.payload.converter.class": "com.tm.kafka.connect.rest.converter.StringPayloadConverter",
    "rest.source.properties": "Content-Type:application/json,Accept::application/json",
    "rest.source.topic.selector": "com.tm.kafka.connect.rest.selector.SimpleTopicSelector",
    "rest.source.destination.topics": "parking-raw"
  }
}

Step 2: Transformation - Split the JSON Array into Separate events

The REST API exposes the details of all parking spaces in a single JSON Array. Each JSON object represents a parking space, it is best to separate them into independent events.

KStreams snippet that splits events from parking-raw topic to parking-data topic.

StreamsBuilder customerStreamsBuilder = new StreamsBuilder();
        final KStream<String, String> parkingRawStream = customerStreamsBuilder.stream("parking-raw",
                Consumed.with(Serdes.String(), Serdes.String()));

parkingRawStream.flatMap((k, v) -> {
            return splitAndCreateParkingKeyValueList(v);
        }).to("parking-data",
                Produced.with(Serdes.String(), Serdes.String()));
....

List<KeyValue<String,String>> parkingKeyValueList = new ArrayList<>();
        JSONParser parser = new JSONParser();
        JSONArray jsonArray  = (JSONArray) parser.parse(parkingStreamValue);

for (Object obj : jsonArray) {
    JSONObject jsonObject = (JSONObject) obj;
    //had to do add this key for Join
    jsonObject.put("magic_id", 1);
    String parkingKey =   jsonObject.get("st_marker_id").toString() + jsonObject.get("st_marker_id").toString() ;
    parkingKeyValueList.add(new KeyValue(parkingKey, jsonObject.toString()));
}

Transformed data in parking-data topic:

[
  {
    "topic": "parking-data",
    "partition": 0,
    "offset": 10000,
    "timestamp": 1583700144963,
    "timestampType": "CREATE_TIME",
    "headers": [],
    "key": "793W793W",
    "value": {
      ":@computed_region_evbi_jbp8": "1",
      "st_marker_id": "793W",
      "magic_id": 1,
      "bay_id": "884",
      "location": {
        "latitude": "-37.811440710274084",
        "human_address": "{\"address\": \"\", \"city\": \"\", \"state\": \"\", \"zip\": \"\"}",
        "longitude": "144.96734907583001"
      },
      "lon": "144.96734907583001",
      "lat": "-37.811440710274084",
      "status": "Present"
    },
    "__confluent_index": 0
  }
]

Step 3: Publishing customer location

Ideally this will be fed from drivers/customers mobile device, in this example we will publish mock data through REST Proxy.

Publishing customer location to customer-location topic:

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" --data '{"records":[ {"key": "45678","value": {"customer_id": "45678","lon":"144.94567018233198","lat":"-37.816402032829835","magic_id": "1"}}]}' "http://localhost:8082/topics/customer-location"

Step 4: Stream Processing

4.1 Join the streams

Now that we have both the customer_location and the parking_location events in two different kafka topics, we can join both the topics to get the nearest parking spaces for each customer.

Create parking_availability stream:

CREATE STREAM parking_availability (st_marker_id VARCHAR, bay_id VARCHAR, lon Double, lat Double, status VARCHAR, magic_id INT) WITH (KAFKA_TOPIC='parking-data', VALUE_FORMAT='json');

Create customer_location stream:

CREATE STREAM customer_location (customer_id VARCHAR, lon DOUBLE, lat DOUBLE, magic_id INT) WITH (KAFKA_TOPIC='customer-location', VALUE_FORMAT='json');

Calculate distance

Note We are trying to join two streams without any common key. KSQL at the moment doesnt support sub-queries/cross joins, which will make this exercise whole lot easier. We will introduce a new field ‘MAGIG_ID’ in both the streams(Hardcoded to ‘1’ in both streams)'. This will allow us to use GEO_DISTANCE() function to calculate the distance between customer/driver & the parking lots. This is not an efficient approach, I will be continuously improving this further. For the moment the idea is to evaluate KSQL for such use cases.

I am using the GEO_DISTANCE() function to calculate the distance between the customer location and all the entries in the parking table where ths status is Unoccupied

CREATE STREAM NEARBY_PARKING_SPACE  AS
SELECT GEO_DISTANCE(customer.LAT, customer.LON, parking.LAT, parking.LON, 'KM') AS DISTANCE, customer.LAT, customer.LON, customer.customer_id, parking.LAT, parking.LON FROM customer_details customer 
LEFT JOIN parking_space_availability parking
WITHIN (0 SECONDS, 1 DAYS)
ON customer.magic_id = parking.magic_id 
WHERE GEO_DISTANCE(customer.LAT, customer.LON, parking.LAT, parking.LON, 'KM') < 1 AND  parking.status = 'Unoccupied'
EMIT CHANGES;

Now, We have a stream called NEARBY_PARKING_SPACE which contains all the parking lots that are unoccupied and that are within a distance of 1 KM.

4.2 Reassign the partition key

The end goal is to make this data available to a front end / app. KSQL Server has a REST Client which lets you query the data using JSON. Here we are reassigning the ROWKEY to CUSTOMER_ID so that we can control data access to a specific user.

CREATE Stream customer_parking_list AS SELECT * FROM NEARBY_PARKING_SPACE PARTITION BY customer_id ;

5 Access the Kafka Stream using REST

KSQL Server has REST interface to accept queries, this is quite useful if you want to build a UI on top of it. (Food for thought)

curl -X "POST" "http://localhost:8088/query" \
     -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
     -H "Accept: application/vnd.ksql.v1+json" \
     -d $'{
  "ksql": "SELECT * FROM customer_parking_list WHERE ROWKEY=\'45678\' EMIT CHANGES;",
  "streamsProperties": {
      "ksql.streams.auto.offset.reset": "earliest"
    }
}'

Refer to the repository for more details - https://github.com/vidyavenu/parking-space