Real-time data anomaly detection and alerting
A practical example of creating a pipeline for real-time logs data anomaly detection and alerting using GlassFlow, OpenAI, and Slack.
Would you like to read it later?
The tutorial will be delivered straight to your inbox.
This tutorial explores how you can leverage GlassFlow and Large Language Models (LLMs) from Open AI to create a real-time log data anomaly detection pipeline and send immediate notifications to Slack.
Skip the intro and scroll to Step 1 of the tutorial
What is anomaly detection?
Anomaly detection is a technique to identify unexpected patterns or behaviors in data. Anomalies in log data indicate security threats, audit issues, application failures, or other unusual and suspicious activities. Detecting these anomalies in real-time allows for quick action, minimizing downtime and preventing potential damage. Our pipeline processes server logs that record various user activities, such as logging in, accessing files, executing commands, and accessing directories. These logs include the timestamp, IP address, username, and the specific action performed.
Here is an example of the kind of logs with anomaly:
[01/Feb/2024 14:46:21] 220.129.215.153 veronica08 accessed file 'concern.html'
At first glance, this might seem like a regular log entry. However, upon closer inspection, you realize that "veronica08" has been accessing several sensitive files. It would help if you had a real-time way to detect and alert you about such activities.
What is GlassFlow?
GlassFlow enables Python developers to create data streaming pipelines for real-time use cases within minutes.
Why GlassFlow is useful?
GlassFlow excels in the real-time transformation of events so that applications can immediately react to new information. GlassFlow offers a zero infrastructure environment where you can develop pipelines without a complex initial setup. You can integrate with various data sources and sinks using managed connectors or implementing custom connectors using GlassFlow SDK for Python. You write code to implement data transformation logic in Python and deploy it in your pipeline with simple clicks using GlassFlow WebApp. With auto-scalable serverless infrastructure, you can easily deal with billions of log records in real-time.
Pipeline components
Our real-time log data anomaly detection pipeline consists of the following key components:
Data generator: We'll use the data generator to simulate server logs data in Python. This allows us to create realistic log entries for demonstration purposes without relying on actual server logs.
Data Source: Custom connector to ingest server log data from data generator. You can also update the Data Source code to collect logs from application servers, database servers, network devices, and more.
Data Transformation: AI-powered transformation using GlassFlow to detect anomalies.
Data Sink: Custom connector to send notifications to Slack.
Tools we use
GlassFlow WebApp: To create a pipeline in a low-code environment.
OpenAI: To use OpenAI API models such as
gpt-3.5-turbo
orGPT-4o
. We request Chat Completion API for each log entry event in the transformation function and classify logs for data anomalies.Slack: To send real-time alerts and notifications.
GlassFlow Python SDK: To build custom connectors for the data source and sink.
Setting up the Pipeline with GlassFlow WebApp in 3 minutes
Prerequisites
To start with the tutorial you need a free GlassFlow account.
Step 1. Log in to GlassFlow WebApp
Navigate to the GlassFlow WebApp and log in with your credentials.
Step 2. Create a New Pipeline
Click on "Create New Pipeline" and provide a name. You can name it "Log Anomaly Detection".
main
Space.Step 3. Configure a Data Source
Select "SDK" to configure the pipeline to use Python SDK for ingesting logs. You will learn how to send data to the pipeline in the upcoming section.
Step 4. Define the transformer
Choose the "AI anomaly detection" transformer template from the Template dropdown menu.
We use OpenAI's GPT-3.5-turbo model to provide insights into the log data event and flag any unusual or suspicious activities.
You can replace it with your API key too. To do so:
Have an OpenAI API account.
Create an API key.
Set the API key in the transformation code: Add the following code by editing the transformation function code just write after the import statements:
import openai import json openai.api_key="{REPLACE_WITH_YOUR_OPENAI_API_KEY}" ....
import
other Python dependencies (packages) in the transformation function. Read more about Python dependencies.Step 5. Configure a Data Sink
Select "SDK" to configure the pipeline to use Python SDK for sending notifications to Slack.
Step 6. Confirm the pipeline
Confirm the pipeline settings in the final step and click "Create Pipeline".
Step 7. Copy the pipeline credentials
Once the pipeline is created, copy its credentials such as Pipeline ID and Access Token.
Important: After creating the pipeline, the transformation function is deployed and running on GlassFlow’s Serverless Execution Enginein the cloud. You do not need to configure or maintain any infrastructure on your side.
Sending data to the Pipeline
Prerequisites
To continue with the rest tutorial, make sure that you have the following:
Python is installed on your machine.
Pip is installed to manage project packages.
Slack account: If don't have a Slack account, sign up for a new free one here and go to the Slack Get Started page.
Slack workspace: You need access to a Slack workspace where you're an admin. If you are creating just a new workspace, follow this guide.
You created an incoming webhook for your Slack workspace.
Create an environment configuration file
Add a .env
file in your project directory and add the following configuration variables:
SLACK_WEBHOOK_URL=your_slack_workspace_webhook_url
PIPELINE_ID=your_pipeline_id
PIPELINE_ACCESS_TOKEN=your_pipeline_access_token
Replace your_pipeline_id
,your_pipeline_access_token
and your_slack_workspace_webhook_url
with appropriate values obtained from your GlassFlow pipeline and Slack workspace.
Install GlassFlow SDK
Install the GlassFlow Python SDK and other required libraries using pip
.
Optional: Create a virtual environment before installing Python dependencies. Run the following command: python -m venv .venv && source .venv/bin/activate
pip install glassflow python-dotenv faker schedule
Create a data source connector
We create a custom data source connector to publish every log event to the GlassFlow pipeline. Here is the source_connector.py
Python script.
import os
import schedule
import time
from dotenv import load_dotenv
from data_generator import DataGenerator
import glassflow
class SourceConnectorLogs:
def __init__(self):
load_dotenv()
self.pipeline_id = os.getenv("PIPELINE_ID")
self.pipeline_access_token = os.getenv("PIPELINE_ACCESS_TOKEN")
self.data_generator = DataGenerator()
# Initiate GlassFlow pipeline client
self.glassflow_client = (
glassflow.GlassFlowClient().pipeline_client(
pipeline_id=self.pipeline_id,
pipeline_access_token=self.pipeline_access_token,
)
)
def send_log_to_glassflow(self):
log_data = self.data_generator.generate_log()
# Send log data to the pipeline continously
response = self.glassflow_client.publish(request_body=log_data)
if response.status_code == 200:
print("Log sent to GlassFlow:", log_data)
else:
print(f"Failed to send log to GlassFlow: {response.text}")
def run(self):
schedule.every(1 / 5).seconds.do(self.send_log_to_glassflow)
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("Exiting...")
if __name__ == "__main__":
SourceConnectorLogs().run()
This sample data_generator.py
script generates fake server logs:
from faker import Faker
import random
class DataGenerator:
def __init__(self):
# Define a list of actions/events
self.actions = [
"logged in",
"accessed file",
"executed command",
"accessed directory",
]
self.fake = Faker()
def generate_log(self):
log = ""
timestamp = self.fake.date_time_this_year().strftime(
"[%d/%b/%Y %H:%M:%S]"
)
ip_address = self.fake.ipv4()
user = self.fake.user_name()
action = random.choice(self.actions)
if action == "logged in":
log = f"{timestamp} {ip_address} {user} {action} successfully"
elif action == "accessed file":
file_name = self.fake.file_name()
log = f"{timestamp} {ip_address} {user} {action} '{file_name}'"
elif action == "executed command":
command = self.fake.word()
log = f"{timestamp} {ip_address} {user} executed command '{command}'"
elif action == "accessed directory":
directory = self.fake.file_path(depth=random.randint(1, 3))
log = f"{timestamp} {ip_address} {user} accessed directory '{directory}'"
return log
if __name__ == "__main__":
data_gen = DataGenerator()
print(data_gen.generate_log())
Consuming data from the Pipeline
After detecting anomalies, we notify the relevant stakeholders promptly. In our pipeline, we use Slack to send these notifications.
Next, we need to set up the data sink to send notifications to Slack. Below is the Python code sink_connector.py
to create a custom data sink connector:
import os
import requests
import json
import time
import random
import glassflow
from dotenv import load_dotenv
class SinkConnectorSlack:
def __init__(self):
load_dotenv()
self.slack_webhook_url = os.getenv(
"SLACK_WEBHOOK_URL"
)
self.pipeline_id = os.getenv("PIPELINE_ID")
self.pipeline_access_token = os.getenv(
"PIPELINE_ACCESS_TOKEN"
)
self.pipeline_client = glassflow.GlassFlowClient().pipeline_client(
pipeline_id=self.pipeline_id,
pipeline_access_token=self.pipeline_access_token,
)
def send_to_slack(self, data):
headers = {"Content-Type": "application/json"}
payload = {"text": json.dumps(data)}
response = requests.post(
self.slack_webhook_url,
headers=headers,
json=payload,
)
if response.status_code != 200:
print(
"Failed to send message to Slack:",
response.text,
)
def run(self):
retry_delay = 10
try:
while True:
# Consume transformed event from the pipeline
res = self.pipeline_client.consume()
if res.status_code == 204:
time.sleep(retry_delay)
retry_delay = min(
retry_delay * 2
+ random.uniform(0, 1),
60,
) # Cap delay to 60 seconds
continue
if res.status_code == 200:
record = res.json()
print(record)
self.send_to_slack(record)
print(
"Consumed transformed event from Glassflow"
"and sent to Slack"
)
# Reset the delay after successful processing
retry_delay = 10
except KeyboardInterrupt:
print("Exiting...")
if __name__ == "__main__":
SinkConnectorSlack().run()
Test the pipeline
You'll run the source and sink connector scripts to test the pipeline.
Run the Source Connector
Run first source_connector.py
Python script in a terminal to publish server log data to the GlassFlow pipeline:
python source_connector.py
Run the Sink Connector
Run the sink_connector.py
Python script in a separate terminal window to see the output side-by-side:
python sink_connector.py
This script will continuously consume new events from the GlassFlow pipeline. Upon receiving transformed events, it will send notifications to Slack. You should see an output indicating that messages are being received on Slack.
Additional Information
GlassFlow CLI also can be used for pipeline creation. You can run the entire setup using Docker for easy deployment. For more details on setting up the pipeline using the CLI, visit our GitHub repository.
Conclusion
Following this tutorial, you’ve set up a real-time log data anomaly detection pipeline using GlassFlow, Open AI, and Slack. Enriched logs, containing identified anomalies, can also be sent to Amazon S3 or OpenSearch Service for further analysis and long-term storage. Additionally, alert notifications can be integrated with communication platforms such as Microsoft Teams or SMS services like Twilio.
This pipeline can be easily adapted for other real-time alerting use cases. That includes monitoring financial transactions for fraud, detecting security breaches, tracking performance metrics, and ensuring compliance with regulatory requirements.
Start leveraging the power of GlassFlow and AI today to build robust and scalable pipelines! Explore other use cases