Hola a todos,

En el post que os traemos hoy vamos a ver como crear (con Visual studio 2017) mediante un script en python un programa que podremos ejecutar como un servicio de windows y que extraiga en tiempo real los twitts relacionados con determinadas palabras o hashtags, los almacene en una base de datos sql server, para luego explotarlos con powerbi.

Las tareas que va a realizar el script son las de conectar al API de streaming de twitter mediante unos tokens generados desde la página oficial de twitter para desarrolladores. Al API le haremos una petición y le pasaremos una lista de hashtags o términos y nos irá devolviendo de forma indefinida en “tiempo real” los twitts que se van publicando y que contienen estos términos. Una vez recuperados los twitts se almacenan en un buffer (de tamaño parametrizable) “n” twitts que al alcanzar el tamaño máximo del buffer se volcarán a una tabla en Sql Server.

Para ello lo primero que debemos hacer es crear un nuevo proyecto de Python en el visual studio:

 

El siguiente paso será añadir las librerías que vamos a usar en nuestro script, y que nos facilitarán el trabajo en gran medida, en este caso nosotros haremos uso de las siguientes:

  • La librería para poder manejar archivos json (ya que el API de Twitter devuelve los datos en este formato).
  • Usamos también la librería de pandas para poder almacenar datos en dataframes (estructuras de datos en memoria, parecidas a las tablas de SQL).
  • Necesitamos usar tweepy que nos facilita el acceso al API de Twitter.
  • pyodbc y sqlAlquiemy nos proporciona herramientas para conectar a una base de datos SQL Server.

Para poder hacer uso de estas librerías antes debemos añadirlas a nuestro entorno de Python desde el explorador de soluciones:

Buscamos la librería y la instalamos:

Luego añadiremos el siguiente código al script principal:

import json
import pandas as pd
import pyodbc
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from pandas.io import sql
from sqlalchemy import create_engine
from pandas.io.json import json_normalize

Para poder acceder al api de twitter necesitamos crear una aplicación en el portal de twitter en la página https://apps.twitter.com/ con cualquier usuario de twitter podemos dar de alta aplicaciones, al dar de alta una aplicación se generan unos tokens que son necesarios para acceder al api, estos tokens los almacenaremos en variables en nuestro script:

#Declare variables that contains the user credentials to access Twitter API
#You can get your own keys in https://apps.twitter.com/
#--------------------------------------------------------------------------------
aToken =       "ENTER YOUR ACCESS TOKEN"
aTokenSecret = "ENTER YOUR ACCESS TOKEN SECRET"
cKey =         "ENTER YOUR API KEY"
cSecret =      "ENTER YOUR API SECRET"

En mi caso obtengo los tokens de la app que previamente ya he credo

El funcionamiento de nuestro script será el siguiente:

El código es el siguiente:

#Import libraries

import json
import pandas as pd
import pyodbc
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from pandas.io import sql
from sqlalchemy import create_engine
from pandas.io.json import json_normalize


#Declare variables that contains the user credentials to access Twitter API
#You can get your own keys in https://apps.twitter.com/
#--------------------------------------------------------------------------------
aToken =       "ENTER YOUR ACCESS TOKEN"
aTokenSecret = "ENTER YOUR ACCESS TOKEN SECRET"
cKey =         "ENTER YOUR API KEY"
cSecret =      "ENTER YOUR API SECRET"
#--------------------------------------------------------------------------------


#Define after how many twitts we do a insert in the data base.
bufferSize = 5
#Defina an array to store the tweets readed from the stream api
twittsBuffer = []
#Define a connectiont to read-write to the Sql server Database 
engine = create_engine("mssql+pyodbc://MyDbUser:MyPassword@MySQLServer/TwitterDB?driver=SQL+Server+Native+Client+11.0")

#Define a function that receive a twitt by parameter and store it into the twittBuffer variable
#if the twittBuffer reach the buffersize defined lenght then call the function AddTwittsToDB that insert the twitts into
#the twittsBuffer array into the SQL Server database and clean the buffer
#--------------------------------------------------------------------------------
def AddTwittToBuffer(twitt):
    
    global twittsBuffer
    twittsBuffer.append(twitt)
    
    if (len(twittsBuffer) == bufferSize):
        AddTwittsToDB(twittsBuffer)
        twittsBuffer = []
    print(twitt['coordinates'] if twitt['coordinates']!= None else 'no coordinates')

    return 

#This function write the twitts stored in the variable twitBuffer to the SQL Database
#--------------------------------------------------------------------------------
def AddTwittsToDB(twitts):
    tData = {'id': [], 
             'text': [], 
             'screen_name': [], 
             'created_at': [],
             'retweet_count': [], 
             'favorite_count': [],
             'friends_count': [], 
             'followers_count': [], 
             'lang':[], 
             'country':[], 
             'latitude':[], 
             'lontitude':[]}
    
    for t in twitts:
        tData['id'].append(t['id'])
        tData['text'].append(t['text'])
        tData['screen_name'].append(t['user']['screen_name'])
        tData['created_at'].append(t['created_at'])
        tData['retweet_count'].append(t['retweet_count'])
        tData['favorite_count'].append(t['favorite_count'])
        tData['friends_count'].append(t['user']['friends_count'])
        tData['followers_count'].append(t['user']['followers_count'])
        tData['lang'].append(t['lang'])

        if t['place'] != None :
            tData['country'].append(t['place']['country'])
        else :
            tData['country'].append(None)
        
        if t['coordinates'] != None :
            tData['lontitude'].append(t['coordinates']['coordinates'][0])
            tData['latitude'].append(t['coordinates']['coordinates'][1])
        else :
            tData['lontitude'].append(None)
            tData['latitude'].append(None)
    tweets = pd.DataFrame(tData)
    tweets.set_index('id', inplace=True)
    tweets.to_sql("Tweets",engine,None,if_exists='append')
    return True
#--------------------------------------------------------------------------------
#Create a listener class that process received tweets
#On error print status
#--------------------------------------------------------------------------------
class StdOutListener(StreamListener):

    def on_data(self, data):
        t= json.loads(data)
        AddTwittToBuffer(t)
        return True

    def on_error(self, status):
        print(status)
#--------------------------------------------------------------------------------
#Define a main function, the entry point of the program
if __name__ == '__main__':

    #This object handles Twitter authetification and the connection to Twitter Streaming API
    myListener = StdOutListener()
    authenticator = OAuthHandler(cKey, cSecret)
    authenticator.set_access_token(aToken, aTokenSecret)
    stream = Stream(authenticator, myListener)

    #This line filter Twitter Streams to capture data tweets with the included text: 'Microsoft' or 'SolidQ' or 'Visual Studio'
    stream.filter(track=['PowerBI', 'Tableau', 'Qlikview','Microstrategy','Pyramid Analytics','Business Objects', 'Ibm cognos'])
#--------------------------------------------------------------------------------

Durante la ejecución del script, si no existe la tabla se creará e irá añadiendo los twitts a la tabla.

La tabla que se crea tendrá los siguientes campos:

  1. El id del twitt
  2. El país de origen
  3. La fecha de creación en formato texto (luego la transformaremos a un formato fecha
  4. El numero de favoritos del usuario que twittea (en el momento que twitteó)
  5. El número de followers del usuario que twittea (en el momento que twitteó)
  6. El número de amigos del usuario que twittea (en el momento que twitteó)
  7. El lenguaje detectado por el api de twitter
  8. La latitud (si la facilitaron a la hora de twitear)
  9. La longitud (si la facilitaron a la hora de twitear)
  10. El numero de retwitts que tiene el mensaje
  11. El nombre del usuario que twittea
  12. El texto del twitt

Si hacemos una consulta a base de datos veremos lo siguiente:

En nuestro caso estamos capturando los tweets que contengan los siguientes términos ‘PowerBI’, ‘Tableau’, ‘Qlikview’, ‘Microstrategy’, ‘Pyramid Analytics’, ‘Business Objects’, ‘Ibm cognos’  para hacer una comparativa de lo que más se twittea sobre estos términos.

Una vez tenemos el script creado y probado, podemos ejecutarlo como un servicio de windows para que este continuamente descargando los tweets, esto lo podemos hacer de forma sencilla con herramientas como Non Suck Service Manager que podéis descargar de su página https://nssm.cc/

Una vez descargado el nssm, vamos la carpeta donde esta el ejecutable de 64 bits (o el de 32 si es vuestro caso) y ejecutamos desde línea de comando lo siguiente:

Se nos abrirá una ventana de configuración como esta:

En path introducimos la ruta donde está el ejecutable de python junto con el ejecutable python.exe, en startup directory introducimos el directorio (solo la ruta) donde esta el ejecutable de Python y en argument la ruta y nombre de fichero del script de Python que hemos creado con Visual Studio.

Con esto ya tendremos el servicio creado, lo iniciamos y veremos que comienzan a aparecer twitts en nuestra tabla de SQL Server.

Ahora que tenemos nuestra tabla con los tweets almacenados creamos un par de tablas para relacionar los términos con la tabla de tweets y crear un modelo que podamos explotar en power bi, esto lo hacemos en varios pasos:

Primero, creamos una dimension con los términos y un código para cada uno en formato de mascara de bits, son 7 terminos por lo que la tabla tendrá 7 registros y los ids seran los numero del 1 al 64 en potencias de 2 (1,2,4,8,16,32, y 64), de este modo podremos relacionar cada twitt con sus términos mediante una mascara de bits.

Por ejemplo si un twitt contiene las palabras PowerBI y QlickView, el id que lo relacione con sus términos sera la suma de los dos ids de esos términos, en este caso el id de relación seria el 5, la suma del id 1 correspondiente a PowerBi y de 4 que es el id del termino QlickView. Si un twitt contiene todos los 7 términos su id de relacion será el 128 (la suma de todos los ids)

with Dim_Terms as(
SELECT 1  as Id, 'PowerBI'			AS Term union all 
SELECT 2  as Id, 'Tableau'			AS Term union all 
SELECT 4  as Id, 'Qlikview'			AS Term union all 
SELECT 8  as Id, 'Microstrategy'		AS Term union all 
SELECT 16 as Id, 'Pyramid Analytics'	        AS Term union all 
SELECT 32 as Id, 'Business Objects'		AS Term union all 
SELECT 64 as Id, 'Ibm cognos'			AS Term )
select * into DimTerm from Dim_Terms

La tabla se vería del siguiente modo:

La relacion que se establece entre esta dimensión de términos y la tabla de “hechos” es una relación “many to many” por lo que vamos a necesitar otras 2 tablas mas que consoliden la relación y poder usarla en powerBI

La primera de estas tablas de relación es la que contiene todas las posibles combinaciones de términos posibles desde un único termino hasta los 7 a la vez, esta tabla tendrá 127 registros (el 0 que correspondería con ningún termino no lo contemplamos de momento)

La query que nos da esta tabla es esta:

with nums as
(
  select 1 as id
  union all 
  select id + 1
    from nums
    where id + 1 < 128
)
,
datos as(
select t.id,
  REPLACE(REPLACE( 
  ltrim(
  iif(cast(t.id & 64 as bit)  = 1, 'Ibm_cognos','') + 
    iif(cast(t.id & 32 as bit)  = 1, ' Business_Objects','') +
    iif(cast(t.id & 16 as bit) 	= 1, ' Pyramid_Analytics','') +
    iif(cast(t.id & 8 as bit)	= 1, ' Microstrategy','') +
    iif(cast(t.id & 4 as bit)	= 1, ' Qlikview','') +
    iif(cast(t.id & 2 as bit)  	= 1, ' Tableau','') +
    iif(cast(t.id & 1 as bit)	= 1, ' Microsoft','')),' ', ' - '), '_',' ')  as Terms
from nums t
)
select * into Dim_AllTerms from datos option (maxrecursion 0)

y tendrá un aspecto como este:

Ahora relacionamos mediante una tabla de relación “Many to Many” la dimensión con la tabla que mapea cada “combinación de términos” (Dim_AllTerms) con sus términos de la dimensión (DimTerm) esto lo hacemos con la query:

with nums as
(
 select 1 as id
 union all 
 select id + 1
 from nums
 where id + 1 < 128
),
datos as(
select t.id, 
 cast(t.id & 64 as bit) as bit7
 ,cast(t.id & 32 as bit) as bit6
 ,cast(t.id & 16 as bit) as bit5
 ,cast(t.id & 8 as bit) as bit4
 ,cast(t.id & 4 as bit) as bit3
 ,cast(t.id & 2 as bit) as bit2
 ,cast(t.id & 1 as bit) as bit1
from nums t
)
 select 
 id, 
 case trend
 when 'bit1' then 1
 when 'bit2' then 2
 when 'bit3' then 4
 when 'bit4' then 8
 when 'bit5' then 16
 when 'bit6' then 32
 when 'bit7' then 64
 else 0
 end as IdTrend
 into F_MtM_Terms
 From 
 (select * from datos ) p
 UNPIVOT 
 (
 valor for trend in (bit1, bit2, bit3, bit4,bit5,bit6,bit7)
 ) as unpivo
 where
 valor = 1
option (maxrecursion 0)

La tabla tendrá la siguiente apariencia, como veis el Id de “Términos” 5, esta repetido 2 veces, uno por cada termino individual que contiene, el 1 de Microsoft y el 4 de QlickView:

Por Ultimo creamos una vista que nos devolverá los datos de los twitts en forma de una tabla de hechos:

SELECT 
  country
  ,favorite_count
  ,followers_count
  ,friends_count
  ,lang
  ,latitude
  ,lontitude
  ,retweet_count
  ,screen_name as UserName
  ,[text]
  , CONVERT(DATETIME, SUBSTRING(created_at,9,2)+'/'
  + CASE SUBSTRING(created_at,5,3)
    WHEN 'Jan' then '01/'
    WHEN 'Feb' then '02/'
    WHEN 'Mar' then '03/'
    WHEN 'Apr' then '04/'
    WHEN 'May' then '05/'
    WHEN 'Jun' then '06/'
    WHEN 'Jul' then '07/'
    WHEN 'Aug' then '08/'
    WHEN 'Sep' then '09/'
    WHEN 'Oct' then '10/'
    WHEN 'Nov' then '11/'
    WHEN 'Dec' then '12/'
  else '' end
  +RIGHT(created_at,4) + ' '
  + SUBSTRING(created_at,12,8), 105) AS Created_Date
  , iif([text] like '%PowerBI%',1,0) 
  + iif([text] like '%Tableau%',2,0) 
  + iif([text] like '%Qlikview%',4,0) 
  + iif([text] like '%Microstrategy%',8,0) 
  + iif([text] like '%Pyramid Analytics%',16,0) 
  + iif([text] like '%Business Objects%',32,0) 
  + iif([text] like '%Ibm cognos%',64,0) IdTrend
FROM 
  [TwitterDB].[dbo].[Tweets]

El modelo en PowerBI quedaría del siguiente modo:

* Importante tener en cuenta que las relaciones many to many deben tener el cross filter direcction a establecido en “Both” para que funcionen bien.

Si vemos las métricas son muy sencillas, un sum de los favoritos, followers y friends y un count del numero de twitts.

El resultado final de un par de dashboards que se me ocurrieron de ejemplo uno mostrando los twits por región y lenguaje y otro mostrando el evolutivo de los 2 últimos días en formato “numDia-NumHora”

Espero que os resulte útil o al menos sirva para animaros a experimentar un poco con Python, es una herramienta muy útil y con una curva de aprendizaje muy asequible.

Un Saludo