Hay ocasiones, cuando estamos construyendo pipelines con Azure Data Factory, que queremos repetir patrones para extraer y procesar la información cambiando de manera dinámica, en tiempo de ejecución, valores, orígenes/destinos de los datasets, incluso los mismos linked services. Esto es posible mediante el uso de parámetros, expresiones y funciones.

Vamos a ver cómo implementarlo con un ejemplo práctico en el que se nos plantea el siguiente supuesto. Se nos ha pedido que extraigamos todos los días los datos del día anterior de distintas tablas del DW a ficheros en un blob storage que además se nombre como la tabla de origen. Si no pudiéramos utilizar contenido dinámico tendríamos que crear dos datasets (uno de origen y otro de destino) y añadir una actividad de copia por cada tabla a exportar.

La alternativa sería añadir a ADF dos datasets genéricos, que iremos modificando en tiempo de ejecución, para seleccionar la tabla de origen y el fichero de destino. Además, cambiaremos la consulta SQL para seleccionar del DW solo los datos del día anterior a la ejecución del trigger de ADF.

Inicialmente tendremos dos linked services, uno para un SQL Server y otro para el Azure Blob Storage (aunque es posible crearlos durante la definición de los datasets).

En la BBDD de origen se ha generado una tabla auxiliar llamada ExtractMetadata con la lista de tablas de las que se quiere extraer la información diaria y si están activas.

La idea es ir iterando sobre los elementos que aparecen listados en ella e ir haciendo la exportación del dato.

A continuación, crearemos el dataset del SQL Server de origen. Cuando lo hagamos no debemos especificar ninguna tabla puesto que usaremos el mismo dataset para obtener datos de la tabla de metadata y de las tablas de datos a exportar.

Para generar el siguiente dataset, seleccionaremos uno del tipo Azure Blob Storage y DelimitedText como formato de archivo. Lo importante en este caso es no importar el esquema puesto que debe cambiar según las columnas que tenga la tabla de origen.

Parte del Path se ha definido ya que es estático, pero el nombre del fichero debemos cambiarlo en tiempo de ejecución para poder ponerle el nombre de la tabla que se está exportando. Para ello, una vez creado el dataset de destino, lo abriremos y, en la pestaña Parameters, crearemos un nuevo parámetro llamado FileName a través del cuál le indicaremos al dataset, cuando sea llamado desde otras partes del pipeline, el nombre del fichero.

Este parámetro, a su vez, debemos vincularlo con el nombre del fichero en la propiedad de Path del mismo dataset. Al situarnos sobre este campo nos aparecerá la opción de añadir contenido dinámico y deberemos pulsar sobre el enlace.

En la nueva sección que nos aparece es donde podemos asignar el parámetro a ese campo. En la parte inferior tendremos un listado de funciones y expresiones. En nuestro caso queremos añadir la expresión del parámetro FileName, por lo que pulsaremos sobre este (1) y aparecerá en cuadro de texto (2).

Todas las expresiones empiezan con “@”, y si las vamos a concatenar con cadenas de texto debemos utilizar llaves justo después de la “@” e introducir dentro la expresión. Por ejemplo podríamos escribir @{dataset().FileName}.zip y añadiría al texto que reciba por parámetro la extensión “.zip”. En esta ocasión no lo vamos a hacer, vamos a dejar que la extensión se añada junto con la lógica para darle el nombre al fichero y que quede todo junto para facilitar el mantenimiento en un futuro.

En este punto ya tenemos los linked services y los dataset de origen y destino creados. Nos falta añadir la lógica para obtener una lista de tablas a copiar de la tabla auxiliar de metadata e ir copiando los datos de cada elemento de la lista. Por lo tanto, el siguiente paso será leer la tabla auxiliar. Esto se hará añadiendo una actividad del tipo Lookup a un nuevo pipeline y configurando sus propiedades.

Como Source Dataset, le indicaremos el dataset de SQL y marcaremos la opción Query para escribir nuestra propia consulta. Recordemos que ahora mismo el dataset no está vinculado a ninguna tabla, es por esto que debemos indicarle de dónde leer utilizando el campo from de la consulta, y lo haremos de la tabla auxiliar. Es importante desmarcar la opción de First row only, ya que necesitamos el listado completo.

SELECT SchemaTableName FROM dbo.ExtractMetadata WHERE IsActive = 1

El siguiente paso será añadir una actividad de Foreach y conectar la salida del Lookup a esta.

Queremos iterar por la lista de tablas que hemos obtenido en el paso anterior y para ello, en las propiedades del Foreach, en la pestaña de Settings, debemos cambiar el campo Items para añadir una nueva expresión, al igual que hicimos antes con Add dynamic content. En la lista de sugerencias ya tenemos la salida del Lookup (en nuestro ejemplo llamado TableList). El problema en este caso es que la sugerencia no está completa. Hay que revisar la documentación para saber que al final hay que añadir un “.value” quedando de la siguiente manera:

@activity(‘TableList’).output.value

Pulsando dos veces sobre el Foreach abriremos su pipeline interno y podremos añadir nuestra última actividad, un Copy Data. Al igual que hicimos anteriormente, definiremos como Source dataset de la actividad de copia el dataset de SQL y utilizaremos una nueva consulta para definir la tabla de origen, la cual viene indicada en el “item” que nos proporciona el Foreach en cada iteración. Para ello volveremos a pulsar sobre Add Dynamic Content y usaremos la expresión “@{item().SchemaTableName”. La segunda parte de la expresión, SchemaTableName, es el nombre de la columna que se ha usado como origen en la consulta del Lookup. Como la expresión la vamos a utilizar en medio de una cadena de texto debemos añadir las llaves a la “@”. Además, vamos a usar un poco de lógica en TSQL para quedarnos solo con los datos del día anterior, pero esto no es problema porque es el motor relacional es el que se encarga de ello, ADF solo le pasa la consulta tras reemplazar la expresión por el valor que corresponde. El resultado es el siguiente:

SELECT * FROM @{item().SchemaTableName} WHERE DateKey = cast(dateadd(day, -1, getdate()) as date)

En la pestaña Sink, en la propiedad Sink dataset, le indicaremos que queremos utilizar el dataset del Blob Storage. También aparece expuesto el parámetro FileName que añadimos al definir este dataset, y es precisamente en este campo donde debemos añadir la lógica para asignarle un nombre a cada uno de los ficheros de salida.

@{item().SchemaTableName}_@{formatDateTime(subtractFromTime(pipeline().TriggerTime,1,'Day'),'yyyy-MM-dd')}.zip

La primera parte de la expresión es la misma que teníamos en el origen, y en la segunda hemos añadido un trío de funciones para conseguir la fecha del día anterior al que se está ejecutando.

  • Pipeline().TriggerTime nos da la fecha de ejecución del pipeline
  • SubstractFromTime(Fecha, número de intervalos, intervalo) quita el número de intervalos a una fecha, en este caso “-1 día”
  • FormatDateTime(Fecha, formato) aplica un formato a una fecha, para evitar que nos de horas, minutos, segundos, etc

Solo queda añadir el “.zip” para darle una extensión y dejar toda la lógica para generar el nombre junta. Si lanzamos un trigger del pipeline el resultado será el siguiente:

A modo de recapitulación, hemos:

  • Creado dos datasets sin vincular a un esquema de datos concreto.
  • En el dataset de salida hemos creado un parámetro para recibir el nombre del fichero y lo hemos vinculado a la propiedad del nombre del fichero.
  • Utilizado un Lookup para obtener la lista de ítems (tablas) a recorrer.
  • Recorrido estos ítems con un Foreach utilizando una expresión para obtener el nombre de la tabla en cada iteración.
  • Para cada ítem del Foreach hemos copiado los datos generando una consulta de origen dinámica con el valor del ítem (nombre de la tabla) y un nombre de salida del fichero, también dinámico, con expresiones. En el caso de la salida además hemos utilizado funciones de tiempo y formato para complementar el nombre del archivo.

 

Como se ha podido ver, el utilizar expresiones junto con parámetros y funciones nos aporta muchísima versatilidad junto con un ahorro importante de tiempo/recursos. La posibilidad de reaprovechar código o estructuras sería mucho más compleja o prácticamente imposible sin estas características.

 

¿Quieres seguir aprendiendo sobre Azure Data Factory u otros aspectos relacionados con la administración de bases de datos, business intelligence o machine learning? ¡Consulta nuestra agenda de próximos cursos públicos! No olvides suscribirte al blog para estar informado de nuevos artículos y otros cursos de formación.

Daniel Gil

Data Platform Specialist at SolidQ
I am a Data Platform Specialist at SolidQ.

I completed a degree in Telecommunications, Sound and Image at University of Alicante in 2008. In 2009, I worked as a Computer technician in San Vicente’s Town Council and collaborated with the team for seismic research of the University of Alicante, creating a software in Matlab.

When I completed my studies in Computer Science at Universidad de Alicante, I started at SolidQ through a University internship, developing a platform for SolidQ Analytics. Later, I started working for SolidQ in the BI sector, working in several projects with companies such as Ferrovial, CIE, etc.
Daniel Gil