Skip to content

Commit

Permalink
定时任务插件:优化一次性定时任务的load
Browse files Browse the repository at this point in the history
  • Loading branch information
黄传 committed Jan 4, 2025
1 parent bf7063b commit 0c2b80b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 32 deletions.
5 changes: 3 additions & 2 deletions plugins/scheduler_plugin/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
class ScheduledTask:
id: str
name: str
cron: str # cron 表达式
cron: str # cron 表达式,一次性任务为空字符串
task_content: str # 任务内容/消息内容
chat_id: str # 关联的聊天ID
created_at: datetime
next_run_time: Optional[datetime] = None
last_run_time: Optional[datetime] = None
last_run_time: Optional[datetime] = None
is_one_time: bool = False # 新增字段,标识是否为一次性任务
60 changes: 35 additions & 25 deletions plugins/scheduler_plugin/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,14 @@ async def _execute_task(self, task: ScheduledTask):
logger.debug(f"Created message object with content: {task.task_content}")

# 在执行时获取 OneBotAdapter
onebot_adapter = None
for adapter_name, adapter in self.im_manager.adapters.items():
if isinstance(adapter, OneBotAdapter):
onebot_adapter = adapter
break

if not onebot_adapter:
raise RuntimeError("No OneBotAdapter instance found in IMManager")
try:
await adapter.handle_message(task.chat_id, message)
except Exception as e:
logger.warning(f"{adapter_name} handle_message fail")

# 发送消息
await onebot_adapter.handle_message(task.chat_id, message)

logger.info(f"Task {task.id} executed successfully and message sent")

# 更新任务状态
Expand All @@ -176,22 +173,34 @@ async def load_tasks(self):

for task in tasks:
try:
# 创建 CronTrigger
trigger = CronTrigger.from_crontab(task.cron)

# 添加到调度器
job = self.scheduler.add_job(
self._execute_task,
trigger,
args=[task],
id=task.id
)

# 更新下次运行时间
task.next_run_time = job.next_run_time
self.storage.save_task(task)

logger.info(f"Successfully loaded task: {task.id} ({task.name})")
if task.is_one_time:
# 对于一次性任务,如果下次运行时间已过,则跳过
if task.next_run_time and task.next_run_time > datetime.now():
self.scheduler.add_job(
self._execute_one_time_task,
'date',
run_date=task.next_run_time,
args=[task],
id=task.id
)
logger.info(f"Successfully loaded one-time task: {task.id} ({task.name})")
else:
# 删除过期的一次性任务
self.delete_task(task.id)
logger.info(f"Removed expired one-time task: {task.id}")
else:
# 周期性任务的处理保持不变
trigger = CronTrigger.from_crontab(task.cron)
job = self.scheduler.add_job(
self._execute_task,
trigger,
args=[task],
id=task.id
)
task.next_run_time = job.next_run_time
self.storage.save_task(task)
logger.info(f"Successfully loaded periodic task: {task.id} ({task.name})")

except Exception as e:
logger.error(f"Failed to load task {task.id}: {str(e)}")
continue
Expand All @@ -212,7 +221,8 @@ async def create_one_time_task(self, name: str, minutes: int, task_content: str,
cron="", # 一次性任务不需要cron表达式
task_content=task_content,
chat_id=chat_id,
created_at=datetime.now()
created_at=datetime.now(),
is_one_time=True # 标记为一次性任务
)

try:
Expand Down
13 changes: 8 additions & 5 deletions plugins/scheduler_plugin/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def _init_db(self):
chat_id TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
next_run_time TIMESTAMP,
last_run_time TIMESTAMP
last_run_time TIMESTAMP,
is_one_time BOOLEAN DEFAULT 0
)
''')

Expand All @@ -31,8 +32,8 @@ def save_task(self, task: ScheduledTask):
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
INSERT OR REPLACE INTO scheduled_tasks
(id, name, cron, task_content, chat_id, created_at, next_run_time, last_run_time)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
(id, name, cron, task_content, chat_id, created_at, next_run_time, last_run_time, is_one_time)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
task.id,
task.name,
Expand All @@ -41,7 +42,8 @@ def save_task(self, task: ScheduledTask):
task.chat_id,
task.created_at.isoformat(),
task.next_run_time.isoformat() if task.next_run_time else None,
task.last_run_time.isoformat() if task.last_run_time else None
task.last_run_time.isoformat() if task.last_run_time else None,
task.is_one_time
))

def get_task(self, task_id: str) -> Optional[ScheduledTask]:
Expand Down Expand Up @@ -84,5 +86,6 @@ def _row_to_task(self, row) -> ScheduledTask:
chat_id=row[4],
created_at=datetime.fromisoformat(row[5]),
next_run_time=datetime.fromisoformat(row[6]) if row[6] else None,
last_run_time=datetime.fromisoformat(row[7]) if row[7] else None
last_run_time=datetime.fromisoformat(row[7]) if row[7] else None,
is_one_time=bool(row[8]) if len(row) > 8 else False
)

0 comments on commit 0c2b80b

Please sign in to comment.