Back to Browse

How to stream response from HTTP Trigger Azure Function | #azure #azurefunctions #microsoft #python

1.4K views
Aug 24, 2025
15:57

In this video, I discussed about streaming response back from HTTP trigger type Azure Function using FastAPI extrension for azure functions. function_app.py file import azure.functions as func import logging import time from azurefunctions.extensions.http.fastapi import Request, StreamingResponse, JSONResponse app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS) def generate_sensor_data(): """Generate real-time sensor data.""" for i in range(10): # Simulate temperature and humidity readings temperature = 20 + i humidity = 50 + i yield f"data: {{'temperature': {temperature}, 'humidity': {humidity}}}\n\n" time.sleep(1) @app.route(route="docStationAzFunCall", methods=[func.HttpMethod.POST, func.HttpMethod.GET]) async def stream_sensor_data(req: Request): """Endpoint to stream real-time sensor data. Validates that the request contains 'input_text' (JSON body or query param). If missing, returns failure with message 'send input_text'. """ input_text = None # Try JSON body first try: body = await req.json() except Exception: body = None if isinstance(body, dict): input_text = body.get("input_text") # Fallback to query params (GET) if not input_text: try: input_text = req.query_params.get("input_text") if hasattr(req, "query_params") else None except Exception: input_text = None # If still missing or empty, return failure response if not input_text: return JSONResponse({"success": False, "message": "send input_text"}, status_code=400) logging.info("Received input_text: %s", input_text) # Optionally include the received input_text as the first event in the stream def gen(): yield f"data: {{'received_input': '{input_text}'}}\n\n" for i in range(10): temperature = 20 + i humidity = 50 + i yield f"data: {{'temperature': {temperature}, 'humidity': {humidity}}}\n\n" time.sleep(1) return StreamingResponse(gen(), media_type="text/event-stream") Client side call for same: import httpx import asyncio async def consume_stream(): url = "http://localhost:7071/api/docStationAzFunCall?input_text=product=123;start=2025-01-01;end=2025-01-31" # Update if hosted elsewhere async with httpx.AsyncClient(timeout=None) as client: async with client.stream("GET", url) as response: async for line in response.aiter_lines(): if line: print(f"Received: {line}") if __name__ == "__main__": asyncio.run(consume_stream()) #azure #azurefunctions #microsoft #programming #azfun #learning #development

Download

1 formats

Video Formats

360pmp426.0 MB

Right-click 'Download' and select 'Save Link As' if the file opens in a new tab.

How to stream response from HTTP Trigger Azure Function | #azure #azurefunctions #microsoft #python | NatokHD