Contents

AWS S3 ALB Log Parse

Contents
Note

Lately I started to work on an AWS Lamba function which will get triggered by S3 object upload, whenever ALB uploads traffic logs to S3 and lamba function will push this code to Splunk using HTTP Event Collector. This is a preperation part for that lambda function.

This piece of code gets a “gzip” file from S3 and attempts to parse lines in to AWS ALB log fields.

Dataset

Here is my fake ALB log.

A B C D E F G H I J K L M N O P Q R S T U V W X Y Z AA BB CC
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29

PoC

Here is my PoC code.

#!/usr/bin/env python3

import boto3
import gzip 
import io
import json

#client_worker_s3 = boto3.client('s3')
resource_worker_s3 = boto3.resource('s3')

gBucket = 'myS3BucketName'
gKey = 'my_poc_log_file.gz'


obj = resource_worker_s3.Object(gBucket,gKey)
# Load file in memory
buf = io.BytesIO(obj.get()["Body"].read())

# Start reading line by line
for line in gzip.GzipFile(fileobj=buf):
    #print(line.decode('utf-8'))
	# Split line items by space and create a list
    line_list = (line.decode('utf-8')).split(" ")
    gData = {} 
    #
    gType = line_list[0]
    gData.update({"type": gType})
    #
    gTime = line_list[1]
    gData.update({"time": gTime})
    #
    gElb  = line_list[2]
    gData.update({"elb": gElb})
    #
    gClientPort  = line_list[3]
    gData.update({"client_port": gClientPort})
    #
    gTargetPort  = line_list[4]
    gData.update({"target_port": gTargetPort})
    #
    gRequestProcessingTime  = line_list[5]
    gData.update({"request_processing_time": gRequestProcessingTime})
    #
    gTargetProcessingTime  = line_list[6]
    gData.update({"target_processing_time": gTargetProcessingTime})
    #
    gResponseProcessingTime  = line_list[7]
    gData.update({"response_processing_time": gResponseProcessingTime})
    #
    gElbStatusCode  = line_list[8]
    gData.update({"elb_status_code": gElbStatusCode})
    #
    gTargetStatusCode  = line_list[9]
    gData.update({"target_status_code": gTargetStatusCode})
    #
    gReceivedBytes  = line_list[10]
    gData.update({"received_bytes": gReceivedBytes})
    #
    gSentBytes  = line_list[11]
    gData.update({"sent_bytes": gSentBytes})
    #
    gRequest  = line_list[12]
    gData.update({"request": gRequest})
    #
    gUserAgent  = line_list[13]
    gData.update({"user_agent": gUserAgent})
    #
    gSslCipher  = line_list[14]
    gData.update({"ssl_cipher": gSslCipher})
    #
    gSslProtocol  = line_list[15]
    gData.update({"ssl_protocol": gSslProtocol})
    #
    gTargetGroupArn = line_list[16]
    gData.update({"target_group_arn": gTargetGroupArn})
    #
    gTraceId = line_list[17]
    gData.update({"trace_id": gTraceId})
    #
    gDomainName = line_list[18]
    gData.update({"domain_name": gDomainName})
    #
    gChosenCertArn = line_list[19]
    gData.update({"chosen_cert_arn": gChosenCertArn})
    #
    gMatchedRulePriority = line_list[20]
    gData.update({"matched_rule_priority": gMatchedRulePriority})
    #
    gRequestCreationTime = line_list[21]
    gData.update({"request_creation_time": gRequestCreationTime})
    #
    gActionsExecuted = line_list[22]
    gData.update({"actions_executed": gActionsExecuted})
    #
    gRedirectUrl = line_list[23]
    gData.update({"redirect_url": gRedirectUrl})
    #
    gErrorReason = line_list[24]
    gData.update({"error_reason": gErrorReason})
    #
    gTargetPortList = line_list[25]
    gData.update({"target_port_list": gTargetPortList})
    #
    gTargetStatusCodeList = line_list[26]
    gData.update({"target_status_code_list": gTargetStatusCodeList})
    #
    gClassification = line_list[27]
    gData.update({"classification": gClassification})
    #
    gClassificationReason = line_list[28]
    gData.update({"classification_reason": gClassificationReason.strip()})
    #print(gData)
    gDataJson = json.dumps(gData, indent = 4)
    print(gDataJson)

Output

Here is the output, so as next step we can take this JSON output and push it to the Splunk.

{
    "type": "A",
    "time": "B",
    "elb": "C",
    "client_port": "D",
    "target_port": "E",
    "request_processing_time": "F",
    "target_processing_time": "G",
    "response_processing_time": "H",
    "elb_status_code": "I",
    "target_status_code": "J",
    "received_bytes": "K",
    "sent_bytes": "L",
    "request": "M",
    "user_agent": "N",
    "ssl_cipher": "O",
    "ssl_protocol": "P",
    "target_group_arn": "Q",
    "trace_id": "R",
    "domain_name": "S",
    "chosen_cert_arn": "T",
    "matched_rule_priority": "U",
    "request_creation_time": "V",
    "actions_executed": "W",
    "redirect_url": "X",
    "error_reason": "Y",
    "target_port_list": "Z",
    "target_status_code_list": "AA",
    "classification": "BB",
    "classification_reason": "CC"
}
{
    "type": "1",
    "time": "2",
    "elb": "3",
    "client_port": "4",
    "target_port": "5",
    "request_processing_time": "6",
    "target_processing_time": "7",
    "response_processing_time": "8",
    "elb_status_code": "9",
    "target_status_code": "10",
    "received_bytes": "11",
    "sent_bytes": "12",
    "request": "13",
    "user_agent": "14",
    "ssl_cipher": "15",
    "ssl_protocol": "16",
    "target_group_arn": "17",
    "trace_id": "18",
    "domain_name": "19",
    "chosen_cert_arn": "20",
    "matched_rule_priority": "21",
    "request_creation_time": "22",
    "actions_executed": "23",
    "redirect_url": "24",
    "error_reason": "25",
    "target_port_list": "26",
    "target_status_code_list": "27",
    "classification": "28",
    "classification_reason": "29"
}