🚀【神器推荐】别再手动刷资讯了!让重要信息自动找你——FeedGrep智能监控工具
💡 你是不是也这样?
- 每天刷十几个网站,就为了等某个产品的降价消息
- 关注了50个技术博客,却总是错过重要的更新
- 想监控竞品动态,但手动搜索效率太低
- 订阅了一堆RSS,结果99%都是无关信息
如果你中枪了,那今天这个工具就是为你量身打造的!
FeedGrep - 让重要信息主动找到你 💫
FeedGrep 是一款专注于 RSS 信息捕获、精准筛选和多渠道推送 的轻量级阅读器。
FeedGrep 支持多源 RSS 抓取、自定义筛选规则、多渠道推送和轻量级 Web UI,让你以最低成本构建自己的信息获取系统。
截图展示
功能特点
📡 多源订阅
- RSS 源订阅 - 支持主流 RSS 格式
- 灵活定时策略 - 自定义抓取间隔频率
🔍 自定义监控筛选
FeedGrep 提供三种关键词规则类型,可组合使用以实现精确过滤:
| 类型 |
描述 |
示例 |
| 普通词 |
任意词命中即匹配 |
苹果 手机 |
| 必须词 |
必须全部出现,使用 +前缀(注意中间无空格) |
+苹果 +发布会 |
| 排除词 |
出现即过滤,使用 -前缀(注意中间无空格) |
苹果 -果汁 |
综合示例:
苹果 华为 +手机 -水果 -价格
意为:
命中“苹果”或“华为”,并必须包含“手机”,排除包含“水果”和“价格”的内容。
🚀典型应用场景
| 用户角色 |
监控示例 |
| 开发者 |
开源库更新、技术博客、漏洞通告 |
| 购物达人 |
商品降价、限时优惠、新品上市 |
| 市场人员 |
品牌舆情、竞品动态、行业趋势 |
| 普通用户 |
热点新闻、新闻动态、新闻快讯 |
| 投资人员 |
财经新闻、股票行情、行业动态 |
🤖 多渠道推送
- 飞书 - 群机器人推送
- 企业微信 - 群机器人
- 微信 - 利用企微-微信插件通道
- Telegram - Bot 消息推送
- 邮件 - SMTP 邮件通知
⚙️ 灵活配置
- rss源管理 - 不限制个数、自由配置
- 关键词匹配规则 - 支持普通词、必须词、排除词
- 推送路由 - 单个rss源或者关键词可配置多个推送渠道
- 消息合并 - 同源同关键词信息合并推送
- 总开关控制 - 全局推送启用/禁用
快速开始
安装依赖
git clone https://github.com/zdx0122/feedgrep.git
pip install -r requirements.txt
配置
修改 feedgrep.yaml 文件来设置RSS源、检查间隔、关键词匹配规则、推送渠道。
运行
运行以下命令启动FeedGrep处理器:
python feedgrep.py
程序会立即检查所有RSS源,然后按照设定的时间间隔定期检查。
启动后可以通过浏览器访问 http://localhost:8000 查看Web界面。
Bash 脚本方式 (推荐)
项目提供了一个统一的 Bash 脚本来管理服务:
# 启动服务
./feedgrep.sh start
# 停止服务
./feedgrep.sh stop
# 重启服务
./feedgrep.sh restart
本地数据存储
RSS条目被存储在本地的SQLite数据库 feedgrep.db 中,每条记录都会标记其所属的分类和来源名称。
高级关键词搜索语法
FeedGrep支持三种关键词类型,可以通过组合使用实现精确的内容筛选:
-
普通词:包含其中任意一个词就会被捕获,多个关键词使用空格分隔
- 示例:
苹果 华为 表示包含"苹果"或"华为"的内容
-
必须词:必须同时包含普通词和必须词才会被捕获,使用+前缀标识
- 示例:
苹果 +手机 表示包含"苹果"且必须包含"手机"的内容
-
排除词:包含过滤词的新闻会被直接排除,即使包含其他关键词,使用-前缀标识
- 示例:
苹果 -水果 表示包含"苹果"但排除包含"水果"的内容
完整示例:苹果 华为 +手机 -水果 -价格 表示搜索包含"苹果"或"华为",同时必须包含"手机",但排除包含"水果"或"价格"的内容。
自定义关键词快捷搜索
在Web界面左侧边栏中提供了默认关键词的快捷按钮,点击即可快速进行相关搜索。为了保持界面美观,按钮上仅显示关键词组的第一个词,鼠标悬停时会显示完整的关键词配置。
推送功能
FeedGrep支持将新的RSS条目推送到多种渠道:
支持的推送渠道
- 飞书群机器人
- 企业微信群机器人
- 个人微信(基于企业微信应用,在企微后台-微信插件,微信扫码关注,推送到个人微信)
- 邮件
- Telegram
项目结构
├── feedgrep.py # 主程序入口
├── feedgrep.yaml # 配置文件
├── feedgrep.db # SQLite 本地数据
├── feedgrep.sh # bash启动脚本
├── index.html # Web UI
├── api.py # API模块
├── push.py # 推送模块
└── utils/ # 日志模块
├── requirements.txt # 依赖包
配置详解
在配置文件中添加 push 部分来启用推送功能:
push:
# 推送总开关
enabled: true
# 推送时间范围控制开关
time_restriction_enabled: true
# 推送时间范围(24小时制,北京时间)
time_start: "08:00" # 早上8点
time_end: "22:00" # 晚上10点
# 推送渠道配置
webhooks:
# 飞书资讯群
webhook_feishu:
type: feishu
url: https://open.feishu.cn/open-apis/bot/v2/hook/XXXXXXXXXXXXXXXXXX
# 企业微信群机器人
webhook_qyweixin:
type: wework
wework_msg_type: text # 可选:text, markdown
url: https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=XXXXXXXXXXXXXXXXXX
# 个人微信
webhook_weixin:
type: wework
wework_msg_type: text # 只可选:text,需要在企微后台扫码关注“微信插件”,其他配置和上述企微机器人一样
url: https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=XXXXXXXXXXXXXXXXXX
# 邮件推送
email_notices:
type: email
smtp_server: smtp.example.com
smtp_port: 587
username: your_username
password: your_password
sender: sender@example.com
receivers:
- receiver1@example.com
- receiver2@example.com
# Telegram推送
telegram_channel:
type: telegram
bot_token: YOUR_BOT_TOKEN
chat_id: YOUR_CHAT_ID
推送时间范围控制功能允许您设置推送消息的有效时间窗口。时间范围始终按照北京时间进行计算,无论服务部署在哪个时区。默认情况下,只在早上8点到晚上10点之间推送消息。您可以通过以下配置项控制此功能:
time_restriction_enabled: 是否启用时间范围控制,默认为false
time_start: 推送开始时间(24小时制,北京时间),默认为"08:00"
time_end: 推送结束时间(24小时制,北京时间),默认为"22:00"
在每个RSS源配置中添加 push_channels 列表来指定该源使用哪些推送渠道:
categories:
news:
- name: 阮一峰的网络日志
url: https://www.ruanyifeng.com/blog/atom.xml
push_channels:
- webhook_feishu
- webhook_weixin
当该RSS源有新内容时,将会推送到指定的渠道。
为关键词配置推送渠道
在关键词配置中添加 push_channels 列表来指定匹配该关键词的内容推送到哪些渠道:
default_keywords:
- keywords: AI 人工智能 +模型 -air -gai -mail
push_channels:
- webhook_feishu
- 纳斯达克 标普 道琼斯
当有新内容匹配关键词时,将会推送到指定的渠道。关键词推送只会推送最近一次RSS获取到的新内容。
核心代码
class FeedGrepProcessor:
def __init__(self, config_path: str, db_path: str = "feedgrep.db"):
"""
初始化FeedGrep处理器
Args:
config_path: 配置文件路径
db_path: SQLite数据库路径
"""
# 加载配置文件
with open(config_path, 'r', encoding='utf-8') as f:
self.config = yaml.safe_load(f)
# 初始化数据库
self.db_path = db_path
self.init_database()
# 初始化批处理ID
self.current_batch_id = self.get_next_batch_id()
# 初始化推送管理器
from push import PushManager
self.push_manager = PushManager(self.config)
# 存储每个源的新条目用于推送
self.feed_new_items = {}
def init_database(self):
"""初始化数据库表"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 创建表来存储RSS条目
cursor.execute('''
CREATE TABLE IF NOT EXISTS feedgrep_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
link TEXT,
description TEXT,
pub_date TEXT,
guid TEXT,
category TEXT,
source_name TEXT,
batch_id INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 创建索引来提高查询速度
cursor.execute('CREATE INDEX IF NOT EXISTS idx_title ON feedgrep_items(title)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_guid ON feedgrep_items(guid)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_link ON feedgrep_items(link)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON feedgrep_items(category)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_source_name ON feedgrep_items(source_name)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_batch_id ON feedgrep_items(batch_id)')
# 为 is_item_exists 方法添加复合索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_source_title_link_guid ON feedgrep_items(source_name, title, link, guid)')
# 为关键词搜索添加复合索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_created_at ON feedgrep_items(created_at DESC)')
# 为分类和时间组合查询添加索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_category_created_at ON feedgrep_items(category, created_at DESC)')
# 为来源和时间组合查询添加索引
cursor.execute('CREATE INDEX IF NOT EXISTS idx_source_name_created_at ON feedgrep_items(source_name, created_at DESC)')
# 不再创建新的batch_counter表,改用配置文件方式存储batch_id
conn.commit()
conn.close()
def get_next_batch_id(self) -> int:
"""
获取下一个批处理ID,并将其加1
Returns:
下一个批处理ID
"""
# 使用feedgrep_items表中的最大batch_id作为当前batch_id,然后加1
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 获取当前最大的batch_id
cursor.execute('SELECT COALESCE(MAX(batch_id), 0) FROM feedgrep_items')
max_batch_id = cursor.fetchone()[0]
conn.close()
# 下一个batch_id应该是最大值加1,最小为1
return max(1, max_batch_id + 1)
except Exception as e:
log.error(f"Error getting next batch ID: {e}")
return 1
def fetch_rss_feed(self, url: str) -> List[Dict]:
"""
获取并解析RSS源
Args:
url: RSS源地址
Returns:
解析后的RSS条目列表
"""
try:
# 设置feedparser的超时和代理(如果需要)
import socket
socket.setdefaulttimeout(30)
feed = feedparser.parse(url)
items = []
for entry in feed.entries:
# 提取关键字段
item = {
'title': getattr(entry, 'title', ''),
'link': getattr(entry, 'link', ''),
'description': getattr(entry, 'summary', ''),
'pub_date': getattr(entry, 'published', ''),
'guid': getattr(entry, 'id', getattr(entry, 'link', ''))
}
items.append(item)
return items
except Exception as e:
log.error(f"Error fetching RSS feed from {url}: {e}")
return []
def is_item_exists(self, guid: str, link: str, title: str, source_name: str) -> bool:
"""
检查条目是否已存在
Args:
guid: 条目的GUID
link: 条目的链接
title: 条目的标题
source_name: 条目来源名称
Returns:
如果条目已存在返回True,否则返回False
"""
max_retries = 3
for attempt in range(max_retries):
try:
conn = sqlite3.connect(self.db_path, timeout=20.0)
cursor = conn.cursor()
cursor.execute(
'SELECT COUNT(*) FROM feedgrep_items WHERE source_name = ? AND title = ? AND link = ?',
(source_name, title, link)
)
count = cursor.fetchone()[0]
conn.close()
return count > 0
except sqlite3.OperationalError as e:
if "database is locked" in str(e) and attempt < max_retries - 1:
time.sleep(1)
continue
else:
log.error(f"Error checking item existence after {attempt+1} attempts: {e}")
if 'conn' in locals():
conn.close()
return False
except Exception as e:
log.error(f"Unexpected error checking item existence: {e}")
if 'conn' in locals():
conn.close()
return False
return False
def save_item(self, item: Dict, category: str, source_name: str) -> bool:
"""
保存单个RSS条目到数据库
Args:
item: RSS条目字典
category: 条目所属类别
source_name: RSS源名称
Returns:
保存成功返回True,否则返回False
"""
# 检查条目是否已存在
if self.is_item_exists(item['guid'], item['link'], item['title'], source_name):
return False # 条目已存在,不需要保存
max_retries = 3
for attempt in range(max_retries):
try:
conn = sqlite3.connect(self.db_path, timeout=20.0)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO feedgrep_items (title, link, description, pub_date, guid, category, source_name, batch_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
item['title'],
item['link'],
item['description'],
item['pub_date'],
item['guid'],
category,
source_name,
self.current_batch_id
))
conn.commit()
conn.close()
log.info(f"[{category} - {source_name}] Saved new item: {item['title']}")
# 记录新条目用于推送
if source_name not in self.feed_new_items:
self.feed_new_items[source_name] = []
self.feed_new_items[source_name].append(item)
return True
except sqlite3.OperationalError as e:
if "database is locked" in str(e) and attempt < max_retries - 1:
time.sleep(1)
continue
else:
log.error(f"Error saving item after {attempt+1} attempts: {e}")
if 'conn' in locals():
conn.close()
return False
except sqlite3.IntegrityError:
# 可能是唯一约束冲突(并发情况下)
if 'conn' in locals():
conn.close()
return False
except Exception as e:
log.error(f"Unexpected error saving item: {e}")
if 'conn' in locals():
conn.close()
return False
return False
def process_feed(self, url: str, category: str, source_name: str):
"""
处理单个RSS源
Args:
url: RSS源地址
category: RSS源所属类别
source_name: RSS源名称
"""
log.info(f"Processing feed: {source_name} ({url}) - Category: {category}")
items = self.fetch_rss_feed(url)
new_items_count = 0
for item in items:
if self.save_item(item, category, source_name):
new_items_count += 1
log.info(f"Feed {source_name} processed. {new_items_count} new items saved.")
# 推送RSS源的新内容
feed_config_list = self.config.get('categories', {}).get(category, [])
feed_config = None
for fc in feed_config_list:
if fc.get('name') == source_name:
feed_config = fc
break
if feed_config and new_items_count > 0:
push_channels = feed_config.get('push_channels', [])
if push_channels:
title = f"[FeedGrep] {source_name} 有 {new_items_count} 条新内容\n"
content = ""
for i, item in enumerate(self.feed_new_items.get(source_name, []), 1):
# 添加序号和超链接到内容
content += f"\n{i}. [{item['title']}]({item['link']})\n"
# 限制总内容长度
if len(content) > 20000:
content += f"\n... 还有更多内容(共{new_items_count}条)"
break
self.push_manager.send_bulk_push(push_channels, title, content)
def process_all_feeds(self):
"""处理所有配置的RSS源"""
log.info("Starting to process all feeds...")
# 生成新的批处理ID
self.current_batch_id = self.get_next_batch_id()
log.info(f"Starting batch processing with batch_id: {self.current_batch_id}")
# 清空之前的新条目记录
self.feed_new_items = {}
# 处理分类的RSS源
categories = self.config.get('categories', {})
for category, feeds in categories.items():
for feed in feeds:
source_name = feed.get('name', 'Unknown')
url = feed.get('url', '')
if url:
try:
self.process_feed(url, category, source_name)
except Exception as e:
log.error(f"Failed to process feed {source_name} ({url}): {e}")
# 处理关键词推送
self.process_keyword_pushes()
log.info("All feeds processed.")
def process_keyword_pushes(self):
"""处理基于关键词的推送"""
if not self.push_manager.push_enabled:
return
# 获取默认关键词配置
default_keywords = self.config.get('default_keywords', [])
# 遍历每个关键词配置
for i, keyword_config in enumerate(default_keywords):
# 检查是否有针对此关键词的推送配置
keyword_push_config = None
if isinstance(keyword_config, dict):
keyword_push_config = keyword_config
keyword_expr = keyword_config.get('keywords', '')
else:
keyword_expr = keyword_config
# 如果没有推送配置,跳过
if isinstance(keyword_push_config, dict) and 'push_channels' not in keyword_push_config:
continue
push_channels = keyword_push_config.get('push_channels', []) if isinstance(keyword_push_config, dict) else []
if not push_channels:
continue
# 搜索匹配该关键词的内容
matched_items = self.search_items_by_keyword(keyword_expr)
# 如果有匹配的内容,则发送推送
if matched_items:
# 构造推送标题和内容
first_keyword = keyword_expr.split()[0] # 取第一个关键词作为标题的一部分
title = f"[FeedGrep关键词] {first_keyword} 有 {len(matched_items)} 条新内容"
content = ""
for i, item in enumerate(matched_items[:20], 1): # 限制最多20条
# 添加序号、来源和超链接到内容
content += f"\n{i}. [{item['source_name']}] [{item['title']}]({item['link']})\n"
if len(matched_items) > 20:
content += f"\n... 还有 {len(matched_items) - 20} 条内容"
# 发送推送
self.push_manager.send_bulk_push(push_channels, title, content)
def search_items_by_keyword(self, keyword):
"""
根据关键词搜索新条目
Args:
keyword: 关键词表达式
Returns:
匹配的条目列表
"""
try:
# 解析关键词语法
required_keywords = [] # 必须包含的关键词 (+)
excluded_keywords = [] # 必须排除的关键词 (-)
normal_keywords = [] # 普通关键词 (空格分隔)
# 解析关键词
parts = keyword.split()
for part in parts:
if part.startswith('+'):
required_keywords.append(part[1:]) # 去掉+号
elif part.startswith('-'):
excluded_keywords.append(part[1:]) # 去掉-号
else:
normal_keywords.append(part)
# 构建查询语句
query_conditions = ["batch_id = ?"] # 只查找当前批次的新内容
params = [self.current_batch_id]
# 处理普通关键词 (OR关系)
if normal_keywords:
or_conditions = []
for kw in normal_keywords:
or_conditions.append("(title LIKE ? OR description LIKE ?)")
params.extend([f"%{kw}%", f"%{kw}%"])
query_conditions.append("(" + " OR ".join(or_conditions) + ")")
# 处理必须关键词 (AND关系)
for kw in required_keywords:
query_conditions.append("(title LIKE ? OR description LIKE ?)")
params.extend([f"%{kw}%", f"%{kw}%"])
# 处理排除关键词
for kw in excluded_keywords:
query_conditions.append("(title NOT LIKE ? AND description NOT LIKE ?)")
params.extend([f"%{kw}%", f"%{kw}%"])
# 基础查询
query = "SELECT * FROM feedgrep_items WHERE " + " AND ".join(query_conditions)
query += " ORDER BY created_at DESC"
# 执行查询
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row # 使结果可以通过列名访问
cursor = conn.cursor()
cursor.execute(query, params)
# 获取结果
rows = cursor.fetchall()
items = [dict(row) for row in rows]
conn.close()
return items
except Exception as e:
log.error(f"搜索关键词 '{keyword}' 时出错: {e}")
return []
def start_scheduler(self):
"""启动定时调度器"""
interval = self.config.get('interval_minutes', 30)
# 安排定时任务
schedule.every(interval).minutes.do(self.process_all_feeds)
# 立即执行一次
self.process_all_feeds()
log.info(f"Scheduler started. Checking RSS feeds every {interval} minutes.")
# 持续运行调度器
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次是否有需要运行的任务
def start_scheduler_async(self):
"""异步启动定时调度器"""
scheduler_thread = threading.Thread(target=self.start_scheduler, daemon=True)
scheduler_thread.start()
return scheduler_thread