Logo DWBI.org Login / Sign Up
Sign Up
Have Login?
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.
Login
New Account?
Recovery
Go to Login
By continuing you indicate that you agree to Terms of Service and Privacy Policy of the site.
AWS Analytics

Streaming Data Analytics with Amazon MSK

Updated on Jul 02, 2024

In the era of big data, the ability to analyze streaming data in real-time is essential for gaining timely insights and making data-driven decisions. Amazon Managed Streaming for Apache Kafka (MSK) is a powerful solution for this purpose. This article guides you through setting up Amazon MSK, ingesting data from Twitter, and analyzing it using Apache Flink.

Setting Up Amazon MSK

Amazon MSK
Amazon MSK

First of all, we will create our MSK cluster configuration. Click on the Cluster configurations link.

Click on the Create cluster configuration button.

MSK Cluster Configuration
MSK Cluster Configuration

Add a name for our configuration & set the Configuration properties. Next click on the Create button.

New MSK Cluster Configuration
New MSK Cluster Configuration
auto.create.topics.enable=true
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
MSK Cluster Configuration
MSK Cluster Configuration

Next click on the Clusters link, and then click on the Create cluster button.

MSK Cluster
MSK Cluster

Select the Custom create method. Next add a name for the MSK Cluster & choose the Apache Kafka version from the dropdown list.

MSK Cluster Custom Create
MSK Cluster Custom Create

Under the Configuration, select the MSK Cluster configuration we created earlier from the dropdown list.

Next select the VPC & Subnets to deploy the MSK cluster.

MSK Cluster Custom Configuration
MSK Cluster Custom Configuration

Select the appropriate Subnets to deploy the Kafka Brokers.

MSK Cluster Networking
MSK Cluster Networking

Next select a Security Group. Choose  Broker Instance type & Number of Brokers per Zone.

MSK Cluster Brokers
MSK Cluster Brokers

Select the Broker Storage size.

MSK Cluster allows multiple access control methods. Let's choose unauthenticated access as well as IAM role-based authentication.

MSK Cluster Access Control
MSK Cluster Access Control

Choose Plaintext as the encryption in transit method for MSK Cluster. Use encryption of data in rest using AWS managed key.

MSK Cluster Encryption
MSK Cluster Encryption

Choose Basic monitoring for our MSK Cluster.

MSK Cluster Monitoring
MSK Cluster Monitoring

Finally add Tags for management & click on the Create cluster button.

Create MSK Cluster
Create MSK Cluster

Wait until the Amazon MSK Cluster is ready.

Amazon MSK Cluster
Amazon MSK Cluster

Ingesting Data from Twitter

Set up an EC2 instance to read tweets from Twitter streams and write them to an MSK topic. 

The Python script for this process involves using boto3, tweepy, and kafka-python packages

requirements.txt

# Python Packages
boto3
tweepy
kafka-python

tweets.py

import json
import os
import tweepy
from kafka import KafkaProducer

consumer_key = os.environ['CONSUMER_KEY']
consumer_secret = os.environ['CONSUMER_SECRET']
access_token = os.environ['ACCESS_TOKEN']
access_token_secret = os.environ['ACCESS_TOKEN_SECRET']
twitter_filter_tag = os.environ['TWITTER_FILTER_TAG']

bootstrap_servers = os.environ['BOOTSTRAP_SERVERS']
kafka_topic_name = os.environ['KAFKA_TOPIC_NAME']


class StreamingTweets(tweepy.Stream):
    def on_status(self, status):
        data = {
            'tweet_text': status.text,
            'created_at': str(status.created_at),
            'user_id': status.user.id,
            'user_name': status.user.name,
            'user_screen_name': status.user.screen_name,
            'user_description': status.user.description,
            'user_location': status.user.location,
            'user_followers_count': status.user.followers_count,
            'tweet_body': json.dumps(status._json)
        }

        response = producer.send(
            kafka_topic_name, json.dumps(data).encode('utf-8'))

        print((response))

    def on_error(self, status):
        print(status)


producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

stream = StreamingTweets(
    consumer_key, consumer_secret,
    access_token, access_token_secret
)

stream.filter(track=[twitter_filter_tag])

Finally let's proceed to analyze our tweets in the MSK topic.

Real-Time Data Analysis with Apache Flink

Click on the Process data in real time button as seen in the previous image.

With Amazon Kinesis Data Analytics for Apache Flink, we can use Java, Scala, or SQL to process and analyze streaming data to perform time-series analytics, feed real-time dashboards, and create real-time metrics.

We can build Java and Scala applications in Kinesis Data Analytics using open-source libraries based on Apache Flink. Apache Flink is a popular framework and engine for processing data streams.

On the popup window, choose Apache Flick - Studio notebook & click on the Create button.

Apache Flink - Studio notebook
Apache Flink - Studio notebook

Add a name & description to the Studio Notebook.

Create Studio Notebook
Create Studio Notebook

Finally click on the Create Studio notebook button.

Studio Notebook Settings
Studio Notebook Settings

Once the Analytics Studio Notebook is created, click on the Run button to start the Studio notebook.

Studio Notebook
Studio Notebook

On the confirmation window, click on the Run button.

Run Studio Notebook
Run Studio Notebook

Now once the Studio Notebook is ready, click on the Open in Apache Zeppelin button.

Analytics Studio Application
Analytics Studio Application

From the Zeppelin Notebook click on the Create new note link.

Zeppelin Notebook
Zeppelin Notebook

Add a Note Name and click on the Create button.

New Zeppelin
New Zeppelin

Create Flink Table

Next we will create a table on top of the Kafka Topic followed by some analytics.

%flink.ssql(type=update)

CREATE TABLE tweets (
  tweet_text VARCHAR(5000),
  created_at VARCHAR(30),
  user_id BIGINT,
  user_name VARCHAR(256),
  user_screen_name VARCHAR(256),
  user_followers_count INT,
  tweet_body VARCHAR(40000),
  user_description VARCHAR(256),
  user_location VARCHAR(256),
  created_at_ts as TO_TIMESTAMP (`created_at`, 'yyyy-MM-dd HH:mm:ssz'),
  WATERMARK FOR created_at_ts AS created_at_ts -INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'tweets',
  'properties.bootstrap.servers' = 'b-2.msk-bda-cluster.laur9p.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-3.msk-bda-cluster.laur9p.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-1.msk-bda-cluster.laur9p.c4.kafka.ap-southeast-1.amazonaws.com:9092',
  'format' = 'json',
  'properties.group.id' = 'TweetConsumerGroup',
  'scan.startup.mode' = 'earliest-offset'
);
Flink SQL
Flink SQL

Querying Data

Let's perform some query to get the latest tweets from users with more than 1000 followers.

%flink.ssql(type=update)

SELECT user_followers_count, user_name, user_screen_name, user_location, created_at_ts, tweet_text
FROM tweets
WHERE user_location IS NOT NULL
AND user_followers_count > 10000;

For aggregating tweet counts:

%flink.ssql(type=update)

SELECT 
TUMBLE_ROWTIME(created_at_ts, INTERVAL '60' SECOND) AS window_end,
COUNT(*) AS tweets
FROM tweets
GROUP BY 
TUMBLE(created_at_ts, INTERVAL '60' SECOND)
ORDER BY window_end, tweets;
Window Function
Window Function

Amazon MSK, coupled with Apache Flink, provides a robust infrastructure for streaming data analytics. By following these steps, you can set up a powerful pipeline to process and analyze real-time data, enabling you to harness the full potential of your streaming data sources.

For automated deployment of the AWS resources using Terraform you may refer https://github.com/sarubhai/aws-big-data-analytics

PrimeChess

PrimeChess.org

PrimeChess.org makes elite chess training accessible and affordable for everyone. For the past 6 years, we have offered free chess camps for kids in Singapore and India, and during that time, we also observed many average-rated coaches charging far too much for their services.

To change that, we assembled a team of top-rated coaches including International Masters (IM) or coaches with multiple IM or GM norms, to provide online classes starting from $50 per month (8 classes each month + 4 tournaments)

This affordability is only possible if we get more students. This is why it will be very helpful if you could please pass-on this message to others.

Exclucively For Indian Residents: 
Basic - ₹1500
Intermediate- ₹2000
Advanced - ₹2500

Top 10 Articles