import argparse
import datetime
import time
import csv
import json
import logging
import urllib3
import os
from gzip import GzipFile
from io import BytesIO, StringIO
import boto3
import botocore
from botocore.config import Config
logger = logging.getLogger()
logger.setLevel(logging.INFO)
«»»
Can Override the global variables using Lambda Environment Parameters
«»»
globalVars = {}
globalVars[‘esIndexPrefix‘] = «s3-to-es-«
globalVars[‘esIndexDocType‘] = «s3_to_es_docs»
def s3_connector(aws_auth):
if (aws_auth[‘role_name‘] is None or aws_auth[‘role_name‘] == «None»)
and (aws_auth[‘role_session‘] is None or aws_auth[‘role_session‘] == «None»):
try:
session = boto3.session.Session(profile_name=aws_auth[‘profile_name‘])
s3 = session.client(service_name=aws_auth[‘client‘],
region_name=aws_auth[‘region‘],
config=Config(retries={‘max_attempts‘: 3})
)
return s3
except Exception as err:
print(«Failed to create a boto3 client connection to S3:n», str(err))
logger.error(‘ERROR: Failed to create a boto3 client connection to S3‘)
return False
elif (aws_auth[‘profile_name‘] is None or aws_auth[‘profile_name‘] == «None»)
and (aws_auth[‘role_name‘] is not None or aws_auth[‘role_name‘] != «None»)
and (aws_auth[‘role_session‘] is not None or aws_auth[‘role_session‘] != «None»):
try:
session = boto3.session.Session()
sts = session.client(service_name=‘sts‘,
region_name=aws_auth[‘region‘],
config=Config(retries={‘max_attempts‘: 3})
)
assumed_role_object = sts.assume_role(
RoleArn=«{0}».format(aws_auth[‘role_name‘]),
RoleSessionName=‘{0}‘.format(aws_auth[‘role_session‘])
)
s3 = session.client(aws_access_key_id=assumed_role_object[‘Credentials‘][‘AccessKeyId‘],
aws_secret_access_key=assumed_role_object[‘Credentials‘][‘SecretAccessKey‘],
aws_session_token=assumed_role_object[‘Credentials‘][‘SessionToken‘],
service_name=aws_auth[‘client‘],
region_name=aws_auth[‘region‘],
config=Config(retries={‘max_attempts‘: 3})
)
return s3
except Exception as err:
print(«Failed to create a boto3 client connection to S3:n», str(err))
logger.error(‘ERROR: Failed to create a boto3 client connection to S3‘)
return False
else:
print(‘Please use/set [—profile-name] or [—role-name] with [—role-session]‘)
return False
def s3_bucket(aws_auth, s3_bucket_name):
s3_bucket_status = False
s3 = s3_connector(aws_auth)
if s3:
try:
s3.head_bucket(Bucket=s3_bucket_name)
print(«A bucket {} is already exists!».format(s3_bucket_name))
s3_bucket_status = True
return s3_bucket_status
except botocore.exceptions.ClientError as e:
error_code = int(e.response[‘Error‘][‘Code‘])
if error_code == 403:
print(«Private {} bucket. Forbidden Access!».format(s3_bucket_name))
logger.error(‘ERROR: Private {0} Bucket. Forbidden Access!‘.format(s3_bucket_name))
elif error_code == 404:
print(«The {} bucket does not exist!».format(s3_bucket_name))
logger.error(‘ERROR: The {0} bucket does not exist!‘.format(s3_bucket_name))
s3_bucket_status = False
return s3_bucket_status
else:
exit(—1)
return s3_bucket_status
def s3_objects(aws_auth, s3_bucket_name):
s3objects = []
s3 = s3_connector(aws_auth)
bucket_name = s3_bucket(aws_auth, s3_bucket_name)
if bucket_name:
try:
for key in s3.list_objects(Bucket=s3_bucket_name)[‘Contents‘]:
key_name = key[‘Key‘]
if (key_name.endswith(‘.gz‘)) or (key_name.endswith(‘.tar.gz‘)):
retr = s3.get_object(Bucket=s3_bucket_name, Key=key_name)
bytestream = BytesIO(retr[‘Body‘].read())
content = GzipFile(None, ‘rb‘, fileobj=bytestream).read().decode(‘utf-8‘)
s3objects.append(content)
else:
data = s3.get_object(Bucket=s3_bucket_name, Key=key_name)
contents = data[‘Body‘].read()
s3objects.append(contents)
logger.info(‘SUCCESS: Retrieved object(s) from S3 {0} bucket‘.format(s3_bucket_name))
except Exception as e:
print(e)
logger.error(‘ERROR: I could not retrieved object(s) from S3 {0} bucket‘.format(s3_bucket_name))
return s3objects
def sending_data_to_elastisearch(es_url, docData):
index_name = globalVars[‘esIndexPrefix‘] + str(datetime.date.today().year) + ‘—‘ + str(datetime.date.today().month)
elastic_searh_url = es_url + ‘/‘ + index_name + ‘/‘ + globalVars[‘esIndexDocType‘]
try:
headers = {‘Content-type‘: ‘application/json‘, ‘Accept‘: ‘text/plain‘}
http = urllib3.PoolManager()
response = http.request(‘POST‘,
elastic_searh_url,
body=json.dumps(docData),
headers=headers,
retries=False,
timeout=30)
logger.error(‘ERROR [response status]: {0}‘.format(response.status))
if response.status == 201:
logger.info(‘INFO: Response status: {0}nResponse data: {1}‘.format(response.status,
response.data.decode(‘utf-8‘)))
‘)
elif response.status == 405:
logger.error(‘ERROR: Something is wrong with sending DATA: nt {}‘.format(response.data.decode(‘utf-8‘)))
exit(1)
else:
logger.error(‘FAILURE: Got an error: nt {}‘.format(response.data.decode(‘utf-8‘)))
exit(1)
except Exception as e:
logger.error(‘ERROR: {0}‘.format(str(e)))
logger.error(‘ERROR: Unable to index line:»{0}»‘.format(str(docData[‘content‘])))
print(e)
exit(1)
return sending_data_to_elastisearch
def pushing_locally(aws_auth, s3_bucket_name, es_url):
s3objects = s3_objects(aws_auth, s3_bucket_name)
for obj in s3objects:
reader = csv.DictReader(StringIO(obj), fieldnames=None, restkey=None, restval=None, dialect=‘excel‘)
for row in reader:
json_out = json.loads(json.dumps(row))
docData = {}
docData[‘content‘] = str(json.dumps(json_out))
docData[‘createdDate‘] = ‘{}‘.format(datetime.datetime.now().strftime(«%Y-%m-%dT%H:%M:%S.%fZ»))
sending_data_to_elastisearch(es_url, docData)
return pushing_locally
def lambda_handler(event, context):
aws_auth = {
«client»: os.environ[‘aws_boto3_client‘],
«region»: os.environ[‘aws_region‘],
«profile_name»: os.environ[‘aws_profile_name‘],
«role_name»: os.environ[‘aws_role_name‘],
«role_session»: os.environ[‘aws_role_session‘]
}
s3_bucket_name = os.environ[‘aws_s3_bucket_name‘]
es_url = os.environ[‘elasticsearch_url‘]
logger.info(«Received event: « + json.dumps(event, indent=2))
s3objects = s3_objects(aws_auth, s3_bucket_name)
for obj in s3objects:
reader = csv.DictReader(StringIO(obj.decode(‘utf-8‘)),
fieldnames=None,
restkey=None,
restval=None,
dialect=‘excel‘)
for row in reader:
json_out = json.loads(json.dumps(row))
docData = {}
docData[‘content‘] = str(json.dumps(json_out))
docData[‘createdDate‘] = ‘{}‘.format(datetime.datetime.now().strftime(«%Y-%m-%dT%H:%M:%S.%fZ»))
sending_data_to_elastisearch(es_url, docData)
logger.info(‘SUCCESS: Successfully indexed the entire doc into ElastiSearch‘)
return {«Status»: «AWS Lambda handler has been finished»}
if __name__ == ‘__main__‘:
parser = argparse.ArgumentParser(prog=‘python3 script_name.py -h‘,
usage=‘python3 script_name.py {ARGS}‘,
add_help=True,
‘—/‘,
epilog=‘‘‘created by Vitalii Natarov‘‘‘
)
parser.add_argument(‘—version‘, action=‘version‘, version=‘v0.1.0‘)
parser.add_argument(‘—bclient‘, dest=‘boto3_client‘, help=‘Set boto3 client‘, default=‘s3‘)
parser.add_argument(‘—region‘, dest=‘region‘, help=‘Set AWS region for boto3‘, default=‘us-east-1‘)
parser.add_argument(‘—pname‘, ‘—profile-name‘, dest=‘profile_name‘, help=‘Set profile name of AWS‘,
default=None)
parser.add_argument(‘—rname‘, ‘—role-name‘, dest=‘role_name‘, help=‘Set role ARN name‘,
default=None)
parser.add_argument(‘—rsession‘, ‘—role-session‘, dest=‘role_session‘, help=‘Set role session name‘,
default=None)
parser.add_argument(‘—s3-bucket‘, ‘—s3bucket‘, dest=‘s3_bucket‘, help=‘Set S3 bucket name‘,
default=«test-s3-to-elastisearch»)
parser.add_argument(‘—es-url‘, ‘—esurl‘, dest=‘es_url‘, help=‘Set ElastiSerch URL‘,
default=«http://localhost:9200»)
parser.add_argument(‘—lambda‘, dest=‘aws_lambda‘, help=‘Set lambda usage‘, default=True)
results = parser.parse_args()
boto3_client = results.boto3_client
region = results.region
profile_name = results.profile_name
role_name = results.role_name
role_session = results.role_session
s3_bucket_name = results.s3_bucket
es_url = results.es_url
aws_lambda = results.aws_lambda
if aws_lambda == ‘True‘:
lambda_handler(None, None)
else:
start__time = time.time()
aws_auth = {
«client»: boto3_client,
«region»: region,
«profile_name»: profile_name,
«role_name»: role_name,
«role_session»: role_session
}
pushing_locally(aws_auth, s3_bucket_name, es_url)
end__time = round(time.time() — start__time, 2)
print(«— %s seconds —« % end__time)