吾爱破解 - 52pojie.cn

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 1935|回复: 59
收起左侧

[Python 原创] 【神器推荐】别再手动刷资讯了!让重要信息自动找你FeedGrep

  [复制链接]
zdx0122 发表于 2025-12-3 10:55
本帖最后由 zdx0122 于 2025-12-4 11:10 编辑

🚀【神器推荐】别再手动刷资讯了!让重要信息自动找你——FeedGrep智能监控工具

💡 你是不是也这样?

  • 每天刷十几个网站,就为了等某个产品的降价消息
  • 关注了50个技术博客,却总是错过重要的更新
  • 想监控竞品动态,但手动搜索效率太低
  • 订阅了一堆RSS,结果99%都是无关信息

如果你中枪了,那今天这个工具就是为你量身打造的!

FeedGrep - 让重要信息主动找到你 💫

FeedGrep 是一款专注于 RSS 信息捕获、精准筛选和多渠道推送 的轻量级阅读器。
FeedGrep 支持多源 RSS 抓取、自定义筛选规则、多渠道推送和轻量级 Web UI,让你以最低成本构建自己的信息获取系统。

截图展示

PC端 移动端 推送-微信渠道
image.png image.png bc3f07b0c50f7b493c84d8d298b61528.jpg

功能特点

📡 多源订阅

  • RSS 源订阅 - 支持主流 RSS 格式
  • 灵活定时策略 - 自定义抓取间隔频率

🔍 自定义监控筛选

FeedGrep 提供三种关键词规则类型,可组合使用以实现精确过滤:

类型 描述 示例
普通词 任意词命中即匹配 苹果 手机
必须词 必须全部出现,使用 +前缀(注意中间无空格) +苹果 +发布会
排除词 出现即过滤,使用 -前缀(注意中间无空格) 苹果 -果汁

综合示例:

苹果 华为 +手机 -水果 -价格

意为:
命中“苹果”或“华为”,并必须包含“手机”,排除包含“水果”和“价格”的内容。

🚀典型应用场景

用户角色 监控示例
开发者 开源库更新、技术博客、漏洞通告
购物达人 商品降价、限时优惠、新品上市
市场人员 品牌舆情、竞品动态、行业趋势
普通用户 热点新闻、新闻动态、新闻快讯
投资人员 财经新闻、股票行情、行业动态

🤖 多渠道推送

  • 飞书 - 群机器人推送
  • 企业微信 - 群机器人
  • 微信 - 利用企微-微信插件通道
  • Telegram - Bot 消息推送
  • 邮件 - SMTP 邮件通知

⚙️ 灵活配置

  • rss源管理 - 不限制个数、自由配置
  • 关键词匹配规则 - 支持普通词、必须词、排除词
  • 推送路由 - 单个rss源或者关键词可配置多个推送渠道
  • 消息合并 - 同源同关键词信息合并推送
  • 总开关控制 - 全局推送启用/禁用

快速开始

安装依赖

git clone https://github.com/zdx0122/feedgrep.git
pip install -r requirements.txt

配置

修改 feedgrep.yaml 文件来设置RSS源、检查间隔、关键词匹配规则、推送渠道。

运行

运行以下命令启动FeedGrep处理器:

python feedgrep.py

程序会立即检查所有RSS源,然后按照设定的时间间隔定期检查。

启动后可以通过浏览器访问 http://localhost:8000 查看Web界面。

Bash 脚本方式 (推荐)

项目提供了一个统一的 Bash 脚本来管理服务:

# 启动服务
./feedgrep.sh start

# 停止服务
./feedgrep.sh stop

# 重启服务
./feedgrep.sh restart

本地数据存储

RSS条目被存储在本地的SQLite数据库 feedgrep.db 中,每条记录都会标记其所属的分类和来源名称。

高级关键词搜索语法

FeedGrep支持三种关键词类型,可以通过组合使用实现精确的内容筛选:

  1. 普通词:包含其中任意一个词就会被捕获,多个关键词使用空格分隔

    • 示例:苹果 华为 表示包含"苹果"或"华为"的内容
  2. 必须词:必须同时包含普通词和必须词才会被捕获,使用+前缀标识

    • 示例:苹果 +手机 表示包含"苹果"且必须包含"手机"的内容
  3. 排除词:包含过滤词的新闻会被直接排除,即使包含其他关键词,使用-前缀标识

    • 示例:苹果 -水果 表示包含"苹果"但排除包含"水果"的内容

完整示例:苹果 华为 +手机 -水果 -价格 表示搜索包含"苹果"或"华为",同时必须包含"手机",但排除包含"水果"或"价格"的内容。

自定义关键词快捷搜索

在Web界面左侧边栏中提供了默认关键词的快捷按钮,点击即可快速进行相关搜索。为了保持界面美观,按钮上仅显示关键词组的第一个词,鼠标悬停时会显示完整的关键词配置。

推送功能

FeedGrep支持将新的RSS条目推送到多种渠道:

支持的推送渠道

  1. 飞书群机器人
  2. 企业微信群机器人
  3. 个人微信(基于企业微信应用,在企微后台-微信插件,微信扫码关注,推送到个人微信)
  4. 邮件
  5. Telegram

项目结构

├── feedgrep.py           # 主程序入口
├── feedgrep.yaml         # 配置文件
├── feedgrep.db           # SQLite 本地数据
├── feedgrep.sh           # bash启动脚本
├── index.html            # Web UI
├── api.py                # API模块
├── push.py               # 推送模块
└── utils/                # 日志模块
├── requirements.txt      # 依赖包

配置详解

在配置文件中添加 push 部分来启用推送功能:

push:
  # 推送总开关
  enabled: true

  # 推送时间范围控制开关
  time_restriction_enabled: true

  # 推送时间范围(24小时制,北京时间)
  time_start: "08:00"  # 早上8点
  time_end: "22:00"    # 晚上10点

  # 推送渠道配置
  webhooks:
    # 飞书资讯群
    webhook_feishu:
      type: feishu
      url: https://open.feishu.cn/open-apis/bot/v2/hook/XXXXXXXXXXXXXXXXXX

    # 企业微信群机器人
    webhook_qyweixin:
      type: wework
      wework_msg_type: text  # 可选:text, markdown
      url: https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=XXXXXXXXXXXXXXXXXX

    # 个人微信
    webhook_weixin:
      type: wework
      wework_msg_type: text  # 只可选:text,需要在企微后台扫码关注“微信插件”,其他配置和上述企微机器人一样
      url: https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=XXXXXXXXXXXXXXXXXX

    # 邮件推送
    email_notices:
      type: email
      smtp_server: smtp.example.com
      smtp_port: 587
      username: your_username
      password: your_password
      sender: sender@example.com
      receivers:
        - receiver1@example.com
        - receiver2@example.com

    # Telegram推送
    telegram_channel:
      type: telegram
      bot_token: YOUR_BOT_TOKEN
      chat_id: YOUR_CHAT_ID

推送时间范围控制功能允许您设置推送消息的有效时间窗口。时间范围始终按照北京时间进行计算,无论服务部署在哪个时区。默认情况下,只在早上8点到晚上10点之间推送消息。您可以通过以下配置项控制此功能:

  • time_restriction_enabled: 是否启用时间范围控制,默认为false
  • time_start: 推送开始时间(24小时制,北京时间),默认为"08:00"
  • time_end: 推送结束时间(24小时制,北京时间),默认为"22:00"

为RSS源配置推送渠道

在每个RSS源配置中添加 push_channels 列表来指定该源使用哪些推送渠道:

categories:
  news:
    - name: 阮一峰的网络日志
      url: https://www.ruanyifeng.com/blog/atom.xml
      push_channels:
        - webhook_feishu
        - webhook_weixin

当该RSS源有新内容时,将会推送到指定的渠道。

为关键词配置推送渠道

在关键词配置中添加 push_channels 列表来指定匹配该关键词的内容推送到哪些渠道:

default_keywords:
  - keywords: AI 人工智能 +模型 -air -gai -mail
    push_channels:
      - webhook_feishu
  - 纳斯达克 标普 道琼斯

当有新内容匹配关键词时,将会推送到指定的渠道。关键词推送只会推送最近一次RSS获取到的新内容。

核心代码

class FeedGrepProcessor:
    def __init__(self, config_path: str, db_path: str = "feedgrep.db"):
        """
        初始化FeedGrep处理器

        Args:
            config_path: 配置文件路径
            db_path: SQLite数据库路径
        """
        # 加载配置文件
        with open(config_path, 'r', encoding='utf-8') as f:
            self.config = yaml.safe_load(f)

        # 初始化数据库
        self.db_path = db_path
        self.init_database()

        # 初始化批处理ID
        self.current_batch_id = self.get_next_batch_id()

        # 初始化推送管理器
        from push import PushManager
        self.push_manager = PushManager(self.config)

        # 存储每个源的新条目用于推送
        self.feed_new_items = {}

    def init_database(self):
        """初始化数据库表"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        # 创建表来存储RSS条目
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS feedgrep_items (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                title TEXT,
                link TEXT,
                description TEXT,
                pub_date TEXT,
                guid TEXT,
                category TEXT,
                source_name TEXT,
                batch_id INTEGER DEFAULT 0,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')

        # 创建索引来提高查询速度
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_title ON feedgrep_items(title)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_guid ON feedgrep_items(guid)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_link ON feedgrep_items(link)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON feedgrep_items(category)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_source_name ON feedgrep_items(source_name)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_batch_id ON feedgrep_items(batch_id)')

        # 为 is_item_exists 方法添加复合索引
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_source_title_link_guid ON feedgrep_items(source_name, title, link, guid)')

        # 为关键词搜索添加复合索引
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_created_at ON feedgrep_items(created_at DESC)')

        # 为分类和时间组合查询添加索引
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_category_created_at ON feedgrep_items(category, created_at DESC)')

        # 为来源和时间组合查询添加索引
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_source_name_created_at ON feedgrep_items(source_name, created_at DESC)')

        # 不再创建新的batch_counter表,改用配置文件方式存储batch_id

        conn.commit()
        conn.close()

    def get_next_batch_id(self) -> int:
        """
        获取下一个批处理ID,并将其加1

        Returns:
            下一个批处理ID
        """
        # 使用feedgrep_items表中的最大batch_id作为当前batch_id,然后加1
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()

            # 获取当前最大的batch_id
            cursor.execute('SELECT COALESCE(MAX(batch_id), 0) FROM feedgrep_items')
            max_batch_id = cursor.fetchone()[0]

            conn.close()

            # 下一个batch_id应该是最大值加1,最小为1
            return max(1, max_batch_id + 1)

        except Exception as e:
            log.error(f"Error getting next batch ID: {e}")
            return 1

    def fetch_rss_feed(self, url: str) -> List[Dict]:
        """
        获取并解析RSS源

        Args:
            url: RSS源地址

        Returns:
            解析后的RSS条目列表
        """
        try:
            # 设置feedparser的超时和代理(如果需要)
            import socket
            socket.setdefaulttimeout(30)
            feed = feedparser.parse(url)
            items = []

            for entry in feed.entries:
                # 提取关键字段
                item = {
                    'title': getattr(entry, 'title', ''),
                    'link': getattr(entry, 'link', ''),
                    'description': getattr(entry, 'summary', ''),
                    'pub_date': getattr(entry, 'published', ''),
                    'guid': getattr(entry, 'id', getattr(entry, 'link', ''))
                }

                items.append(item)

            return items
        except Exception as e:
            log.error(f"Error fetching RSS feed from {url}: {e}")
            return []

    def is_item_exists(self, guid: str, link: str, title: str, source_name: str) -> bool:
        """
        检查条目是否已存在

        Args:
            guid: 条目的GUID
            link: 条目的链接
            title: 条目的标题
            source_name: 条目来源名称

        Returns:
            如果条目已存在返回True,否则返回False
        """
        max_retries = 3
        for attempt in range(max_retries):
            try:
                conn = sqlite3.connect(self.db_path, timeout=20.0)
                cursor = conn.cursor()

                cursor.execute(
                    'SELECT COUNT(*) FROM feedgrep_items WHERE source_name = ? AND title = ? AND link = ?',
                    (source_name, title, link)
                )
                count = cursor.fetchone()[0]

                conn.close()
                return count > 0
            except sqlite3.OperationalError as e:
                if "database is locked" in str(e) and attempt < max_retries - 1:
                    time.sleep(1)
                    continue
                else:
                    log.error(f"Error checking item existence after {attempt+1} attempts: {e}")
                    if 'conn' in locals():
                        conn.close()
                    return False
            except Exception as e:
                log.error(f"Unexpected error checking item existence: {e}")
                if 'conn' in locals():
                    conn.close()
                return False
        return False

    def save_item(self, item: Dict, category: str, source_name: str) -> bool:
        """
        保存单个RSS条目到数据库

        Args:
            item: RSS条目字典
            category: 条目所属类别
            source_name: RSS源名称

        Returns:
            保存成功返回True,否则返回False
        """
        # 检查条目是否已存在
        if self.is_item_exists(item['guid'], item['link'], item['title'], source_name):
            return False  # 条目已存在,不需要保存

        max_retries = 3
        for attempt in range(max_retries):
            try:
                conn = sqlite3.connect(self.db_path, timeout=20.0)
                cursor = conn.cursor()

                cursor.execute('''
                    INSERT INTO feedgrep_items (title, link, description, pub_date, guid, category, source_name, batch_id)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                ''', (
                    item['title'],
                    item['link'],
                    item['description'],
                    item['pub_date'],
                    item['guid'],
                    category,
                    source_name,
                    self.current_batch_id
                ))

                conn.commit()
                conn.close()

                log.info(f"[{category} - {source_name}] Saved new item: {item['title']}")

                # 记录新条目用于推送
                if source_name not in self.feed_new_items:
                    self.feed_new_items[source_name] = []
                self.feed_new_items[source_name].append(item)

                return True
            except sqlite3.OperationalError as e:
                if "database is locked" in str(e) and attempt < max_retries - 1:
                    time.sleep(1)
                    continue
                else:
                    log.error(f"Error saving item after {attempt+1} attempts: {e}")
                    if 'conn' in locals():
                        conn.close()
                    return False
            except sqlite3.IntegrityError:
                # 可能是唯一约束冲突(并发情况下)
                if 'conn' in locals():
                    conn.close()
                return False
            except Exception as e:
                log.error(f"Unexpected error saving item: {e}")
                if 'conn' in locals():
                    conn.close()
                return False
        return False

    def process_feed(self, url: str, category: str, source_name: str):
        """
        处理单个RSS源

        Args:
            url: RSS源地址
            category: RSS源所属类别
            source_name: RSS源名称
        """
        log.info(f"Processing feed: {source_name} ({url}) - Category: {category}")
        items = self.fetch_rss_feed(url)

        new_items_count = 0
        for item in items:
            if self.save_item(item, category, source_name):
                new_items_count += 1

        log.info(f"Feed {source_name} processed. {new_items_count} new items saved.")

        # 推送RSS源的新内容
        feed_config_list = self.config.get('categories', {}).get(category, [])
        feed_config = None
        for fc in feed_config_list:
            if fc.get('name') == source_name:
                feed_config = fc
                break

        if feed_config and new_items_count > 0:
            push_channels = feed_config.get('push_channels', [])
            if push_channels:
                title = f"[FeedGrep] {source_name} 有 {new_items_count} 条新内容\n"
                content = ""

                for i, item in enumerate(self.feed_new_items.get(source_name, []), 1):
                    # 添加序号和超链接到内容
                    content += f"\n{i}. [{item['title']}]({item['link']})\n"

                    # 限制总内容长度
                    if len(content) > 20000:
                        content += f"\n... 还有更多内容(共{new_items_count}条)"
                        break

                self.push_manager.send_bulk_push(push_channels, title, content)

    def process_all_feeds(self):
        """处理所有配置的RSS源"""
        log.info("Starting to process all feeds...")

        # 生成新的批处理ID
        self.current_batch_id = self.get_next_batch_id()
        log.info(f"Starting batch processing with batch_id: {self.current_batch_id}")

        # 清空之前的新条目记录
        self.feed_new_items = {}

        # 处理分类的RSS源
        categories = self.config.get('categories', {})
        for category, feeds in categories.items():
            for feed in feeds:
                source_name = feed.get('name', 'Unknown')
                url = feed.get('url', '')
                if url:
                    try:
                        self.process_feed(url, category, source_name)
                    except Exception as e:
                        log.error(f"Failed to process feed {source_name} ({url}): {e}")

        # 处理关键词推送
        self.process_keyword_pushes()

        log.info("All feeds processed.")

    def process_keyword_pushes(self):
        """处理基于关键词的推送"""
        if not self.push_manager.push_enabled:
            return

        # 获取默认关键词配置
        default_keywords = self.config.get('default_keywords', [])

        # 遍历每个关键词配置
        for i, keyword_config in enumerate(default_keywords):
            # 检查是否有针对此关键词的推送配置
            keyword_push_config = None
            if isinstance(keyword_config, dict):
                keyword_push_config = keyword_config
                keyword_expr = keyword_config.get('keywords', '')
            else:
                keyword_expr = keyword_config

            # 如果没有推送配置,跳过
            if isinstance(keyword_push_config, dict) and 'push_channels' not in keyword_push_config:
                continue

            push_channels = keyword_push_config.get('push_channels', []) if isinstance(keyword_push_config, dict) else []
            if not push_channels:
                continue

            # 搜索匹配该关键词的内容
            matched_items = self.search_items_by_keyword(keyword_expr)

            # 如果有匹配的内容,则发送推送
            if matched_items:
                # 构造推送标题和内容
                first_keyword = keyword_expr.split()[0]  # 取第一个关键词作为标题的一部分
                title = f"[FeedGrep关键词] {first_keyword} 有 {len(matched_items)} 条新内容"

                content = ""

                for i, item in enumerate(matched_items[:20], 1):  # 限制最多20条
                    # 添加序号、来源和超链接到内容
                    content += f"\n{i}. [{item['source_name']}] [{item['title']}]({item['link']})\n"

                if len(matched_items) > 20:
                    content += f"\n... 还有 {len(matched_items) - 20} 条内容"

                # 发送推送
                self.push_manager.send_bulk_push(push_channels, title, content)

    def search_items_by_keyword(self, keyword):
        """
        根据关键词搜索新条目

        Args:
            keyword: 关键词表达式

        Returns:
            匹配的条目列表
        """
        try:
            # 解析关键词语法
            required_keywords = []  # 必须包含的关键词 (+)
            excluded_keywords = []  # 必须排除的关键词 (-)
            normal_keywords = []    # 普通关键词 (空格分隔)

            # 解析关键词
            parts = keyword.split()
            for part in parts:
                if part.startswith('+'):
                    required_keywords.append(part[1:])  # 去掉+号
                elif part.startswith('-'):
                    excluded_keywords.append(part[1:])  # 去掉-号
                else:
                    normal_keywords.append(part)

            # 构建查询语句
            query_conditions = ["batch_id = ?"]  # 只查找当前批次的新内容
            params = [self.current_batch_id]

            # 处理普通关键词 (OR关系)
            if normal_keywords:
                or_conditions = []
                for kw in normal_keywords:
                    or_conditions.append("(title LIKE ? OR description LIKE ?)")
                    params.extend([f"%{kw}%", f"%{kw}%"])
                query_conditions.append("(" + " OR ".join(or_conditions) + ")")

            # 处理必须关键词 (AND关系)
            for kw in required_keywords:
                query_conditions.append("(title LIKE ? OR description LIKE ?)")
                params.extend([f"%{kw}%", f"%{kw}%"])

            # 处理排除关键词
            for kw in excluded_keywords:
                query_conditions.append("(title NOT LIKE ? AND description NOT LIKE ?)")
                params.extend([f"%{kw}%", f"%{kw}%"])

            # 基础查询
            query = "SELECT * FROM feedgrep_items WHERE " + " AND ".join(query_conditions)
            query += " ORDER BY created_at DESC"

            # 执行查询
            conn = sqlite3.connect(self.db_path)
            conn.row_factory = sqlite3.Row  # 使结果可以通过列名访问
            cursor = conn.cursor()
            cursor.execute(query, params)

            # 获取结果
            rows = cursor.fetchall()
            items = [dict(row) for row in rows]

            conn.close()

            return items
        except Exception as e:
            log.error(f"搜索关键词 '{keyword}' 时出错: {e}")
            return []

    def start_scheduler(self):
        """启动定时调度器"""
        interval = self.config.get('interval_minutes', 30)

        # 安排定时任务
        schedule.every(interval).minutes.do(self.process_all_feeds)

        # 立即执行一次
        self.process_all_feeds()

        log.info(f"Scheduler started. Checking RSS feeds every {interval} minutes.")

        # 持续运行调度器
        while True:
            schedule.run_pending()
            time.sleep(60)  # 每分钟检查一次是否有需要运行的任务

    def start_scheduler_async(self):
        """异步启动定时调度器"""
        scheduler_thread = threading.Thread(target=self.start_scheduler, daemon=True)
        scheduler_thread.start()
        return scheduler_thread

feedgrep-main.zip

40.73 KB, 下载次数: 123, 下载积分: 吾爱币 -1 CB

免费评分

参与人数 6吾爱币 +12 热心值 +6 收起 理由
xiaoxinbai + 1 + 1 谢谢@Thanks!
szlutom + 1 + 1 我很赞同!
shaunkelly + 1 + 1 我很赞同!
yimin + 1 + 1 谢谢@Thanks!
hrh123 + 7 + 1 欢迎分析讨论交流,吾爱破解论坛有你更精彩!
EIK + 1 + 1 热心回复!

查看全部评分

本帖被以下淘专辑推荐:

发帖前要善用论坛搜索功能,那里可能会有你要找的答案或者已经有人发布过相同内容了,请勿重复发帖。

 楼主| zdx0122 发表于 2025-12-3 14:11
我自己搭建了一个demo,供参照:https://rc.cx
开源地址在这里,求个star:https://github.com/zdx0122/feedgrep
shaunkelly 发表于 2025-12-3 12:45
蛋蛋的小忧伤 发表于 2025-12-3 12:57
w360 发表于 2025-12-3 13:01
试一下看看好用吗
w360 发表于 2025-12-3 13:12
没会用啊
cick 发表于 2025-12-3 13:21
我第一想到的是查看招投标指定关键词信息
q3125418 发表于 2025-12-3 13:53
这不重要,能不能搞八卦小道国际新闻。要最新的 ,比如懂王今早没拉使,对鲍威尔进行批斗
 楼主| zdx0122 发表于 2025-12-3 14:10
q3125418 发表于 2025-12-3 13:53
这不重要,能不能搞八卦小道国际新闻。要最新的 ,比如懂王今早没拉使,对鲍威尔进行批斗

需要有信息源才行
zwxiii 发表于 2025-12-3 14:13
py真好用啊,好多脚本之类的都是用它
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

返回列表

RSS订阅|小黑屋|处罚记录|联系我们|吾爱破解 - 52pojie.cn ( 京ICP备16042023号 | 京公网安备 11010502030087号 )

GMT+8, 2025-12-12 11:42

Powered by Discuz!

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表