Well, due to the author’s code being in Java, we would also add a Python version for a clearer demonstration and introduction for our current section and following parts.
import csv
from confluent_kafka import Producer, Consumer, KafkaError
import json
class RidesDAO:
"""
Data Access Object for handling ride-sharing data with Kafka.
This class provides methods to:
1. Read ride data from a CSV file
2. Publish ride data to a Kafka topic
3. Consume ride data from a Kafka topic
"""
def __init__(self, csv_file_path):
"""
Initialize RidesDAO by reading ride data from a CSV file.
:param csv_file_path: Path to the CSV file containing ride data
"""
self.rides = []
with open(csv_file_path, 'r') as csvfile:
csvreader = csv.reader(csvfile)
next(csvreader) # Skip the header row
for row in csvreader:
self.rides.append(row)
def publish_rides(self, topic='rides'):
"""
Publish ride data to a Kafka topic.
:param topic: Kafka topic name to publish rides to
"""
# Kafka Producer configuration
conf = {
'bootstrap.servers': 'localhost:9092', # Replace with your Kafka server address
'client.id': 'rides-producer'
}
# Create Kafka Producer
producer = Producer(conf)
try:
# Iterate through rides and publish to Kafka
for ride in self.rides:
# Choose a key for the message (using the first field)
key = ride[0]
# Convert ride data to a JSON string
value = json.dumps({
'vendorId': ride[0],
'pickupTime': ride[1],
'dropoffTime': ride[2],
'passengerCount': ride[3],
'tripDistance': ride[4],
'pickupLocation': ride[5],
'dropoffLocation': ride[6]
})
# Produce message to Kafka topic
producer.produce(topic, key=key, value=value)
# Ensure all messages are sent
producer.flush()
print(f"Successfully published {len(self.rides)} rides to topic {topic}")
except Exception as e:
print(f"Error publishing messages: {e}")
def consume_rides(self, topic='rides'):
"""
Consume ride data from a Kafka topic.
:param topic: Kafka topic name to consume rides from
"""
# Kafka Consumer configuration
conf = {
'bootstrap.servers': 'localhost:9092', # Replace with your Kafka server address
'group.id': 'rides-consumer-group',
'auto.offset.reset': 'earliest' # Start reading from the earliest offset
}
# Create Kafka Consumer
consumer = Consumer(conf)
try:
# Subscribe to the topic
consumer.subscribe([topic])
print(f"Starting to consume messages from topic {topic}...")
while True:
# Poll for messages
msg = consumer.poll(1.0) # Timeout of 1 second
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print('Reached end of partition')
else:
print(f'Error: {msg.error()}')
continue
# Parse and print message
try:
ride_data = json.loads(msg.value().decode('utf-8'))
print("Received ride data:")
print(json.dumps(ride_data, indent=2))
except json.JSONDecodeError:
print("Unable to parse message")
except KeyboardInterrupt:
print("Consumer interrupted by user")
finally:
# Close the consumer
consumer.close()
# Example usage
if __name__ == '__main__':
# Path to your CSV file
rides_dao = RidesDAO('rides.csv')
# Publish rides to Kafka
rides_dao.publish_rides()
# Consume rides from Kafka
rides_dao.consume_rides()
# Important Notes:
# 1. Install required library: pip install confluent-kafka
# 2. Ensure Kafka server is running
# 3. Adjust bootstrap.servers parameter based on your Kafka configuration
# 4. Ensure your CSV file matches the expected format
# Learning Objectives:
# - Understanding Kafka Producer and Consumer in Python
# - Processing streaming data
# - Working with CSV and JSON data
# - Basic error handling in message streaming
Sample rides.csv
vendor_id,pickup_time,dropoff_time,passenger_count,trip_distance,pickup_location,dropoff_location
1,2023-06-15T14:30:00,2023-06-15T15:00:00,2,5.2,40.7128,-74.0060,40.7282,-73.7949
2,2023-06-15T15:45:00,2023-06-15T16:10:00,1,3.8,40.7282,-73.7949,40.7595,-73.9845
3,2023-06-15T16:20:00,2023-06-15T16:50:00,3,7.5,40.7595,-73.9845,40.7589,-73.9851
4,2023-06-15T17:00:00,2023-06-15T17:25:00,2,4.3,40.7589,-73.9851,40.7128,-74.0060
5,2023-06-15T17:35:00,2023-06-15T18:05:00,1,6.1,40.7128,-74.0060,40.7282,-73.7949
6,2023-06-15T18:15:00,2023-06-15T18:45:00,4,8.2,40.7282,-73.7949,40.7595,-73.9845
7,2023-06-15T19:00:00,2023-06-15T19:30:00,2,5.7,40.7595,-73.9845,40.7589,-73.9851
8,2023-06-15T19:40:00,2023-06-15T20:10:00,1,4.9,40.7589,-73.9851,40.7128,-74.0060
9,2023-06-15T20:20:00,2023-06-15T20:50:00,3,6.5,40.7128,-74.0060,40.7282,-73.7949
10,2023-06-15T21:00:00,2023-06-15T21:30:00,2,5.1,40.7282,-73.7949,40.7595,-73.9845
🔍 What We're Importing
import csv
from confluent_kafka import Producer, Consumer, KafkaError
import json
csv
: Reads data from CSV filesconfluent_kafka
: Provides Kafka producer and consumer functionalityjson
: Helps serialize and deserialize data💡 Class Purpose A data access object (DAO) that manages ride-sharing data processing through Kafka. It handles three main tasks:
🔑 Key Method: __init__()
- Data Loading
def __init__(self, csv_file_path):
self.rides = []
with open(csv_file_path, 'r') as csvfile:
csvreader = csv.reader(csvfile)
next(csvreader) # Skip header row
for row in csvreader:
self.rides.append(row)
What's Happening?