はじめに
各種 AWS サービスの出力するアクセスログを正規表現でパースしてみました。ログ収集基盤や DWH などで雑多なログをある一定のフォーマットに整形するのはよくある話で、自分なりに汎用化しておくと捗る気がします。
- Python でコーディング
- pattern_1, 2 というふうに分かれているのは、歴史的経緯でログフォーマットが更新されているので、古いものも救おうという意図がある
- サンプルなので単純に JSON を返しているが、実際に使う場合はコネコネする想定
現時点で対応できている AWS サービス
- S3
- ALB
- CLB
- CloudFront
機会があれば対象サービスを増やしていきたい (NLB など)
S3 の場合
import json import re import sys import argparse parser = argparse.ArgumentParser("parse access logs") parser.add_argument("-p", "--path", help="set log file path", required=True) args = parser.parse_args() path = args.path pattern_1 = [ r"^(?P<bucket_owner>[\w -/:-@\[-~]+|-)", r"(?P<bucket>[\w -/:-@\[-~]+)", r"\[(?P<time>([\w -/:-@\[-~]+ [\w+]+))\]", r"(?P<remote_ip>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|-)", r"(?P<requester>[\w -/:-@\[-~]+|Anonymous)", r"(?P<request_id>[\w -/:-@\[-~]+|-)", r"(?P<operation>[\w -/:-@\[-~]+|-)", r"(?P<key>[\w -/:-@\[-~]+)", r"\"(?P<request_uri>([\w -/:-@\[-~]+|-) ([\w -/:-@\[-~]+))\"", r"(?P<http_status>\d{1,3}|-)", r"(?P<error_code>[\w -/:-@\[-~]+)", r"(?P<bytes_sent>[\d\-.]+|-)", r"(?P<object_size>[\d\-.]+|-)", r"(?P<total_time>[\d\-.]+|-)", r"(?P<turn_around_time>[\d\-.]+|-)", r"\"(?P<referer>(((https?|ftp|file)://)?([\w\-.]+)\.?([\w\-.]+)(:[\d]{1,5})?([\w -/:-@\[-~]+)?)|-)\"", r"\"(?P<user_agent>[^\"]+)\"", r"(?P<version_id>[\w -/:-@\[-~]+)", ] pattern_2 = [ r"(?P<host_id>([\w -/:-@\[-~]+))", r"(?P<signature_version>(SigV2|SigV4|-))", r"(?P<cipher_suite>[\w -/:-@\[-~]+)", r"(?P<authentication_type>(AuthHeader|QueryString|-))", r"(?P<host_header>[\w -/:-@\[-~]+)", ] pattern_3 = [ r"(?P<tls_version>(TLS(v|V)1.3|TLS(v|V)1.2|TLS(v|V)1.1|TLS(v|V)1|SSL(v|V)3|-))", ] pattern_4 = [ r"(?P<access_point_arn>[\w -/:-@\[-~]+)", ] DELIMITER = " " pt_1 = DELIMITER.join(pattern_1) pt_2 = DELIMITER.join(pattern_1 + pattern_2) pt_3 = DELIMITER.join(pattern_1 + pattern_2 + pattern_3) pt_4 = DELIMITER.join(pattern_1 + pattern_2 + pattern_3 + pattern_4) re_1 = re.compile(pt_1) re_1_no_quotes = re.compile(pt_1.replace('\\"', "")) re_2 = re.compile(pt_2) re_2_no_quotes = re.compile(pt_2.replace('\\"', "")) re_3 = re.compile(pt_3) re_3_no_quotes = re.compile(pt_3.replace('\\"', "")) re_4 = re.compile(pt_4) re_4_no_quotes = re.compile(pt_4.replace('\\"', "")) with open(path) as f: logs = f.read() for log in logs.splitlines(): m = re_4.match(log) if not m: m = re_3.match(log) if not m: m = re_2.match(log) if not m: m = re_1.match(log) if not m: m = re_4_no_quotes.match(log) if not m: m = re_3_no_quotes.match(log) if not m: m = re_2_no_quotes.match(log) if not m: m = re_1_no_quotes.match(log) if m: print(json.dumps(m.groupdict(), indent=2)) else: print("ERROR: Log parsing failed!") sys.exit(1)
ALB の場合
import json import re import sys import argparse parser = argparse.ArgumentParser("parse access logs") parser.add_argument("-p", "--path", help="set log file path", required=True) args = parser.parse_args() path = args.path pattern_1 = [ r"^(?P<type>https|http|h2|grpcs|wss|ws|-)", r"(?P<time>\d{4}-\d{2}-\d{2}((| |T)\d{2}:\d{2}:\d{2}(([.,])\d{1,6})?([+-]\d{2}(:)?\d{2})?)(|Z)|-)", r"(?P<elb>[\w -/:-@\[-~]+)", r"(?P<client_port>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{1,5}|-)|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d]\:\d{1,5})|-)", r"(?P<target_port>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{1,5}|-)|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d]\:\d{1,5})|-)", r"(?P<request_processing_time>\d+(\.\d+)?|-1|-)", r"(?P<target_processing_time>\d+(\.\d+)?|-1|-)", r"(?P<response_processing_time>\d+(\.\d+)?|-1|-)", r"(?P<elb_status_code>\d{1,3}|-)", r"(?P<target_status_code>\d{1,3}|-)", r"(?P<received_bytes>[\d\-.]+|-)", r"(?P<sent_bytes>[\d\-.]+|-)", r"\"(?P<request>([\w\-]+|-) ((((https?|ftp|file)://)?([\w\.-]+)\.?([\w\.-]+)(:[\d]{1,5})?([\w -/:-@\[-~]+)?)|-) ([\w\-./]+))( ?)\"", r"\"(?P<user_agent>[^\"]+)\"", r"(?P<ssl_cipher>[\w -/:-@\[-~]+)", r"(?P<ssl_protocol>(TLS(v|V)1.3|TLS(v|V)1.2|TLS(v|V)1.1|TLS(v|V)1|SSL(v|V)3|-))", r"(?P<target_group_arn>[\w -/:-@\[-~]+)", r"\"(?P<trace_id>[\w -/:-@\[-~]+)\"", r"\"(?P<domain_name>[\w -/:-@\[-~]+)\"", r"\"(?P<chosen_cert_arn>[\w\-:./]+|session-reused)\"", r"(?P<matched_rule_priority>[\d]{1,5}|-1|-)", r"(?P<request_creation_time>\d{4}-\d{2}-\d{2}((| |T)\d{2}:\d{2}:\d{2}(([.,])\d{1,6})?([+-]\d{2}(:)?\d{2})?)(|Z)|-)", r"\"(?P<actions_executed>[\w -/:-@\[-~]+|-)\"", r"\"(?P<redirect_url>(((https?|ftp|file)://)?([\w\-.]+)\.?([\w\-.]+)(:[\d]{1,5})?([\w -/:-@\[-~]+)?)|-)\"", r"\"(?P<error_reason>([a-zA-Z]+|-))\"", r"\"?(?P<target_port_list>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{1,5}|-))\"?", r"\"?(?P<target_status_code_list>(\d{3}|-))\"?", r"\"(?P<classification>(Acceptable|Ambiguous|Severe|-))\"", r"\"(?P<classification_reason>[a-zA-Z]+|-)\"", ] DELIMITER = " " pt_1 = DELIMITER.join(pattern_1) re_1 = re.compile(pt_1) re_1_no_quotes = re.compile(pt_1.replace('\\"', "")) with open(path) as f: logs = f.read() for log in logs.splitlines(): m = re_1.match(log) if not m: m = re_1_no_quotes.match(log) if m: print(json.dumps(m.groupdict(), indent=2)) else: print("ERROR: Log parsing failed!") sys.exit(1)
CLB の場合
import json import re import sys import argparse parser = argparse.ArgumentParser("parse access logs") parser.add_argument("-p", "--path", help="set log file path", required=True) args = parser.parse_args() path = args.path pattern_1 = [ r"(?P<time>\d{4}-\d{2}-\d{2}((| |T)\d{2}:\d{2}:\d{2}(([.,])\d{1,6})?([+-]\d{2}(:)?\d{2})?)(|Z)|-)", r"(?P<elb>[\w -/:-@\[-~]+)", r"(?P<client_port>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{1,5}|-)|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d]\:\d{1,5})|-)", r"(?P<backend_port>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\:\d{1,5}|-)|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d]\:\d{1,5})|-)", r"(?P<request_processing_time>\d+(\.\d+)?|-1|-)", r"(?P<backend_processing_time>\d+(\.\d+)?|-1|-)", r"(?P<response_processing_time>\d+(\.\d+)?|-1|-)", r"(?P<elb_status_code>\d{1,3}|-)", r"(?P<backend_status_code>\d{1,3}|-)", r"(?P<received_bytes>[\d\-.]+|-)", r"(?P<sent_bytes>[\d\-.]+|-)", r"\"(?P<request>([\w\-]+|-) ((((https?|ftp|file)://)?([\w\.-]+)\.?([\w\.-]+)(:[\d]{1,5})?([\w -/:-@\[-~]+)?)|-) ([\w\-./]+))( ?)\"", ] pattern_2 = [ r"\"(?P<user_agent>[^\"]+)\"", r"(?P<ssl_cipher>[\w -/:-@\[-~]+)", r"(?P<ssl_protocol>(TLS(v|V)1.3|TLS(v|V)1.2|TLS(v|V)1.1|TLS(v|V)1|SSL(v|V)3|-))", ] DELIMITER = " " pt_1 = DELIMITER.join(pattern_1) pt_2 = DELIMITER.join(pattern_1 + pattern_2) re_1 = re.compile(pt_1) re_1_no_quotes = re.compile(pt_1.replace('\\"', "")) re_2 = re.compile(pt_2) re_2_no_quotes = re.compile(pt_2.replace('\\"', "")) with open(path) as f: logs = f.read() for log in logs.splitlines(): m = re_2.match(log) if not m: m = re_1.match(log) if not m: m = re_2_no_quotes.match(log) if not m: m = re_1_no_quotes.match(log) if m: print(json.dumps(m.groupdict(), indent=2)) else: print("ERROR: Log parsing failed!") sys.exit(1)
CloudFront の場合
import json import re import sys import argparse parser = argparse.ArgumentParser("parse access logs") parser.add_argument("-p", "--path", help="set log file path", required=True) args = parser.parse_args() path = args.path pattern = [ r"^(?P<date>(\d{4}-\d{1,2}-\d{1,2})|-)", r"(?P<time>(\d{1,2}:\d{1,2}:\d{1,2})|-)", r"(?P<x_edge_location>[0-9a-zA-Z\-]+)", r"(?P<sc_bytes>\d+|-)", r"(?P<c_ip>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|((([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d]))))", r"(?P<cs_method>([\w\-]+|-))", r"(?P<cs_host>[\w -/:-@\[-~]+)", r"(?P<cs_uri_stem>[\w -/:-@\[-~]+)", r"(?P<sc_status>\d{1,3}|-)", r"(?P<cs_referer>((([\w -/:-@\[-~]+)?://)?([\w\.-]+)\.([\w\.-]+)(:[\d]{1,5})?([\w -/:-@\[-~]+)?)|-)", r"(?P<cs_user_agent>[^\t]+)", r"(?P<cs_uri_query>[\w -/:-@\[-~]+)", r"(?P<cs_cookie>\S+)", r"(?P<x_edge_result_type>(Hit|RefreshHit|Miss|LimitExceeded|CapacityExceeded|Error|Redirect|-))", r"(?P<x_edge_request_id>[\w -/:-@\[-~]+)", r"(?P<x_host_header>[\w -/:-@\[-~]+)", r"(?P<cs_protocol>(http|https|ws|wss|-))", r"(?P<cs_bytes>[\d\-.]+|-)", r"(?P<time_taken>[\d\-.]+|-)", r"(?P<x_forwarded_for>(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|([\da-fA-F]{1,4}:){7,7}[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,7}:|([\da-fA-F]{1,4}:){1,6}:[\da-fA-F]{1,4}|([\da-fA-F]{1,4}:){1,5}(:[\da-fA-F]{1,4}){1,2}|([\da-fA-F]{1,4}:){1,4}(:[\da-fA-F]{1,4}){1,3}|([\da-fA-F]{1,4}:){1,3}(:[\da-fA-F]{1,4}){1,4}|([\da-fA-F]{1,4}:){1,2}(:[\da-fA-F]{1,4}){1,5}|[\da-fA-F]{1,4}:((:[\da-fA-F]{1,4}){1,6})|:((:[\da-fA-F]{1,4}){1,7}|:)|fe80:(:[\da-fA-F]{0,4}){0,4}%[\da-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|([\da-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[\d]){0,1}[\d])|-)", r"(?P<ssl_protocol>(TLS(v|V)1.3|TLS(v|V)1.2|TLS(v|V)1.1|TLS(v|V)1|SSL(v|V)3|-))", r"(?P<ssl_cipher>[\w\-]+)", r"(?P<x_edge_response_result_type>(Hit|RefreshHit|Miss|LimitExceeded|CapacityExceeded|Error|Redirect|-))", r"(?P<cs_protocol_version>[\w\-./ ]+)", r"(?P<fle_status>([\w\-]+|-))", r"(?P<fle_encrypted_fields>\S+)", r"(?P<c_port>\d+|-)", r"(?P<time_to_first_byte>\d+\.\d+|-)", r"(?P<x_edge_detailed_result_type>[\w\-]+)", r"(?P<sc_content_type>[\w -/:-@\[-~]+)", r"(?P<sc_content_len>\d+|-)", r"(?P<sc_range_start>\d+|-)", r"(?P<sc_range_end>\d+|-)", ] DELIMITER = "\t" pt_1 = DELIMITER.join(pattern) re_1 = re.compile(DELIMITER.join(pattern)) re_1_no_quotes = re.compile(pt_1.replace('\\"', "")) with open(path) as f: logs = f.read() for log in logs.splitlines(): m = re_1.match(log) if not m: m = re_1_no_quotes.match(log) if m: print(json.dumps(m.groupdict(), indent=2)) else: print("ERROR: Log parsing failed!") sys.exit(1)
がんばったところ
たぶん、IPv6 がきても対応できる
免責事項
それなりの精度はあるはず (と思いたい) ですが、確実にパースできることを保証するものではありません。
おわりに
正規表現が横に長いので、フォーマッタの設定はちゃんとしておいたほうがストレスが少ないと思います。正規表現を書くとき、こちらのサイトが便利でした。NLB 対応したら、go で書き直してシングルバイナリ化したい。