はじめに

各種 AWS サービスの出力するアクセスログを正規表現でパースしてみました。ログ収集基盤や DWH などで雑多なログをある一定のフォーマットに整形するのはよくある話で、自分なりに汎用化しておくと捗る気がします。

  • Python でコーディング
  • pattern_1, 2 というふうに分かれているのは、歴史的経緯でログフォーマットが更新されているので、古いものも救おうという意図がある
  • サンプルなので単純に JSON を返しているが、実際に使う場合はコネコネする想定

現時点で対応できている AWS サービス

  • S3
  • ALB
  • CLB
  • CloudFront

機会があれば対象サービスを増やしていきたい (NLB など)

S3 の場合

import json
import re
import sys
import argparse

parser = argparse.ArgumentParser("parse access logs")
parser.add_argument("-p", "--path", help="set log file path", required=True)
args = parser.parse_args()
path = args.path

pattern_1 = [
    r"^(?P<bucket_owner>[\w -/:-@\[-~]+|-)",
    r"(?P<bucket>[\w -/:-@\[-~]+)",
    r"\[(?P<time>([\w -/:-@\[-~]+ [\w+]+))\]",
    r"(?P<remote_ip>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|-)",
    r"(?P<requester>[\w -/:-@\[-~]+|Anonymous)",
    r"(?P<request_id>[\w -/:-@\[-~]+|-)",
    r"(?P<operation>[\w -/:-@\[-~]+|-)",
    r"(?P<key>[\w -/:-@\[-~]+)",
    r"\"(?P<request_uri>([\w -/:-@\[-~]+|-) ([\w -/:-@\[-~]+))\"",
    r"(?P<http_status>\d{1,3}|-)",
    r"(?P<error_code>[\w -/:-@\[-~]+)",
    r"(?P<bytes_sent>[\d\-.]+|-)",
    r"(?P<object_size>[\d\-.]+|-)",
    r"(?P<total_time>[\d\-.]+|-)",
    r"(?P<turn_around_time>[\d\-.]+|-)",
    r"\"(?P<referer>(((https?|ftp|file)://)?([\w\-.]+)\.?([\w\-.]+)(:[\d]{1,5})?([\w -/:-@\[-~]+)?)|-)\"",
    r"\"(?P<user_agent>[^\"]+)\"",
    r"(?P<version_id>[\w -/:-@\[-~]+)",
]
pattern_2 = [
    r"(?P<host_id>([\w -/:-@\[-~]+))",
    r"(?P<signature_version>(SigV2|SigV4|-))",
    r"(?P<cipher_suite>[\w -/:-@\[-~]+)",
    r"(?P<authentication_type>(AuthHeader|QueryString|-))",
    r"(?P<host_header>[\w -/:-@\[-~]+)",
]
pattern_3 = [
    r"(?P<tls_version>(TLS(v|V)1.3|TLS(v|V)1.2|TLS(v|V)1.1|TLS(v|V)1|SSL(v|V)3|-))",
]
pattern_4 = [
    r"(?P<access_point_arn>[\w -/:-@\[-~]+)",
]

DELIMITER = " "

pt_1 = DELIMITER.join(pattern_1)
pt_2 = DELIMITER.join(pattern_1 + pattern_2)
pt_3 = DELIMITER.join(pattern_1 + pattern_2 + pattern_3)
pt_4 = DELIMITER.join(pattern_1 + pattern_2 + pattern_3 + pattern_4)

re_1 = re.compile(pt_1)
re_1_no_quotes = re.compile(pt_1.replace('\\"', ""))

re_2 = re.compile(pt_2)
re_2_no_quotes = re.compile(pt_2.replace('\\"', ""))

re_3 = re.compile(pt_3)
re_3_no_quotes = re.compile(pt_3.replace('\\"', ""))

re_4 = re.compile(pt_4)
re_4_no_quotes = re.compile(pt_4.replace('\\"', ""))

with open(path) as f:
    logs = f.read()

for log in logs.splitlines():
    m = re_4.match(log)
    if not m:
        m = re_3.match(log)
    if not m:
        m = re_2.match(log)
    if not m:
        m = re_1.match(log)
    if not m:
        m = re_4_no_quotes.match(log)
    if not m:
        m = re_3_no_quotes.match(log)
    if not m:
        m = re_2_no_quotes.match(log)
    if not m:
        m = re_1_no_quotes.match(log)
    if m:
        print(json.dumps(m.groupdict(), indent=2))
    else:
        print("ERROR: Log parsing failed!")
        sys.exit(1)

ALB の場合

import json
import re
import sys
import argparse

parser = argparse.ArgumentParser("parse access logs")
parser.add_argument("-p", "--path", help="set log file path", required=True)
args = parser.parse_args()
path = args.path

pattern_1 = [
    r"^(?P<type>https|http|h2|grpcs|wss|ws|-)",
    r"(?P<time>\d{4}-\d{2}-\d{2}((| |T)\d{2}:\d{2}:\d{2}(([.,])\d{1,6})?([+-]\d{2}(:)?\d{2})?)(|Z)|-)",
    r"(?P<elb>[\w -/:-@\[-~]+)",
    r"(?P<client_port>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{1,5}|-)|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d]\:\d{1,5})|-)",
    r"(?P<target_port>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{1,5}|-)|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d]\:\d{1,5})|-)",
    r"(?P<request_processing_time>\d+(\.\d+)?|-1|-)",
    r"(?P<target_processing_time>\d+(\.\d+)?|-1|-)",
    r"(?P<response_processing_time>\d+(\.\d+)?|-1|-)",
    r"(?P<elb_status_code>\d{1,3}|-)",
    r"(?P<target_status_code>\d{1,3}|-)",
    r"(?P<received_bytes>[\d\-.]+|-)",
    r"(?P<sent_bytes>[\d\-.]+|-)",
    r"\"(?P<request>([\w\-]+|-) ((((https?|ftp|file)://)?([\w\.-]+)\.?([\w\.-]+)(:[\d]{1,5})?([\w -/:-@\[-~]+)?)|-) ([\w\-./]+))( ?)\"",
    r"\"(?P<user_agent>[^\"]+)\"",
    r"(?P<ssl_cipher>[\w -/:-@\[-~]+)",
    r"(?P<ssl_protocol>(TLS(v|V)1.3|TLS(v|V)1.2|TLS(v|V)1.1|TLS(v|V)1|SSL(v|V)3|-))",
    r"(?P<target_group_arn>[\w -/:-@\[-~]+)",
    r"\"(?P<trace_id>[\w -/:-@\[-~]+)\"",
    r"\"(?P<domain_name>[\w -/:-@\[-~]+)\"",
    r"\"(?P<chosen_cert_arn>[\w\-:./]+|session-reused)\"",
    r"(?P<matched_rule_priority>[\d]{1,5}|-1|-)",
    r"(?P<request_creation_time>\d{4}-\d{2}-\d{2}((| |T)\d{2}:\d{2}:\d{2}(([.,])\d{1,6})?([+-]\d{2}(:)?\d{2})?)(|Z)|-)",
    r"\"(?P<actions_executed>[\w -/:-@\[-~]+|-)\"",
    r"\"(?P<redirect_url>(((https?|ftp|file)://)?([\w\-.]+)\.?([\w\-.]+)(:[\d]{1,5})?([\w -/:-@\[-~]+)?)|-)\"",
    r"\"(?P<error_reason>([a-zA-Z]+|-))\"",
    r"\"?(?P<target_port_list>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{1,5}|-))\"?",
    r"\"?(?P<target_status_code_list>(\d{3}|-))\"?",
    r"\"(?P<classification>(Acceptable|Ambiguous|Severe|-))\"",
    r"\"(?P<classification_reason>[a-zA-Z]+|-)\"",
]

DELIMITER = " "

pt_1 = DELIMITER.join(pattern_1)

re_1 = re.compile(pt_1)
re_1_no_quotes = re.compile(pt_1.replace('\\"', ""))

with open(path) as f:
    logs = f.read()

for log in logs.splitlines():
    m = re_1.match(log)
    if not m:
        m = re_1_no_quotes.match(log)
    if m:
        print(json.dumps(m.groupdict(), indent=2))
    else:
        print("ERROR: Log parsing failed!")
        sys.exit(1)

CLB の場合

import json
import re
import sys
import argparse

parser = argparse.ArgumentParser("parse access logs")
parser.add_argument("-p", "--path", help="set log file path", required=True)
args = parser.parse_args()
path = args.path

pattern_1 = [
    r"(?P<time>\d{4}-\d{2}-\d{2}((| |T)\d{2}:\d{2}:\d{2}(([.,])\d{1,6})?([+-]\d{2}(:)?\d{2})?)(|Z)|-)",
    r"(?P<elb>[\w -/:-@\[-~]+)",
    r"(?P<client_port>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{1,5}|-)|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d]\:\d{1,5})|-)",
    r"(?P<backend_port>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{1,5}|-)|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d]\:\d{1,5})|-)",
    r"(?P<request_processing_time>\d+(\.\d+)?|-1|-)",
    r"(?P<backend_processing_time>\d+(\.\d+)?|-1|-)",
    r"(?P<response_processing_time>\d+(\.\d+)?|-1|-)",
    r"(?P<elb_status_code>\d{1,3}|-)",
    r"(?P<backend_status_code>\d{1,3}|-)",
    r"(?P<received_bytes>[\d\-.]+|-)",
    r"(?P<sent_bytes>[\d\-.]+|-)",
    r"\"(?P<request>([\w\-]+|-) ((((https?|ftp|file)://)?([\w\.-]+)\.?([\w\.-]+)(:[\d]{1,5})?([\w -/:-@\[-~]+)?)|-) ([\w\-./]+))( ?)\"",
]
pattern_2 = [
    r"\"(?P<user_agent>[^\"]+)\"",
    r"(?P<ssl_cipher>[\w -/:-@\[-~]+)",
    r"(?P<ssl_protocol>(TLS(v|V)1.3|TLS(v|V)1.2|TLS(v|V)1.1|TLS(v|V)1|SSL(v|V)3|-))",
]

DELIMITER = " "

pt_1 = DELIMITER.join(pattern_1)
pt_2 = DELIMITER.join(pattern_1 + pattern_2)

re_1 = re.compile(pt_1)
re_1_no_quotes = re.compile(pt_1.replace('\\"', ""))

re_2 = re.compile(pt_2)
re_2_no_quotes = re.compile(pt_2.replace('\\"', ""))

with open(path) as f:
    logs = f.read()

for log in logs.splitlines():
    m = re_2.match(log)
    if not m:
        m = re_1.match(log)
    if not m:
        m = re_2_no_quotes.match(log)
    if not m:
        m = re_1_no_quotes.match(log)
    if m:
        print(json.dumps(m.groupdict(), indent=2))
    else:
        print("ERROR: Log parsing failed!")
        sys.exit(1)

CloudFront の場合

import json
import re
import sys
import argparse

parser = argparse.ArgumentParser("parse access logs")
parser.add_argument("-p", "--path", help="set log file path", required=True)
args = parser.parse_args()
path = args.path

pattern = [
    r"^(?P<date>(\d{4}-\d{1,2}-\d{1,2})|-)",
    r"(?P<time>(\d{1,2}:\d{1,2}:\d{1,2})|-)",
    r"(?P<x_edge_location>[0-9a-zA-Z\-]+)",
    r"(?P<sc_bytes>\d+|-)",
    r"(?P<c_ip>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|((([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d]))))",
    r"(?P<cs_method>([\w\-]+|-))",
    r"(?P<cs_host>[\w -/:-@\[-~]+)",
    r"(?P<cs_uri_stem>[\w -/:-@\[-~]+)",
    r"(?P<sc_status>\d{1,3}|-)",
    r"(?P<cs_referer>((([\w -/:-@\[-~]+)?://)?([\w\.-]+)\.([\w\.-]+)(:[\d]{1,5})?([\w -/:-@\[-~]+)?)|-)",
    r"(?P<cs_user_agent>[^\t]+)",
    r"(?P<cs_uri_query>[\w -/:-@\[-~]+)",
    r"(?P<cs_cookie>\S+)",
    r"(?P<x_edge_result_type>(Hit|RefreshHit|Miss|LimitExceeded|CapacityExceeded|Error|Redirect|-))",
    r"(?P<x_edge_request_id>[\w -/:-@\[-~]+)",
    r"(?P<x_host_header>[\w -/:-@\[-~]+)",
    r"(?P<cs_protocol>(http|https|ws|wss|-))",
    r"(?P<cs_bytes>[\d\-.]+|-)",
    r"(?P<time_taken>[\d\-.]+|-)",
    r"(?P<x_forwarded_for>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|-)",
    r"(?P<ssl_protocol>(TLS(v|V)1.3|TLS(v|V)1.2|TLS(v|V)1.1|TLS(v|V)1|SSL(v|V)3|-))",
    r"(?P<ssl_cipher>[\w\-]+)",
    r"(?P<x_edge_response_result_type>(Hit|RefreshHit|Miss|LimitExceeded|CapacityExceeded|Error|Redirect|-))",
    r"(?P<cs_protocol_version>[\w\-./ ]+)",
    r"(?P<fle_status>([\w\-]+|-))",
    r"(?P<fle_encrypted_fields>\S+)",
    r"(?P<c_port>\d+|-)",
    r"(?P<time_to_first_byte>\d+\.\d+|-)",
    r"(?P<x_edge_detailed_result_type>[\w\-]+)",
    r"(?P<sc_content_type>[\w -/:-@\[-~]+)",
    r"(?P<sc_content_len>\d+|-)",
    r"(?P<sc_range_start>\d+|-)",
    r"(?P<sc_range_end>\d+|-)",
]
DELIMITER = "\t"

pt_1 = DELIMITER.join(pattern)

re_1 = re.compile(DELIMITER.join(pattern))
re_1_no_quotes = re.compile(pt_1.replace('\\"', ""))

with open(path) as f:
    logs = f.read()

for log in logs.splitlines():
    m = re_1.match(log)
    if not m:
        m = re_1_no_quotes.match(log)
    if m:
        print(json.dumps(m.groupdict(), indent=2))
    else:
        print("ERROR: Log parsing failed!")
        sys.exit(1)

がんばったところ

たぶん、IPv6 がきても対応できる

免責事項

それなりの精度はあるはず (と思いたい) ですが、確実にパースできることを保証するものではありません。

おわりに

正規表現が横に長いので、フォーマッタの設定はちゃんとしておいたほうがストレスが少ないと思います。正規表現を書くとき、こちらのサイトが便利でした。NLB 対応したら、go で書き直してシングルバイナリ化したい。