2025保姆级微信AI群聊机器人教程:教你如何本地打造私人和群聊机器人

2025保姆级微信AI群聊机器人教程:教你如何本地打造私人和群聊机器人

微信AI机器人-人工智能技术,为用户提供服务的自动化系统:具备自然语言处理技术、理解用户的文本输入,并给出响应的回复或执行特定的任务能力。

AI机器人能24小时提供实时服务,无论何时何地,用户都能获得及时的信息反馈和帮助。节省了用户搜索时间,提高了效率,并且还可以帮助管理社群,提高社群活跃度。

一、去千帆大模型官网申请API

1、在模型广场搜索ERNIE-Lite(免费),点击体验

2、点击应用接入,填入必填项,点击创建(需要实名认证)

3、获取API KEYI和Secret Key

二、打开pycharm,运行以下程序

一、完整代码展示

注:运行代码前请登录PC端微信,否则会运行失败

import requests

import json

import csv

import os

import schedule

import time

from wxauto import WeChat

class WeChatBot:

def __init__(self):

self.wx = WeChat()

self.list_name = [

'徐长卿',#机器人账号添加管理员的备注,徐长卿为管理员,ros为机器人,ros给管理员的备注就应为徐长卿,可进行修改

'嘻嘻嘻',#被监控群聊名称

'AAA刘哥百货超市',

'小超市'

]

# 为每个群聊添加监听

for group in self.list_name:

self.wx.AddListenChat(who=group, savepic=True)

def get_access_token(self):

"""使用 API Key,Secret Key 获取access_token"""

url = "https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=[应用API Key]&client_secret=[应用Secret Key]"

payload = json.dumps("")

headers = {

'Content-Type': 'application/json',

'Accept': 'application/json'

}

response = requests.post(url, headers=headers, data=payload)

if response.status_code == 200:

return response.json().get("access_token")

else:

print("Failed to get access token:", response.text)

return None

def call_ai_model(self, content):

"""调用大模型接口并返回结果"""

access_token = self.get_access_token()

if access_token: # 确保access_token不为空

url = f"https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-lite-8k?access_token={access_token}"

payload = json.dumps({

"messages": [

{

"role": "user",

"content": content

}

]

})

headers = {

'Content-Type': 'application/json'

}

response = requests.post(url, headers=headers, data=payload)

if response.status_code == 200:

return response.json().get('result')

else:

print("Failed to get response from chat API:", response.text)

return None

else:

print("Access token is None, cannot proceed with chat API request.")

return None

def send_morning_wishes(self):

"""发送早安祝福消息"""

good_morning_message = "老板生成一段祝福群友的早安祝福"

ai_reply = self.call_ai_model(good_morning_message)

if ai_reply:

for group in self.list_name:

self.wx.SendMsg(msg=ai_reply, who=group)

def send_evening_greetings(self):

"""发送晚上好的消息"""

evening_greetings = "老板生成一段祝福群友的晚上好祝福"

for group in self.list_name:

self.wx.SendMsg(msg=evening_greetings, who=group)

def listen_messages(self):

# 持续监听消息

wait = 1 # 设置1秒查看一次是否有新消息

while True:

msgs = self.wx.GetListenMessage()

for chat, messages in msgs.items():

# 获取聊天窗口名(群名)

who = chat.who

# 检查是否是指定的群聊

if who in self.list_name:

for msg in messages:

msg_type = msg.type # 获取消息类型

content = msg.content # 获取消息内容

print(f'【{who}】:{content}')

# 获取发送者信息

sender = msg.sender

# 根据群聊名称和消息内容回复不同的消息

if who != self.list_name[0]:

if content.startswith('购买'):

try:

parts = content.split('购买')[1].strip()

commodity_name, quantity = parts.split(' ', 1)

if self.is_commodity_exists(commodity_name):

reply = f"{sender},已为您将{commodity_name}{quantity}添加至订单中,感谢您的支持"

self.save_to_csv_order(sender, commodity_name, quantity)

self.wx.SendMsg(msg=reply, who=who)

else:

reply = f"{sender},该商品小店尚未出售"

self.wx.SendMsg(msg=reply, who=who)

except ValueError:

reply = f"{sender},输入格式错误,请按照'购买 商品名称 购买数量'的格式输入。"

self.wx.SendMsg(msg=reply, who=who)

elif content.startswith('查询'):

parts = content.split('查询')[1].strip()

if parts: # 确保有商品信息

commodity_name = parts

if self.is_commodity_exists(commodity_name):

price_info = self.get_price_info(commodity_name)

reply = f"{sender},{commodity_name} {price_info}"

else:

reply = f"{sender},该商品小店尚未出售"

self.wx.SendMsg(msg=reply, who=who)

if content.startswith('老板'):

ai_reply = self.call_ai_model(content[2:])

if ai_reply:

self.wx.SendMsg(msg=ai_reply, who=who)

elif who == self.list_name[0]:

if content.startswith('转发'):

# 转发消息到其他所有监控群聊,删除“转发”二字

new_content = content[2:].strip()

self.forward_message(new_content)

if content.startswith('增加'):

try:

parts = content.split('增加')[1].strip()

if parts: # 确保有商品信息

commodity_name, price_info = parts.split(' ', 1)

if self.is_commodity_exists(commodity_name):

reply = f"{sender},{commodity_name}已经存在。"

else:

reply = f"{sender},{commodity_name}增加成功。"

self.save_to_csv(commodity_name, price_info)

self.wx.SendMsg(msg=reply, who=who)

except ValueError:

reply = f"{sender},输入格式错误,请按照'增加 商品名称 价格信息'的格式输入。"

self.wx.SendMsg(msg=reply, who=who)

elif content.startswith('删除'):

commodity_name = content.split('删除')[1].strip()

if self.delete_commodity_from_csv(commodity_name):

reply = f"{sender},{commodity_name}删除成功。"

else:

reply = f"{sender},{commodity_name}不存在。"

self.wx.SendMsg(msg=reply, who=who)

schedule.run_pending()

time.sleep(wait)

def forward_message(self, content):

"""将消息转发到所有监控的群聊"""

for group in self.list_name:

if group != self.list_name[0]: # 排除发送“转发”命令的群聊

self.wx.SendMsg(msg=content, who=group)

def save_to_csv_order(self, username, commodity_name, quantity):

# 将订单信息保存到order.csv中

with open('order.csv', 'a', newline='', encoding='utf-8') as file:

writer = csv.writer(file)

# 写入用户名、商品名称和数量

writer.writerow([username, commodity_name, quantity])

print(f"订单 '{commodity_name}' 已添加到order.csv")

def get_price_info(self, commodity_name):

# 从commodity.csv中获取商品的价格信息

with open('commodity.csv', 'r', newline='', encoding='utf-8') as file:

reader = csv.reader(file)

for row in reader:

if row and row[0] == commodity_name:

return row[1]

return "未知价格"

def save_to_csv(self, commodity_name, price_info):

# 将信息保存到commodity.csv中

with open('commodity.csv', 'a', newline='', encoding='utf-8') as file:

writer = csv.writer(file)

# 写入商品名称和价格信息

writer.writerow([commodity_name, price_info])

print(f"商品 '{commodity_name}' 已添加到commodity.csv")

def is_commodity_exists(self, commodity_name):

# 检查商品是否存在

if not os.path.exists('commodity.csv'):

return False

with open('commodity.csv', 'r', newline='', encoding='utf-8') as file:

reader = csv.reader(file)

for row in reader:

if row and row[0] == commodity_name:

return True

return False

def delete_commodity_from_csv(self, commodity_name):

# 删除CSV文件中的商品

lines = []

found = False

with open('commodity.csv', 'r', newline='', encoding='utf-8') as file:

reader = csv.reader(file)

for row in reader:

if row and row[0] != commodity_name:

lines.append(row)

elif row:

found = True

with open('commodity.csv', 'w', newline='', encoding='utf-8') as file:

writer = csv.writer(file)

writer.writerows(lines)

return found

def start(self):

schedule.every().day.at("08:30").do(self.send_morning_wishes)

schedule.every().day.at("19:30").do(self.send_evening_greetings)

# 开始监听消息

self.listen_messages()

if __name__ == "__main__":

bot = WeChatBot()

bot.start()

2、代码修改

__init__函数中第一个为管理员,其他的可以为群聊和个人用户

get_access_token函数中修改url为自己的获取API KEYI和Secret Key,注意需要删除“[ ]”且不要有空格

3、程序解释

上述代码通过wxauto库对微信进行实时监控,可以实现管理员和需要管理的群聊互不干扰,代码可并行检查多个群聊,并不会因为同时多个群聊收到多个信息而出现遗漏,会根据先后顺序回答,同时可以自行设置时间发送早安、晚安祝福,wxauto库开源,并且可以进行云端部署,建议配合官方文档查看修改。

上述功能不会导致微信封号、放心使用。

本文部分代码参考以下链接