Real-time data anomaly detection and alerting

Real-time data anomaly detection and alerting

A practical example of creating a pipeline for real-time logs data anomaly detection and alerting using GlassFlow, OpenAI, and Slack.

Would you like to read it later?
The tutorial will be delivered straight to your inbox.

This tutorial explores how you can leverage GlassFlow and Large Language Models (LLMs) from Open AI to create a real-time log data anomaly detection pipeline and send immediate notifications to Slack.

Skip the intro and scroll to Step 1 of the tutorial

What is anomaly detection?

Anomaly detection is a technique to identify unexpected patterns or behaviors in data. Anomalies in log data indicate security threats, audit issues, application failures, or other unusual and suspicious activities. Detecting these anomalies in real-time allows for quick action, minimizing downtime and preventing potential damage. Our pipeline processes server logs that record various user activities, such as logging in, accessing files, executing commands, and accessing directories. These logs include the timestamp, IP address, username, and the specific action performed.

Here is an example of the kind of logs with anomaly:

[01/Feb/2024 14:46:21] 220.129.215.153 veronica08 accessed file 'concern.html'

At first glance, this might seem like a regular log entry. However, upon closer inspection, you realize that "veronica08" has been accessing several sensitive files. It would help if you had a real-time way to detect and alert you about such activities.

What is GlassFlow?

GlassFlow enables Python developers to create data streaming pipelines for real-time use cases within minutes.

Why GlassFlow is useful?

GlassFlow excels in the real-time transformation of events so that applications can immediately react to new information. GlassFlow offers a zero infrastructure environment where you can develop pipelines without a complex initial setup. You can integrate with various data sources and sinks using managed connectors or implementing custom connectors using GlassFlow SDK for Python. You write code to implement data transformation logic in Python and deploy it in your pipeline with simple clicks using GlassFlow WebApp. With auto-scalable serverless infrastructure, you can easily deal with billions of log records in real-time.

Pipeline components

Our real-time log data anomaly detection pipeline consists of the following key components:

  • Data generator: We'll use the data generator to simulate server logs data in Python. This allows us to create realistic log entries for demonstration purposes without relying on actual server logs.

  • Data Source: Custom connector to ingest server log data from data generator. You can also update the Data Source code to collect logs from application servers, database servers, network devices, and more.

  • Data Transformation: AI-powered transformation using GlassFlow to detect anomalies.

  • Data Sink: Custom connector to send notifications to Slack.

Tools we use

  • GlassFlow WebApp: To create a pipeline in a low-code environment.

  • OpenAI: To use OpenAI API models such as gpt-3.5-turbo or GPT-4o. We request Chat Completion API for each log entry event in the transformation function and classify logs for data anomalies.

  • Slack: To send real-time alerts and notifications.

  • GlassFlow Python SDK: To build custom connectors for the data source and sink.

Setting up the Pipeline with GlassFlow WebApp in 3 minutes

Prerequisites

To start with the tutorial you need a free GlassFlow account.

Sign up for a free

Step 1. Log in to GlassFlow WebApp

Navigate to the GlassFlow WebApp and log in with your credentials.

Step 2. Create a New Pipeline

Click on "Create New Pipeline" and provide a name. You can name it "Log Anomaly Detection".

Create a new pipeline on GlassFlow

A pipeline will be created in the default main Space.

Step 3. Configure a Data Source

Select "SDK" to configure the pipeline to use Python SDK for ingesting logs. You will learn how to send data to the pipeline in the upcoming section.

GlassFlow pipeline data source selection

Step 4. Define the transformer

Choose the "AI anomaly detection" transformer template from the Template dropdown menu.

GlassFlow pipeline implement transformer function

We use OpenAI's GPT-3.5-turbo model to provide insights into the log data event and flag any unusual or suspicious activities.

💡
By default, the transformer function uses a free OpenAI API key provided by GlassFlow.

You can replace it with your API key too. To do so:

  1. Have an OpenAI API account.

  2. Create an API key.

  3. Set the API key in the transformation code: Add the following code by editing the transformation function code just write after the import statements:

     import openai 
     import json
    
     openai.api_key="{REPLACE_WITH_YOUR_OPENAI_API_KEY}"
    
     ....
    
💡
You can also import other Python dependencies (packages) in the transformation function. Read more about Python dependencies.

Step 5. Configure a Data Sink

Select "SDK" to configure the pipeline to use Python SDK for sending notifications to Slack.

GlassFlow pipeline data sink selection

Step 6. Confirm the pipeline

Confirm the pipeline settings in the final step and click "Create Pipeline".

Step 7. Copy the pipeline credentials

Once the pipeline is created, copy its credentials such as Pipeline ID and Access Token.

GlassFlow pipeline copy credentials

Important: After creating the pipeline, the transformation function is deployed and running on GlassFlow’s Serverless Execution Enginein the cloud. You do not need to configure or maintain any infrastructure on your side.

Sending data to the Pipeline

Prerequisites

To continue with the rest tutorial, make sure that you have the following:

  • Python is installed on your machine.

  • Pip is installed to manage project packages.

  • Slack account: If don't have a Slack account, sign up for a new free one here and go to the Slack Get Started page.

  • Slack workspace: You need access to a Slack workspace where you're an admin. If you are creating just a new workspace, follow this guide.

  • You created an incoming webhook for your Slack workspace.

Create an environment configuration file

Add a .env file in your project directory and add the following configuration variables:

SLACK_WEBHOOK_URL=your_slack_workspace_webhook_url
PIPELINE_ID=your_pipeline_id
PIPELINE_ACCESS_TOKEN=your_pipeline_access_token

Replace your_pipeline_id ,your_pipeline_access_token and your_slack_workspace_webhook_url with appropriate values obtained from your GlassFlow pipeline and Slack workspace.

Install GlassFlow SDK

Install the GlassFlow Python SDK and other required libraries using pip.

Optional: Create a virtual environment before installing Python dependencies. Run the following command: python -m venv .venv && source .venv/bin/activate

pip install glassflow python-dotenv faker schedule

Create a data source connector

We create a custom data source connector to publish every log event to the GlassFlow pipeline. Here is the source_connector.py Python script.

import os
import schedule
import time
from dotenv import load_dotenv
from data_generator import DataGenerator
import glassflow


class SourceConnectorLogs:
    def __init__(self):
        load_dotenv()
        self.pipeline_id = os.getenv("PIPELINE_ID")
        self.pipeline_access_token = os.getenv("PIPELINE_ACCESS_TOKEN")

        self.data_generator = DataGenerator()
        # Initiate GlassFlow pipeline client
        self.glassflow_client = (
            glassflow.GlassFlowClient().pipeline_client(
                pipeline_id=self.pipeline_id,
                pipeline_access_token=self.pipeline_access_token,
            )
        )

    def send_log_to_glassflow(self):
        log_data = self.data_generator.generate_log()
        # Send log data to the pipeline continously
        response = self.glassflow_client.publish(request_body=log_data)

        if response.status_code == 200:
            print("Log sent to GlassFlow:", log_data)
        else:
            print(f"Failed to send log to GlassFlow: {response.text}")

    def run(self):
        schedule.every(1 / 5).seconds.do(self.send_log_to_glassflow)
        try:
            while True:
                schedule.run_pending()
                time.sleep(1)
        except KeyboardInterrupt:
            print("Exiting...")


if __name__ == "__main__":
    SourceConnectorLogs().run()

This sample data_generator.py script generates fake server logs:

from faker import Faker
import random


class DataGenerator:
    def __init__(self):
        # Define a list of actions/events
        self.actions = [
            "logged in",
            "accessed file",
            "executed command",
            "accessed directory",
        ]
        self.fake = Faker()

    def generate_log(self):
        log = ""
        timestamp = self.fake.date_time_this_year().strftime(
            "[%d/%b/%Y %H:%M:%S]"
        )
        ip_address = self.fake.ipv4()
        user = self.fake.user_name()
        action = random.choice(self.actions)
        if action == "logged in":
            log = f"{timestamp} {ip_address} {user} {action} successfully"
        elif action == "accessed file":
            file_name = self.fake.file_name()
            log = f"{timestamp} {ip_address} {user} {action} '{file_name}'"
        elif action == "executed command":
            command = self.fake.word()
            log = f"{timestamp} {ip_address} {user} executed command '{command}'"
        elif action == "accessed directory":
            directory = self.fake.file_path(depth=random.randint(1, 3))
            log = f"{timestamp} {ip_address} {user} accessed directory '{directory}'"
        return log


if __name__ == "__main__":
    data_gen = DataGenerator()
    print(data_gen.generate_log())

Consuming data from the Pipeline

After detecting anomalies, we notify the relevant stakeholders promptly. In our pipeline, we use Slack to send these notifications.

Next, we need to set up the data sink to send notifications to Slack. Below is the Python code sink_connector.py to create a custom data sink connector:

import os
import requests
import json
import time
import random
import glassflow
from dotenv import load_dotenv


class SinkConnectorSlack:
    def __init__(self):
        load_dotenv()
        self.slack_webhook_url = os.getenv(
            "SLACK_WEBHOOK_URL"
        )
        self.pipeline_id = os.getenv("PIPELINE_ID")
        self.pipeline_access_token = os.getenv(
            "PIPELINE_ACCESS_TOKEN"
        )
        self.pipeline_client = glassflow.GlassFlowClient().pipeline_client(
            pipeline_id=self.pipeline_id,
            pipeline_access_token=self.pipeline_access_token,
        )

    def send_to_slack(self, data):
        headers = {"Content-Type": "application/json"}
        payload = {"text": json.dumps(data)}
        response = requests.post(
            self.slack_webhook_url,
            headers=headers,
            json=payload,
        )
        if response.status_code != 200:
            print(
                "Failed to send message to Slack:",
                response.text,
            )

    def run(self):
        retry_delay = 10
        try:
            while True:
                # Consume transformed event from the pipeline
                res = self.pipeline_client.consume()

                if res.status_code == 204:
                    time.sleep(retry_delay)
                    retry_delay = min(
                        retry_delay * 2
                        + random.uniform(0, 1),
                        60,
                    )  # Cap delay to 60 seconds
                    continue
                if res.status_code == 200:
                    record = res.json()
                    print(record)
                    self.send_to_slack(record)

                    print(
                        "Consumed transformed event from Glassflow"
                        "and sent to Slack"
                    )
                    # Reset the delay after successful processing
                    retry_delay = 10
        except KeyboardInterrupt:
            print("Exiting...")


if __name__ == "__main__":
    SinkConnectorSlack().run()

Test the pipeline

You'll run the source and sink connector scripts to test the pipeline.

Run the Source Connector

Run first source_connector.py Python script in a terminal to publish server log data to the GlassFlow pipeline:

python source_connector.py

Run the Sink Connector

Run the sink_connector.py Python script in a separate terminal window to see the output side-by-side:

python sink_connector.py

This script will continuously consume new events from the GlassFlow pipeline. Upon receiving transformed events, it will send notifications to Slack. You should see an output indicating that messages are being received on Slack.

Slack output with detected anomalies

Additional Information

GlassFlow CLI also can be used for pipeline creation. You can run the entire setup using Docker for easy deployment. For more details on setting up the pipeline using the CLI, visit our GitHub repository.

Conclusion

Following this tutorial, you’ve set up a real-time log data anomaly detection pipeline using GlassFlow, Open AI, and Slack. Enriched logs, containing identified anomalies, can also be sent to Amazon S3 or OpenSearch Service for further analysis and long-term storage. Additionally, alert notifications can be integrated with communication platforms such as Microsoft Teams or SMS services like Twilio.

This pipeline can be easily adapted for other real-time alerting use cases. That includes monitoring financial transactions for fraud, detecting security breaches, tracking performance metrics, and ensuring compliance with regulatory requirements.

Start leveraging the power of GlassFlow and AI today to build robust and scalable pipelines! Explore other use cases