feat: playbook执行日志实时流式推送(SSE) + 任务详情显示原始日志
Bu işleme şunda yer alıyor:
@@ -2,6 +2,7 @@ package handlers
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/ansible-deploy/internal/models"
|
||||
"github.com/ansible-deploy/internal/services"
|
||||
@@ -325,6 +326,57 @@ func (h *AnsibleHandler) GetTask(c *gin.Context) {
|
||||
})
|
||||
}
|
||||
|
||||
// StreamTaskOutput SSE 流式推送任务日志
|
||||
func (h *AnsibleHandler) StreamTaskOutput(c *gin.Context) {
|
||||
id := c.Param("id")
|
||||
task := h.service.GetTask(id)
|
||||
if task == nil {
|
||||
c.JSON(http.StatusNotFound, gin.H{"code": 404, "msg": "任务不存在"})
|
||||
return
|
||||
}
|
||||
|
||||
c.Header("Content-Type", "text/event-stream")
|
||||
c.Header("Cache-Control", "no-cache")
|
||||
c.Header("Connection", "keep-alive")
|
||||
|
||||
lastLen := 0
|
||||
for {
|
||||
// 检查客户端是否断开
|
||||
if c.Request.Context().Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
task := h.service.GetTask(id)
|
||||
if task == nil {
|
||||
return
|
||||
}
|
||||
|
||||
output := task.Output
|
||||
if len(output) > lastLen {
|
||||
// 只发送增量
|
||||
increment := output[lastLen:]
|
||||
lastLen = len(output)
|
||||
c.SSEvent("log", increment)
|
||||
c.Writer.Flush()
|
||||
}
|
||||
|
||||
if task.Status != "running" {
|
||||
// 任务完成,发送最终状态
|
||||
c.SSEvent("status", task.Status)
|
||||
c.SSEvent("error", task.Error)
|
||||
c.Writer.Flush()
|
||||
return
|
||||
}
|
||||
|
||||
// 等 500ms 再推送
|
||||
select {
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
case <-c.Request.Context().Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// CancelTask 取消任务
|
||||
func (h *AnsibleHandler) CancelTask(c *gin.Context) {
|
||||
id := c.Param("id")
|
||||
|
||||
@@ -703,12 +703,45 @@ func (s *AnsibleService) runPlaybook(task *models.TaskExecution, playbookPath st
|
||||
cmd = exec.CommandContext(ctx, "ansible-playbook", args...)
|
||||
}
|
||||
|
||||
var output bytes.Buffer
|
||||
cmd.Stdout = &output
|
||||
cmd.Stderr = &output
|
||||
// 实时写入日志的 Writer
|
||||
sw := &syncWriter{buf: bytes.NewBuffer(nil)}
|
||||
cmd.Stdout = sw
|
||||
cmd.Stderr = sw
|
||||
|
||||
// 启动 goroutine 实时搬运日志到 task.Output
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
ticker := time.NewTicker(200 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
var lastLen int
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
s.taskLock.Lock()
|
||||
sw.mu.Lock()
|
||||
currentLen := sw.buf.Len()
|
||||
if currentLen > lastLen {
|
||||
task.Output = sw.buf.String()
|
||||
lastLen = currentLen
|
||||
}
|
||||
sw.mu.Unlock()
|
||||
s.taskLock.Unlock()
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
err := cmd.Run()
|
||||
close(done) // 通知 goroutine 退出
|
||||
|
||||
// 最终同步一次完整日志
|
||||
sw.mu.Lock()
|
||||
finalOutput := sw.buf.String()
|
||||
sw.mu.Unlock()
|
||||
|
||||
s.taskLock.Lock()
|
||||
task.Output = finalOutput
|
||||
task.EndTime = time.Now()
|
||||
if err != nil {
|
||||
task.Status = "failed"
|
||||
@@ -716,7 +749,25 @@ func (s *AnsibleService) runPlaybook(task *models.TaskExecution, playbookPath st
|
||||
} else {
|
||||
task.Status = "success"
|
||||
}
|
||||
task.Output = output.String()
|
||||
s.taskLock.Unlock()
|
||||
}
|
||||
|
||||
// syncWriter 线程安全的 Writer
|
||||
type syncWriter struct {
|
||||
buf *bytes.Buffer
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (w *syncWriter) Write(p []byte) (n int, err error) {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
return w.buf.Write(p)
|
||||
}
|
||||
|
||||
func (w *syncWriter) String() string {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
return w.buf.String()
|
||||
}
|
||||
|
||||
// ListPlaybooks 列出可用Playbooks
|
||||
|
||||
Yeni konuda referans
Bir kullanıcı engelle