Logo DWBI.org Login / Sign Up
Sign Up
Have Login?
This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.
Login
New Account?
Recovery
Go to Login
By continuing you indicate that you agree to Terms of Service and Privacy Policy of the site.
AWS Analytics

Streaming Data Analytics using Amazon MSK

Updated on Oct 31, 2021

Streaming Data Analytics using Amazon MSK

Streaming Data Analytics using Amazon MSK.

Amazon MSK
Amazon MSK

First of all, we will create our MSK cluster configuration. Click on the Cluster configurations link.

Click on the Create cluster configuration button.

MSK Cluster Configuration
MSK Cluster Configuration

Add a name for our configuration & set the Configuration properties. Next click on the Create button.

New MSK Cluster Configuration
New MSK Cluster Configuration
auto.create.topics.enable=true
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
MSK Cluster Configuration
MSK Cluster Configuration

Next click on the Clusters link, and then click on the Create cluster button.

MSK Cluster
MSK Cluster

Select the Custom create method. Next add a name for the MSK Cluster & choose the Apache Kafka version from the dropdown list.

MSK Cluster Custom Create
MSK Cluster Custom Create

Under the Configuration, select the MSK Cluster configuration we created earlier from the dropdown list.

Next select the VPC & Subnets to deploy the MSK cluster.

MSK Cluster Custom Configuration
MSK Cluster Custom Configuration

Select the appropriate Subnets to deploy the Kafka Brokers.

MSK Cluster Networking
MSK Cluster Networking

Next select a Security Group. Choose  Broker Instance type & Number of Brokers per Zone.

MSK Cluster Brokers
MSK Cluster Brokers

Select the Broker Storage size.

MSK Cluster allows multiple access control methods. Let's choose unauthenticated access as well as IAM role-based authentication.

MSK Cluster Access Control
MSK Cluster Access Control

Choose Plaintext as the encryption in transit method for MSK Cluster. Use encryption of data in rest using AWS managed key.

MSK Cluster Encryption
MSK Cluster Encryption

Choose Basic monitoring for our MSK Cluster.

MSK Cluster Monitoring
MSK Cluster Monitoring

Finally add Tags for management & click on the Create cluster button.

Create MSK Cluster
Create MSK Cluster

Once the Amazon MSK Cluster is ready, we will set up an EC2 instance to read tweets from twitter streams & write to MSK topic.

Amazon MSK Cluster
Amazon MSK Cluster

requirements.txt

# Python Packages
boto3
tweepy
kafka-python

tweets.py

import json
import os
import tweepy
from kafka import KafkaProducer

consumer_key = os.environ['CONSUMER_KEY']
consumer_secret = os.environ['CONSUMER_SECRET']
access_token = os.environ['ACCESS_TOKEN']
access_token_secret = os.environ['ACCESS_TOKEN_SECRET']
twitter_filter_tag = os.environ['TWITTER_FILTER_TAG']

bootstrap_servers = os.environ['BOOTSTRAP_SERVERS']
kafka_topic_name = os.environ['KAFKA_TOPIC_NAME']


class StreamingTweets(tweepy.Stream):
    def on_status(self, status):
        data = {
            'tweet_text': status.text,
            'created_at': str(status.created_at),
            'user_id': status.user.id,
            'user_name': status.user.name,
            'user_screen_name': status.user.screen_name,
            'user_description': status.user.description,
            'user_location': status.user.location,
            'user_followers_count': status.user.followers_count,
            'tweet_body': json.dumps(status._json)
        }

        response = producer.send(
            kafka_topic_name, json.dumps(data).encode('utf-8'))

        print((response))

    def on_error(self, status):
        print(status)


producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

stream = StreamingTweets(
    consumer_key, consumer_secret,
    access_token, access_token_secret
)

stream.filter(track=[twitter_filter_tag])

Finally let's proceed to analyze our tweets in the MSK topic. Click on the Process data in real time button.

With Amazon Kinesis Data Analytics for Apache Flink, we can use Java, Scala, or SQL to process and analyze streaming data to perform time-series analytics, feed real-time dashboards, and create real-time metrics.

We can build Java and Scala applications in Kinesis Data Analytics using open-source libraries based on Apache Flink. Apache Flink is a popular framework and engine for processing data streams.

On the popup window, choose Apache Flick - Studio notebook & click on the Create button.

Apache Flink - Studio notebook
Apache Flink - Studio notebook

Add a name & description to the Studio Notebook.

Create Studio Notebook
Create Studio Notebook

Finally click on the Create Studio notebook button.

Studio Notebook Settings
Studio Notebook Settings

Once the Analytics Studio Notebook is created, click on the Run button to start the Studio notebook.

Studio Notebook
Studio Notebook

On the confirmation window, click on the Run button.

Run Studio Notebook
Run Studio Notebook

Now once the Studio Notebook is ready, click on the Open in Apache Zeppelin button.

Analytics Studio Application
Analytics Studio Application

From the Zeppelin Notebook click on the Create new note link.

Zeppelin Notebook
Zeppelin Notebook

Add a Note Name and click on the Create button.

New Zeppelin
New Zeppelin

Next we will create a table on top of the Kafka Topic followed by some analytics.

Flink SQL
Flink SQL
%flink.ssql(type=update)

CREATE TABLE tweets (
  tweet_text VARCHAR(5000),
  created_at VARCHAR(30),
  user_id BIGINT,
  user_name VARCHAR(256),
  user_screen_name VARCHAR(256),
  user_followers_count INT,
  tweet_body VARCHAR(40000),
  user_description VARCHAR(256),
  user_location VARCHAR(256),
  created_at_ts as TO_TIMESTAMP (`created_at`, 'yyyy-MM-dd HH:mm:ssz'),
  WATERMARK FOR created_at_ts AS created_at_ts -INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'tweets',
  'properties.bootstrap.servers' = 'b-2.msk-bda-cluster.laur9p.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-3.msk-bda-cluster.laur9p.c4.kafka.ap-southeast-1.amazonaws.com:9092,b-1.msk-bda-cluster.laur9p.c4.kafka.ap-southeast-1.amazonaws.com:9092',
  'format' = 'json',
  'properties.group.id' = 'TweetConsumerGroup',
  'scan.startup.mode' = 'earliest-offset'
);

Let's perform some query to get the latest tweets from users who have a followers count of more than 1000.

%flink.ssql(type=update)

SELECT user_followers_count, user_name, user_screen_name, user_location, created_at_ts, tweet_text
FROM tweets
WHERE user_location IS NOT NULL
AND user_followers_count > 10000;
Window Function
Window Function
%flink.ssql(type=update)

SELECT 
TUMBLE_ROWTIME(created_at_ts, INTERVAL '60' SECOND) AS window_end,
COUNT(*) AS tweets
FROM tweets
GROUP BY 
TUMBLE(created_at_ts, INTERVAL '60' SECOND)
ORDER BY window_end, tweets;

For automated deployment of the AWS resources using Terraform you may refer https://github.com/sarubhai/aws-big-data-analytics