はじめに
各種 AWS サービスの出力するアクセスログを正規表現でパースしてみました。ログ収集基盤や DWH などで雑多なログをある一定のフォーマットに整形するのはよくある話で、自分なりに汎用化しておくと捗る気がします。
- Python でコーディング
- pattern_1, 2 というふうに分かれているのは、歴史的経緯でログフォーマットが更新されているので、古いものも救おうという意図がある
- サンプルなので単純に JSON を返しているが、実際に使う場合はコネコネする想定
現時点で対応できている AWS サービス
- S3
- ALB
- CLB
- CloudFront
機会があれば対象サービスを増やしていきたい (NLB など)
S3 の場合
import json
import re
import sys
import argparse
parser = argparse.ArgumentParser("parse access logs")
parser.add_argument("-p", "--path", help="set log file path", required=True)
args = parser.parse_args()
path = args.path
pattern_1 = [
r"^(?P<bucket_owner>[\w -/:-@\[-~]+|-)",
r"(?P<bucket>[\w -/:-@\[-~]+)",
r"\[(?P<time>([\w -/:-@\[-~]+ [\w+]+))\]",
r"(?P<remote_ip>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|-)",
r"(?P<requester>[\w -/:-@\[-~]+|Anonymous)",
r"(?P<request_id>[\w -/:-@\[-~]+|-)",
r"(?P<operation>[\w -/:-@\[-~]+|-)",
r"(?P<key>[\w -/:-@\[-~]+)",
r"\"(?P<request_uri>([\w -/:-@\[-~]+|-) ([\w -/:-@\[-~]+))\"",
r"(?P<http_status>\d{1,3}|-)",
r"(?P<error_code>[\w -/:-@\[-~]+)",
r"(?P<bytes_sent>[\d\-.]+|-)",
r"(?P<object_size>[\d\-.]+|-)",
r"(?P<total_time>[\d\-.]+|-)",
r"(?P<turn_around_time>[\d\-.]+|-)",
r"\"(?P<referer>(((https?|ftp|file)://)?([\w\-.]+)\.?([\w\-.]+)(:[\d]{1,5})?([\w -/:-@\[-~]+)?)|-)\"",
r"\"(?P<user_agent>[^\"]+)\"",
r"(?P<version_id>[\w -/:-@\[-~]+)",
]
pattern_2 = [
r"(?P<host_id>([\w -/:-@\[-~]+))",
r"(?P<signature_version>(SigV2|SigV4|-))",
r"(?P<cipher_suite>[\w -/:-@\[-~]+)",
r"(?P<authentication_type>(AuthHeader|QueryString|-))",
r"(?P<host_header>[\w -/:-@\[-~]+)",
]
pattern_3 = [
r"(?P<tls_version>(TLS(v|V)1.3|TLS(v|V)1.2|TLS(v|V)1.1|TLS(v|V)1|SSL(v|V)3|-))",
]
pattern_4 = [
r"(?P<access_point_arn>[\w -/:-@\[-~]+)",
]
DELIMITER = " "
pt_1 = DELIMITER.join(pattern_1)
pt_2 = DELIMITER.join(pattern_1 + pattern_2)
pt_3 = DELIMITER.join(pattern_1 + pattern_2 + pattern_3)
pt_4 = DELIMITER.join(pattern_1 + pattern_2 + pattern_3 + pattern_4)
re_1 = re.compile(pt_1)
re_1_no_quotes = re.compile(pt_1.replace('\\"', ""))
re_2 = re.compile(pt_2)
re_2_no_quotes = re.compile(pt_2.replace('\\"', ""))
re_3 = re.compile(pt_3)
re_3_no_quotes = re.compile(pt_3.replace('\\"', ""))
re_4 = re.compile(pt_4)
re_4_no_quotes = re.compile(pt_4.replace('\\"', ""))
with open(path) as f:
logs = f.read()
for log in logs.splitlines():
m = re_4.match(log)
if not m:
m = re_3.match(log)
if not m:
m = re_2.match(log)
if not m:
m = re_1.match(log)
if not m:
m = re_4_no_quotes.match(log)
if not m:
m = re_3_no_quotes.match(log)
if not m:
m = re_2_no_quotes.match(log)
if not m:
m = re_1_no_quotes.match(log)
if m:
print(json.dumps(m.groupdict(), indent=2))
else:
print("ERROR: Log parsing failed!")
sys.exit(1)
ALB の場合
import json
import re
import sys
import argparse
parser = argparse.ArgumentParser("parse access logs")
parser.add_argument("-p", "--path", help="set log file path", required=True)
args = parser.parse_args()
path = args.path
pattern_1 = [
r"^(?P<type>https|http|h2|grpcs|wss|ws|-)",
r"(?P<time>\d{4}-\d{2}-\d{2}((| |T)\d{2}:\d{2}:\d{2}(([.,])\d{1,6})?([+-]\d{2}(:)?\d{2})?)(|Z)|-)",
r"(?P<elb>[\w -/:-@\[-~]+)",
r"(?P<client_port>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{1,5}|-)|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d]\:\d{1,5})|-)",
r"(?P<target_port>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{1,5}|-)|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d]\:\d{1,5})|-)",
r"(?P<request_processing_time>\d+(\.\d+)?|-1|-)",
r"(?P<target_processing_time>\d+(\.\d+)?|-1|-)",
r"(?P<response_processing_time>\d+(\.\d+)?|-1|-)",
r"(?P<elb_status_code>\d{1,3}|-)",
r"(?P<target_status_code>\d{1,3}|-)",
r"(?P<received_bytes>[\d\-.]+|-)",
r"(?P<sent_bytes>[\d\-.]+|-)",
r"\"(?P<request>([\w\-]+|-) ((((https?|ftp|file)://)?([\w\.-]+)\.?([\w\.-]+)(:[\d]{1,5})?([\w -/:-@\[-~]+)?)|-) ([\w\-./]+))( ?)\"",
r"\"(?P<user_agent>[^\"]+)\"",
r"(?P<ssl_cipher>[\w -/:-@\[-~]+)",
r"(?P<ssl_protocol>(TLS(v|V)1.3|TLS(v|V)1.2|TLS(v|V)1.1|TLS(v|V)1|SSL(v|V)3|-))",
r"(?P<target_group_arn>[\w -/:-@\[-~]+)",
r"\"(?P<trace_id>[\w -/:-@\[-~]+)\"",
r"\"(?P<domain_name>[\w -/:-@\[-~]+)\"",
r"\"(?P<chosen_cert_arn>[\w\-:./]+|session-reused)\"",
r"(?P<matched_rule_priority>[\d]{1,5}|-1|-)",
r"(?P<request_creation_time>\d{4}-\d{2}-\d{2}((| |T)\d{2}:\d{2}:\d{2}(([.,])\d{1,6})?([+-]\d{2}(:)?\d{2})?)(|Z)|-)",
r"\"(?P<actions_executed>[\w -/:-@\[-~]+|-)\"",
r"\"(?P<redirect_url>(((https?|ftp|file)://)?([\w\-.]+)\.?([\w\-.]+)(:[\d]{1,5})?([\w -/:-@\[-~]+)?)|-)\"",
r"\"(?P<error_reason>([a-zA-Z]+|-))\"",
r"\"?(?P<target_port_list>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{1,5}|-))\"?",
r"\"?(?P<target_status_code_list>(\d{3}|-))\"?",
r"\"(?P<classification>(Acceptable|Ambiguous|Severe|-))\"",
r"\"(?P<classification_reason>[a-zA-Z]+|-)\"",
]
DELIMITER = " "
pt_1 = DELIMITER.join(pattern_1)
re_1 = re.compile(pt_1)
re_1_no_quotes = re.compile(pt_1.replace('\\"', ""))
with open(path) as f:
logs = f.read()
for log in logs.splitlines():
m = re_1.match(log)
if not m:
m = re_1_no_quotes.match(log)
if m:
print(json.dumps(m.groupdict(), indent=2))
else:
print("ERROR: Log parsing failed!")
sys.exit(1)
CLB の場合
import json
import re
import sys
import argparse
parser = argparse.ArgumentParser("parse access logs")
parser.add_argument("-p", "--path", help="set log file path", required=True)
args = parser.parse_args()
path = args.path
pattern_1 = [
r"(?P<time>\d{4}-\d{2}-\d{2}((| |T)\d{2}:\d{2}:\d{2}(([.,])\d{1,6})?([+-]\d{2}(:)?\d{2})?)(|Z)|-)",
r"(?P<elb>[\w -/:-@\[-~]+)",
r"(?P<client_port>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{1,5}|-)|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d]\:\d{1,5})|-)",
r"(?P<backend_port>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{1,5}|-)|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d]\:\d{1,5})|-)",
r"(?P<request_processing_time>\d+(\.\d+)?|-1|-)",
r"(?P<backend_processing_time>\d+(\.\d+)?|-1|-)",
r"(?P<response_processing_time>\d+(\.\d+)?|-1|-)",
r"(?P<elb_status_code>\d{1,3}|-)",
r"(?P<backend_status_code>\d{1,3}|-)",
r"(?P<received_bytes>[\d\-.]+|-)",
r"(?P<sent_bytes>[\d\-.]+|-)",
r"\"(?P<request>([\w\-]+|-) ((((https?|ftp|file)://)?([\w\.-]+)\.?([\w\.-]+)(:[\d]{1,5})?([\w -/:-@\[-~]+)?)|-) ([\w\-./]+))( ?)\"",
]
pattern_2 = [
r"\"(?P<user_agent>[^\"]+)\"",
r"(?P<ssl_cipher>[\w -/:-@\[-~]+)",
r"(?P<ssl_protocol>(TLS(v|V)1.3|TLS(v|V)1.2|TLS(v|V)1.1|TLS(v|V)1|SSL(v|V)3|-))",
]
DELIMITER = " "
pt_1 = DELIMITER.join(pattern_1)
pt_2 = DELIMITER.join(pattern_1 + pattern_2)
re_1 = re.compile(pt_1)
re_1_no_quotes = re.compile(pt_1.replace('\\"', ""))
re_2 = re.compile(pt_2)
re_2_no_quotes = re.compile(pt_2.replace('\\"', ""))
with open(path) as f:
logs = f.read()
for log in logs.splitlines():
m = re_2.match(log)
if not m:
m = re_1.match(log)
if not m:
m = re_2_no_quotes.match(log)
if not m:
m = re_1_no_quotes.match(log)
if m:
print(json.dumps(m.groupdict(), indent=2))
else:
print("ERROR: Log parsing failed!")
sys.exit(1)
CloudFront の場合
import json
import re
import sys
import argparse
parser = argparse.ArgumentParser("parse access logs")
parser.add_argument("-p", "--path", help="set log file path", required=True)
args = parser.parse_args()
path = args.path
pattern = [
r"^(?P<date>(\d{4}-\d{1,2}-\d{1,2})|-)",
r"(?P<time>(\d{1,2}:\d{1,2}:\d{1,2})|-)",
r"(?P<x_edge_location>[0-9a-zA-Z\-]+)",
r"(?P<sc_bytes>\d+|-)",
r"(?P<c_ip>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|((([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d]))))",
r"(?P<cs_method>([\w\-]+|-))",
r"(?P<cs_host>[\w -/:-@\[-~]+)",
r"(?P<cs_uri_stem>[\w -/:-@\[-~]+)",
r"(?P<sc_status>\d{1,3}|-)",
r"(?P<cs_referer>((([\w -/:-@\[-~]+)?://)?([\w\.-]+)\.([\w\.-]+)(:[\d]{1,5})?([\w -/:-@\[-~]+)?)|-)",
r"(?P<cs_user_agent>[^\t]+)",
r"(?P<cs_uri_query>[\w -/:-@\[-~]+)",
r"(?P<cs_cookie>\S+)",
r"(?P<x_edge_result_type>(Hit|RefreshHit|Miss|LimitExceeded|CapacityExceeded|Error|Redirect|-))",
r"(?P<x_edge_request_id>[\w -/:-@\[-~]+)",
r"(?P<x_host_header>[\w -/:-@\[-~]+)",
r"(?P<cs_protocol>(http|https|ws|wss|-))",
r"(?P<cs_bytes>[\d\-.]+|-)",
r"(?P<time_taken>[\d\-.]+|-)",
r"(?P<x_forwarded_for>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|-)",
r"(?P<ssl_protocol>(TLS(v|V)1.3|TLS(v|V)1.2|TLS(v|V)1.1|TLS(v|V)1|SSL(v|V)3|-))",
r"(?P<ssl_cipher>[\w\-]+)",
r"(?P<x_edge_response_result_type>(Hit|RefreshHit|Miss|LimitExceeded|CapacityExceeded|Error|Redirect|-))",
r"(?P<cs_protocol_version>[\w\-./ ]+)",
r"(?P<fle_status>([\w\-]+|-))",
r"(?P<fle_encrypted_fields>\S+)",
r"(?P<c_port>\d+|-)",
r"(?P<time_to_first_byte>\d+\.\d+|-)",
r"(?P<x_edge_detailed_result_type>[\w\-]+)",
r"(?P<sc_content_type>[\w -/:-@\[-~]+)",
r"(?P<sc_content_len>\d+|-)",
r"(?P<sc_range_start>\d+|-)",
r"(?P<sc_range_end>\d+|-)",
]
DELIMITER = "\t"
pt_1 = DELIMITER.join(pattern)
re_1 = re.compile(DELIMITER.join(pattern))
re_1_no_quotes = re.compile(pt_1.replace('\\"', ""))
with open(path) as f:
logs = f.read()
for log in logs.splitlines():
m = re_1.match(log)
if not m:
m = re_1_no_quotes.match(log)
if m:
print(json.dumps(m.groupdict(), indent=2))
else:
print("ERROR: Log parsing failed!")
sys.exit(1)
がんばったところ
たぶん、IPv6 がきても対応できる
免責事項
それなりの精度はあるはず (と思いたい) ですが、確実にパースできることを保証するものではありません。
おわりに
正規表現が横に長いので、フォーマッタの設定はちゃんとしておいたほうがストレスが少ないと思います。正規表現を書くとき、こちらのサイトが便利でした。NLB 対応したら、go で書き直してシングルバイナリ化したい。