Logo DWBI.org Login / Sign Up
Sign Up
Have Login?
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.
New Account?
Go to Login
By continuing you indicate that you agree to Terms of Service and Privacy Policy of the site.
AWS Analytics

Streaming Data Analytics with Amazon MSK

Updated on Jul 02, 2024

In the era of big data, the ability to analyze streaming data in real-time is essential for gaining timely insights and making data-driven decisions. Amazon Managed Streaming for Apache Kafka (MSK) is a powerful solution for this purpose. This article guides you through setting up Amazon MSK, ingesting data from Twitter, and analyzing it using Apache Flink.

Setting Up Amazon MSK

Amazon MSK
Amazon MSK

First of all, we will create our MSK cluster configuration. Click on the Cluster configurations link.

Click on the Create cluster configuration button.

MSK Cluster Configuration
MSK Cluster Configuration

Add a name for our configuration & set the Configuration properties. Next click on the Create button.

New MSK Cluster Configuration
New MSK Cluster Configuration
MSK Cluster Configuration
MSK Cluster Configuration

Next click on the Clusters link, and then click on the Create cluster button.

MSK Cluster
MSK Cluster

Select the Custom create method. Next add a name for the MSK Cluster & choose the Apache Kafka version from the dropdown list.

MSK Cluster Custom Create
MSK Cluster Custom Create

Under the Configuration, select the MSK Cluster configuration we created earlier from the dropdown list.

Next select the VPC & Subnets to deploy the MSK cluster.

MSK Cluster Custom Configuration
MSK Cluster Custom Configuration

Select the appropriate Subnets to deploy the Kafka Brokers.

MSK Cluster Networking
MSK Cluster Networking

Next select a Security Group. Choose  Broker Instance type & Number of Brokers per Zone.

MSK Cluster Brokers
MSK Cluster Brokers

Select the Broker Storage size.

MSK Cluster allows multiple access control methods. Let's choose unauthenticated access as well as IAM role-based authentication.

MSK Cluster Access Control
MSK Cluster Access Control

Choose Plaintext as the encryption in transit method for MSK Cluster. Use encryption of data in rest using AWS managed key.

MSK Cluster Encryption
MSK Cluster Encryption

Choose Basic monitoring for our MSK Cluster.

MSK Cluster Monitoring
MSK Cluster Monitoring

Finally add Tags for management & click on the Create cluster button.

Create MSK Cluster
Create MSK Cluster

Wait until the Amazon MSK Cluster is ready.

Amazon MSK Cluster
Amazon MSK Cluster

Ingesting Data from Twitter

Set up an EC2 instance to read tweets from Twitter streams and write them to an MSK topic. 

The Python script for this process involves using boto3, tweepy, and kafka-python packages


# Python Packages


import json
import os
import tweepy
from kafka import KafkaProducer

consumer_key = os.environ['CONSUMER_KEY']
consumer_secret = os.environ['CONSUMER_SECRET']
access_token = os.environ['ACCESS_TOKEN']
access_token_secret = os.environ['ACCESS_TOKEN_SECRET']
twitter_filter_tag = os.environ['TWITTER_FILTER_TAG']

bootstrap_servers = os.environ['BOOTSTRAP_SERVERS']
kafka_topic_name = os.environ['KAFKA_TOPIC_NAME']

class StreamingTweets(tweepy.Stream):
    def on_status(self, status):
        data = {
            'tweet_text': status.text,
            'created_at': str(status.created_at),
            'user_id': status.user.id,
            'user_name': status.user.name,
            'user_screen_name': status.user.screen_name,
            'user_description': status.user.description,
            'user_location': status.user.location,
            'user_followers_count': status.user.followers_count,
            'tweet_body': json.dumps(status._json)

        response = producer.send(
            kafka_topic_name, json.dumps(data).encode('utf-8'))


    def on_error(self, status):

producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

stream = StreamingTweets(
    consumer_key, consumer_secret,
    access_token, access_token_secret


Finally let's proceed to analyze our tweets in the MSK topic.

Real-Time Data Analysis with Apache Flink

Click on the Process data in real time button as seen in the previous image.

With Amazon Kinesis Data Analytics for Apache Flink, we can use Java, Scala, or SQL to process and analyze streaming data to perform time-series analytics, feed real-time dashboards, and create real-time metrics.

We can build Java and Scala applications in Kinesis Data Analytics using open-source libraries based on Apache Flink. Apache Flink is a popular framework and engine for processing data streams.

On the popup window, choose Apache Flick - Studio notebook & click on the Create button.

Apache Flink - Studio notebook
Apache Flink - Studio notebook

Add a name & description to the Studio Notebook.

Create Studio Notebook
Create Studio Notebook

Finally click on the Create Studio notebook button.

Studio Notebook Settings
Studio Notebook Settings

Once the Analytics Studio Notebook is created, click on the Run button to start the Studio notebook.

Studio Notebook
Studio Notebook

On the confirmation window, click on the Run button.

Run Studio Notebook
Run Studio Notebook

Now once the Studio Notebook is ready, click on the Open in Apache Zeppelin button.

Analytics Studio Application
Analytics Studio Application

From the Zeppelin Notebook click on the Create new note link.

Zeppelin Notebook
Zeppelin Notebook

Add a Note Name and click on the Create button.

New Zeppelin
New Zeppelin

Create Flink Table

Next we will create a table on top of the Kafka Topic followed by some analytics.


  tweet_text VARCHAR(5000),
  created_at VARCHAR(30),
  user_id BIGINT,
  user_name VARCHAR(256),
  user_screen_name VARCHAR(256),
  user_followers_count INT,
  tweet_body VARCHAR(40000),
  user_description VARCHAR(256),
  user_location VARCHAR(256),
  created_at_ts as TO_TIMESTAMP (`created_at`, 'yyyy-MM-dd HH:mm:ssz'),
  WATERMARK FOR created_at_ts AS created_at_ts -INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'tweets',
  'properties.bootstrap.servers' = 'b-2.msk-bda-cluster.laur9p.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-3.msk-bda-cluster.laur9p.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-1.msk-bda-cluster.laur9p.c4.kafka.ap-southeast-1.amazonaws.com:9092',
  'format' = 'json',
  'properties.group.id' = 'TweetConsumerGroup',
  'scan.startup.mode' = 'earliest-offset'
Flink SQL
Flink SQL

Querying Data

Let's perform some query to get the latest tweets from users with more than 1000 followers.


SELECT user_followers_count, user_name, user_screen_name, user_location, created_at_ts, tweet_text
FROM tweets
WHERE user_location IS NOT NULL
AND user_followers_count > 10000;

For aggregating tweet counts:


TUMBLE_ROWTIME(created_at_ts, INTERVAL '60' SECOND) AS window_end,
COUNT(*) AS tweets
FROM tweets
TUMBLE(created_at_ts, INTERVAL '60' SECOND)
ORDER BY window_end, tweets;
Window Function
Window Function

Amazon MSK, coupled with Apache Flink, provides a robust infrastructure for streaming data analytics. By following these steps, you can set up a powerful pipeline to process and analyze real-time data, enabling you to harness the full potential of your streaming data sources.

For automated deployment of the AWS resources using Terraform you may refer https://github.com/sarubhai/aws-big-data-analytics

Top 10 Articles