import feedparser import pymysql import json import os class Rss(): def __init__(self, id, type, url, result_handler) -> None: self.rss_url = url self.id = id self.type = type # self.rss_pipe_handler = rss_pipe_handler self.result_handler = result_handler self.rss_db = RssDB() def run(self): print('Info: Rss handler start!') rss_source = feedparser.parse(self.rss_url) rss_source = self.compare_entries(rss_source['entries']) db_source = self.rss_db.fetchData(self.type, 0) result_list = rss_source + [json.loads(i[1]) for i in db_source] print('need handle rss entry number: ' + str(len(result_list))) # 调用对应类型的处理函数 (success_entries, failed_entries) = self.result_handler(result_list) # 保存处理结果 self.save_result(success_entries, failed_entries) print('Info: Rss handler completed!') print('Info: success entries has ({success_num})!! failed entries has ({failed_num})'.format(success_num=str(len(success_entries)), failed_num=str(len(failed_entries)))) def compare_entries(self, entries): db_list = self.rss_db.fetchData(self.type, None); success_ids = [i[0] for i in db_list] result = [] for item in entries: if not(item['id'] in success_ids): result.append(item) return result def save_result(self, success_entries, failed_entries): success_list = [{ 'entry_id': entry['id'], 'entry_content': '', 'type': self.type, 'is_success': 1, 'gmt_create': entry['updated'] } for entry in success_entries] failed_list = [{ 'entry_id': entry['id'], 'entry_content': json.dumps(entry), 'type': self.type, 'is_success': 0, 'gmt_create': entry['updated'] } for entry in failed_entries] list = success_list + failed_list for item in list: self.rss_db.updateData(item['entry_id'], item['entry_content'], item['type'], item['is_success'], item['gmt_create']) class RssDB(): def __init(self): pass def connect(self): return pymysql.connect(host=os.getenv('db_host'), user=os.getenv('db_user'), passwd=os.getenv('db_passwd'), port=os.getenv('db_port'), db=os.getenv('db_db')) def fetchData(self, type, is_success): db = self.connect() cursor = db.cursor() sql = '' if is_success is None: sql = 'SELECT entry_id, entry_content FROM rss_log WHERE type=%s' cursor.execute(sql, type) else: sql = 'SELECT entry_id, entry_content FROM rss_log WHERE type=%s AND is_success=%s' cursor.execute(sql, (type, is_success)) data = cursor.fetchall() db.close() return data def updateData(self, entry_id, entry_content, type, is_success, gmt_create): db = self.connect() cursor = db.cursor() sql = 'SELECT id FROM rss_log WHERE entry_id=%s' cursor.execute(sql, entry_id) key = cursor.fetchone() if key is None: try: sql = 'INSERT INTO rss_log ( entry_id, entry_content, type, is_success, gmt_create ) VALUES (%s, %s, %s, %s, %s)' cursor.execute(sql, (entry_id, entry_content, type, is_success, gmt_create)) db.commit() # print('Info: INSERT success' + entry_id) except: db.rollback() print('Error: INSERT failed' + entry_id) else: try: sql = 'UPDATE rss_log SET entry_id=%s, entry_content=%s, type=%s, is_success=%s, gmt_create=%s WHERE id=%s' cursor.execute(sql, (entry_id, entry_content, type, is_success, gmt_create, key)) db.commit() # print('Info: Update success' + entry_id) except: db.rollback() print('Error: Update failed' + entry_id) db.close()