エムオーテックス株式会社が運営するテックブログです。

Security Hub CSPMの棚卸し運用を見直し:見える化と自動化の方針

Security Hub CSPMの棚卸し運用を見直し:見える化と自動化の方針

はじめに

こんにちは、SREチームの植松です。

エムオーテックスではAWS Security Hub CSPM(以下、Security Hub CSPM)の検知を月次で棚卸し、対応要否を精査する運用を行なっています。

しかし、AWS のアップデートによるコントロール追加やリリースに伴うリソース増加によって、月によっては検知件数が大きく増え、棚卸し作業の負荷が高くなるという課題がありました。

本記事では、Security Hub CSPMの検知棚卸しをオートメーション前提で見直すにあたり、どのような点を整理し、どのような検討を行なったのかをまとめます。

docs.aws.amazon.com

なお、本記事で触れている「オートメーション」は、Security Hub CSPMの自動化ルールを用いて、検知結果のステータス変更(抑制・通知済みへの変更など)を自動化することを指しています。 チケット起票や外部ツール連携まで含めた包括的な自動化については扱いません。まずは Kiro-CLI を用いて検知状況を整理し、現状把握から着手しました。

現状のSecurity Hub CSPMの棚卸し運用と課題

Security Hub CSPMを運用するにあたり、どのように検知内容を棚卸しするかについては、いくつかの方法が考えられます。 検知された内容をトリガーとして自動的にチケットを起票する、あるいはチャット通知するといった方法も候補として挙がりましたが、 検知件数が多い場合にはチケット数が膨らむ可能性があり、通知についてもノイズが増えることが懸念されました。

こうした点を踏まえ、エムオーテックスでは月次で検知内容を棚卸しするという運用を採用しています。 集計期間内にコントロールステータスが「NEW」(新たに作成)のリソースについて、 新しいコントロールかどうか、過去に判断済みのコントロールかどうか、個別に修正対応が必要かの確認を行なっています。 対応が必要だと判断したコントロールについては適宜チケットを起票し、それぞれのリソースを開発した担当チームに連携します。

シンプルな運用方式ではありますが、AWS 側のアップデートによるコントロール追加や、リリースに伴うリソース増加があると、検知件数が一気に増えることがあります。その結果、月次棚卸しの作業量が一時的に大きくなり、 過去に「現時点では対応不要」と判断して抑制しているコントロールについても、新しいリソースが追加されるたびに同様の判断を繰り返す必要がある点が課題になっていました。

こうした背景から、現在の月次棚卸し運用を維持しつつ、 オートメーションを前提とした形に見直せないかという検討を進めることにしました。

Security Hub CSPMのオートメーション

「はじめに」に記載の通り、Security Hub CSPMにはオートメーションの仕組みがあります。 オートメーションでできることは大きく2つあり、自動化ルール及び、自動対応と修正がサポートされています。 自動化ルールとは、定義した基準に基づいてリアルタイムで検出結果を更新するものです。 自動対応と修正は、検出された場合にLambda関数を実行するといったようなアクションを定義できます。

今回は、特定のコントロールIDのリソースが検出された場合に自動で通知済み、または抑制済みにするといったことをやりたいので「自動化ルール」を検討します。

docs.aws.amazon.com

Security Hub CSPMのコントロールの対応状況を確認する

Security Hub CSPMの基本的なセキュリティのベストプラクティス標準は300以上のコントロールがあります。 自動化ルールを実装するにあたり、それぞれのコントロールについてオートメーションが必要かどうかを整理しました。

コントロール対応状況確認のスクリプト

Security Hub CSPMのコントロールの対応状況確認のため、以下のスクリプトを作成、実行しました。

#!/usr/bin/env python3
"""
Security Hub CSPM Control Inventory Script

このスクリプトは、AWS Security Hub CSPMの指定されたスタンダードについて、
各コントロールの Finding 状況を集計し、CSV ファイルに出力します。
"""

import boto3
import csv
import time
import argparse
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed


def fetch_findings_for_control(securityhub, control_id, account_id):
    """
    指定されたコントロールの Finding を取得
    
    Args:
        securityhub: boto3 Security Hub CSPMクライアント
        control_id: コントロール ID (例: "S3.1")
        account_id: AWS アカウント ID
    
    Returns:
        tuple: (control_id, findings のリスト)
    """
    findings = []
    paginator = securityhub.get_paginator('get_findings')
    max_retries = 5
    retry_count = 0
    
    while retry_count < max_retries:
        try:
            for page in paginator.paginate(
                Filters={
                    'ProductFields': [
                        {'Key': 'ControlId', 'Value': control_id, 'Comparison': 'EQUALS'}
                    ],
                    'AwsAccountId': [
                        {'Value': account_id, 'Comparison': 'EQUALS'}
                    ],
                    'RecordState': [
                        {'Value': 'ACTIVE', 'Comparison': 'EQUALS'}
                    ]
                }
            ):
                findings.extend(page['Findings'])
            break
        except Exception as e:
            if 'TooManyRequestsException' in str(e):
                retry_count += 1
                wait_time = 2 ** retry_count
                print(f"  Rate limit hit for {control_id}, retrying in {wait_time}s...")
                time.sleep(wait_time)
            else:
                raise
    
    return control_id, findings


def classify_control(control_status, status_count, total_findings):
    """
    コントロールの状況を分類
    
    分類カテゴリ:
    - 無効化: コントロール自体が無効
    - Finding なし: アクティブな Finding が存在しない
    - 全解決済: 全 Finding が RESOLVED
    - 全抑制: 全 Finding が SUPPRESSED
    - 全通知済: 全 Finding が NOTIFIED
    - 一部抑制: SUPPRESSED と他ステータスが混在
    - 未対応: 全 Finding が NEW
    - 混在: 上記以外の混在状態
    """
    if control_status == 'DISABLED':
        return '無効化'
    elif total_findings == 0:
        return 'Finding なし'
    elif status_count['RESOLVED'] == total_findings:
        return '全解決済'
    elif status_count['SUPPRESSED'] == total_findings:
        return '全抑制'
    elif status_count['NOTIFIED'] == total_findings:
        return '全通知済'
    elif status_count['SUPPRESSED'] > 0:
        return '一部抑制'
    elif status_count['NEW'] == total_findings:
        return '未対応'
    else:
        return '混在'


def main():
    parser = argparse.ArgumentParser(
        description='Security Hub CSPMのコントロール状況を棚卸しして CSV に出力します'
    )
    parser.add_argument(
        '--profile',
        default='default',
        help='AWS プロファイル名 (デフォルト: default)'
    )
    parser.add_argument(
        '--account-id',
        required=True,
        help='対象の AWS アカウント ID'
    )
    parser.add_argument(
        '--region',
        default='ap-northeast-1',
        help='AWS リージョン (デフォルト: ap-northeast-1)'
    )
    parser.add_argument(
        '--standard',
        default='aws-foundational-security-best-practices/v/1.0.0',
        help='スタンダード名 (デフォルト: aws-foundational-security-best-practices/v/1.0.0)'
    )
    parser.add_argument(
        '--output',
        default='control_inventory.csv',
        help='出力ファイル名 (デフォルト: control_inventory.csv)'
    )
    parser.add_argument(
        '--workers',
        type=int,
        default=3,
        help='並列処理のワーカー数 (デフォルト: 3)'
    )
    
    args = parser.parse_args()
    
    # AWS セッション作成
    session = boto3.Session(profile_name=args.profile, region_name=args.region)
    securityhub = session.client('securityhub')
    
    print(f"Profile: {args.profile}")
    print(f"Region: {args.region}")
    print(f"Account ID: {args.account_id}")
    print(f"Standard: {args.standard}")
    
    # 有効なスタンダードを取得
    standards = securityhub.get_enabled_standards()
    standard_arn = None
    for std in standards['StandardsSubscriptions']:
        if args.standard in std['StandardsArn']:
            standard_arn = std['StandardsSubscriptionArn']
            break
    
    if not standard_arn:
        print(f"エラー: スタンダード '{args.standard}' が見つかりません")
        print("有効なスタンダード:")
        for std in standards['StandardsSubscriptions']:
            print(f"  - {std['StandardsArn']}")
        return 1
    
    print(f"Processing: {standard_arn}")
    
    # コントロール一覧を取得
    controls = []
    paginator = securityhub.get_paginator('describe_standards_controls')
    for page in paginator.paginate(StandardsSubscriptionArn=standard_arn):
        controls.extend(page['Controls'])
    
    print(f"Total controls: {len(controls)}")
    
    # 並列処理で Findings 取得
    print(f"Fetching findings in parallel (workers: {args.workers})...")
    findings_by_control = {}
    
    with ThreadPoolExecutor(max_workers=args.workers) as executor:
        futures = {
            executor.submit(fetch_findings_for_control, securityhub, control['ControlId'], args.account_id): control 
            for control in controls
        }
        
        for idx, future in enumerate(as_completed(futures), 1):
            control_id, findings = future.result()
            findings_by_control[control_id] = findings
            print(f"[{idx}/{len(controls)}] Fetched {control_id}: {len(findings)} findings")
    
    # 結果を集計
    results = []
    
    for control in controls:
        control_id = control['ControlId']
        control_title = control['Title']
        control_status = control['ControlStatus']
        
        control_findings = findings_by_control.get(control_id, [])
        status_count = defaultdict(int)
        
        for finding in control_findings:
            workflow_status = finding['Workflow']['Status']
            status_count[workflow_status] += 1
        
        total_findings = sum(status_count.values())
        category = classify_control(control_status, status_count, total_findings)
        
        results.append({
            'ControlId': control_id,
            'Title': control_title,
            'ControlStatus': control_status,
            'Category': category,
            'Total': total_findings,
            'NEW': status_count['NEW'],
            'NOTIFIED': status_count['NOTIFIED'],
            'SUPPRESSED': status_count['SUPPRESSED'],
            'RESOLVED': status_count['RESOLVED']
        })
    
    # CSV 出力
    with open(args.output, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=[
            'ControlId', 'Title', 'ControlStatus', 'Category', 
            'Total', 'NEW', 'NOTIFIED', 'SUPPRESSED', 'RESOLVED'
        ])
        writer.writeheader()
        writer.writerows(results)
    
    print(f"\nCompleted! Output: {args.output}")
    
    # サマリー表示
    print("\n=== Summary ===")
    category_summary = defaultdict(int)
    for r in results:
        category_summary[r['Category']] += 1
    
    for category, count in sorted(category_summary.items()):
        print(f"{category}: {count}")
    
    return 0


if __name__ == '__main__':
    exit(main())

スクリプトの説明

このスクリプトは、指定された AWS Security Hub CSPMスタンダードの全コントロールについて、検知されたリソース(Finding )の状況を集計し、以下のカテゴリに分類します。

  • 無効化: コントロール自体が無効
  • Finding なし: アクティブな Finding が存在しない
  • 全解決済: 全 Finding が RESOLVED
  • 全抑制: 全 Finding が SUPPRESSED
  • 全通知済: 全 Finding が NOTIFIED
  • 一部抑制: SUPPRESSED と他ステータスが混在
  • 未対応: 全 Finding が NEW
  • 混在: 上記以外の混在状態

スクリプトは以下のサンプルのように実行します。--profileはAWS CLIのプロファイルを指します。--account-idはコントロールの状況を確認したいAWSアカウントのIDです。

python inventory_controls_generic.py --profile audit --account-id 123456789012

エムオーテックスではLANSCOPE エンドポイントマネージャー クラウド版の運用にあたりAWS Control Towerを有効にしています。 Control Tower有効化にあたり、セキュリティアカウント(Auditアカウント)が自動生成されますが、エムオーテックスではこのアカウントにSecurity Hub CSPMの結果を集約しているため、--profile auditと指定しています。 --profileのアカウントと--account-idが同一の場合でも問題なくスクリプトは動作します。

docs.aws.amazon.com

スクリプトを実行すると、以下のようなCSVファイルが出力されます。

ControlIdとTitle には、コントロールの識別子と説明が記録されます。例えば S3.1 というコントロール ID に対して "S3 Block Public Access setting should be enabled" といった説明が付きます。 ControlStatus は、コントロール自体が有効(ENABLED)か無効(DISABLED)かを示します。 Total には、そのコントロールで検知されているアクティブな Finding の総数が記録されます。

NEW、NOTIFIED、SUPPRESSED、RESOLVEDの各カラムには、それぞれのワークフローステータスごとのFinding 数が記録されます。これにより、例えば「100件の Finding のうち、80件は抑制済みで20件が未対応」といった詳細な状況を把握できます。

オートメーションの設計方針

スクリプト実行で得られたCSVを参考に、以下の方針でオートメーション対応するコントロールIDを決めました。

  • カテゴリが全抑制:過去に抑制済みで問題ないと判断したコントロールであるため、新しい検知がきた場合も抑制済みとする
  • カテゴリが全通知済:過去に対応必要と判断したコントロールであるため、新しい検知がきた場合は通知済みとする
  • カテゴリが全解決済:オートメーション対応しない
  • カテゴリが一部抑制、混在:一律にコントロールステータスを変更できないため、オートメーション対応しない

カテゴリが全抑制、全通知済のコントロール数は約40となりました。オートメーション実装はこれからですが、手動対応していたステータス変更が自動化されるため棚卸し作業の負荷が減る見込みです。

おわりに

本記事では、Security Hub CSPMの検知棚卸しについて、オートメーションを前提に運用を見直すにあたって行なった検討内容を整理しました。 まだオートメーションを本格的に導入した段階ではありませんが、事前に検知状況を可視化し、コントロールごとの扱いを整理することで、棚卸し業務の負荷を下げられる見通しを持つことができました。

今後は、今回整理した方針をもとにオートメーションを実装し、実際の運用の中で効果や課題を確認していく予定です。 Security Hub CSPMの棚卸し運用に悩んでいる方にとって、本記事の内容が検討の一助になれば幸いです。