How to extract Twitter data with a windows service created with Python in Visual Studio 2017

How to extract Twitter data with a windows service created with Python in Visual Studio 2017

Hi everyone,

In this post, we will code a script in python (with Visual Studio 2017) to create a program which we can execute as a windows service in order to extract (in almost real time) the tweets related to certain words or hashtags, store them in a SQL server database, and then consume them with Power BI.

The tasks to be performed by the script are to connect to the Twitter streaming API using some tokens generated from the official Twitter page for developers. Then it will make a request to the API passing by parameter a list of hashtags or terms and will return indefinitely in “real time” the tweets that are published and that contain these terms. Once the tweets are obtained, they are stored in a buffer (with parameterizable size) “n” tweets that when reaching the maximum size of the buffer will be stored into a SQL Server table.

In order to do this, the first thing we must do is to create a new Python project in the Visual Studio 2017 Environment:

The next step will be to add the libraries that we will use in our script, and that will help the work to a great extent, in this case we will make use of the following:

  • The library to be able to handle JSON files (since the Twitter API returns data in this format).
  • We also use pandas library to store data in dataframes (in memory data structures, similar to SQL tables).
  • We need to use tweepy that gives us access to Twitter API.
  • pyodbc and sqlAlquiemy provide us with tools to connect to a SQL Server databases.

To be able to make use of these libraries, we must add them to our Python environment from the solution explorer:

Search the library in the Python environment:

Then we will add the following code to the main Python script:

import json
import pandas as pd
import pyodbc
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from pandas.io import sql
from sqlalchemy import create_engine
from pandas.io.json import json_normalize

In order to access the Twitter API, we need to create an application in the developers Twitter portal on page https://apps.twitter.com/  we can register applications with any Twitter user. When registering an application some tokens are generated that are necessary to access the API, these tokens will be stored in variables in our script:

#Declare variables that contains the user credentials to access Twitter API
#You can get your own keys in https://apps.twitter.com/
#--------------------------------------------------------------------------------
aToken =       "ENTER YOUR ACCESS TOKEN"
aTokenSecret = "ENTER YOUR ACCESS TOKEN SECRET"
cKey =         "ENTER YOUR API KEY"
cSecret =      "ENTER YOUR API SECRET"

 

In our example we get tokens of the app created before:

The workflow of our script will be this:

The whole code is here:

#Import libraries

import json
import pandas as pd
import pyodbc
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from pandas.io import sql
from sqlalchemy import create_engine
from pandas.io.json import json_normalize


#Declare variables that contains the user credentials to access Twitter API
#You can get your own keys in https://apps.twitter.com/
#--------------------------------------------------------------------------------
aToken =       "ENTER YOUR ACCESS TOKEN"
aTokenSecret = "ENTER YOUR ACCESS TOKEN SECRET"
cKey =         "ENTER YOUR API KEY"
cSecret =      "ENTER YOUR API SECRET"
#--------------------------------------------------------------------------------


#Define after how many twitts we do a insert in the data base.
bufferSize = 5
#Defina an array to store the tweets readed from the stream api
twittsBuffer = []
#Define a connectiont to read-write to the Sql server Database 
engine = create_engine("mssql+pyodbc://MyDbUser:MyPassword@MySQLServer/TwitterDB?driver=SQL+Server+Native+Client+11.0")

#Define a function that receive a twitt by parameter and store it into the twittBuffer variable
#if the twittBuffer reach the buffersize defined lenght then call the function AddTwittsToDB that insert the twitts into
#the twittsBuffer array into the SQL Server database and clean the buffer
#--------------------------------------------------------------------------------
def AddTwittToBuffer(twitt):
    
    global twittsBuffer
    twittsBuffer.append(twitt)
    
    if (len(twittsBuffer) == bufferSize):
        AddTwittsToDB(twittsBuffer)
        twittsBuffer = []
    print(twitt['coordinates'] if twitt['coordinates']!= None else 'no coordinates')

    return 

#This function write the twitts stored in the variable twitBuffer to the SQL Database
#--------------------------------------------------------------------------------
def AddTwittsToDB(twitts):
    tData = {'id': [], 
             'text': [], 
             'screen_name': [], 
             'created_at': [],
             'retweet_count': [], 
             'favorite_count': [],
             'friends_count': [], 
             'followers_count': [], 
             'lang':[], 
             'country':[], 
             'latitude':[], 
             'lontitude':[]}
    
    for t in twitts:
        tData['id'].append(t['id'])
        tData['text'].append(t['text'])
        tData['screen_name'].append(t['user']['screen_name'])
        tData['created_at'].append(t['created_at'])
        tData['retweet_count'].append(t['retweet_count'])
        tData['favorite_count'].append(t['favorite_count'])
        tData['friends_count'].append(t['user']['friends_count'])
        tData['followers_count'].append(t['user']['followers_count'])
        tData['lang'].append(t['lang'])

        if t['place'] != None :
            tData['country'].append(t['place']['country'])
        else :
            tData['country'].append(None)
        
        if t['coordinates'] != None :
            tData['lontitude'].append(t['coordinates']['coordinates'][0])
            tData['latitude'].append(t['coordinates']['coordinates'][1])
        else :
            tData['lontitude'].append(None)
            tData['latitude'].append(None)
    tweets = pd.DataFrame(tData)
    tweets.set_index('id', inplace=True)
    tweets.to_sql("Tweets",engine,None,if_exists='append')
    return True
#--------------------------------------------------------------------------------
#Create a listener class that process received tweets
#On error print status
#--------------------------------------------------------------------------------
class StdOutListener(StreamListener):

    def on_data(self, data):
        t= json.loads(data)
        AddTwittToBuffer(t)
        return True

    def on_error(self, status):
        print(status)
#--------------------------------------------------------------------------------
#Define a main function, the entry point of the program
if __name__ == '__main__':

    #This object handles Twitter authetification and the connection to Twitter Streaming API
    myListener = StdOutListener()
    authenticator = OAuthHandler(cKey, cSecret)
    authenticator.set_access_token(aToken, aTokenSecret)
    stream = Stream(authenticator, myListener)

    #This line filter Twitter Streams to capture data tweets with the included text: 'Microsoft' or 'SolidQ' or 'Visual Studio'
    stream.filter(track=['PowerBI', 'Tableau', 'Qlikview','Microstrategy','Pyramid Analytics','Business Objects', 'Ibm cognos'])
#--------------------------------------------------------------------------------

 

During the execution of the script, if the destination table does not exist, it will be created and the tweets will be inserted into the table.

The table we created will have the following columns:

  1. The id of the tweet
  2. The tweet origin country
  3. The date of creation in text format (then we will transform it to a date format)
  4. The number of favorites of the user who tweets (at the time he tweeted)
  5. The number of followers of the user who tweets (at the time he tweeted)
  6. The number of friends of the user who tweets (at the time he tweeted)
  7. The language detected by the Twitter API
  8. The latitude (if it is informed)
  9. The longitude (if it is informed)
  10. The number of retweets of the message.
  11. The name of the user who tweets
  12. The text of the tweet

If we query the database we will see the following:

In our example, we are getting tweets that contain the following terms ‘PowerBI’, ‘Tableau’, ‘Qlikview’, ‘Microstrategy’, ‘Pyramid Analytics’, ‘Business Objects’, ‘IBM Cognos’ to make a comparison of tweet counts about these terms.

Once we have the script created and tested, we can run it as a windows service so that it is continuously downloading the tweets, this can be done easily with tools such as Non Suck Service Manager you can download from its page https://nssm.cc

Once the nssm is downloaded, we go to the folder where the 64-bit executable is (or the 32-bit executable if it’s your case) and execute the following from the command line:

This window will be opened:

In path we introduce the path where the python executable is adding the executable name of python.exe, in the startup directory we enter the directory (only the path) where the Python executable is stored and in arguments insert the path and file name of the Python script that we created with Visual Studio 2017.

Doing this we will have created the service, we must start it and we will see that tweets begin to appear in our SQL Server table.

Now we have our table with the stored tweets we must create a couple of tables to relate the terms with the table of tweets and create a model that we can consume with Power BI, we do this in several steps:

First, we create a dimension with the terms and a code for each one in bitmask format, there are 7 terms so the table will have 7 files and the ids will be numbered from 1 to 64 in powers of 2 (1, 2, 4, 8,16, 32, and 64), in this way we can relate each tweet with its terms by means of a bit mask.

For example if a tweet contains the words PowerBI and QlickView, the id that relates it to its terms will be the sum of the two ids of those terms, in this case, the relationship ID would be 5, the sum of id 1 corresponding to Power Bi and Id 4 which is the id of the term QlickView. If a tweet contains all the 7 terms its relationship id will be 127 (the sum of all the ids 1+2+4+8+16+32+64)

The script that generates this dimension table is:

with Dim_Terms as(
SELECT 1  as Id, 'PowerBI'			AS Term union all 
SELECT 2  as Id, 'Tableau'			AS Term union all 
SELECT 4  as Id, 'Qlikview'			AS Term union all 
SELECT 8  as Id, 'Microstrategy'		AS Term union all 
SELECT 16 as Id, 'Pyramid Analytics'	        AS Term union all 
SELECT 32 as Id, 'Business Objects'		AS Term union all 
SELECT 64 as Id, 'Ibm cognos'			AS Term )
select * into DimTerm from Dim_Terms

 

Table queried will look like this:

The relationship established between this dimension of terms and the table of “facts” is a “many to many” relationship so we are going to need another 2 tables that consolidate the relationship to be able to use it in Power BI

The first of these relationship tables is the one that contains all the possible combinations of possible terms from a single term up to 7 at a time, this table will have 127 records (the 0 that would correspond to any term will not be contemplated at the moment)

The query that will provide us this table is:

with nums as
(
  select 1 as id
  union all 
  select id + 1
    from nums
    where id + 1 < 128
)
,
datos as(
select t.id,
  REPLACE(REPLACE( 
  ltrim(
  iif(cast(t.id & 64 as bit)  = 1, 'Ibm_cognos','') + 
    iif(cast(t.id & 32 as bit)  = 1, ' Business_Objects','') +
    iif(cast(t.id & 16 as bit) 	= 1, ' Pyramid_Analytics','') +
    iif(cast(t.id & 8 as bit)	= 1, ' Microstrategy','') +
    iif(cast(t.id & 4 as bit)	= 1, ' Qlikview','') +
    iif(cast(t.id & 2 as bit)  	= 1, ' Tableau','') +
    iif(cast(t.id & 1 as bit)	= 1, ' Microsoft','')),' ', ' - '), '_',' ')  as Terms
from nums t
)
select * into Dim_AllTerms from datos option (maxrecursion 0)

 

And its structure will be:

Now we must relate, through a table of relationship “Many to Many”, the dimension with the table that maps each “combination of terms” (Dim_AllTerms) with its terms of the dimension (DimTerm) we do this with next query:

with nums as
(
 select 1 as id
 union all 
 select id + 1
 from nums
 where id + 1 < 128
),
datos as(
select t.id, 
 cast(t.id & 64 as bit) as bit7
 ,cast(t.id & 32 as bit) as bit6
 ,cast(t.id & 16 as bit) as bit5
 ,cast(t.id & 8 as bit) as bit4
 ,cast(t.id & 4 as bit) as bit3
 ,cast(t.id & 2 as bit) as bit2
 ,cast(t.id & 1 as bit) as bit1
from nums t
)
 select 
 id, 
 case trend
 when 'bit1' then 1
 when 'bit2' then 2
 when 'bit3' then 4
 when 'bit4' then 8
 when 'bit5' then 16
 when 'bit6' then 32
 when 'bit7' then 64
 else 0
 end as IdTrend
 into F_MtM_Terms
 From 
 (select * from datos ) p
 UNPIVOT 
 (
 valor for trend in (bit1, bit2, bit3, bit4,bit5,bit6,bit7)
 ) as unpivo
 where
 valor = 1
option (maxrecursion 0)

 

The table will have the following appearance, as you can see the Id of “Terms” 5, it is repeated 2 times, one for each individual term that it’s contained, the 1 of Microsoft and the 4 of QlikView:

Finally, we create a view that will return tweets data in form of a fact table:

SELECT 
  country
  ,favorite_count
  ,followers_count
  ,friends_count
  ,lang
  ,latitude
  ,lontitude
  ,retweet_count
  ,screen_name as UserName
  ,[text]
  , CONVERT(DATETIME, SUBSTRING(created_at,9,2)+'/'
  + CASE SUBSTRING(created_at,5,3)
    WHEN 'Jan' then '01/'
    WHEN 'Feb' then '02/'
    WHEN 'Mar' then '03/'
    WHEN 'Apr' then '04/'
    WHEN 'May' then '05/'
    WHEN 'Jun' then '06/'
    WHEN 'Jul' then '07/'
    WHEN 'Aug' then '08/'
    WHEN 'Sep' then '09/'
    WHEN 'Oct' then '10/'
    WHEN 'Nov' then '11/'
    WHEN 'Dec' then '12/'
  else '' end
  +RIGHT(created_at,4) + ' '
  + SUBSTRING(created_at,12,8), 105) AS Created_Date
  , iif([text] like '%PowerBI%',1,0) 
  + iif([text] like '%Tableau%',2,0) 
  + iif([text] like '%Qlikview%',4,0) 
  + iif([text] like '%Microstrategy%',8,0) 
  + iif([text] like '%Pyramid Analytics%',16,0) 
  + iif([text] like '%Business Objects%',32,0) 
  + iif([text] like '%Ibm cognos%',64,0) IdTrend
FROM 
  [TwitterDB].[dbo].[Tweets]

 

Model in power BI have next structure:

* It’s important to note that the many to many relationships must have the cross filter address set to “Both” to work fine.

If we see the used measures are very simple, a sum of favorites, followers and friends, and a count of the number of tweets.

The final result of a pair of dashboards as an example, showing the tweets by region and language and another showing the evolution of the last 2 days in “numDia-NumHora” format

I hope this will be useful or at least encourages you to experiment a bit with Python, it is a very useful tool with a very easy learning curve.

Greetings

 

How to refresh Power BI dataset from an on-premise Power Shell script

How to refresh Power BI dataset from an on-premise Power Shell script

Hi friends,

Today we will show you how we can refresh a dataset published in Power BI from a Power Shell Script that we would invoke at the end of our ETL process.

We will use the Power BI libraries for power shell to connect to our power Bi portal and send an instruction to refresh a data set. This could be useful to improve our ETL processes, refreshing our on-line datasets used in Power Bi portal before loading data into our data-warehouse and/or our OLAP/Tabular database send an instruction to.

Note that if the dataset is not in a workspace assigned to Premium capacity, then you will be limited to eight refreshes per day. Datasets in workspaces assigned to Premium will support up to 48 refreshes a day.

In order to perform this operation, we will need to create an Azure application with access to our Power BI portal, to do so, follow this steps:

Connect to https://dev.powerbi.com/apps and log in as a Power BI admin user:

Create a new application using this information:
-App Type: Native app
-Redirect URL: “urn:ietf:wg:oauth:2.0:oob” (without quotes)
-Level of access: check all dataset APIs permissions

Then we need to assign permissions to our Power Bi admin user to use the application. To do that connect to Azure portal and enter the “Azure Active Directory” tool following these steps:
1.- Enter in the App Registry (marked in yellow)

2.- Then click on “All applications” to see all the applications you own, and  select the application that will refresh our dataset (marked in yellow):

3.- Add our user as the owner of the application:

4.- At this point we must grant permissions to our application to access active directory:

We must grant access to the datasets of Power BI too:

Once this steps are done, it’s time to code (let’s rock !)

To create the script we must install and add the following libraries to our script:
Microsoft.IdentityModel.Clients.ActiveDirectory.dll
Microsoft.IdentityModel.Clients.ActiveDirectory.WindowsForms.dll

We do that with thiscommand: https://docs.microsoft.com/es-es/powershell/azure/install-azurerm-ps?view=azurermps-5.5.0

# Install the Azure Resource Manager modules from the PowerShell Gallery
Install-Module -Name AzureRM -AllowClobber

In our code we need to parametrize the client Id (that is our app client ID), the dataset ID that we can take it from power BI portal when we click on the dataset options (app.powerbi.com/groups/{groupID}/settings/datasets/{datasetID} ). Note that the group Id must be “me” in the script because we are using “My workspace” workspace.

The entire code with comments are here:

# Parameters - fill these in before running the script!
# =====================================================

# An easy way to get group and dataset ID is to go to dataset settings and click on the dataset
# that you'd like to refresh. Once you do, the URL in the address bar will show the group ID and 
# dataset ID, in the format: 
# app.powerbi.com/groups/{groupID}/settings/datasets/{datasetID} 

$groupID = "me" # the ID of the group that hosts the dataset. Use "me" if this is your My Workspace
$datasetID = "d9b4fd0c-7ac9-4e5d-be42-76686ce6b2db" # the ID of the dataset that hosts the dataset

# AAD Client ID
# To get this, go to the following page and follow the steps to provision an app
# https://dev.powerbi.com/apps
# To get the sample to work, ensure that you have the following fields:
# App Type: Native app
# Redirect URL: urn:ietf:wg:oauth:2.0:oob
#  Level of access: all dataset APIs
$clientId = "Client ID" 

# End Parameters =======================================

# Calls the Active Directory Authentication Library (ADAL) to authenticate against AAD
function GetAuthToken
{
       $adal = "C:\Program Files\WindowsPowerShell\Modules\Azure\5.1.1\Automation\Microsoft.IdentityModel.Clients.ActiveDirectory.dll"
       
 
       $adalforms = "C:\Program Files\WindowsPowerShell\Modules\Azure\5.1.1\Networking\Microsoft.IdentityModel.Clients.ActiveDirectory.WindowsForms.dll"
 
       [System.Reflection.Assembly]::LoadFrom($adal) | Out-Null
 
       [System.Reflection.Assembly]::LoadFrom($adalforms) | Out-Null
 
       $redirectUri = "urn:ietf:wg:oauth:2.0:oob"
 
       $resourceAppIdURI = "https://analysis.windows.net/powerbi/api"
 
       $authority = "https://login.microsoftonline.com/common/oauth2/authorize";
 
       $authContext = New-Object "Microsoft.IdentityModel.Clients.ActiveDirectory.AuthenticationContext" -ArgumentList $authority

       $UserCred = New-Object “Microsoft.IdentityModel.Clients.ActiveDirectory.UserCredential” -ArgumentList “UserName@domain.com“, “myPassword”
 
       $authResult = $authContext.AcquireTokenAsync($resourceAppIdURI, $clientId, $UserCred).Result
 
       return $authResult
}

# Get the auth token from AAD
$token = GetAuthToken

# Building Rest API header with authorization token
$authHeader = @{
   'Content-Type'='application/json'
   'Authorization'=$token.CreateAuthorizationHeader()
}

# properly format groups path
$groupsPath = ""
if ($groupID -eq "me") {
    $groupsPath = "myorg"
} else {
    $groupsPath = "myorg/groups/$groupID"
}

# Refresh the dataset
$uri = "https://api.powerbi.com/v1.0/$groupsPath/datasets/$datasetID/refreshes"
Invoke-RestMethod -Uri $uri –Headers $authHeader –Method POST –Verbose

# Check the refresh history
$uri = "https://api.powerbi.com/v1.0/$groupsPath/datasets/$datasetID/refreshes"
Invoke-RestMethod -Uri $uri –Headers $authHeader –Method GET –Verbose

And you’re all set!

This post was possible thanks to PatAltimore with his work in https://github.com/Azure-Samples/powerbi-powershell/blob/master/manageRefresh.ps1 We only did some changes to his code to make it work with our libraries. Thanks Pal!

Subscribe to our blog if you enjoyed this post 😉

SQL Server performance with Spectre and Meltdown patches

SQL Server performance with Spectre and Meltdown patches

As you know, as long as you are not totally oblivious to the technological world you will have heard about one of the biggest bugs in the history of computer science (Spectre and Meltdown) and that its effects are real. So real, that we ourselves at SolidQ ourselves have experienced it in our own Query Analytics software. In this post I will try to shed some light on how to proceed if you detect performance regression in your solution with SQL Server, explaining how I have solved it in my own system.

(more…)

How to Better Evaluate the Goodness-of-Fit of Regressions

Most of the content of this post is platform-agnostic. Since in these days I’m using Azure Machine Learning, I take it as a starting point of my studies.

It’s quite simple for an Azure Machine Learning average user to create a regression experiment, make the data flow in it and get the predicted values. It’s also easy to have some metrics to evaluate the implemented model. Once you get them, the following questions arise:

  • How can I interpret these numbers?
  • Are these metrics enough to assess the goodness-of-fit of the model?

This post wants to provide you with the statistical foundation behind these metrics and with some additional tools that will help you to better understand how the model has fitted. These tools are implemented in a R script you can simply copy&paste into an Execute R Script module.

Read the rest of the article here:

https://medium.com/wandering-in-advanced-analytics/how-to-better-evaluate-the-goodness-of-fit-of-regressions-990dbf1c0091

I use columnstore indexes on SQL 2014. What would be the advantages of upgrading to SQL 2016?

I use columnstore indexes on SQL 2014. What would be the advantages of upgrading to SQL 2016?

SQL Server 2016 really represents a drastic change for the better in regards to general performance. And the proof is that, thanks to the engine changes, this is the first SQL Server version where Microsoft actively provides a widespread increase in performance levels by simply upgrading. But what impact can it have in your daily loads if you use columnstore indexes in SQL server 2014 datawarehouse? There was already an amazing increase in performance levels with SQL Server 2014 and the columnstore indexes. Is it worth upgrading to SQL 2016? (more…)

Integration Services (SSIS) vs. POWERCENTER. ETL tool comparation (I)

Integration Services (SSIS) vs. POWERCENTER. ETL tool comparation (I)

Below is a comparison table between ETL tools: SQL Server Integration Services aka SSIS vs PowerCenter. What tool should I choose? What are the differences? What are the advantages and disadvantages? Do they meet my requirements and particular needs? These are some of the questions that you may have asked yourself at the start of any new project, and we will aim to answer them in this post by providing you with some key facts regarding two very potent tools: SQL Server Integration Services a.k.a. SSIS versus Informatica PowerCenter.

(more…)

How to bulk copy Azure ML Experiments from a Workspace to another one or do a Backup of them in Physical Files

How to bulk copy Azure ML Experiments from a Workspace to another one or do a Backup of them in Physical Files

Hi all,

today I want to tackle the issue of bulk copying more than one Azure ML experiment at once between different workspaces.

May be you already know that you can partially solve this task by copying an experiment one at a time. But you have to access to both the workspaces with your user. If not, you can simply share a workspace in this way:

https://docs.microsoft.com/en-us/azure/machine-learning/machine-learning-create-workspace

Once you can see both the workspaces in your Azure Machine Learning Studio, you can simply select an experiment and than “Copy to workspace”:

AzureML_Copy_001

and than you can choose the destination workspace:

AzureML_Copy_002

A you can imagine… you can’t simply select more than one experiment and than copy all them:

AzureML_Copy_000

Now suppose you have dozens of experiments and simply you don’t want to waste your time coping them all manually, or moreover you can’t have access to a shared workspace for security reasons. Is there a way to bulk copy your experiments? I’ll show you how to do that using few rows of PowerShell.

(more…)