1

I have a python AWS lambda function that takes JSON records, checks them to see if they have required keys, and then inserts into a MySQL db (AWS RDS Aurora). The function gets invoked whenever a new record comes into the stream def handler.

At the moment, Lambda is reporting some errors, but when I look at cloudwatch logs I don't see any errors, which leads me to believe that maybe I'm not handling or catching the exception. Can anyone tell me where the issue might be?

from __future__ import print_function
import base64
import json
import pymysql

RDS_HOST = 'host'

DB_USER = 'dummy_user'
DB_PASSWORD = 'password1234'
DB_NAME = 'crazy_name'
DB_TABLE = 'wow_table'

class MYSQL(object):
    '''
    This a wrapper Class for PyMySQL
    '''
    CONNECTION_TIMEOUT = 30

    def __init__(self, host, user, password, database, table):
        self.host = host
        self.user = user
        self.password = password
        self.database = database
        self.table = table
        self.connection = self.connect()

    def connect(self):
        '''
        Connects to MySQL instance
        '''
        try:
            connection = pymysql.connect(
                host=self.host, 
                user=self.user, 
                password=self.password, 
                db=self.database, 
                connect_timeout=self.CONNECTION_TIMEOUT
                )

            return connection

        except Exception as ex:
            print(ex)
            print("ERROR: Unexpected error: Could not connect to AuroraDB instance")

    def execute(self, account_id, external_ref_id, timestamp):
        '''
        Executes command given a MySQL connection
        '''

        with self.connection.cursor() as cursor:
            sql = ('INSERT INTO ' + 
                   self.database + 
                   '.' + 
                   self.table +
                   '(`account_id`, `external_reference_id`, `registration`, `c_name`, `c_id`, `create_date`)' +
                   ' VALUES (%s, %s, DATE_FORMAT(STR_TO_DATE(%s,"%%Y-%%M-%%d %%H:%%i:%%s"),"%%Y-%%m-%%d %%H:%%i:%%s"), %s, %s, current_timestamp())' + 
                   ' ON DUPLICATE KEY UPDATE create_date = VALUES(create_date)')
            cursor.execute(sql, (
                account_id, 
                external_ref_id, 
                timestamp, 
                'bingo', 
                300)
                          )

            self.connection.commit()

    def close_connection(self):
        '''
        Closes connection to MySQL
        '''
        self.connection.close()

def get_data_from_kinesis_object(obj):
    '''
    Retrieves data from kinesis event
    '''
    return obj['kinesis']['data']

def decode_data(data):
    '''
    Decodes record via base64
    '''
    return base64.b64decode(data)

def split_records_into_record(records):
    '''
    Splits a record of records into an array of records
    '''
    return records.split('\n')

def parse_record(record):
    '''
    parses record into JSON
    '''

    if record:

        return json.loads(record)

def is_record_valid(record):
    '''
    Check for keys in event
    returns True if they all exist
    and False if they dont all exist

    '''
    return all(key in record for key in (
        'eventName', 
        'sourceType',
        'AccountId',
        'Timestamp',
        'ExternalReferenceId'
        ))

def handler(event, context):
    """
    This function inserts data into Aurora RDS instance
    """

    mysql = MYSQL(RDS_HOST, DB_USER, DB_PASSWORD, DB_NAME, DB_TABLE)

    for obj in event['Records']:
        records = decode_data(get_data_from_kinesis_object(obj))
        split_records = split_records_into_record(records)

        for record in split_records:
            parsed_record = parse_record(record)

            if is_record_valid(parsed_record):
                mysql.execute(
                    parsed_record['AccountId'],
                    parsed_record['ExternalReferenceId'],
                    str(parsed_record['Timestamp'])
                    )

    mysql.close_connection()
4

0 に答える 0