• About
  • Privacy Policy
  • Disclaimer
  • Contact
Soft Bliss Academy
No Result
View All Result
  • Home
  • Artificial Intelligence
  • Software Development
  • Machine Learning
  • Research & Academia
  • Startups
  • Home
  • Artificial Intelligence
  • Software Development
  • Machine Learning
  • Research & Academia
  • Startups
Soft Bliss Academy
No Result
View All Result
Home Artificial Intelligence

A Code Implementation of a Real‑Time In‑Memory Sensor Alert Pipeline in Google Colab with FastStream, RabbitMQ, TestRabbitBroker, Pydantic

softbliss by softbliss
April 22, 2025
in Artificial Intelligence
0
A Code Implementation of a Real‑Time In‑Memory Sensor Alert Pipeline in Google Colab with FastStream, RabbitMQ, TestRabbitBroker, Pydantic
0
SHARES
0
VIEWS
Share on FacebookShare on Twitter


In this notebook, we demonstrate how to build a fully in-memory “sensor alert” pipeline in Google Colab using FastStream, a high-performance, Python-native stream processing framework, and its integration with RabbitMQ. By leveraging faststream.rabbit’s RabbitBroker and TestRabbitBroker, we simulate a message broker without needing external infrastructure. We orchestrate four distinct stages: ingestion & validation, normalization, monitoring & alert generation, and archiving, each defined as Pydantic models (RawSensorData, NormalizedData, AlertData) to ensure data quality and type safety. Under the hood, Python’s asyncio powers asynchronous message flow, while nest_asyncio enables nested event loops in Colab. We also employ the standard logging module for traceable pipeline execution and pandas for final result inspection, making it easy to visualize archived alerts in a DataFrame.

!pip install -q faststream[rabbit] nest_asyncio

We install FastStream with its RabbitMQ integration, providing the core stream-processing framework and broker connectors, as well as the nest_asyncio package, which enables nested asyncio event loops in environments like Colab. All this is achieved while keeping the output minimal with the -q flag.

import nest_asyncio, asyncio, logging
nest_asyncio.apply()

We import the nest_asyncio, asyncio, and logging modules, then apply nest_asyncio.apply() to patch Python’s event loop so that you can run nested asynchronous tasks inside environments like Colab or Jupyter notebooks without errors. The logging import readies you to instrument your pipeline with detailed runtime logs.

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("sensor_pipeline")

We configure Python’s built‑in logging to emit INFO‑level (and above) messages prefixed with a timestamp and severity, then create a dedicated logger named “sensor_pipeline” for emitting structured logs within your streaming pipeline.

from faststream import FastStream
from faststream.rabbit import RabbitBroker, TestRabbitBroker
from pydantic import BaseModel, Field, validator
import pandas as pd
from typing import List

We bring in FastStream’s core FastStream class alongside its RabbitMQ connectors (RabbitBroker for real brokers and TestRabbitBroker for in‑memory testing), Pydantic’s BaseModel, Field, and validator for declarative data validation, pandas for tabular result inspection, and Python’s List type for annotating our in‑memory archives.

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app    = FastStream(broker)

We instantiate a RabbitBroker pointed at a (local) RabbitMQ server using the AMQP URL, then create a FastStream application bound to that broker, setting up the messaging backbone for your pipeline stages.

class RawSensorData(BaseModel):
    sensor_id: str       = Field(..., examples=["sensor_1"])
    reading_celsius: float = Field(..., ge=-50, le=150, examples=[23.5])
   
    @validator("sensor_id")
    def must_start_with_sensor(cls, v):
        if not v.startswith("sensor_"):
            raise ValueError("sensor_id must start with 'sensor_'")
        return v


class NormalizedData(BaseModel):
    sensor_id: str
    reading_kelvin: float


class AlertData(BaseModel):
    sensor_id: str
    reading_kelvin: float
    alert: bool

These Pydantic models define the schema for each stage: RawSensorData enforces input validity (e.g., reading range and a sensor_ prefix), NormalizedData converts Celsius to Kelvin, and AlertData encapsulates the final alert payload (including a boolean flag), ensuring a type-safe data flow throughout the pipeline.

archive: List[AlertData] = []


@broker.subscriber("sensor_input")
@broker.publisher("normalized_input")
async def ingest_and_validate(raw: RawSensorData) -> dict:
    logger.info(f"Ingested raw data: {raw.json()}")
    return raw.dict()


@broker.subscriber("normalized_input")
@broker.publisher("sensor_alert")
async def normalize(data: dict) -> dict:
    norm = NormalizedData(
        sensor_id=data["sensor_id"],
        reading_kelvin=data["reading_celsius"] + 273.15
    )
    logger.info(f"Normalized to Kelvin: {norm.json()}")
    return norm.dict()


ALERT_THRESHOLD_K = 323.15  
   
@broker.subscriber("sensor_alert")
@broker.publisher("archive_topic")
async def monitor(data: dict) -> dict:
    alert_flag = data["reading_kelvin"] > ALERT_THRESHOLD_K
    alert = AlertData(
        sensor_id=data["sensor_id"],
        reading_kelvin=data["reading_kelvin"],
        alert=alert_flag
    )
    logger.info(f"Monitor result: {alert.json()}")
    return alert.dict()


@broker.subscriber("archive_topic")
async def archive_data(payload: dict):
    rec = AlertData(**payload)
    archive.append(rec)
    logger.info(f"Archived: {rec.json()}")

An in-memory archive list collects all finalized alerts, while four asynchronous functions, wired via @broker.subscriber/@broker.publisher, form the pipeline stages. These functions ingest and validate raw sensor inputs, convert Celsius to Kelvin, check against an alert threshold, and finally archive each AlertData record, emitting logs at every step for full traceability.

async def main():
    readings = [
        {"sensor_id": "sensor_1", "reading_celsius": 45.2},
        {"sensor_id": "sensor_2", "reading_celsius": 75.1},
        {"sensor_id": "sensor_3", "reading_celsius": 50.0},
    ]
    async with TestRabbitBroker(broker) as tb:
        for r in readings:
            await tb.publish(r, "sensor_input")
        await asyncio.sleep(0.1)
       
    df = pd.DataFrame([a.dict() for a in archive])
    print("\nFinal Archived Alerts:")
    display(df)


asyncio.run(main())

Finally, the main coroutine publishes a set of sample sensor readings into the in-memory TestRabbitBroker, pauses briefly to allow each pipeline stage to run, and then collates the resulting AlertData records from the archive into a pandas DataFrame for easy display and verification of the end-to-end alert flow. At the end, asyncio.run(main()) kicks off the entire async demo in Colab.

In conclusion, this tutorial demonstrates how FastStream, combined with RabbitMQ abstractions and in-memory testing via TestRabbitBroker, can accelerate the development of real-time data pipelines without the overhead of deploying external brokers. With Pydantic handling schema validation, asyncio managing concurrency, and pandas enabling quick data analysis, this pattern provides a robust foundation for sensor monitoring, ETL tasks, or event‑driven workflows. You can seamlessly transition from this in‑memory demo to production by swapping in a live broker URL (RabbitMQ, Kafka, NATS, or Redis) and running faststream run under uvicorn or your preferred ASGI server, unlocking scalable, maintainable stream processing in any Python environment.


Here is the Colab Notebook. Also, don’t forget to follow us on Twitter and join our Telegram Channel and LinkedIn Group. Don’t Forget to join our 90k+ ML SubReddit.

🔥 [Register Now] miniCON Virtual Conference on AGENTIC AI: FREE REGISTRATION + Certificate of Attendance + 4 Hour Short Event (May 21, 9 am- 1 pm PST) + Hands on Workshop


Sana Hassan, a consulting intern at Marktechpost and dual-degree student at IIT Madras, is passionate about applying technology and AI to address real-world challenges. With a keen interest in solving practical problems, he brings a fresh perspective to the intersection of AI and real-life solutions.

Tags: AlertCodeColabFastStreamGoogleImplementationInMemoryPipelinePydanticRabbitMQRealTimeSensorTestRabbitBroker
Previous Post

Apple Machine Learning Research at ICLR 2025

Next Post

The Importance of Websites for Modern Businesses

softbliss

softbliss

Related Posts

3 Questions: How to help students recognize potential bias in their AI datasets | MIT News
Artificial Intelligence

3 Questions: How to help students recognize potential bias in their AI datasets | MIT News

by softbliss
June 7, 2025
A Comprehensive Coding Tutorial for Advanced SerpAPI Integration with Google Gemini-1.5-Flash for Advanced Analytics
Artificial Intelligence

A Comprehensive Coding Tutorial for Advanced SerpAPI Integration with Google Gemini-1.5-Flash for Advanced Analytics

by softbliss
June 7, 2025
Alibaba’s Qwen3 Model Outperforms OpenAI and DeepSeek
Artificial Intelligence

Alibaba’s Qwen3 Model Outperforms OpenAI and DeepSeek

by softbliss
June 6, 2025
The Evolution of AI Boyfriend Apps in NSFW Mode
Artificial Intelligence

The Evolution of AI Boyfriend Apps in NSFW Mode

by softbliss
June 6, 2025
Soham Mazumdar, Co-Founder & CEO of WisdomAI – Interview Series
Artificial Intelligence

Soham Mazumdar, Co-Founder & CEO of WisdomAI – Interview Series

by softbliss
June 6, 2025
Next Post
The Importance of Websites for Modern Businesses

The Importance of Websites for Modern Businesses

Premium Content

Can We Really Trust AI’s Chain-of-Thought Reasoning?

Can We Really Trust AI’s Chain-of-Thought Reasoning?

May 24, 2025
MobiKwik shelves BNPL play amid margin pressure and lender scarcity

MobiKwik shelves BNPL play amid margin pressure and lender scarcity

May 21, 2025
AI in Customer Service: Customer Support in 2025

AI in Customer Service: Customer Support in 2025

May 31, 2025

Browse by Category

  • Artificial Intelligence
  • Machine Learning
  • Research & Academia
  • Software Development
  • Startups

Browse by Tags

Amazon App Artificial Blog Build Building Business Coding Data Development Digital Framework Future Gemini Generative Google Guide Impact Innovation Intelligence Key Language Large Learning LLM LLMs Machine Microsoft MIT model Models News NVIDIA opinion OReilly Research Science Series Software Startup Startups students Tech Tools Video

Soft Bliss Academy

Welcome to SoftBliss Academy, your go-to source for the latest news, insights, and resources on Artificial Intelligence (AI), Software Development, Machine Learning, Startups, and Research & Academia. We are passionate about exploring the ever-evolving world of technology and providing valuable content for developers, AI enthusiasts, entrepreneurs, and anyone interested in the future of innovation.

Categories

  • Artificial Intelligence
  • Machine Learning
  • Research & Academia
  • Software Development
  • Startups

Recent Posts

  • Emails Shed Light on UNC’s Plans to Create a New Accreditor
  • 3 Questions: How to help students recognize potential bias in their AI datasets | MIT News
  • Introducing Veo and Imagen 3 generative AI tools

© 2025 https://softblissacademy.online/- All Rights Reserved

No Result
View All Result
  • Home
  • Artificial Intelligence
  • Software Development
  • Machine Learning
  • Research & Academia
  • Startups

© 2025 https://softblissacademy.online/- All Rights Reserved

Are you sure want to unlock this post?
Unlock left : 0
Are you sure want to cancel subscription?