package worker import ( "context" "fmt" "log" "time" "github.com/zs/InsightReply/internal/model" "github.com/zs/InsightReply/internal/repository" "github.com/zs/InsightReply/internal/scraper" ) type MonitorWorker struct { repo *repository.CompetitorMonitorRepository tweetRepo *repository.TweetRepository client *scraper.ScraperClient baseUrl string } func NewMonitorWorker(repo *repository.CompetitorMonitorRepository, tweetRepo *repository.TweetRepository) *MonitorWorker { return &MonitorWorker{ repo: repo, tweetRepo: tweetRepo, client: scraper.NewScraperClient(), baseUrl: "https://x.beenglish.eu.org", // Self-hosted Nitter instance } } // Start begins the background job loop. This should be run in a goroutine. func (w *MonitorWorker) Start(ctx context.Context, interval time.Duration) { log.Printf("[MonitorWorker] Starting background scraping loop every %v", interval) ticker := time.NewTicker(interval) defer ticker.Stop() // Initial run w.runCycle(ctx) for { select { case <-ctx.Done(): log.Println("[MonitorWorker] Stopping background scraping loop") return case <-ticker.C: w.runCycle(ctx) } } } func (w *MonitorWorker) runCycle(ctx context.Context) { log.Println("[MonitorWorker] Starting scrape cycle...") monitors, err := w.repo.ListAllActive() if err != nil { log.Printf("[MonitorWorker] Error fetching active monitors: %v", err) return } if len(monitors) == 0 { log.Println("[MonitorWorker] No active monitors found. Skipping cycle.") return } for _, monitor := range monitors { // Stop processing if context cancelled (e.g., app shutdown) select { case <-ctx.Done(): return default: } // Determine Scraping Strategy var url string // URL encode the brand name which acts as our keyword keyword := monitor.BrandName if monitor.XHandle != "" { if monitor.XHandle == keyword || keyword == "" { // Standard profile timeline scraping log.Printf("[MonitorWorker] Scraping timeline for account @%s", monitor.XHandle) url = fmt.Sprintf("%s/%s", w.baseUrl, monitor.XHandle) } else { // Combo scraping: Keyword + Specific Account log.Printf("[MonitorWorker] Scraping combo: '%s' from @%s", keyword, monitor.XHandle) url = fmt.Sprintf("%s/search?f=tweets&q=%s+from%%3A%s", w.baseUrl, keyword, monitor.XHandle) } } else if keyword != "" { // Global search for Keyword across X log.Printf("[MonitorWorker] Scraping global search for keyword: '%s'", keyword) url = fmt.Sprintf("%s/search?f=tweets&q=%s", w.baseUrl, keyword) } else { continue // Invalid monitor config } w.scrapeAndLog(url) // Anti-Ban: Jitter delay between requests (3s to 8s) w.client.JitterDelay(3000, 8000) } log.Println("[MonitorWorker] Scrape cycle completed.") } func (w *MonitorWorker) scrapeAndLog(url string) { htmlData, err := w.client.Fetch(url) if err != nil { log.Printf("[MonitorWorker] Error scraping %s: %v", url, err) return } tweets, err := scraper.ParseTimeline(htmlData) if err != nil { log.Printf("[MonitorWorker] Error parsing HTML for %s: %v", url, err) return } log.Printf("[MonitorWorker] Extracted %d tweets from %s", len(tweets), url) // Epic 6: Upsert into tracking database upsertCount := 0 for _, rawTweet := range tweets { tweet := &model.Tweet{ XTweetID: rawTweet.ID, AuthorHandle: rawTweet.Handle, Content: rawTweet.Content, PostedAt: rawTweet.CreatedAt, LikeCount: rawTweet.Likes, RetweetCount: rawTweet.Retweets, ReplyCount: rawTweet.Replies, CrawlQueue: "normal", IsProcessed: false, LastCrawledAt: time.Now(), } // Save/Update in DB err := w.tweetRepo.Upsert(tweet) if err != nil { log.Printf("[MonitorWorker] Error UPSERTing tweet %s: %v", tweet.XTweetID, err) } else { upsertCount++ } } log.Printf("[MonitorWorker] Successfully Upserted %d/%d tweets to the database.", upsertCount, len(tweets)) }