随着人工智能技术的快速发展,大语言模型(LLM)已经成为推动各个领域创新的重要力量。在数据分析领域,LLM 的应用正在彻底改变人们与数据交互的方式。本文将探讨如何通过Model Context Protocol (MCP) 协议将 Datalayers 多模数据库与大语言模型结合,实现自然语言驱动的数据分析能力,帮助用户更加直观、高效地挖掘数据价值。
Model Context Protocol(MCP) 是一种在大模型(Large Models)和人工智能(AI)领域中用于管理和传递模型上下文信息的协议或机制。它的核心目标是确保模型在处理任务时能够高效地访问和利用相关的上下文信息,从而提升模型的性能与推理能力。
MCP 架构遵循客户端-服务器模式,其中一个宿主应用程序可以连接到多个服务器:
MCP 为大语言模型提供了工具使用能力: 使 LLM 能够调用预定义的工具函数,如 SQL 查询、数据转换和图表生成等。通过 MCP,LLM 不再局限于基于训练数据的静态知识,而是能够动态调用工具获取最新信息并执行实际操作,从而实现真正的智能助手。
Datalayers MCP Server 是连接 Datalayers 多模数据库与大语言模型的关键组件。它提供了一系列 API,允许 LLM 通过自然语言指令查询和分析 Datalayers 中存储的时序数据。
Datalayers MCP Server提供以下主要功能:
以下是Datalayers MCP Server的 mcp_server.py 代码片段:
import asyncio
import logging
import os
import http.client
import json
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
from pydantic import AnyUrl
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("datalayers_mcp_server")
def get_db_config():
"""Get Datalayers database configuration from environment variables."""
config = {
"host": os.getenv("DATALAYERS_HOST", "127.0.0.1"),
"port": int(os.getenv("DATALAYERS_PORT", "8361")),
"username": os.getenv("DATALAYERS_USER", "admin"),
"password": os.getenv("DATALAYERS_PASSWORD", "public"),
"database": os.getenv("DATALAYERS_DATABASE", "neuronex")
}
if not all([config["username"], config["password"]]):
logger.warning("Database credentials not fully configured. Check environment variables.")
return config
def create_connection():
"""Create an HTTP connection to the Datalayers server."""
config = get_db_config()
conn = http.client.HTTPConnection(host=config["host"], port=config["port"])
return conn, config
def execute_query(query, conn=None, config=None):
"""Execute a SQL query against Datalayers database."""
if conn is None or config is None:
conn, config = create_connection()
# Encode credentials for Basic Auth
import base64
auth = base64.b64encode(f"{config['username']}:{config['password']}".encode()).decode()
url = f"/api/v1/sql"
headers = {
"Content-Type": "application/binary",
"Authorization": f"Basic {auth}"
}
try:
conn.request(method="POST", url=url, headers=headers, body=query)
response = conn.getresponse()
status = response.status
if status != 200:
error_msg = response.read().decode('utf-8')
logger.error(f"Query failed with status {status}: {error_msg}")
return {"success": False, "error": error_msg}
# Parse the response
result = response.read().decode('utf-8')
try:
# Try to parse as JSON
parsed_result = json.loads(result)
return {"success": True, "data": parsed_result}
except json.JSONDecodeError:
# If not JSON, return as text
return {"success": True, "data": result}
except Exception as e:
logger.error(f"Error executing query: {str(e)}")
return {"success": False, "error": str(e)}
# Initialize the MCP server
app = Server("datalayers_mcp_server")
@app.list_resources()
async def list_resources() -> list[Resource]:
"""List available resources in the Datalayers database."""
logger.info("Listing resources")
conn, config = create_connection()
# Query to list databases
databases_query = "SHOW DATABASES;"
databases_result = execute_query(databases_query, conn, config)
resources = []
if databases_result["success"]:
try:
# Add each database as a resource
if isinstance(databases_result["data"], list):
for db in databases_result["data"]:
if isinstance(db, dict) and "Database" in db:
db_name = db["Database"]
resources.append(Resource(
uri=f"datalayers://{db_name}",
name=db_name,
description=f"Datalayers database: {db_name}"
))
except Exception as e:
logger.error(f"Error parsing database list: {str(e)}")
return resources
@app.read_resource()
async def read_resource(uri: AnyUrl) -> str:
"""Read schema information for a Datalayers database resource."""
logger.info(f"Reading resource: {uri}")
# Parse the URI to extract database name
# Format: datalayers://database_name
parts = str(uri).split("://")
if len(parts) != 2 or parts[0] != "datalayers":
return "Invalid Datalayers URI format. Expected: datalayers://database_name"
database = parts[1]
conn, config = create_connection()
# Query to list tables in the database
tables_query = f"SHOW TABLES FROM {database};"
tables_result = execute_query(tables_query, conn, config)
if not tables_result["success"]:
return f"Error reading database {database}: {tables_result.get('error', 'Unknown error')}"
# Format the result as a readable string
result = f"Database: {database}\n\nTables:\n"
try:
if isinstance(tables_result["data"], list):
for table_info in tables_result["data"]:
table_name = list(table_info.values())[0] if isinstance(table_info, dict) else str(table_info)
result += f"- {table_name}\n"
# Get schema for each table
schema_query = f"DESCRIBE {database}.{table_name};"
schema_result = execute_query(schema_query, conn, config)
if schema_result["success"] and isinstance(schema_result["data"], list):
result += " Columns:\n"
for column in schema_result["data"]:
if isinstance(column, dict):
col_name = column.get("Field", "Unknown")
col_type = column.get("Type", "Unknown")
result += f" - {col_name} ({col_type})\n"
except Exception as e:
logger.error(f"Error formatting database info: {str(e)}")
result += f"\nError retrieving complete information: {str(e)}"
return result
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools for interacting with Datalayers."""
logger.info("Listing tools...")
return [
Tool(
name="execute_sql",
description="Execute an SQL query on the TimeSeries database Datalayers",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The SQL query to execute"
}
},
"required": ["query"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Call a tool to interact with the Datalayers database."""
logger.info(f"Calling tool: {name} with arguments: {arguments}")
if name != "execute_sql":
raise ValueError(f"Unknown tool: {name}")
query = arguments.get("query")
if not query:
raise ValueError("Query is required")
# Execute the query
result = execute_query(query)
if not result["success"]:
logger.error(f"Error executing SQL '{query}': {result.get('error', 'Unknown error')}")
return [TextContent(type="text", text=f"Error executing query: {result.get('error', 'Unknown error')}")]
# Format the result
try:
if isinstance(result["data"], dict):
return [TextContent(type="text", text=json.dumps(result["data"], indent=2))]
elif isinstance(result["data"], list):
formatted_result = ""
# Check if the list contains dictionaries (rows with column names)
if result["data"] and isinstance(result["data"][0], dict):
# Get column names from the first row
columns = list(result["data"][0].keys())
formatted_result += " | ".join(columns) + "\n"
formatted_result += "-" * (sum(len(col) for col in columns) + 3 * (len(columns) - 1)) + "\n"
# Add each row
for row in result["data"]:
formatted_result += " | ".join(str(row.get(col, "")) for col in columns) + "\n"
else:
# Simple list output
for item in result["data"]:
formatted_result += str(item) + "\n"
return [TextContent(type="text", text=formatted_result)]
else:
# String or other type
return [TextContent(type="text", text=str(result["data"]))]
except Exception as e:
logger.error(f"Error formatting query result: {str(e)}")
return [TextContent(type="text", text=f"Error formatting result: {str(e)}")]
async def main():
"""Main entry point to run the MCP server."""
from mcp.server.stdio import stdio_server
logger.info("Starting Datalayers MCP server...")
config = get_db_config()
logger.info(f"Database config: {config['host']}:{config['port']} as {config['username']}")
# Test database connection
conn, _ = create_connection()
test_result = execute_query("SELECT 1;", conn)
if test_result["success"]:
logger.info("Successfully connected to Datalayers database")
else:
logger.warning(f"Failed to connect to Datalayers database: {test_result.get('error', 'Unknown error')}")
async with stdio_server() as (read_stream, write_stream):
try:
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
except Exception as e:
logger.error(f"Server error: {str(e)}", exc_info=True)
raise
if __name__ == "__main__":
asyncio.run(main())
Cursor 是一款强大的AI辅助编码 IDE,集成了大语言模型能力,可以帮助开发者更高效地编写和调试代码。利用 Cursor 的 MCP 插件系统,我们可以轻松部署 Datalayers MCP Server,为 AI 编码助手提供访问 Datalayers 数据库的能力,快速进行数据分析与探索。
首先,确保 Cursor 调用的 Python 编译器已安装必要的 Python 依赖,如有 Python 其他基础库缺失,请手动安装。
pip install mcp>=1.3.0
pip install pydantic>=2.8.2
在 Cursor IDE → Cursor Settings → Features → MCP Servers 中,通过 MCP 扩展面板添加本地 MCP 服务器:
部署完成后,可以通过 Cursor 的界面查看到 MCP 服务器的状态,包括当前 Datalayers MCP Server 内含的Tools:
在开始使用 MCP 与 Datalayers 进行交互之前,我们需要确保 Datalayers 数据库正确部署并准备好测试数据。
部署步骤如下:
docker run --name datalayers -d \
-v ~/dl_data:/var/lib/datalayers \
-p 8360:8360 -p 8361:8361 \
datalayers/datalayers:nightly
为了演示 LLM 结合 MCP 后的数据分析能力,我们需要准备一些测试数据。以下通过进入 Datalayers docker 命令行,进行创建库、建表以及写入数据
### 创建数据库 neuronex
create database neuronex
### 创建数据表 neuron_float
CREATE TABLE `neuron_float` (
`time` TIMESTAMP(3) NOT NULL,
`node_name` STRING NOT NULL,
`group_name` STRING NOT NULL,
`tag` STRING NOT NULL,
`value` DOUBLE,
timestamp key(time)
)
PARTITION BY HASH (node_name, group_name, tag) PARTITIONS 1
ENGINE=TimeSeries
with (ttl='7d')
### 插入数据
INSERT INTO neuron_float (time,node_name,group_name,tag,value) VALUES (now(),'m2','group2','tag88',555.8)
INSERT INTO neuron_float (time,node_name,group_name,tag,value) VALUES (now() - interval 0.5 hour,'m2','group2','tag88',123.8)
INSERT INTO neuron_float (time,node_name,group_name,tag,value) VALUES (now() - interval 3.5 hour,'m2','group2','tag88',456.8)
### 插入数据省略..
有了 Datalayers MCP Server 和测试数据,我们现在可以展示如何通过自然语言交互挖掘数据价值。
我们在 Cursor 中选择 COMPOSER 模型,并选择 claude-3.7-sonnet-thinking
模型,输入如下内容进行查询:
你来帮我探索分析时序数据库 Datalayers 中的数据。
Datalayers 使用 Datafusion 作为查询引擎,具有标准化的 SQL 语言,支持 MySQL 方言。
查询 Datalayers 数据库中,`neuronex.neuron_float` 表的表结构,以及数据点 tag88 过去一天中按1小时聚合的平均值数据。
以下是输出结果
用户可以根据自身需求,对包括异常点检测、数据转换、数据统计分析等场景进行分析。
通过 LLM、MCP 与 Datalayers 的结合,可快速实现以下收益:
降低分析门槛: 无需编写复杂 SQL 或掌握数据分析技能,通过自然语言即可进行复杂查询
加速洞察发现: AI 可以自动分析数据模式、趋势和异常,帮助快速发现问题和机会
持续交互式探索: LLM 利用 MCP 的能力,可以自主多轮调用 MCP,即使出现错误也会自动修复
知识累积: 系统可以记忆用户的分析历史,累积领域知识,提供更个性化的建议
Datalayers 与 MCP 的结合,通过大语言模型的智能增强,为时序数据分析带来了革命性的变化。用户可以摆脱编程和查询语言的限制,直接用自然语言与数据交流,极大地提高了数据价值挖掘的效率和可达性。