In a previous blog post, we explained how you can use PySpark CLI to jumpstart your PySpark projects. Here, I’ll explain how it can be used to build an end-to-end real-time streaming application.

The platform that I’m about to discuss here was created to help understand global trends related to Covid-19, the biggest health crisis of our lifetime. It collects Twitter data related to Covid-19 based on hashtags, categorizes them into new infections, deaths, recovered cases, vaccines, etc., based on custom logic, and saves them in MongoDB. The APIs for accessing the Twitter data are written in Django and the UI is developed using React. In the end, we’ll have a single-page application with tweets that are updated every minute. For listening to Twitter API, I have used Tweepy, an open-source PyPI package. 

You can use this platform to create custom categories with a set of hashtags to identify Twitter data relevant to your campaign or research interest. 

Technologies Used

  • PySpark for big data processing
  • Django for API implementation
  • React for frontend

Backend APIs

  • api/list_categories
  • api/list_tweets?category=<category_id>
  • api/add_spam_count

Implementation Plan

  • Develop a PySpark streaming project to categorize tweets
  • Save data in MongoDB and visualize data using a web platform
  • Add functionality for users to report tweets on the platform
Building Covid-19 Twitter Data Aggregation Platform with PySparkCLI
Covid-19 Twitter Data Aggregation Platform

Setting Up the Project

Let’s create the initial structure for our streaming project using PySpark CLI. We can use the pysparkcli create command to do that.

I used the following command to create the project-covidTDP (Covid Twitter Data Platform) with Twitter streaming template:

pysparkcli create covidTDP -m ‘local[*]’ -c 2 -t streaming

It gives the following message:

Using CLI Version: 1.0.3

Completed building project:  covidTDP

Let’s check the generated folder structure using the following command:

➜  tree  covidTDP    

 covidTDP

├── __init__.py

├── requirements.txt

├── src

│   ├── app.py

│   ├── configs

│   │   ├── __init__.py

│   │   ├── spark_config.py

│   │   └── stream_source_config.py

│   ├── __init__.py

│   ├── jobs

│   │   ├── __init__.py

│   │   └── transformation_job.py

│   ├── settings

│   │   ├── default.py

│   │   ├── __init__.py

│   │   ├── local.py

│   │   └── production.py

│   └── streaming

│       ├── __init__.py

│       └── twitter_stream.py

└── tests

    ├── __pycache__

    │   └── test_stream.cpython-36.pyc

    └── test_stream.py

7 directories, 17 files

Now the initial project structure is ready. 

Project Coding Explained

It's time to dive into the code and get our hands dirty.

  1. For this project, we need to fetch all the tweets related to Coronavirus and Covid-19. To do that, we need to put hashtags such as #corona, #covid19, etc. in the twitter_stream.py file in the generated project for our streaming pipeline to pull data from Twitter. You can find the file at the path covidTDP/src/streaming/twitter_stream.py.

In our case, there are lots of hashtags to add. So we went with a Python List and assigned the list variable. Check the raw code here.

Covid-19 Twitter Data Aggregation Platform with PySparkCLI

2. The next step is to transform the tweet data coming in the Spark streaming feed.

Using PySpark CLI to build Covid-19 Twitter Data Aggregation Platform

We will be transforming the above data into the following JSON format before saving it to MongoDB.

PySpark CLI Twitter Data Aggregation Platform

3. Now we need to set up a database to save the data. For this project, I used MongoDB and for hosting, I used AWS EC2 instances. To save this to MongoDB, you need to create a MongoDB model. I created the following model for saving Twitter data. See the model here in code.

Aggregating Covid Twitter data using PySpark CLI

4. Now let's modify the app logic to transform Twitter data and save it to MongoDB cluster. It's time to examine app logic in depth. We will go based on how the code logic is working when we call pysparkcli run project command.

In the main app file, the code execution starts at the code block shown below. The df variable is the data frame where all the data from the Twitter stream is read. 

if __name__ == "__main__":
   df = spark_config.spark \
           .readStream \
           .format("socket") \
           .option("host", spark_config.IP) \
           .option("port", spark_config.Port) \
           .load()
 
   transform = df.writeStream\
           .foreach(saveMongo)\
           .start()

The awaitTermination is to wait on the write stream until an interrupt signal is initiated. 

transform.awaitTermination()

The function given below will take data as input and transform it to the expected format.

def saveMongo(data):
   data = loads(data.asDict()['value'])
   text = data.get("text", '--NA--')
   print("Processing data from user:"+data.get('user',{}).get('name', '--NA--'))

Here we’ve added checks for the following:

  • Retweets that contain RT @.
  • Tweets with empty texts (yes, that happens!).
  • Texts containing a set of keywords that we have used within the code.
 if not text.startswith("RT @") and filterKeyword(text) and "retweeted_status" not in data.keys() and text != '--NA--':
       hashtags = data.get('entities', {}).get('hashtags', [])
       if filterHash(hashtags):

Here we are creating the MongoDB client to save the new data to the cluster:

 db_client = connect(
               db=DB_NAME
           )

Preparing the categories from the text:

categories = getCategory(text)
           cat_objs = Category.objects.in_bulk(categories)

Here we have custom logic to identify newly coming categories and save them in MongoDB.

for cat_name in list(set(categories) - set(cat_objs)):
               category = Category(_id = cat_name)
               category.save()

Now we are ready to save Twitter data in MongoDB.

 tweet = TwitterData(text=text)
           tweet.hashtags = [hashtag["text"] for hashtag in hashtags]
           tweet.user = data.get('user')
           tweet.country = getCountry(text)
           tweet.category = categories
           tweet.url = "https://twitter.com/user/status/" + data.get("id_str")
           tweet.save()

Time to close the MongoDB connection. We don’t want a lot of connections to crash our server.

  db_client.close()

Steps to take before running the project:

  • Copy utils folder to covidTDP folder.
  • Copy requirements file.
  • Make sure JAVA_HOME is configured with /usr/lib/jvm/java-8-openjdk-amd64.
  • Install MongoDB. 
  • Install npm.

Now it's time to run the application.

Start the Twitter stream using pysparkcli stream command and then run the PySpark project with pysparkcli run command.

Twitter Stream Initiation

Initiate a stream with the following command:

(.env) ➜  BigData pysparkcli stream covidTDP twitter_stream

Covid-19: Building Twitter Data Aggregation Platform using PySpark CLI

Running PySpark Project 

It’s time to start running the project.

(.env) ➜  BigData pysparkcli run bigdata

The output is given below.

How to implement Covid-19 Twitter Data Aggregation Platform using PySpark CLI

Once the logs start coming in, you can see the raw JSON data that will be consumed by our PySpark streaming application. 

Setting Up Django API

The next step is to develop the Django project for building the API for tweets. The code for the Django project is available here

To run the project, use the following commands:

  • git clone git@github.com:qburst/dashboard-for-socialmedia-trend.git
  • cd dashboard-for-socialmedia-trend/backend
  • cd api
  • pip3 install -r requirements.txt
  • python manage.py makemigrations
  • python manage.py migrate
  • python manage.py collectstatic
  • python manage.py runserver

The output is given below.

Implementing Covid-19 Twitter Data Aggregation Platform using PySpark CLI

Setting Up the UI

Next, is setting up the UI build using React Native. The frontend code is available here

Change BASE_URL in the path frontend/src/utils/withBase/index.js. This is to connect with the local Django API server.

const BASE_URL = "http://localhost:8000";

Follow these commands to get the UI running in a development environment.

  • cd dashboard-for-socialmedia-trend/frontend
  • npm install
  • npm start

The above commands will result in the following output.

Covid-19 Twitter Data Aggregation Platform

So now we have the entire platform running in the development environment.

Here's the UI we built.

Dashboard for Social Media Trends - An application of PySparkCLI

Our platform can now:

  • Collect tweets related to Coronavirus and Covid-19.
  • Filter these tweets based on categories (New infections/Deaths/Vaccines, etc.).
  • Filter these tweets based on country of origin.
  • Mark unwanted tweets as spam.

Conclusion

As you have seen, we can create complex streaming projects with ease using PySpark CLI.  This entire project is open-sourced and is available on GitHub. If you face any issues while setting up this project in your development environment, you can raise them here.