Building NewsInsight: AWS Architecture - Part C

Posted by Vineet Kumar Loyer on Thursday, January 1, 2026

Part 3: AWS Automation Layer

Overview

The NewsInsight automation layer handles the nightly ingestion, processing, and cleanup of news articles. This serverless architecture ensures fresh content is always available while keeping storage costs minimal.


3.1 Architecture Overview

AWS Architecture

Component Responsibilities

Component Purpose Trigger
EventBridge Cron scheduler Daily at 2 AM UTC
Lambda Job orchestrator EventBridge rule
Glue ETL Data processing Lambda invocation
Bedrock AI analysis Glue job calls
DynamoDB Article storage Glue job writes
S3 Full content + logs Glue job writes

3.2 EventBridge Nightly Schedule

I didn’t want to run a server 24/7 just to trigger a job once a day. That’s a waste of money. AWS EventBridge Scheduler is perfect for this—it’s a serverless cron job.

The 2 AM Decision I set the schedule for 2 AM UTC.

Schedule Configuration

# Cron expression: 2 AM UTC daily
schedule_expression = "cron(0 2 * * ? *)"

Why 2 AM?

  • Traffic: It’s a “quiet” time for global internet traffic.
  • Completeness: The previous day’s news cycle is fully wrapped up.
  • Readiness: By the time my users in Europe and the Americas wake up, the data is processed and cached.

EventBridge Rule Setup

def create_schedule(self, job_name):
    """Create EventBridge schedule for nightly execution"""
    
    rule_name = "NewsInsight-NightlySchedule"
    
    rule_response = self.events.put_rule(
        Name=rule_name,
        ScheduleExpression="cron(0 2 * * ? *)",  # 2 AM UTC daily
        Description="Trigger NewsInsight nightly ETL job",
        State="ENABLED"
    )
    
    print(f"✅ Created EventBridge rule '{rule_name}' - runs daily at 2 AM UTC")
    return rule_name

Cron Expression Breakdown

cron(0 2 * * ? *)
     │ │ │ │ │ │
     │ │ │ │ │ └── Year (any)
     │ │ │ │ └──── Day of week (any - using ?)
     │ │ │ └────── Month (any)
     │ │ └──────── Day of month (any)
     │ └────────── Hour (2 = 2 AM)
     └──────────── Minute (0)

3.3 Lambda Trigger Function

Glue job can be triggered by EventBridge, but it’s not the only way. Using Lambda as orchestrator seems a much better option.

This layer also gives me a place to add Slack/Email notifications later if I want to be texted when a job fails.

Why Lambda Instead of Direct EventBridge-to-Glue?

  1. Error Handling: Lambda provides custom error handling and logging
  2. Flexibility: Can add pre-checks (e.g., skip if job already running)
  3. Notifications: Can integrate SNS for failure alerts
  4. Metrics: Custom CloudWatch metrics for monitoring
  5. Future Expansion: Easy to add conditional logic

Purpose

The Lambda function serves as a lightweight orchestrator that:

  1. Receives EventBridge trigger
  2. Starts the Glue job
  3. Returns job run ID for tracking
  4. Handles errors gracefully

Implementation

import json
import boto3

def lambda_handler(event, context):
    """
    Lambda function to trigger NewsInsight Glue ETL job
    Called by EventBridge on schedule
    """
    
    glue_client = boto3.client('glue')
    job_name = 'NewsInsight-NightlyETL'
    
    try:
        # Start the Glue job
        response = glue_client.start_job_run(JobName=job_name)
        job_run_id = response['JobRunId']
        
        print(f"Successfully started Glue job: {job_name}")
        print(f"Job Run ID: {job_run_id}")
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': f'Successfully triggered Glue job: {job_name}',
                'jobRunId': job_run_id
            })
        }
        
    except Exception as e:
        print(f"Error starting Glue job: {str(e)}")
        
        return {
            'statusCode': 500,
            'body': json.dumps({
                'error': f'Failed to trigger Glue job: {str(e)}'
            })
        }

3.4 Glue Job Configuration

AWS Glue is a serverless data integration service. I chose it over a standard Lambda function for the actual processing because Lambda has a 15-minute timeout.

My ingestion process involves calling external APIs and waiting for Bedrock (AI) to generate summaries. That takes time. Glue can run for hours if needed.

Job Setup Script

class GlueJobSetup:
    def create_glue_job(self, role_arn, script_location):
        """Create the Glue ETL job"""
        
        job_name = "NewsInsight-NightlyETL"
        
        job_config = {
            "Name": job_name,
            "Role": role_arn,
            "Command": {
                "Name": "pythonshell",
                "ScriptLocation": script_location,
                "PythonVersion": "3.9"
            },
            "DefaultArguments": {
                "--job-language": "python",
                "--enable-metrics": "",
                "--enable-continuous-cloudwatch-log": "",
                "--AWS_REGION": self.aws_region,
                "--DDB_TABLE": "news_metadata",
                "--PROC_BUCKET": f"newsinsights-processed-{self.account_id}-{self.aws_region}",
                "--RAW_BUCKET": f"newsinsights-raw-{self.account_id}-{self.aws_region}",
                "--BEDROCK_MODEL_ID": os.getenv("BEDROCK_MODEL_ID", ""),
                "--NEWSAPI_KEY": os.getenv("NEWSAPI_KEY", ""),
                "--GUARDIAN_KEY": os.getenv("GUARDIAN_KEY", "")
            },
            "MaxRetries": 1,
            "Timeout": 60,  # 1 hour timeout
            "MaxCapacity": 0.0625,  # Minimum for pythonshell
            "Description": "Nightly ETL job for NewsInsight article ingestion"
        }

The 0.0625 Decision: I explicitly set MaxCapacity to 0.0625 DPUs (Data Processing Units). This is the smallest/cheapest slice of computing power AWS offers for Glue.

Why? I’m processing text, not crunching petabytes of financial data. I don’t need a Ferrari; I need a bicycle. This simple setting saved me roughly 90% on my potential Glue costs.

IAM Role Configuration

I created a specific IAM Role (NewsInsight-GlueServiceRole) that acts as the ID card for this job. It has permission to do exactly what it needs and nothing else.

✅ Can Read/Write to my specific S3 Bucket. ✅ Can Put Items into my DynamoDB table.✅ Can Invoke my Bedrock Model. ❌ Cannot delete tables, access other buckets, or spin up EC2 instances.

def create_glue_service_role(self):
    """Create IAM role for Glue job execution"""
    
    role_name = "NewsInsight-GlueServiceRole"
    
    # Trust policy - allows Glue to assume this role
    trust_policy = {
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Principal": {"Service": "glue.amazonaws.com"},
            "Action": "sts:AssumeRole"
        }]
    }
    
    # Permissions policy
    permissions_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "dynamodb:PutItem",
                    "dynamodb:GetItem",
                    "dynamodb:UpdateItem",
                    "dynamodb:Scan",
                    "dynamodb:Query"
                ],
                "Resource": f"arn:aws:dynamodb:{region}:{account}:table/news_metadata"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:DeleteObject",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::newsinsights-*",
                    "arn:aws:s3:::newsinsights-*/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": ["bedrock:InvokeModel"],
                "Resource": f"arn:aws:bedrock:{region}:{account}:*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents"
                ],
                "Resource": f"arn:aws:logs:{region}:{account}:*"
            }
        ]
    }

3.5 Glue ETL Job Implementation

Once the Glue job wakes up, it follows a strict routine defined in NewsInsightETL. I didn’t want this to be a complex web of dependencies. I kept it linear and predictable.

The job does three things:

  • Fetch & Analyze: Loop through my list of 15 trending topics (Technology, Politics, AI, etc.).

  • Clean Up: Delete old news (the “Janitor” phase).

  • Report: Save a summary JSON so I know what happened.

I chose these 15 categories because they represent about 90% of overall article categories. It results in processing roughly 300 articles per night, which is a sweet spot between comprehensive coverage and low cost.

Job Structure

class NewsInsightETL:
    def __init__(self):
        self.processed_count = 0
        self.stored_count = 0
        self.cleaned_count = 0
        self.errors = []
    
    def run_nightly_etl(self):
        """Main ETL process"""
        start_time = datetime.utcnow()
        
        # Step 1: Process trending categories
        for category in TRENDING_CATEGORIES:
            processed, stored = self.process_category(category)
        
        # Step 2: Cleanup old articles
        self.cleanup_old_articles(max_age_days=2)
        
        # Step 3: Generate and store summary
        self.save_job_summary()
TRENDING_CATEGORIES = [
    "technology", "politics", "business", "markets", "science", 
    "health", "climate", "AI", "economy", "innovation",
    "cryptocurrency", "stocks", "energy", "healthcare", "finance"
]

3.6 AI Processing with Bedrock

This is the most exciting part of the code. In the past, if I wanted to know the “sentiment” of an article, I’d have to use a rigid library like NLTK or train a custom model.

Now, I just ask Claude.

I created a function analyze_with_bedrock that sends the article text to the Claude 3 Haiku model. I chose Haiku because it is blazing fast and a fraction of the cost of Opus or Sonnet, yet perfectly capable of summarization.

Sentiment & Entity Analysis

def analyze_with_bedrock(self, text: str) -> Dict[str, Any]:
    """Analyze article with AWS Bedrock"""
    
    prompt = f"""
    Analyze this article and return JSON with sentiment analysis:
    {{
        "overall_sentiment": "positive|negative|neutral",
        "emotions": {{
            "anger": "high|medium|low|none",
            "anticipation": "high|medium|low|none",
            "joy": "high|medium|low|none",
            "trust": "high|medium|low|none",
            "fear": "high|medium|low|none",
            "sadness": "high|medium|low|none"
        }},
        "entities": [
            {{"type": "person|organization|location|technology", "text": "entity_name"}}
        ],
        "summary": "3-sentence summary of key points"
    }}
    
    Article: {text[:2000]}
    """
    
    body = {
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 800,
        "messages": [{
            "role": "user",
            "content": [{"type": "text", "text": prompt}]
        }]
    }
    
    response = bedrock.invoke_model(
        modelId=BEDROCK_MODEL_ID,
        body=json.dumps(body)
    )

AI Output Schema

The trick wasn’t calling the API; it was getting the API to return clean data. LLMs love to chat. I didn’t want a chat; I wanted JSON.

prompt = f"""
Analyze this article and return JSON. DO NOT output conversational text.
{
    "overall_sentiment": "positive|negative|neutral",
    "emotions": {
        "anger": "high|medium|low|none",
        "joy": "high|medium|low|none",
        ...
    },
    "entities": [
        {"type": "organization", "text": "OpenAI"}
    ],
    "summary": "3-sentence summary of key points"
}

Article: {text[:2000]}
"""

Fallback Handling

Even with a perfect prompt, LLMs sometimes fail or return broken JSON. I wrapped the analysis in a robust error handler.

If Bedrock fails or returns garbage, I don’t crash the pipeline. I return a “safe default” object with neutral sentiment. This ensures the article still gets indexed, just without the fancy AI metadata.

try:
    analysis = json.loads(response_text)
except json.JSONDecodeError:
    # If the AI hallucinates bad JSON, fall back to basics
    return { "sentiment": "neutral", "summary": text[:200] + "..." }

3.7 Data Cleanup Strategy

News rots fast. An article about a stock market dip from 3 days ago is worse than useless—it’s misleading.

I implemented a strict 2-day retention policy in cleanup_old_articles.

Cleanup Logic

def cleanup_old_articles(self, max_age_days=2):
    # Calculate the cutoff date
    cutoff_date = datetime.utcnow() - timedelta(days=max_age_days)
    
    # Scan and destroy
    for item in table.scan()['Items']:
        if item['date'] < cutoff_str:
            table.delete_item(Key={"id": item["id"]})
            # Also delete the full text from S3 to save space
            s3.delete_object(...)

Why 2-Day Retention?

Factor Consideration
News Freshness Articles >2 days old are stale
Storage Costs Limits DynamoDB and S3 growth
Search Relevance Users want current news
ETL Efficiency Smaller dataset = faster scans
Cache Alignment Matches 24-hour cache TTL

3.8 Error Handling & Logging

In a distributed system, things fail. The NewsAPI might rate-limit me on the “Crypto” category, or a specific article might be malformed.

I designed the main loop to be fault-tolerant. If one category fails, the script logs the error and moves to the next one. It doesn’t throw a tantrum and quit.

Structured Logging

def log_info(self, message: str):
    """Log info message with timestamp"""
    timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] INFO: {message}")
    
def log_error(self, message: str, error: Exception = None):
    """Log error message"""
    timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
    error_msg = f"[{timestamp}] ERROR: {message}"
    if error:
        error_msg += f" - {str(error)}"
    print(error_msg)
    self.errors.append(error_msg)

Error Collection & Reporting

At the end of the run, I save a summary.json file to S3. This is my dashboard. I can look at it and see: “Okay, the job took 45 minutes, processed 287 articles, but failed on ‘Health’ due to a timeout.”

def run_nightly_etl(self):
    """Main ETL process with error tracking"""
    
    try:
        for category in TRENDING_CATEGORIES:
            try:
                processed, stored = self.process_category(category)
            except Exception as e:
                # Log but continue with other categories
                self.log_error(f"Category '{category}' processing failed", e)
        
        # Generate summary including errors
        summary = {
            "job_start": start_time.isoformat(),
            "job_end": end_time.isoformat(),
            "duration_seconds": duration,
            "articles_processed": total_processed,
            "articles_stored": total_stored,
            "articles_cleaned": self.cleaned_count,
            "errors": len(self.errors),
            "error_messages": self.errors[:10]  # First 10 errors
        }
        
        # Store summary for monitoring
        s3.put_object(
            Bucket=PROC_BUCKET,
            Key=f"etl-logs/{start_time.strftime('%Y/%m/%d')}/summary.json",
            Body=json.dumps(summary, indent=2)
        )
        
    except Exception as e:
        self.log_error("ETL job failed", e)
        raise  # Re-raise to mark job as failed

Rate Limiting

def process_category(self, category: str):
    for article in all_articles:
        # Process article...
        processed += 1
        
        # Rate limiting to avoid API throttling
        if processed % 10 == 0:
            import time
            time.sleep(1)  # 1 second pause every 10 articles

3.9 Infrastructure Setup

I didn’t want to click around the AWS console creating tables manually. I scripted the infrastructure setup.

DynamoDB:

- news_metadata: The main table.

- content_blacklist: Where I store banned domains.

S3 Buckets:

- newsinsights-raw: A backup of the raw API responses.

- newsinsights-processed: The final JSON files.

I also added a Lifecycle Policy to the S3 buckets to automatically delete files after 30 days. This is a fail-safe to prevent my storage costs from creeping up over years.

S3 Lifecycle Policy

lifecycle_config = {
    'Rules': [{
        'ID': 'DeleteOldVersions',
        'Status': 'Enabled',
        'NoncurrentVersionExpiration': {'NoncurrentDays': 30}
    }]
}

3.10 Cost Optimization

Glue Job Costs

Resource Configuration Est. Cost/Month
Glue pythonshell 0.0625 DPU × 45 min × 30 days ~$1.50
Lambda 128MB × 1 sec × 30 invocations ~$0.01
EventBridge 30 rule invocations Free tier
Total Automation ~$1.51/month

Storage Costs

Resource Configuration Est. Cost/Month
DynamoDB ~300 articles × 2KB × PAY_PER_REQUEST ~$0.50
S3 ~300 articles × 5KB + logs ~$0.10
Total Storage ~$0.60/month

Check out the code: github.com/VineetLoyer/NewsInsight.ai