
BeeAI FrameworkIn this tutorial, we explore the power and flexibility of the beeai-framework by building a fully functional multi-agent system from the ground up. We walk through the essential components, custom agents, tools, memory management, and event monitoring, to show how BeeAI simplifies the development of intelligent, cooperative agents. Along the way, we demonstrate how these agents can perform complex tasks, such as market research, code analysis, and strategic planning, using a modular, production-ready pattern.
import sys
import asyncio
import json
from typing import Dict, List, Any, Optional
from datetime import datetime
import os
def install_packages():
packages = [
“beeai-framework”,
“requests”,
“beautifulsoup4”,
“numpy”,
“pandas”,
“pydantic”
]
print(“Installing required packages…”)
for package in packages:
try:
subprocess.check_call([sys.executable, “-m”, “pip”, “install”, package])
print(f”✅ {package} installed successfully”)
except subprocess.CalledProcessError as e:
print(f”❌ Failed to install {package}: {e}”)
print(“Installation complete!”)
install_packages()
try:
from beeai_framework import ChatModel
from beeai_framework.agents import Agent
from beeai_framework.tools import Tool
from beeai_framework.workflows import Workflow
BEEAI_AVAILABLE = True
print(“✅ BeeAI Framework imported successfully”)
except ImportError as e:
print(f”⚠️ BeeAI Framework import failed: {e}”)
print(“Falling back to custom implementation…”)
BEEAI_AVAILABLE = False
We begin by installing all the required packages, including the beeai-framework, to ensure our environment is ready for multi-agent development. Once installed, we attempt to import BeeAI’s core modules. If the import fails, we gracefully fall back to a custom implementation to maintain workflow functionality.
“””Mock LLM for demonstration purposes”””
def __init__(self, model_name: str = “mock-llm”):
self.model_name = model_name
async def generate(self, messages: List[Dict[str, str]]) -> str:
“””Generate a mock response”””
last_message = messages[-1][‘content’] if messages else “”
if “market” in last_message.lower():
return “Market analysis shows strong growth in AI frameworks with 42% YoY increase. Key competitors include LangChain, CrewAI, and AutoGen.”
elif “code” in last_message.lower():
return “Code analysis reveals good structure with async patterns. Consider adding more error handling and documentation.”
elif “strategy” in last_message.lower():
return “Strategic recommendation: Focus on ease of use, strong documentation, and enterprise features to compete effectively.”
else:
return f”Analyzed: {last_message[:100]}… Recommendation: Implement best practices for scalability and maintainability.”
class CustomTool:
“””Base class for custom tools”””
def __init__(self, name: str, description: str):
self.name = name
self.description = description
async def run(self, input_data: str) -> str:
“””Override this method in subclasses”””
raise NotImplementedError
We define a MockChatModel to simulate LLM behavior when BeeAI is unavailable, allowing us to test and prototype workflows without relying on external APIs. Alongside it, we create a CustomTool base class, which serves as a blueprint for task-specific tools that our agents can use, laying the foundation for modular, tool-augmented agent capabilities.
“””Custom tool for market research and competitor analysis”””
def __init__(self):
super().__init__(
name=”market_research”,
description=”Analyzes market trends and competitor information”
)
self.market_data = {
“AI_frameworks”: {
“competitors”: [“LangChain”, “CrewAI”, “AutoGen”, “Haystack”, “Semantic Kernel”],
“market_size”: “$2.8B”,
“growth_rate”: “42% YoY”,
“key_trends”: [“Multi-agent systems”, “Production deployment”, “Tool integration”, “Enterprise adoption”]
},
“enterprise_adoption”: {
“rate”: “78%”,
“top_use_cases”: [“Customer support”, “Data analysis”, “Code generation”, “Document processing”],
“challenges”: [“Reliability”, “Cost control”, “Integration complexity”, “Governance”]
}
}
async def run(self, query: str) -> str:
“””Simulate market research based on query”””
query_lower = query.lower()
if “competitor” in query_lower or “competition” in query_lower:
data = self.market_data[“AI_frameworks”]
return f”””Market Analysis Results:
Key Competitors: {‘, ‘.join(data[‘competitors’])}
Market Size: {data[‘market_size’]}
Growth Rate: {data[‘growth_rate’]}
Key Trends: {‘, ‘.join(data[‘key_trends’])}
Recommendation: Focus on differentiating features like simplified deployment, better debugging tools, and enterprise-grade security.”””
elif “adoption” in query_lower or “enterprise” in query_lower:
data = self.market_data[“enterprise_adoption”]
return f”””Enterprise Adoption Analysis:
Adoption Rate: {data[‘rate’]}
Top Use Cases: {‘, ‘.join(data[‘top_use_cases’])}
Main Challenges: {‘, ‘.join(data[‘challenges’])}
Recommendation: Address reliability and cost control concerns through better monitoring and resource management features.”””
else:
return “Market research available for: competitor analysis, enterprise adoption, or specific trend analysis. Please specify your focus area.”
We implement the MarketResearchTool as a specialized extension of our CustomTool base class. This tool simulates real-world market intelligence by returning pre-defined insights on AI framework trends, key competitors, adoption rates, and industry challenges. With this, we equip our agents to make informed, data-driven recommendations during workflow execution.
“””Custom tool for analyzing code patterns and suggesting improvements”””
def __init__(self):
super().__init__(
name=”code_analysis”,
description=”Analyzes code structure and suggests improvements”
)
async def run(self, code_snippet: str) -> str:
“””Analyze code and provide insights”””
analysis = {
“lines”: len(code_snippet.split(‘\n’)),
“complexity”: “High” if len(code_snippet) > 500 else “Medium” if len(code_snippet) > 200 else “Low”,
“async_usage”: “Yes” if “async” in code_snippet or “await” in code_snippet else “No”,
“error_handling”: “Present” if “try:” in code_snippet or “except:” in code_snippet else “Missing”,
“documentation”: “Good” if ‘”””‘ in code_snippet or “”'” in code_snippet else “Needs improvement”,
“imports”: “Present” if “import ” in code_snippet else “None detected”,
“classes”: len([line for line in code_snippet.split(‘\n’) if line.strip().startswith(‘class ‘)]),
“functions”: len([line for line in code_snippet.split(‘\n’) if line.strip().startswith(‘def ‘) or line.strip().startswith(‘async def ‘)])
}
suggestions = []
if analysis[“error_handling”] == “Missing”:
suggestions.append(“Add try-except blocks for error handling”)
if analysis[“documentation”] == “Needs improvement”:
suggestions.append(“Add docstrings and comments”)
if “print(” in code_snippet:
suggestions.append(“Consider using proper logging instead of print statements”)
if analysis[“async_usage”] == “Yes” and “await” not in code_snippet:
suggestions.append(“Ensure proper await usage with async functions”)
if analysis[“complexity”] == “High”:
suggestions.append(“Consider breaking down into smaller functions”)
return f”””Code Analysis Report:
Structure:
– Lines of code: {analysis[‘lines’]}
– Complexity: {analysis[‘complexity’]}
– Classes: {analysis[‘classes’]}
– Functions: {analysis[‘functions’]}
Quality Metrics:
– Async usage: {analysis[‘async_usage’]}
– Error handling: {analysis[‘error_handling’]}
– Documentation: {analysis[‘documentation’]}
Suggestions:
{chr(10).join(f”• {suggestion}” for suggestion in suggestions) if suggestions else “• Code looks good! Following best practices.”}
Overall Score: {10 – len(suggestions) * 2}/10″””
class CustomAgent:
“””Custom agent implementation”””
def __init__(self, name: str, role: str, instructions: str, tools: List[CustomTool], llm=None):
self.name = name
self.role = role
self.instructions = instructions
self.tools = tools
self.llm = llm or MockChatModel()
self.memory = []
async def run(self, task: str) -> Dict[str, Any]:
“””Execute agent task”””
print(f”🤖 {self.name} ({self.role}) processing task…”)
self.memory.append({“type”: “task”, “content”: task, “timestamp”: datetime.now()})
task_lower = task.lower()
tool_used = None
tool_result = None
for tool in self.tools:
if tool.name == “market_research” and (“market” in task_lower or “competitor” in task_lower):
tool_result = await tool.run(task)
tool_used = tool.name
break
elif tool.name == “code_analysis” and (“code” in task_lower or “analyze” in task_lower):
tool_result = await tool.run(task)
tool_used = tool.name
break
messages = [
{“role”: “system”, “content”: f”You are {self.role}. {self.instructions}”},
{“role”: “user”, “content”: task}
]
if tool_result:
messages.append({“role”: “system”, “content”: f”Tool {tool_used} provided: {tool_result}”})
response = await self.llm.generate(messages)
self.memory.append({“type”: “response”, “content”: response, “timestamp”: datetime.now()})
return {
“agent”: self.name,
“task”: task,
“tool_used”: tool_used,
“tool_result”: tool_result,
“response”: response,
“success”: True
}
We now implement the CodeAnalysisTool, which enables our agents to assess code snippets based on structure, complexity, documentation, and error handling. This tool generates insightful suggestions to improve code quality. We also define the CustomAgent class, equipping each agent with its own role, instructions, memory, tools, and access to an LLM. This design allows each agent to decide whether a tool is needed intelligently and then synthesize responses using both analysis and LLM reasoning, ensuring adaptable and context-aware behavior.
“””Monitor and log workflow events”””
def __init__(self):
self.events = []
self.start_time = datetime.now()
def log_event(self, event_type: str, data: Dict[str, Any]):
“””Log workflow events”””
timestamp = datetime.now()
self.events.append({
“timestamp”: timestamp,
“duration”: (timestamp – self.start_time).total_seconds(),
“event_type”: event_type,
“data”: data
})
print(f”[{timestamp.strftime(‘%H:%M:%S’)}] {event_type}: {data.get(‘agent’, ‘System’)}”)
def get_summary(self):
“””Get monitoring summary”””
return {
“total_events”: len(self.events),
“total_duration”: (datetime.now() – self.start_time).total_seconds(),
“event_types”: list(set([e[“event_type”] for e in self.events])),
“events”: self.events
}
class CustomWorkflow:
“””Custom workflow implementation”””
def __init__(self, name: str, description: str):
self.name = name
self.description = description
self.agents = []
self.monitor = WorkflowMonitor()
def add_agent(self, agent: CustomAgent):
“””Add agent to workflow”””
self.agents.append(agent)
self.monitor.log_event(“agent_added”, {“agent”: agent.name, “role”: agent.role})
async def run(self, tasks: List[str]) -> Dict[str, Any]:
“””Execute workflow with tasks”””
self.monitor.log_event(“workflow_started”, {“tasks”: len(tasks)})
results = []
context = {“shared_insights”: []}
for i, task in enumerate(tasks):
agent = self.agents[i % len(self.agents)]
if context[“shared_insights”]:
enhanced_task = f”{task}\n\nContext from previous analysis:\n” + “\n”.join(context[“shared_insights”][-2:])
else:
enhanced_task = task
result = await agent.run(enhanced_task)
results.append(result)
context[“shared_insights”].append(f”{agent.name}: {result[‘response’][:200]}…”)
self.monitor.log_event(“task_completed”, {
“agent”: agent.name,
“task_index”: i,
“success”: result[“success”]
})
self.monitor.log_event(“workflow_completed”, {“total_tasks”: len(tasks)})
return {
“workflow”: self.name,
“results”: results,
“context”: context,
“summary”: self._generate_summary(results)
}
def _generate_summary(self, results: List[Dict[str, Any]]) -> str:
“””Generate workflow summary”””
summary_parts = []
for result in results:
summary_parts.append(f”• {result[‘agent’]}: {result[‘response’][:150]}…”)
return f”””Workflow Summary for {self.name}:
{chr(10).join(summary_parts)}
Key Insights:
• Market opportunities identified in AI framework space
• Technical architecture recommendations provided
• Strategic implementation plan outlined
• Multi-agent collaboration demonstrated successfully”””
We implement the WorkflowMonitor to log and track events throughout the execution, giving us real-time visibility into the actions taken by each agent. With the CustomWorkflow class, we orchestrate the entire multi-agent process, assigning tasks, preserving shared context across agents, and capturing all relevant insights. This structure ensures that we not only execute tasks in a coordinated and transparent way but also generate a comprehensive summary that highlights collaboration and key outcomes.
“””Demonstrate advanced multi-agent workflow”””
print(“🚀 Advanced Multi-Agent Workflow Demo”)
print(“=” * 50)
workflow = CustomWorkflow(
name=”Advanced Business Intelligence System”,
description=”Multi-agent system for comprehensive business analysis”
)
market_agent = CustomAgent(
name=”MarketAnalyst”,
role=”Senior Market Research Analyst”,
instructions=”Analyze market trends, competitor landscape, and business opportunities. Provide data-driven insights with actionable recommendations.”,
tools=[MarketResearchTool()],
llm=MockChatModel()
)
tech_agent = CustomAgent(
name=”TechArchitect”,
role=”Technical Architecture Specialist”,
instructions=”Evaluate technical solutions, code quality, and architectural decisions. Focus on scalability, maintainability, and best practices.”,
tools=[CodeAnalysisTool()],
llm=MockChatModel()
)
strategy_agent = CustomAgent(
name=”StrategicPlanner”,
role=”Strategic Business Planner”,
instructions=”Synthesize market and technical insights into comprehensive strategic recommendations. Focus on ROI, risk assessment, and implementation roadmaps.”,
tools=[],
llm=MockChatModel()
)
workflow.add_agent(market_agent)
workflow.add_agent(tech_agent)
workflow.add_agent(strategy_agent)
tasks = [
“Analyze the current AI framework market landscape and identify key opportunities for a new multi-agent framework targeting enterprise users.”,
“””Analyze this code architecture pattern and provide technical assessment:
async def multi_agent_workflow():
agents = [ResearchAgent(), AnalysisAgent(), SynthesisAgent()]
context = SharedContext()
for agent in agents:
try:
result = await agent.run(context.get_task())
if result.success:
context.add_insight(result.data)
else:
context.add_error(result.error)
except Exception as e:
logger.error(f”Agent {agent.name} failed: {e}”)
return context.synthesize_recommendations()”””,
“Based on the market analysis and technical assessment, create a comprehensive strategic plan for launching a competitive AI framework with focus on multi-agent capabilities and enterprise adoption.”
]
print(“\n🔄 Executing Advanced Workflow…”)
result = await workflow.run(tasks)
print(“\n✅ Workflow Completed Successfully!”)
print(“=” * 50)
print(“📊 COMPREHENSIVE ANALYSIS RESULTS”)
print(“=” * 50)
print(result[“summary”])
print(“\n📈 WORKFLOW MONITORING SUMMARY”)
print(“=” * 30)
summary = workflow.monitor.get_summary()
print(f”Total Events: {summary[‘total_events’]}”)
print(f”Total Duration: {summary[‘total_duration’]:.2f} seconds”)
print(f”Event Types: {‘, ‘.join(summary[‘event_types’])}”)
return workflow, result
async def simple_tool_demo():
“””Demonstrate individual tool functionality”””
print(“\n🛠️ Individual Tool Demo”)
print(“=” * 30)
market_tool = MarketResearchTool()
code_tool = CodeAnalysisTool()
print(“Available Tools:”)
print(f”• {market_tool.name}: {market_tool.description}”)
print(f”• {code_tool.name}: {code_tool.description}”)
print(“\n🔍 Market Research Analysis:”)
market_result = await market_tool.run(“competitor analysis in AI frameworks”)
print(market_result)
print(“\n🔍 Code Analysis:”)
sample_code=””‘
import asyncio
from typing import List, Dict
class AgentManager:
“””Manages multiple AI agents”””
def __init__(self):
self.agents = []
self.results = []
async def add_agent(self, agent):
“””Add agent to manager”””
self.agents.append(agent)
async def run_all(self, task: str) -> List[Dict]:
“””Run task on all agents”””
results = []
for agent in self.agents:
try:
result = await agent.execute(task)
results.append(result)
except Exception as e:
print(f”Agent failed: {e}”)
results.append({“error”: str(e)})
return results
”’
code_result = await code_tool.run(sample_code)
print(code_result)
We demonstrate two powerful workflows. First, in the individual tool demo, we directly test the capabilities of our MarketResearchTool and CodeAnalysisTool, ensuring they generate relevant insights independently. Then, we bring everything together in the advanced workflow demo, where we deploy three specialized agents, MarketAnalyst, TechArchitect, and StrategicPlanner, to tackle business analysis tasks collaboratively.
“””Main demo function”””
print(“🐝 Advanced BeeAI Framework Tutorial”)
print(“=” * 40)
print(“This tutorial demonstrates:”)
print(“• Multi-agent workflows”)
print(“• Custom tool development”)
print(“• Memory management”)
print(“• Event monitoring”)
print(“• Production-ready patterns”)
if BEEAI_AVAILABLE:
print(“• Using real BeeAI Framework”)
else:
print(“• Using custom implementation (BeeAI not available)”)
print(“=” * 40)
await simple_tool_demo()
print(“\n” + “=”*50)
await advanced_workflow_demo()
print(“\n🎉 Tutorial Complete!”)
print(“\nNext Steps:”)
print(“1. Install BeeAI Framework properly: pip install beeai-framework”)
print(“2. Configure your preferred LLM (OpenAI, Anthropic, local models)”)
print(“3. Explore the official BeeAI documentation”)
print(“4. Build custom agents for your specific use case”)
print(“5. Deploy to production with proper monitoring”)
if __name__ == “__main__”:
try:
import nest_asyncio
nest_asyncio.apply()
print(“✅ Applied nest_asyncio for Colab compatibility”)
except ImportError:
print(“⚠️ nest_asyncio not available – may not work in some environments”)
asyncio.run(main())
We wrap up our tutorial with the main() function, which ties together everything we’ve built, demonstrating both tool-level capabilities and a full multi-agent business intelligence workflow. Whether we’re running BeeAI natively or using a fallback setup, we ensure compatibility with environments like Google Colab using nest_asyncio. With this structure in place, we’re ready to scale our agent systems, explore deeper use cases, and confidently deploy production-ready AI workflows.
In conclusion, we’ve built and executed a robust multi-agent workflow using the BeeAI framework (or a custom equivalent), showcasing its potential in real-world business intelligence applications. We’ve seen how easy it is to create agents with specific roles, attach tools for task augmentation, and monitor execution in a transparent way.
Check out the Codes. All credit for this research goes to the researchers of this project. Also, feel free to follow us on Twitter, Youtube and Spotify and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter.
Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of Artificial Intelligence for social good. His most recent endeavor is the launch of an Artificial Intelligence Media Platform, Marktechpost, which stands out for its in-depth coverage of machine learning and deep learning news that is both technically sound and easily understandable by a wide audience. The platform boasts of over 2 million monthly views, illustrating its popularity among audiences.
Be the first to comment