Come automatizzare il partizionamento delle tabelle AWS Athena con AWS Lambda

Come automatizzare il partizionamento delle tabelle AWS Athena con AWS Lambda

Pur avendo già visto come migrare i nostri dati su AWS Athena utilizzando AWS S3 come storage e, successivamente, come organizzarli per gestire al meglio le partizioni di Athena, manca ancora un passaggio: capire come automatizzare il processo di aggiornamento delle partizioni con AWS Lambda.

Creare una tabella con Athena, infatti, non significa che il sistema sarà in grado di leggere le nuove partizioni automaticamente. In realtà, dovremmo eseguire l’operazione “Load Partitions” manualmente ogni volta.

Per fortuna, AWS mette a disposizione alcuni strumenti molto utili per rendere automatico il riconoscimento di nuove partizioni e la modifica delle tabelle di Athena.

AWS SNS

AWS SNS (Amazon Simple Notification Service) è un servizio di messaggistica interamente “gestito”, che consente di inviare notifiche push istantanee o trasmettere messaggi ad un numero molto elevato di destinatari.

Con AWS SNS, è possibile inviare, non solo notifiche push, ma anche messaggi SMS (Short Message Service), messaggi e-mail e messaggi in-app.

Il servizio consente anche di creare temi, ovvero un canale di messaggistica per inviare messaggi a più endpoint, tra cui le applicazioni mobili, le caselle di posta elettronica e i server HTTP/HTTPS.

AWS SNS consente di creare un flusso di lavoro costruito sulla messaggistica altamente scalabile e affidabile. Fornisce, inoltre, funzionalità avanzate, come la conferma della ricezione dei messaggi, il controllo degli accessi e la sicurezza dei dati. Integrandosi nativamente con altri servizi AWS come, ad esempio, AWS Lambda, AWS CloudFormation e AWS CloudTrail, AWS SNS ne semplifica la configurazione e l’utilizzo.

Eventi di Amazon S3

Gli Eventi di Amazon S3 consentono di avviare un’azione automatizzata al verificarsi di uno specifico evento, ad esempio il caricamento di un oggetto in un bucket S3.

Per utilizzare gli Eventi in S3, è necessario creare una regola di notifica che, al verificarsi di uno specifico evento, definisca il tipo di evento da monitorare e l’azione da intraprendere.

AWS Lambda

AWS Lambda è un servizio fornito da Amazon Web Services che consente di eseguire codice in risposta a eventi senza la necessità di gestire l’infrastruttura sottostante.

Questa piattaforma di calcolo senza server è basata su un’infrastruttura ad alta disponibilità e gestisce automaticamente tutte le risorse necessarie per l’esecuzione del codice.

Ciò include l’amministrazione dei server e del sistema operativo, oltreché la gestione delle risorse di elaborazione. La piattaforma offre funzionalità di provisioning ed una scalabilità automatica della capacità, consentendo alle applicazioni di adattarsi dinamicamente al carico di lavoro.

Il codice di AWS Lambda viene eseguito in risposta a specifici eventi, ad esempio un caricamento di file o un’azione dell’utente.

Collegare AWS S3 ad AWS Lambda

Una delle caratteristiche dell’utilizzo combinato degli Eventi S3, delle Notifiche e delle Funzioni Lambda è quello di creare una regola di notifica in grado di monitorare un bucket S3 per il caricamento di nuovi oggetti e di inviare una notifica ad un’istanza di AWS Lambda.

La funzione Lambda può, quindi, elaborare l’oggetto appena caricato in modo del tutto automatizzato, ad esempio, eseguendo un’operazione di trasformazione sui dati o spostando l’oggetto in un altro bucket S3.

Nel nostro caso specifico, invece, provvederemo ad analizzare il percorso del file appena caricato su S3 e, qualora corrispondesse ad uno di quelli di nostro interesse (ossia di una tabelle di AWS Athena), verificheremo la presenza di possibili partizioni ed eseguiremo la query di modifica della tabella per aggiungere le nuove partizioni trovate.

Creazione della Notifica SNS

Per prima cosa, dobbiamo creare una nuova notifica Standard su Amazon SNS e collegarvi la policy che consenta al nostro bucket S3 di interesse di pubblicarvi i propri eventi.

Il modello di policy che possiamo utilizzare è la seguente:

{
  “Version”:
“2008-10-17”,
  “Id”: “__default_policy_ID”,
  “Statement”: [
      {
      “Sid”: “__default_statement_ID”,
      “Effect”: “Allow”,
      “Principal”: {
      “AWS”: “*”
      },
      “Action”: [
      “SNS:GetTopicAttributes”,
      “SNS:SetTopicAttributes”,
      “SNS:AddPermission”,
      “SNS:RemovePermission”,
      “SNS:DeleteTopic”,
      “SNS:Subscribe”,
      “SNS:ListSubscriptionsByTopic”,
      “SNS:Publish”,
      “SNS:Receive”
      ],
      “Resource”: “{MYSNSARN}”,
      “Condition”: {
      “StringEquals”: {
            “AWS:SourceOwner”: “{AWS-ACCOUNT-ID}”
      }
      }
      },
      {
      “Sid”: “enki-reports-access-policy”,
      “Effect”: “Allow”,
      “Principal”: {
      “AWS”: “*”
      },
      “Action”: “SNS:Publish”,
      “Resource”: “{MYSNSARN}”,
      “Condition”: {
      “ArnLike”: {
            “aws:SourceArn”: “arn:aws:s3:*:*:{S3-BUCKET-NAME}”
      }
      }
      }
  ]
}
Creazione della Notifica SNS

Creazione dell’Evento S3

Dopo aver creato la notifica SNS, dobbiamo generare l’Evento S3. Andiamo, quindi, nella sezione “Proprietà” del nostro bucket > “Notifica degli eventi”  e creiamo una nuova notifica degli eventi.

Selezioniamo “Tutti gli eventi di creazione oggetto” come Tipo di Evento. Come destinazione, “Argomento SNS” e in “Immetti l’ARN Argomento SNS” impostiamo l’ARN della notifica che abbiamo creato precedentemente.

Creazione della Funzione Lambda

L’ultimo passaggio è quello di creare la nostra Funzione Lambda.

Ecco, alcuni accorgimenti da seguire in fase di creazione della funzione:

  • E’ fondamentale impostare l’Argomento SNS come trigger della funzione, in modo che quando un nuovo file viene caricato sul bucket S3, il messaggio venga inoltrato alla Funzione Lambda;
  • La Funzione Lambda deve essere autorizzata ad utilizzare AWS Glue, strumento su cui AWS Athena si poggia per gestire i dati;
  • La Funzione Lambda deve poter accedere allo storage S3 ed avere i permessi di operare su Athena (deve poter leggere le informazioni dei file e deve eseguire delle query di alter delle tabelle);
  • L’Argomento SNS deve poter eseguire “invokeFunction” sulla Funzione Lambda.

Il Codice della Funzione Lambda

Le Funzioni Lambda possono essere scritte in diversi linguaggi di programmazione e la loro gestione può avvenire direttamente nella console AWS.

Amazon Web Services fornisce un framework di sviluppo in Python, chiamato Chalice, che gestisce tutte le fasi del deployment della funzione, compresa la creazione dei permessi base e della struttura dei file.

Per velocizzare il lavoro ed avere una struttura già pensata per il servizio, il consiglio è di partire proprio da Chalice.

import sys
import os
import re
import time
from pathlib import Path
import json
from collections import namedtuple
import logging
import boto3
from chalice import Chalice
from base64 import b64decode
# from chalice.app import Cron
from os import environ as env
import urllib.parse

LOG_LEVEL = env.get(‘LOG_LEVEL’)
DEBUG = bool(env.get(‘DEBUG’))

CLEANUP_ATHENA_OUTPUT = env.get(‘CLEANUP_ATHENA_OUTPUT’)

ATHENA_REGION = env.get(‘ATHENA_REGION’)
ATHENA_DATABASE = env.get(‘ATHENA_DATABASE’) # is the same for all tables
# outputs result
ATHENA_OUTPUT_BUCKET = env.get(‘ATHENA_OUTPUT_BUCKET’)
ATHENA_OUTPUT_BUCKET_PATH = env.get(‘ATHENA_OUTPUT_BUCKET_PATH’)

# sns topics to listen
NOTIFICATION_TOPIC_ARN_REPORTS_S3_FILE_UPDATED = env.get(
      ‘NOTIFICATION_TOPIC_ARN_REPORTS_S3_FILE_UPDATED’)

app = Chalice(app_name=’partition-magic’)
app.log.setLevel(LOG_LEVEL)
app.debug = DEBUG


#
# Athena database configuration
# For each s3 path, the corresponding Athena Table
#
db_table_config = {
      ‘my-bucket/googleads’: ‘googleads_reporting’,
      ‘my-bucket/meta’: ‘meta_reporting’,
}

@app.on_sns_message(topic=NOTIFICATION_TOPIC_ARN_REPORTS_S3_FILE_UPDATED)
def handle_sns_message(event):
      app.log.info(“Message received with subject: %s, message: %s”,
                  event.subject, event.message)

      # Parse JSON into an object with attributes corresponding to dict keys.
      m = json.loads(event.message)

      # m[‘Records’][0][‘s3’] MUST exists
      if ‘Records’ in m and len(m[‘Records’]) > 0 and ‘s3’ in m[‘Records’][0]:
      e = m[‘Records’][0][‘s3’]

      handleS3Event(e[‘bucket’][‘name’], urllib.parse.unquote(e[‘object’][‘key’]))
      else:
      app.log.warning(“Message does not contains valid s3 object, exit.”)


def handleS3Event(bucket_name, object_key):

      app.log.debug(“Event received for bucket: %s, key %s”, bucket_name,
                  object_key)
      try:
      session = boto3.Session()
      result_file = runAthenaProcess(session, bucket_name, object_key)

      app.log.info(“Query executed, result file: %s”, result_file)

      except Exception as e:
      app.log.debug(“Triggered Exception”)
      app.log.debug(e)
      app.log.debug(“Athena ALTER TABLE query failed”)
      pass

def runAthenaProcess(session, bucket, file_path):
      partitions = getPartitionFromPath(file_path)
      if len(partitions) == 0:
      app.log.debug(“Path with no partitions. So, nothing to do. File: %s”,
                        file_path)
      return False

      athenaTable = getAthenaTableFromS3Path(bucket, file_path)
      if not athenaTable:
      app.log.debug(“Athena Table not found for %s %s. Skipping!”, bucket,
                        file_path)
      return False

      athenaSql = “ALTER TABLE ” + athenaTable + ” ADD IF NOT EXISTS PARTITION
(” + ‘,’.join(
      partitions) + “);”

      app.log.debug(“Athena Alter Table: %s !”, athenaSql)

      #if athenaDb is None:
      athenaDb = ATHENA_DATABASE

      app.log.info(“Table “+athenaTable+” Database “+athenaDb)

      params = {
      ‘region’: ATHENA_REGION,
      ‘database’: athenaDb,
      ‘bucket’: ATHENA_OUTPUT_BUCKET,
      ‘path’: ATHENA_OUTPUT_BUCKET_PATH,
      ‘query’: athenaSql
      }
      app.log.debug(“Athena query to run: %s”, athenaSql)
      return athenaToS3(session, params)

#
# get Athena DB from bucket and prefix
#
def getAthenaTableFromS3Path(bucket, prefix):

      # find in dict keys the first match
      k = bucket + “/” + prefix
      for key in db_table_config:
      if k.find(key) != -1:
            return db_table_config.get(key, None)
      return None


def getAthenaDatabaseFromTable(table):

      # find in dict keys the first match
      for key in db_table_database_exception:
      if table.find(key) != -1:
            return db_table_database_exception.get(key, None)
      return None


def getPartitionFromPath(path):
      partitions = []
   
      for f in Path(path).parts:
      if f.find(“=”) != -1:
            app.log.debug(
                  “%s | Probably found a partition! ‘=’ is in the string!”, f)

             
            regex = r”^(.*)=(.*)$”
            subst = “\\1=’\\2′”  #add needing apices for Athena Query
             
            new_folder = re.sub(regex, subst, f, 0)
            if new_folder:
                  partitions.append(new_folder)
            else:
                  partitions.append(f)
      return partitions


def athenaQuery(client, params):

      response = client.start_query_execution(
      QueryString=params[“query”],
      QueryExecutionContext={‘Database’: params[‘database’]},
      ResultConfiguration={
            ‘OutputLocation’: ‘s3://’ + params[‘bucket’] + ‘/’ + params[‘path’]
      })
      return response


def athenaToS3(session, params, max_execution=15):
      client = session.client(‘athena’, region_name=params[“region”])
      execution = athenaQuery(client, params)
      execution_id = execution[‘QueryExecutionId’]
      state = ‘RUNNING’

      while (max_execution > 0 and state in [‘RUNNING’]):
      max_execution = max_execution – 1
      response = client.get_query_execution(QueryExecutionId=execution_id)

      if ‘QueryExecution’ in response and \
                  ‘Status’ in response[‘QueryExecution’] and \
                  ‘State’ in response[‘QueryExecution’][‘Status’]:
            state = response[‘QueryExecution’][‘Status’][‘State’]
            if state == ‘FAILED’:
                  return False
            elif state == ‘SUCCEEDED’:
                  s3_path = response[‘QueryExecution’][‘ResultConfiguration’][
                  ‘OutputLocation’]
                  filename = re.findall(r’.*\/(.*)’, s3_path)[0]
                  return filename
      time.sleep(1)

      return False

Il Codice della Funzione Lambda
Come migrare i propri dati su AWS Athena
Come si gestiscono le partizioni compatibili con AWS Athena su Amazon S3

Contattaci per maggiori informazioni

Vuoi imparare ad utilizzare AWS Lambda per il tuo business? Vuoi sapere come automatizzare il processo di aggiornamento delle partizioni? Contattaci per maggiori informazioni!


    Siamo anche su Facebook!
    Vieni a trovarci, cliccando QUI!


    Pubblicato

    in

    da