Hi everyone,
In this post, we will code a script in python (with Visual Studio 2017) to create a program which we can execute as a windows service in order to extract (in almost real time) the tweets related to certain words or hashtags, store them in a SQL server database, and then consume them with Power BI.
The tasks to be performed by the script are to connect to the Twitter streaming API using some tokens generated from the official Twitter page for developers. Then it will make a request to the API passing by parameter a list of hashtags or terms and will return indefinitely in “real time” the tweets that are published and that contain these terms. Once the tweets are obtained, they are stored in a buffer (with parameterizable size) “n” tweets that when reaching the maximum size of the buffer will be stored into a SQL Server table.
In order to do this, the first thing we must do is to create a new Python project in the Visual Studio 2017 Environment:
The next step will be to add the libraries that we will use in our script, and that will help the work to a great extent, in this case we will make use of the following:
- The library to be able to handle JSON files (since the Twitter API returns data in this format).
- We also use pandas library to store data in dataframes (in memory data structures, similar to SQL tables).
- We need to use tweepy that gives us access to Twitter API.
- pyodbc and sqlAlquiemy provide us with tools to connect to a SQL Server databases.
To be able to make use of these libraries, we must add them to our Python environment from the solution explorer:
Search the library in the Python environment:
Then we will add the following code to the main Python script:
import json import pandas as pd import pyodbc from tweepy.streaming import StreamListener from tweepy import OAuthHandler from tweepy import Stream from pandas.io import sql from sqlalchemy import create_engine from pandas.io.json import json_normalize
In order to access the Twitter API, we need to create an application in the developers Twitter portal on page https://apps.twitter.com/ we can register applications with any Twitter user. When registering an application some tokens are generated that are necessary to access the API, these tokens will be stored in variables in our script:
#Declare variables that contains the user credentials to access Twitter API #You can get your own keys in https://apps.twitter.com/ #-------------------------------------------------------------------------------- aToken = "ENTER YOUR ACCESS TOKEN" aTokenSecret = "ENTER YOUR ACCESS TOKEN SECRET" cKey = "ENTER YOUR API KEY" cSecret = "ENTER YOUR API SECRET"
In our example we get tokens of the app created before:
The workflow of our script will be this:
The whole code is here:
#Import libraries import json import pandas as pd import pyodbc from tweepy.streaming import StreamListener from tweepy import OAuthHandler from tweepy import Stream from pandas.io import sql from sqlalchemy import create_engine from pandas.io.json import json_normalize #Declare variables that contains the user credentials to access Twitter API #You can get your own keys in https://apps.twitter.com/ #-------------------------------------------------------------------------------- aToken = "ENTER YOUR ACCESS TOKEN" aTokenSecret = "ENTER YOUR ACCESS TOKEN SECRET" cKey = "ENTER YOUR API KEY" cSecret = "ENTER YOUR API SECRET" #-------------------------------------------------------------------------------- #Define after how many twitts we do a insert in the data base. bufferSize = 5 #Defina an array to store the tweets readed from the stream api twittsBuffer = [] #Define a connectiont to read-write to the Sql server Database engine = create_engine("mssql+pyodbc://MyDbUser:MyPassword@MySQLServer/TwitterDB?driver=SQL+Server+Native+Client+11.0") #Define a function that receive a twitt by parameter and store it into the twittBuffer variable #if the twittBuffer reach the buffersize defined lenght then call the function AddTwittsToDB that insert the twitts into #the twittsBuffer array into the SQL Server database and clean the buffer #-------------------------------------------------------------------------------- def AddTwittToBuffer(twitt): global twittsBuffer twittsBuffer.append(twitt) if (len(twittsBuffer) == bufferSize): AddTwittsToDB(twittsBuffer) twittsBuffer = [] print(twitt['coordinates'] if twitt['coordinates']!= None else 'no coordinates') return #This function write the twitts stored in the variable twitBuffer to the SQL Database #-------------------------------------------------------------------------------- def AddTwittsToDB(twitts): tData = {'id': [], 'text': [], 'screen_name': [], 'created_at': [], 'retweet_count': [], 'favorite_count': [], 'friends_count': [], 'followers_count': [], 'lang':[], 'country':[], 'latitude':[], 'lontitude':[]} for t in twitts: tData['id'].append(t['id']) tData['text'].append(t['text']) tData['screen_name'].append(t['user']['screen_name']) tData['created_at'].append(t['created_at']) tData['retweet_count'].append(t['retweet_count']) tData['favorite_count'].append(t['favorite_count']) tData['friends_count'].append(t['user']['friends_count']) tData['followers_count'].append(t['user']['followers_count']) tData['lang'].append(t['lang']) if t['place'] != None : tData['country'].append(t['place']['country']) else : tData['country'].append(None) if t['coordinates'] != None : tData['lontitude'].append(t['coordinates']['coordinates'][0]) tData['latitude'].append(t['coordinates']['coordinates'][1]) else : tData['lontitude'].append(None) tData['latitude'].append(None) tweets = pd.DataFrame(tData) tweets.set_index('id', inplace=True) tweets.to_sql("Tweets",engine,None,if_exists='append') return True #-------------------------------------------------------------------------------- #Create a listener class that process received tweets #On error print status #-------------------------------------------------------------------------------- class StdOutListener(StreamListener): def on_data(self, data): t= json.loads(data) AddTwittToBuffer(t) return True def on_error(self, status): print(status) #-------------------------------------------------------------------------------- #Define a main function, the entry point of the program if __name__ == '__main__': #This object handles Twitter authetification and the connection to Twitter Streaming API myListener = StdOutListener() authenticator = OAuthHandler(cKey, cSecret) authenticator.set_access_token(aToken, aTokenSecret) stream = Stream(authenticator, myListener) #This line filter Twitter Streams to capture data tweets with the included text: 'Microsoft' or 'SolidQ' or 'Visual Studio' stream.filter(track=['PowerBI', 'Tableau', 'Qlikview','Microstrategy','Pyramid Analytics','Business Objects', 'Ibm cognos']) #--------------------------------------------------------------------------------
During the execution of the script, if the destination table does not exist, it will be created and the tweets will be inserted into the table.
The table we created will have the following columns:
- The id of the tweet
- The tweet origin country
- The date of creation in text format (then we will transform it to a date format)
- The number of favorites of the user who tweets (at the time he tweeted)
- The number of followers of the user who tweets (at the time he tweeted)
- The number of friends of the user who tweets (at the time he tweeted)
- The language detected by the Twitter API
- The latitude (if it is informed)
- The longitude (if it is informed)
- The number of retweets of the message.
- The name of the user who tweets
- The text of the tweet
If we query the database we will see the following:
In our example, we are getting tweets that contain the following terms ‘PowerBI’, ‘Tableau’, ‘Qlikview’, ‘Microstrategy’, ‘Pyramid Analytics’, ‘Business Objects’, ‘IBM Cognos’ to make a comparison of tweet counts about these terms.
Once we have the script created and tested, we can run it as a windows service so that it is continuously downloading the tweets, this can be done easily with tools such as Non Suck Service Manager you can download from its page https://nssm.cc
Once the nssm is downloaded, we go to the folder where the 64-bit executable is (or the 32-bit executable if it’s your case) and execute the following from the command line:
This window will be opened:
In path we introduce the path where the python executable is adding the executable name of python.exe, in the startup directory we enter the directory (only the path) where the Python executable is stored and in arguments insert the path and file name of the Python script that we created with Visual Studio 2017.
Doing this we will have created the service, we must start it and we will see that tweets begin to appear in our SQL Server table.
Now we have our table with the stored tweets we must create a couple of tables to relate the terms with the table of tweets and create a model that we can consume with Power BI, we do this in several steps:
First, we create a dimension with the terms and a code for each one in bitmask format, there are 7 terms so the table will have 7 files and the ids will be numbered from 1 to 64 in powers of 2 (1, 2, 4, 8,16, 32, and 64), in this way we can relate each tweet with its terms by means of a bit mask.
For example if a tweet contains the words PowerBI and QlickView, the id that relates it to its terms will be the sum of the two ids of those terms, in this case, the relationship ID would be 5, the sum of id 1 corresponding to Power Bi and Id 4 which is the id of the term QlickView. If a tweet contains all the 7 terms its relationship id will be 127 (the sum of all the ids 1+2+4+8+16+32+64)
The script that generates this dimension table is:
with Dim_Terms as( SELECT 1 as Id, 'PowerBI' AS Term union all SELECT 2 as Id, 'Tableau' AS Term union all SELECT 4 as Id, 'Qlikview' AS Term union all SELECT 8 as Id, 'Microstrategy' AS Term union all SELECT 16 as Id, 'Pyramid Analytics' AS Term union all SELECT 32 as Id, 'Business Objects' AS Term union all SELECT 64 as Id, 'Ibm cognos' AS Term ) select * into DimTerm from Dim_Terms
Table queried will look like this:
The relationship established between this dimension of terms and the table of “facts” is a “many to many” relationship so we are going to need another 2 tables that consolidate the relationship to be able to use it in Power BI
The first of these relationship tables is the one that contains all the possible combinations of possible terms from a single term up to 7 at a time, this table will have 127 records (the 0 that would correspond to any term will not be contemplated at the moment)
The query that will provide us this table is:
with nums as ( select 1 as id union all select id + 1 from nums where id + 1 < 128 ) , datos as( select t.id, REPLACE(REPLACE( ltrim( iif(cast(t.id & 64 as bit) = 1, 'Ibm_cognos','') + iif(cast(t.id & 32 as bit) = 1, ' Business_Objects','') + iif(cast(t.id & 16 as bit) = 1, ' Pyramid_Analytics','') + iif(cast(t.id & 8 as bit) = 1, ' Microstrategy','') + iif(cast(t.id & 4 as bit) = 1, ' Qlikview','') + iif(cast(t.id & 2 as bit) = 1, ' Tableau','') + iif(cast(t.id & 1 as bit) = 1, ' Microsoft','')),' ', ' - '), '_',' ') as Terms from nums t ) select * into Dim_AllTerms from datos option (maxrecursion 0)
And its structure will be:
Now we must relate, through a table of relationship “Many to Many”, the dimension with the table that maps each “combination of terms” (Dim_AllTerms) with its terms of the dimension (DimTerm) we do this with next query:
with nums as ( select 1 as id union all select id + 1 from nums where id + 1 < 128 ), datos as( select t.id, cast(t.id & 64 as bit) as bit7 ,cast(t.id & 32 as bit) as bit6 ,cast(t.id & 16 as bit) as bit5 ,cast(t.id & 8 as bit) as bit4 ,cast(t.id & 4 as bit) as bit3 ,cast(t.id & 2 as bit) as bit2 ,cast(t.id & 1 as bit) as bit1 from nums t ) select id, case trend when 'bit1' then 1 when 'bit2' then 2 when 'bit3' then 4 when 'bit4' then 8 when 'bit5' then 16 when 'bit6' then 32 when 'bit7' then 64 else 0 end as IdTrend into F_MtM_Terms From (select * from datos ) p UNPIVOT ( valor for trend in (bit1, bit2, bit3, bit4,bit5,bit6,bit7) ) as unpivo where valor = 1 option (maxrecursion 0)
The table will have the following appearance, as you can see the Id of “Terms” 5, it is repeated 2 times, one for each individual term that it’s contained, the 1 of Microsoft and the 4 of QlikView:
Finally, we create a view that will return tweets data in form of a fact table:
SELECT country ,favorite_count ,followers_count ,friends_count ,lang ,latitude ,lontitude ,retweet_count ,screen_name as UserName ,[text] , CONVERT(DATETIME, SUBSTRING(created_at,9,2)+'/' + CASE SUBSTRING(created_at,5,3) WHEN 'Jan' then '01/' WHEN 'Feb' then '02/' WHEN 'Mar' then '03/' WHEN 'Apr' then '04/' WHEN 'May' then '05/' WHEN 'Jun' then '06/' WHEN 'Jul' then '07/' WHEN 'Aug' then '08/' WHEN 'Sep' then '09/' WHEN 'Oct' then '10/' WHEN 'Nov' then '11/' WHEN 'Dec' then '12/' else '' end +RIGHT(created_at,4) + ' ' + SUBSTRING(created_at,12,8), 105) AS Created_Date , iif([text] like '%PowerBI%',1,0) + iif([text] like '%Tableau%',2,0) + iif([text] like '%Qlikview%',4,0) + iif([text] like '%Microstrategy%',8,0) + iif([text] like '%Pyramid Analytics%',16,0) + iif([text] like '%Business Objects%',32,0) + iif([text] like '%Ibm cognos%',64,0) IdTrend FROM [TwitterDB].[dbo].[Tweets]
Model in power BI have next structure:
* It’s important to note that the many to many relationships must have the cross filter address set to “Both” to work fine.
If we see the used measures are very simple, a sum of favorites, followers and friends, and a count of the number of tweets.
The final result of a pair of dashboards as an example, showing the tweets by region and language and another showing the evolution of the last 2 days in “numDia-NumHora” format
I hope this will be useful or at least encourages you to experiment a bit with Python, it is a very useful tool with a very easy learning curve.
Greetings
Great work
Thanks Alfred, I’m glad to be helpful. 🙂