[HTML payload içeriği buraya]
32.4 C
Jakarta
Wednesday, May 13, 2026

Ship Amazon CloudWatch logs to Amazon OpenSearch Serverless


Amazon CloudWatch Logs gather, mixture, and analyze logs from totally different methods in a single place. CloudWatch gives subcriptions as a real-time feed of those logs to different providers like Amazon Kinesis Information Streams, AWS Lambda, and Amazon OpenSearch Service. These subscriptions are a preferred mechanism to allow customized processing and superior evaluation of log knowledge to realize further invaluable insights. On the time of publishing this weblog submit, these subscription filters assist delivering logs to Amazon OpenSearch Service provisioned clusters solely. Clients are more and more adopting Amazon OpenSearch Serverless as a cheap possibility for rare, intermittent and unpredictable workloads.

On this weblog submit, we are going to present the way to use Amazon OpenSearch Ingestion to ship CloudWatch logs to OpenSearch Serverless in close to real-time. We define a mechanism to attach a Lambda subscription filter with OpenSearch Ingestion and ship logs to OpenSearch Serverless with out explicitly needing a separate subscription filter for it.

Resolution overview

The next diagram illustrates the answer structure.

  1. CloudWatch Logs: Collects and shops logs from varied AWS sources and purposes. It serves because the supply of log knowledge on this resolution.
  2. Subscription filter : A CloudWatch Logs subscription filter filters and routes particular log knowledge from CloudWatch Logs to the following part within the pipeline.
  3. CloudWatch exporter Lambda perform: It is a Lambda perform that receives the filtered log knowledge from the subscription filter. Its function is to rework and put together the log knowledge for ingestion into the OpenSearch Ingestion pipeline.
  4. OpenSearch Ingestion: It is a part of OpenSearch Service. The Ingestion pipeline is answerable for processing and enriching the log knowledge obtained from the CloudWatch exporter Lambda perform earlier than storing it within the OpenSearch Serverless assortment.
  5. OpenSearch Service: That is totally managed service that shops and indexes log knowledge, making it searchable and obtainable for evaluation and visualization. OpenSearch Service presents two configurations: provisioned domains and serverless. On this setup, we use serverless, which is an auto-scaling configuration for OpenSearch Service.

Conditions

Deploy the answer

With the conditions in place, you possibly can create and deploy the items of the answer.

Step 1: Create PipelineRole for ingestion

  • Open the AWS Administration Console for AWS Identification and Entry Administration (IAM).
  • Select Insurance policies, after which select Create coverage.
  • Choose JSON and paste the next coverage into the editor:
{
    "Model": "2012-10-17",
    "Assertion": [
        {
            "Action": [
                "aoss:BatchGetCollection",
                "aoss:APIAccessAll"
            ],
            "Impact": "Permit",
            "Useful resource": "arn:aws:aoss:us-east-1:{accountId}:assortment/{collectionId}"
        },
        {
            "Motion": [
                "aoss:CreateSecurityPolicy",
                "aoss:GetSecurityPolicy",
                "aoss:UpdateSecurityPolicy"
            ],
            "Impact": "Permit",
            "Useful resource": "*",
            "Situation": {
                "StringEquals": {
                    "aoss:assortment": "{assortment}"
                }
            }
        }
    ]
}

// Substitute {accountId}, {collectionId}, and {assortment} with your individual values

  • Select Subsequent, select Subsequent, and identify your coverage collection-pipeline-policy.
  • Select Create coverage.
  • Subsequent, create a job and connect the coverage to it. Select Roles, after which select Create position.
  • Choose Customized belief coverage and paste the next coverage into the editor:
{
   "Model":"2012-10-17",
   "Assertion":[
      {
         "Effect":"Allow",
         "Principal":{
            "Service":"osis-pipelines.amazonaws.com"
         },
         "Action":"sts:AssumeRole"
      }
   ]
}

  • Select Subsequent, after which seek for and choose the collection-pipeline-policy you simply created.
  • Select Subsequent and identify the position PipelineRole.
  • Select Create position.

Step 2: Configure the community and knowledge coverage for OpenSearch assortment

  • Within the OpenSearch Service console, navigate to the Serverless menu.
  • Create a VPC endpoint by following the instruction in Create an interface endpoint for OpenSearch Serverless.
  • Go to Safety and select Community insurance policies.
  • Select Create community coverage.
  • Configure the next coverage
[
  {
    "Rules": [
      {
        "Resource": [
          "collection/{collection name}"
        ],
        "ResourceType": "assortment"
      }
    ],
    "AllowFromPublic": false,
    "SourceVPCEs": [
      "{VPC Enddpoint Id}"
    ]
  },
  {
    "Guidelines": [
      {
        "Resource": [
          "collection/{collection name}"
        ],
        "ResourceType": "dashboard"
      }
    ],
    "AllowFromPublic": true
  }
]

  • Go to Safety and select Information entry insurance policies.
  • Select Create entry coverage.
  • Configure the next coverage:
[
  {
    "Rules": [
      {
        "Resource": [
          "index/{collection name}/*"
        ],
        "Permission": [
          "aoss:CreateIndex",
          "aoss:UpdateIndex",
          "aoss:DescribeIndex",
          "aoss:ReadDocument",
          "aoss:WriteDocument"
        ],
        "ResourceType": "index"
      }
    ],
    "Principal": [
      "arn:aws:iam::{accountId}:role/PipelineRole",
      "arn:aws:iam::{accountId}:role/Admin"
    ],
    "Description": "Rule 1"
  }
]

Step 3: Create an OpenSearch Ingestion pipeline

  • Navigate to the OpenSearch Service.
  • Go to the Ingestion pipelines part.
  • Select Create pipeline.
  • Outline the pipeline configuration.
model: "2"
 cwlogs-ingestion-pipeline:

  supply:

    http:

      path: /logs/ingest

  sink:

    - opensearch:

        # Present an AWS OpenSearch Service area endpoint

        hosts: ["https://{collectionId}.{region}.aoss.amazonaws.com"]

        index: "cwl-%{yyyy-MM-dd}"

        aws:

          # Present a Function ARN with entry to the area. This position ought to have a belief relationship with osis-pipelines.amazonaws.com

          sts_role_arn: "arn:aws:iam::{accountId}:position/PipelineRole"

          # Present the area of the area.

          area: "{area}"

          serverless: true

          serverless_options:

            network_policy_name: "{Community coverage identify}"
 # To get the values for the placeholders: 
 # 1. {collectionId}: Yow will discover the gathering ID by navigating to the Amazon OpenSearch Serverless Assortment within the AWS Administration Console, after which clicking on the Assortment. The gathering ID is listed below the "Overview" part. 
 # 2. {area}: That is the AWS area the place your Amazon OpenSearch Service area is situated. Yow will discover this info within the AWS Administration Console whenever you navigate to the area. 
 # 3. {accountId}: That is your AWS account ID. Yow will discover your account ID by clicking in your username within the top-right nook of the AWS Administration Console and choosing "My Account" from the dropdown menu. 
 # 4. {Community coverage identify}: That is the identify of the community coverage you might have configured on your Amazon OpenSearch Serverless Assortment. If you have not configured a community coverage, you possibly can go away this placeholder as is or take away it from the configuration.
 # After acquiring the required values, exchange the placeholders within the configuration with the precise values.            

Step 4: Create a Lambda perform

  • Create a Lambda layer for requests and sigv4 packages. Run the next instructions in AWS Cloudshell.
mkdir lambda_layers
 cd lambda_layers
 mkdir python
 cd python
 pip set up requests -t ./
 pip set up requests_auth_aws_sigv4 -t ./
 cd ..
 zip -r python_modules.zip .


 aws lambda publish-layer-version --layer-name Information-requests --description "My Python layer" --zip-file fileb://python_modules.zip --compatible-runtimes python3.x

import base64
 import gzip
 import json
 import logging
 import json
 import jmespath
 import requests
 from datetime import datetime
 from requests_auth_aws_sigv4 import AWSSigV4
 import boto3


 LOGGER = logging.getLogger(__name__)
 LOGGER.setLevel(logging.INFO)


 def lambda_handler(occasion, context):

    """Extract the information from the occasion"""

    knowledge = jmespath.search("awslogs.knowledge", occasion)

    """Decompress the logs"""

    cwLogs = decompress_json_data(knowledge)

    """Assemble the payload to ship to OpenSearch Ingestion"""

    payload = prepare_payload(cwLogs)

    print(payload)

    """Ingest the set of occasions to the pipeline"""    

    response = ingestData(payload)

    return {

        'statusCode': 200

    }
 def decompress_json_data(knowledge):

    compressed_data = base64.b64decode(knowledge)

    uncompressed_data = gzip.decompress(compressed_data)

    return json.masses(uncompressed_data)


 def prepare_payload(cwLogs):

    payload = []

    logEvents = cwLogs['logEvents']

    for logEvent in logEvents:

        request = {}

        request['id'] = logEvent['id']

        dt = datetime.fromtimestamp(logEvent['timestamp'] / 1000) 

        request['timestamp'] = dt.isoformat()

        request['message'] = logEvent['message'];

        request['owner'] = cwLogs['owner'];

        request['log_group'] = cwLogs['logGroup'];

        request['log_stream'] = cwLogs['logStream'];

        payload.append(request)

    return payload

 def ingestData(payload):

    ingestionEndpoint="{OpenSearch Pipeline Endpoint}"

    endpoint="https://" + ingestionEndpoint

    headers = {'Content material-Kind': 'software/json', 'Settle for':'software/json'}

    r = requests.request('POST', f'{endpoint}/logs/ingest', json=payload, auth=AWSSigV4('osis'), headers=headers)

    LOGGER.data('Response obtained: ' + r.textual content)

    return r

  • Substitute {OpenSearch Pipeline Endpoint}’ with the endpoint of your OpenSearch Ingestion pipeline.
  • Connect the next inline coverage in execution position.
{

    "Model": "2012-10-17",

    "Assertion": [

        {

            "Sid": "PermitsWriteAccessToPipeline",

            "Effect": "Allow",

            "Action": "osis:Ingest",

            "Resource": "arn:aws:osis:{region}:{accountId}:pipeline/{OpenSearch Pipeline Name}"

        }

    ]
 }

Step 5: Arrange a CloudWatch Logs subscription

  • Grant permission to a selected AWS service or AWS account to invoke the required Lambda perform. The next command grants permission to the CloudWatch Logs service to invoke the cloud-logs Lambda perform for the required log group. That is crucial as a result of CloudWatch Logs can not instantly invoke a Lambda perform with out being granted permission. Run the next command in CloudShell so as to add permission.
aws lambda add-permission
 --function-name "{perform identify}"
 --statement-id "{perform identify}"
 --principal "logs.amazonaws.com"
 --action "lambda:InvokeFunction"
 --source-arn "arn:aws:logs:{area}:{accountId}:log-group:{log_group}:*"
 --source-account "{accountId}"

  • Create a subscription filter for a log group. The next command creates a subscription filter on the log group, which forwards all log occasions (as a result of the filter sample is an empty string) to the Lambda perform. Run the next command in Cloudshell to create the subscription filter.
aws logs put-subscription-filter
 --log-group-name {log_group}
 --filter-name {filter identify}
 --filter-pattern ""
 --destination-arn arn:aws:lambda:{area}:{accountId}:perform:{perform identify}

Step 6: Testing and verification

  • Generate some logs in your CloudWatch log group. Run the next command in Cloudshell to create pattern logs in log group.
aws logs put-log-events --log-group-name {log_group} --log-stream-name {stream_name} --log-events "[{"timestamp":{timestamp in millis} , "message": "Simple Lambda Test"}]"

  • Verify the OpenSearch assortment to make sure logs are listed accurately.

Clear up

Take away the infrastructure for this resolution when not in use to keep away from incurring pointless prices.

Conclusion

You noticed the way to arrange a pipeline to ship CloudWatch logs to an OpenSearch Serverless assortment inside a VPC. This integration makes use of CloudWatch for log aggregation, Lambda for log processing, and OpenSearch Serverless for querying and visualization. You should utilize this resolution to reap the benefits of the pay-as-you-go pricing mannequin for OpenSearch Serverless to optimize operational prices for log evaluation.

To additional discover, you possibly can:


In regards to the Authors

Balaji Mohan is a senior modernization architect specializing in software and knowledge modernization to the cloud. His business-first method ensures seamless transitions, aligning know-how with organizational targets. Utilizing cloud-native architectures, he delivers scalable, agile, and cost-effective options, driving innovation and development.

Souvik Bose is a Software program Growth Engineer engaged on Amazon OpenSearch Service.

Muthu Pitchaimani is a Search Specialist with Amazon OpenSearch Service. He builds large-scale search purposes and options. Muthu is within the subjects of networking and safety, and relies out of Austin, Texas.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles