feat: 部署初版测试
This commit is contained in:
@@ -1,22 +1,35 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/zs/InsightReply/internal/service"
|
||||
)
|
||||
|
||||
type AIHandler struct {
|
||||
svc *service.AIService
|
||||
svc *service.AIService
|
||||
profileSvc *service.ProductProfileService
|
||||
strategySvc *service.CustomStrategyService
|
||||
}
|
||||
|
||||
func NewAIHandler(svc *service.AIService) *AIHandler {
|
||||
return &AIHandler{svc: svc}
|
||||
func NewAIHandler(svc *service.AIService, profileSvc *service.ProductProfileService, strategySvc *service.CustomStrategyService) *AIHandler {
|
||||
return &AIHandler{
|
||||
svc: svc,
|
||||
profileSvc: profileSvc,
|
||||
strategySvc: strategySvc,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *AIHandler) Test(w http.ResponseWriter, r *http.Request) {
|
||||
// ...
|
||||
ctx := r.Context()
|
||||
msg, err := h.svc.TestConnection(ctx)
|
||||
if err != nil {
|
||||
SendError(w, http.StatusInternalServerError, 5000, err.Error())
|
||||
return
|
||||
}
|
||||
SendSuccess(w, map[string]string{"status": msg})
|
||||
}
|
||||
|
||||
func (h *AIHandler) Generate(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -24,6 +37,8 @@ func (h *AIHandler) Generate(w http.ResponseWriter, r *http.Request) {
|
||||
TweetContent string `json:"tweet_content"`
|
||||
Strategy string `json:"strategy"`
|
||||
Identity string `json:"identity"`
|
||||
Provider string `json:"provider,omitempty"`
|
||||
Model string `json:"model,omitempty"`
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||
@@ -37,13 +52,56 @@ func (h *AIHandler) Generate(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
ctx := r.Context()
|
||||
reply, err := h.svc.GenerateReply(ctx, body.TweetContent, body.Strategy, body.Identity)
|
||||
userID := ctx.Value("userID").(string)
|
||||
|
||||
// Fetch Product Profile Context
|
||||
var productContext string
|
||||
if profile, err := h.profileSvc.GetProfile(userID); err == nil && profile.IsActive {
|
||||
productContext = "Product Context: " + profile.ProductName
|
||||
if profile.Tagline != "" {
|
||||
productContext += " - " + profile.Tagline
|
||||
}
|
||||
if profile.KeyFeatures != "" && profile.KeyFeatures != "[]" {
|
||||
productContext += ". Key Features: " + profile.KeyFeatures
|
||||
}
|
||||
if profile.CustomContext != "" {
|
||||
productContext += ". Context: " + profile.CustomContext
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch Custom Strategies Context
|
||||
if strategies, err := h.strategySvc.ListStrategies(userID); err == nil && len(strategies) > 0 {
|
||||
productContext += "\n\nAvailable User Custom Strategies:\n"
|
||||
for _, s := range strategies {
|
||||
productContext += "- " + s.StrategyKey + " (" + s.Label + "): " + s.Description + "\n"
|
||||
}
|
||||
}
|
||||
|
||||
replyString, err := h.svc.GenerateReply(ctx, body.TweetContent, productContext, body.Identity, body.Provider, body.Model)
|
||||
if err != nil {
|
||||
SendError(w, http.StatusBadGateway, 5002, "Failed to generate AI reply: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
SendSuccess(w, map[string]string{
|
||||
"reply": reply,
|
||||
// Clean up potential markdown wrappers from LLM output
|
||||
cleanReply := strings.TrimSpace(replyString)
|
||||
cleanReply = strings.TrimPrefix(cleanReply, "```json")
|
||||
cleanReply = strings.TrimPrefix(cleanReply, "```")
|
||||
cleanReply = strings.TrimSuffix(cleanReply, "```")
|
||||
cleanReply = strings.TrimSpace(cleanReply)
|
||||
|
||||
var replies []map[string]interface{}
|
||||
if err := json.Unmarshal([]byte(cleanReply), &replies); err != nil {
|
||||
// Fallback: return as single string object if parsing totally fails
|
||||
replies = []map[string]interface{}{
|
||||
{
|
||||
"strategy": "Fallback",
|
||||
"content": replyString,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
SendSuccess(w, map[string]interface{}{
|
||||
"replies": replies,
|
||||
})
|
||||
}
|
||||
|
||||
67
server/internal/handler/competitor_monitor_handler.go
Normal file
67
server/internal/handler/competitor_monitor_handler.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/google/uuid"
|
||||
"github.com/zs/InsightReply/internal/model"
|
||||
"github.com/zs/InsightReply/internal/service"
|
||||
)
|
||||
|
||||
type CompetitorMonitorHandler struct {
|
||||
svc *service.CompetitorMonitorService
|
||||
}
|
||||
|
||||
func NewCompetitorMonitorHandler(svc *service.CompetitorMonitorService) *CompetitorMonitorHandler {
|
||||
return &CompetitorMonitorHandler{svc: svc}
|
||||
}
|
||||
|
||||
func (h *CompetitorMonitorHandler) ListMonitors(w http.ResponseWriter, r *http.Request) {
|
||||
userID := r.Context().Value("userID").(string)
|
||||
|
||||
monitors, err := h.svc.ListMonitors(userID)
|
||||
if err != nil {
|
||||
SendError(w, http.StatusInternalServerError, 5001, "Failed to list monitors")
|
||||
return
|
||||
}
|
||||
|
||||
SendSuccess(w, monitors)
|
||||
}
|
||||
|
||||
func (h *CompetitorMonitorHandler) CreateMonitor(w http.ResponseWriter, r *http.Request) {
|
||||
userIDStr := r.Context().Value("userID").(string)
|
||||
userID, err := uuid.Parse(userIDStr)
|
||||
if err != nil {
|
||||
SendError(w, http.StatusUnauthorized, 4010, "Invalid user ID")
|
||||
return
|
||||
}
|
||||
|
||||
var monitor model.CompetitorMonitor
|
||||
if err := json.NewDecoder(r.Body).Decode(&monitor); err != nil {
|
||||
SendError(w, http.StatusBadRequest, 4001, "Invalid request body")
|
||||
return
|
||||
}
|
||||
|
||||
monitor.UserID = userID
|
||||
|
||||
if err := h.svc.CreateMonitor(&monitor); err != nil {
|
||||
SendError(w, http.StatusInternalServerError, 5001, "Failed to create monitor")
|
||||
return
|
||||
}
|
||||
|
||||
SendSuccess(w, monitor)
|
||||
}
|
||||
|
||||
func (h *CompetitorMonitorHandler) DeleteMonitor(w http.ResponseWriter, r *http.Request) {
|
||||
userID := r.Context().Value("userID").(string)
|
||||
monitorID := chi.URLParam(r, "id")
|
||||
|
||||
if err := h.svc.DeleteMonitor(monitorID, userID); err != nil {
|
||||
SendError(w, http.StatusInternalServerError, 5001, "Failed to delete monitor")
|
||||
return
|
||||
}
|
||||
|
||||
SendSuccess(w, map[string]string{"status": "deleted"})
|
||||
}
|
||||
67
server/internal/handler/custom_strategy_handler.go
Normal file
67
server/internal/handler/custom_strategy_handler.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/google/uuid"
|
||||
"github.com/zs/InsightReply/internal/model"
|
||||
"github.com/zs/InsightReply/internal/service"
|
||||
)
|
||||
|
||||
type CustomStrategyHandler struct {
|
||||
svc *service.CustomStrategyService
|
||||
}
|
||||
|
||||
func NewCustomStrategyHandler(svc *service.CustomStrategyService) *CustomStrategyHandler {
|
||||
return &CustomStrategyHandler{svc: svc}
|
||||
}
|
||||
|
||||
func (h *CustomStrategyHandler) ListStrategies(w http.ResponseWriter, r *http.Request) {
|
||||
userID := r.Context().Value("userID").(string)
|
||||
|
||||
strategies, err := h.svc.ListStrategies(userID)
|
||||
if err != nil {
|
||||
SendError(w, http.StatusInternalServerError, 5001, "Failed to list strategies")
|
||||
return
|
||||
}
|
||||
|
||||
SendSuccess(w, strategies)
|
||||
}
|
||||
|
||||
func (h *CustomStrategyHandler) CreateStrategy(w http.ResponseWriter, r *http.Request) {
|
||||
userIDStr := r.Context().Value("userID").(string)
|
||||
userID, err := uuid.Parse(userIDStr)
|
||||
if err != nil {
|
||||
SendError(w, http.StatusUnauthorized, 4010, "Invalid user ID")
|
||||
return
|
||||
}
|
||||
|
||||
var strategy model.UserCustomStrategy
|
||||
if err := json.NewDecoder(r.Body).Decode(&strategy); err != nil {
|
||||
SendError(w, http.StatusBadRequest, 4001, "Invalid request body")
|
||||
return
|
||||
}
|
||||
|
||||
strategy.UserID = userID
|
||||
|
||||
if err := h.svc.CreateStrategy(&strategy); err != nil {
|
||||
SendError(w, http.StatusInternalServerError, 5001, "Failed to create strategy")
|
||||
return
|
||||
}
|
||||
|
||||
SendSuccess(w, strategy)
|
||||
}
|
||||
|
||||
func (h *CustomStrategyHandler) DeleteStrategy(w http.ResponseWriter, r *http.Request) {
|
||||
userID := r.Context().Value("userID").(string)
|
||||
strategyID := chi.URLParam(r, "id")
|
||||
|
||||
if err := h.svc.DeleteStrategy(strategyID, userID); err != nil {
|
||||
SendError(w, http.StatusInternalServerError, 5001, "Failed to delete strategy")
|
||||
return
|
||||
}
|
||||
|
||||
SendSuccess(w, map[string]string{"status": "deleted"})
|
||||
}
|
||||
54
server/internal/handler/product_profile_handler.go
Normal file
54
server/internal/handler/product_profile_handler.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/zs/InsightReply/internal/model"
|
||||
"github.com/zs/InsightReply/internal/service"
|
||||
)
|
||||
|
||||
type ProductProfileHandler struct {
|
||||
svc *service.ProductProfileService
|
||||
}
|
||||
|
||||
func NewProductProfileHandler(svc *service.ProductProfileService) *ProductProfileHandler {
|
||||
return &ProductProfileHandler{svc: svc}
|
||||
}
|
||||
|
||||
func (h *ProductProfileHandler) GetProfile(w http.ResponseWriter, r *http.Request) {
|
||||
userID := r.Context().Value("userID").(string)
|
||||
|
||||
profile, err := h.svc.GetProfile(userID)
|
||||
if err != nil {
|
||||
SendError(w, http.StatusNotFound, 4004, "Product profile not found")
|
||||
return
|
||||
}
|
||||
|
||||
SendSuccess(w, profile)
|
||||
}
|
||||
|
||||
func (h *ProductProfileHandler) SaveProfile(w http.ResponseWriter, r *http.Request) {
|
||||
userIDStr := r.Context().Value("userID").(string)
|
||||
userID, err := uuid.Parse(userIDStr)
|
||||
if err != nil {
|
||||
SendError(w, http.StatusUnauthorized, 4010, "Invalid user ID")
|
||||
return
|
||||
}
|
||||
|
||||
var profile model.UserProductProfile
|
||||
if err := json.NewDecoder(r.Body).Decode(&profile); err != nil {
|
||||
SendError(w, http.StatusBadRequest, 4001, "Invalid request body")
|
||||
return
|
||||
}
|
||||
|
||||
profile.UserID = userID // Ensure user cannot overwrite other's profile
|
||||
|
||||
if err := h.svc.SaveProfile(&profile); err != nil {
|
||||
SendError(w, http.StatusInternalServerError, 5001, "Failed to save product profile")
|
||||
return
|
||||
}
|
||||
|
||||
SendSuccess(w, profile)
|
||||
}
|
||||
97
server/internal/handler/reply_handler.go
Normal file
97
server/internal/handler/reply_handler.go
Normal file
@@ -0,0 +1,97 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/zs/InsightReply/internal/model"
|
||||
"github.com/zs/InsightReply/internal/repository"
|
||||
)
|
||||
|
||||
type ReplyHandler struct {
|
||||
repo *repository.ReplyRepository
|
||||
}
|
||||
|
||||
func NewReplyHandler(repo *repository.ReplyRepository) *ReplyHandler {
|
||||
return &ReplyHandler{repo: repo}
|
||||
}
|
||||
|
||||
func (h *ReplyHandler) RecordReply(w http.ResponseWriter, r *http.Request) {
|
||||
var body struct {
|
||||
TweetID string `json:"tweet_id"`
|
||||
StrategyType string `json:"strategy_type"`
|
||||
Content string `json:"content"`
|
||||
Language string `json:"language"`
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||
SendError(w, http.StatusBadRequest, 4001, "Invalid request body")
|
||||
return
|
||||
}
|
||||
|
||||
ctx := r.Context()
|
||||
userID, ok := ctx.Value("userID").(string)
|
||||
if !ok || userID == "" {
|
||||
SendError(w, http.StatusUnauthorized, 4002, "Unauthorized")
|
||||
return
|
||||
}
|
||||
|
||||
userUUID, err := uuid.Parse(userID)
|
||||
if err != nil {
|
||||
SendError(w, http.StatusBadRequest, 4003, "Invalid user ID format")
|
||||
return
|
||||
}
|
||||
|
||||
// Resolve the raw string X_Tweet_ID into our internal UUID
|
||||
// Create a dummy tweet entry via Upsert if it doesn't exist yet so foreign keys don't panic
|
||||
tweet := &model.Tweet{
|
||||
XTweetID: body.TweetID,
|
||||
Content: body.Content, // Temporarily store AI content as a placeholder if original is missing
|
||||
IsProcessed: false,
|
||||
}
|
||||
|
||||
err = h.repo.UpsertDummyTweet(tweet)
|
||||
if err != nil {
|
||||
SendError(w, http.StatusInternalServerError, 5001, "Failed to resolve tweet reference")
|
||||
return
|
||||
}
|
||||
|
||||
reply := &model.GeneratedReply{
|
||||
UserID: userUUID,
|
||||
TweetID: tweet.ID,
|
||||
StrategyType: body.StrategyType,
|
||||
Content: body.Content,
|
||||
Status: "copied",
|
||||
Language: body.Language,
|
||||
}
|
||||
|
||||
if err := h.repo.CreateGeneratedReply(reply); err != nil {
|
||||
SendError(w, http.StatusInternalServerError, 5002, "Failed to log generated reply")
|
||||
return
|
||||
}
|
||||
|
||||
SendSuccess(w, map[string]string{ "message": "Reply recorded successfully",
|
||||
})
|
||||
}
|
||||
|
||||
// GetReplies handles GET /api/v1/replies
|
||||
func (h *ReplyHandler) GetReplies(w http.ResponseWriter, r *http.Request) {
|
||||
userIDStr := r.Header.Get("X-User-ID")
|
||||
userID, err := uuid.Parse(userIDStr)
|
||||
if err != nil {
|
||||
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
replies, err := h.repo.GetGeneratedRepliesByUser(userID)
|
||||
if err != nil {
|
||||
log.Printf("Failed to get replies: %v", err)
|
||||
http.Error(w, "Failed to get replies", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(replies)
|
||||
}
|
||||
48
server/internal/handler/tweet_handler.go
Normal file
48
server/internal/handler/tweet_handler.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
|
||||
"github.com/zs/InsightReply/internal/repository"
|
||||
)
|
||||
|
||||
type TweetHandler struct {
|
||||
repo *repository.TweetRepository
|
||||
}
|
||||
|
||||
func NewTweetHandler(repo *repository.TweetRepository) *TweetHandler {
|
||||
return &TweetHandler{repo: repo}
|
||||
}
|
||||
|
||||
// GetHotTweets returns the top heating tweets spanning across all tracking targets
|
||||
func (h *TweetHandler) GetHotTweets(w http.ResponseWriter, r *http.Request) {
|
||||
// Standardize to take the top 50 hottest tweets that haven't been manually marked as processed
|
||||
tweets, err := h.repo.GetTopHeatingTweets(50)
|
||||
if err != nil {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{"error": "failed to retrieve hot tweets"})
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(tweets)
|
||||
}
|
||||
|
||||
// SearchTweets provides the multi-rule filtering API for Epic 5
|
||||
func (h *TweetHandler) GetSearchTweets(w http.ResponseWriter, r *http.Request) {
|
||||
keyword := r.URL.Query().Get("keyword")
|
||||
handle := r.URL.Query().Get("handle")
|
||||
|
||||
tweets, err := h.repo.SearchTweets(keyword, handle, 50)
|
||||
if err != nil {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{"error": "failed to search tweets"})
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(tweets)
|
||||
}
|
||||
@@ -34,3 +34,38 @@ func (h *UserHandler) Register(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
SendSuccess(w, user)
|
||||
}
|
||||
|
||||
func (h *UserHandler) GetProfile(w http.ResponseWriter, r *http.Request) {
|
||||
// Assumes JWTAuth middleware has placed userID in context
|
||||
userID := r.Context().Value("userID").(string)
|
||||
|
||||
user, err := h.svc.GetUserByID(userID)
|
||||
if err != nil {
|
||||
SendError(w, http.StatusNotFound, 4004, "User not found")
|
||||
return
|
||||
}
|
||||
|
||||
SendSuccess(w, user)
|
||||
}
|
||||
|
||||
func (h *UserHandler) UpdatePreferences(w http.ResponseWriter, r *http.Request) {
|
||||
userID := r.Context().Value("userID").(string)
|
||||
|
||||
var body struct {
|
||||
IdentityLabel string `json:"identity_label"`
|
||||
LanguagePreference string `json:"language_preference"`
|
||||
}
|
||||
|
||||
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
|
||||
SendError(w, http.StatusBadRequest, 4001, "Invalid request body")
|
||||
return
|
||||
}
|
||||
|
||||
user, err := h.svc.UpdatePreferences(userID, body.IdentityLabel, body.LanguagePreference)
|
||||
if err != nil {
|
||||
SendError(w, http.StatusInternalServerError, 5001, "Failed to update preferences")
|
||||
return
|
||||
}
|
||||
|
||||
SendSuccess(w, user)
|
||||
}
|
||||
|
||||
65
server/internal/middleware/jwt.go
Normal file
65
server/internal/middleware/jwt.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/golang-jwt/jwt/v5"
|
||||
)
|
||||
|
||||
type contextKey string
|
||||
|
||||
const UserIDKey contextKey = "userID"
|
||||
|
||||
func JWTAuth(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
authHeader := r.Header.Get("Authorization")
|
||||
if authHeader == "" {
|
||||
http.Error(w, `{"code":401, "message":"Unauthorized: missing token"}`, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
parts := strings.Split(authHeader, " ")
|
||||
if len(parts) != 2 || parts[0] != "Bearer" {
|
||||
http.Error(w, `{"code":401, "message":"Unauthorized: invalid token format"}`, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
tokenString := parts[1]
|
||||
secret := os.Getenv("JWT_SECRET")
|
||||
|
||||
token, err := jwt.Parse(tokenString, func(t *jwt.Token) (interface{}, error) {
|
||||
if _, ok := t.Method.(*jwt.SigningMethodHMAC); !ok {
|
||||
return nil, jwt.ErrSignatureInvalid
|
||||
}
|
||||
return []byte(secret), nil
|
||||
})
|
||||
|
||||
if err != nil || !token.Valid {
|
||||
http.Error(w, `{"code":401, "message":"Unauthorized: invalid or expired token"}`, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
claims, ok := token.Claims.(jwt.MapClaims)
|
||||
if !ok {
|
||||
http.Error(w, `{"code":401, "message":"Unauthorized: invalid token claims"}`, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
userID, ok := claims["sub"].(string)
|
||||
if !ok || userID == "" {
|
||||
// fallback check inside user_id or id if sub doesn't exist
|
||||
userID, _ = claims["user_id"].(string)
|
||||
}
|
||||
|
||||
if userID == "" {
|
||||
http.Error(w, `{"code":401, "message":"Unauthorized: user ID not found in token"}`, http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
ctx := context.WithValue(r.Context(), UserIDKey, userID)
|
||||
next.ServeHTTP(w, r.WithContext(ctx))
|
||||
})
|
||||
}
|
||||
82
server/internal/middleware/ratelimit.go
Normal file
82
server/internal/middleware/ratelimit.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
var (
|
||||
limiters = make(map[string]*rate.Limiter)
|
||||
limiterMux sync.RWMutex
|
||||
)
|
||||
|
||||
// getLimiter retrieves or creates a rate limiter for a specific user.
|
||||
// Uses a simple token bucket. For strict "10 per day" with distributed persistence,
|
||||
// this should be refactored to use Redis or DB API usage counters.
|
||||
func getLimiter(userID string, tier string) *rate.Limiter {
|
||||
limiterMux.Lock()
|
||||
defer limiterMux.Unlock()
|
||||
|
||||
if limiter, exists := limiters[userID]; exists {
|
||||
return limiter
|
||||
}
|
||||
|
||||
var limiter *rate.Limiter
|
||||
if tier == "Pro" || tier == "Premium" {
|
||||
// Unlimited (e.g., 20 requests per second burst)
|
||||
limiter = rate.NewLimiter(rate.Limit(20), 100)
|
||||
} else {
|
||||
// Free: 10 per day -> replenishes 1 token every 2.4 hours, bucket size 10
|
||||
limiter = rate.NewLimiter(rate.Every(24*time.Hour/10), 10)
|
||||
}
|
||||
|
||||
limiters[userID] = limiter
|
||||
return limiter
|
||||
}
|
||||
|
||||
// RateLimit middleware enforces rate limits based on user tier.
|
||||
// It expects JWTAuth to have already populated UserIDKey in the context.
|
||||
func RateLimit(db *gorm.DB) func(http.Handler) http.Handler {
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
userIDVal := r.Context().Value(UserIDKey)
|
||||
if userIDVal == nil {
|
||||
// Allow if not authenticated strictly, or rate limit by IP
|
||||
// For now, fallback to generic tight limit for anonymous usage
|
||||
ipLimiter := getLimiter(r.RemoteAddr, "Free")
|
||||
if !ipLimiter.Allow() {
|
||||
http.Error(w, `{"code":429, "message":"Too Many Requests: Rate limit exceeded"}`, http.StatusTooManyRequests)
|
||||
return
|
||||
}
|
||||
next.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
userID := userIDVal.(string)
|
||||
|
||||
// Fast DB query to get user tier (ideally cached in Redis in prod)
|
||||
var tier string
|
||||
// Look up active subscription for this user
|
||||
err := db.Table("subscriptions").
|
||||
Select("tier").
|
||||
Where("user_id = ? AND status = 'active'", userID).
|
||||
Scan(&tier).Error
|
||||
|
||||
if err != nil || tier == "" {
|
||||
tier = "Free" // defaults to Free if no active sub
|
||||
}
|
||||
|
||||
limiter := getLimiter(userID, tier)
|
||||
if !limiter.Allow() {
|
||||
http.Error(w, `{"code":429, "message":"Too Many Requests: Daily quota or rate limit exceeded"}`, http.StatusTooManyRequests)
|
||||
return
|
||||
}
|
||||
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
}
|
||||
16
server/internal/model/competitor_monitor.go
Normal file
16
server/internal/model/competitor_monitor.go
Normal file
@@ -0,0 +1,16 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type CompetitorMonitor struct {
|
||||
ID uuid.UUID `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
|
||||
UserID uuid.UUID `gorm:"type:uuid;not null;uniqueIndex:idx_user_competitor" json:"user_id"`
|
||||
BrandName string `gorm:"type:varchar(255);not null;uniqueIndex:idx_user_competitor" json:"brand_name"`
|
||||
XHandle string `gorm:"type:varchar(255)" json:"x_handle"`
|
||||
IsActive bool `gorm:"default:true" json:"is_active"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
18
server/internal/model/generated_reply.go
Normal file
18
server/internal/model/generated_reply.go
Normal file
@@ -0,0 +1,18 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type GeneratedReply struct {
|
||||
ID uuid.UUID `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
|
||||
UserID uuid.UUID `gorm:"type:uuid;not null;index:idx_generated_replies_user_id" json:"user_id"`
|
||||
TweetID uuid.UUID `gorm:"type:uuid;not null;index:idx_generated_replies_tweet_id" json:"tweet_id"`
|
||||
StrategyType string `gorm:"type:varchar(100);not null" json:"strategy_type"`
|
||||
Content string `gorm:"type:text;not null" json:"content"`
|
||||
Status string `gorm:"type:varchar(50);default:'draft'" json:"status"` // draft, copied, posted
|
||||
Language string `gorm:"type:varchar(10);default:'en'" json:"language"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
17
server/internal/model/reply_performance.go
Normal file
17
server/internal/model/reply_performance.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type ReplyPerformance struct {
|
||||
ID uuid.UUID `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
|
||||
ReplyID uuid.UUID `gorm:"type:uuid;not null;index:idx_reply_performance_reply_id" json:"reply_id"`
|
||||
UserID uuid.UUID `gorm:"type:uuid;not null;index:idx_reply_performance_user_id" json:"user_id"`
|
||||
LikeCountIncrease int `gorm:"default:0" json:"like_count_increase"`
|
||||
ReplyCountIncrease int `gorm:"default:0" json:"reply_count_increase"`
|
||||
InteractionRate float64 `gorm:"default:0.0" json:"interaction_rate"`
|
||||
CheckTime time.Time `json:"check_time"`
|
||||
}
|
||||
24
server/internal/model/tweet.go
Normal file
24
server/internal/model/tweet.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type Tweet struct {
|
||||
ID uuid.UUID `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
|
||||
XTweetID string `gorm:"type:varchar(255);uniqueIndex:idx_tweets_x_tweet_id;not null" json:"x_tweet_id"`
|
||||
AuthorID string `gorm:"type:varchar(255)" json:"author_id"`
|
||||
AuthorHandle string `gorm:"type:varchar(255)" json:"author_handle"`
|
||||
Content string `gorm:"type:text;not null" json:"content"`
|
||||
PostedAt time.Time `json:"posted_at"`
|
||||
LikeCount int `gorm:"default:0" json:"like_count"`
|
||||
RetweetCount int `gorm:"default:0" json:"retweet_count"`
|
||||
ReplyCount int `gorm:"default:0" json:"reply_count"`
|
||||
HeatScore float64 `gorm:"default:0.0;index:idx_tweets_heat_score" json:"heat_score"`
|
||||
CrawlQueue string `gorm:"type:varchar(20);default:'normal';index:idx_tweets_crawl_queue" json:"crawl_queue"`
|
||||
IsProcessed bool `gorm:"default:false" json:"is_processed"`
|
||||
LastCrawledAt time.Time `gorm:"index:idx_tweets_crawl_queue" json:"last_crawled_at"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
@@ -7,11 +7,12 @@ import (
|
||||
)
|
||||
|
||||
type User struct {
|
||||
ID uuid.UUID `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
|
||||
Email string `gorm:"unique;not null" json:"email"`
|
||||
PasswordHash string `json:"-"`
|
||||
SubscriptionTier string `gorm:"default:'Free'" json:"subscription_tier"`
|
||||
IdentityLabel string `json:"identity_label"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
ID uuid.UUID `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
|
||||
Email string `gorm:"unique;not null" json:"email"`
|
||||
PasswordHash string `json:"-"`
|
||||
SubscriptionTier string `gorm:"default:'Free'" json:"subscription_tier"`
|
||||
IdentityLabel string `json:"identity_label"`
|
||||
LanguagePreference string `gorm:"default:'auto'" json:"language_preference"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
}
|
||||
|
||||
21
server/internal/model/user_custom_strategy.go
Normal file
21
server/internal/model/user_custom_strategy.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type UserCustomStrategy struct {
|
||||
ID uuid.UUID `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
|
||||
UserID uuid.UUID `gorm:"type:uuid;not null;uniqueIndex:idx_user_strategy_key" json:"user_id"`
|
||||
StrategyKey string `gorm:"type:varchar(100);not null;uniqueIndex:idx_user_strategy_key" json:"strategy_key"`
|
||||
Label string `gorm:"type:varchar(255);not null" json:"label"`
|
||||
Icon string `gorm:"type:varchar(10)" json:"icon"`
|
||||
Description string `gorm:"type:text" json:"description"`
|
||||
PromptTemplate string `gorm:"type:text" json:"prompt_template"`
|
||||
FewShotExamples string `gorm:"type:jsonb;default:'[]'" json:"few_shot_examples"` // Stored as JSON string
|
||||
IsActive bool `gorm:"default:true" json:"is_active"`
|
||||
SortOrder int `gorm:"default:0" json:"sort_order"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
26
server/internal/model/user_product_profile.go
Normal file
26
server/internal/model/user_product_profile.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type UserProductProfile struct {
|
||||
ID uuid.UUID `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
|
||||
UserID uuid.UUID `gorm:"type:uuid;unique;not null" json:"user_id"`
|
||||
ProductName string `gorm:"type:varchar(255)" json:"product_name"`
|
||||
Tagline string `gorm:"type:text" json:"tagline"`
|
||||
Domain string `gorm:"type:varchar(255)" json:"domain"`
|
||||
KeyFeatures string `gorm:"type:jsonb;default:'[]'" json:"key_features"` // Stored as JSON string
|
||||
TargetUsers string `gorm:"type:text" json:"target_users"`
|
||||
ProductUrl string `gorm:"type:varchar(500)" json:"product_url"`
|
||||
Competitors string `gorm:"type:jsonb;default:'[]'" json:"competitors"` // Stored as JSON string
|
||||
RelevanceKeywords string `gorm:"type:jsonb;default:'[]'" json:"relevance_keywords"` // Stored as JSON string
|
||||
CustomContext string `gorm:"type:text" json:"custom_context"`
|
||||
DefaultLLMProvider string `gorm:"type:varchar(50)" json:"default_llm_provider"` // User preferred LLM provider
|
||||
DefaultLLMModel string `gorm:"type:varchar(100)" json:"default_llm_model"` // User preferred LLM model
|
||||
IsActive bool `gorm:"default:true" json:"is_active"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
}
|
||||
34
server/internal/repository/competitor_monitor_repository.go
Normal file
34
server/internal/repository/competitor_monitor_repository.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"github.com/zs/InsightReply/internal/model"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type CompetitorMonitorRepository struct {
|
||||
db *gorm.DB
|
||||
}
|
||||
|
||||
func NewCompetitorMonitorRepository(db *gorm.DB) *CompetitorMonitorRepository {
|
||||
return &CompetitorMonitorRepository{db: db}
|
||||
}
|
||||
|
||||
func (r *CompetitorMonitorRepository) ListByUserID(userID string) ([]model.CompetitorMonitor, error) {
|
||||
var monitors []model.CompetitorMonitor
|
||||
err := r.db.Where("user_id = ? AND is_active = ?", userID, true).Order("created_at desc").Find(&monitors).Error
|
||||
return monitors, err
|
||||
}
|
||||
|
||||
func (r *CompetitorMonitorRepository) ListAllActive() ([]model.CompetitorMonitor, error) {
|
||||
var monitors []model.CompetitorMonitor
|
||||
err := r.db.Where("is_active = ?", true).Find(&monitors).Error
|
||||
return monitors, err
|
||||
}
|
||||
|
||||
func (r *CompetitorMonitorRepository) Create(monitor *model.CompetitorMonitor) error {
|
||||
return r.db.Create(monitor).Error
|
||||
}
|
||||
|
||||
func (r *CompetitorMonitorRepository) Delete(id string, userID string) error {
|
||||
return r.db.Where("id = ? AND user_id = ?", id, userID).Delete(&model.CompetitorMonitor{}).Error
|
||||
}
|
||||
38
server/internal/repository/custom_strategy_repository.go
Normal file
38
server/internal/repository/custom_strategy_repository.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"github.com/zs/InsightReply/internal/model"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type CustomStrategyRepository struct {
|
||||
db *gorm.DB
|
||||
}
|
||||
|
||||
func NewCustomStrategyRepository(db *gorm.DB) *CustomStrategyRepository {
|
||||
return &CustomStrategyRepository{db: db}
|
||||
}
|
||||
|
||||
func (r *CustomStrategyRepository) ListByUserID(userID string) ([]model.UserCustomStrategy, error) {
|
||||
var strategies []model.UserCustomStrategy
|
||||
err := r.db.Where("user_id = ? AND is_active = ?", userID, true).Order("sort_order asc, created_at desc").Find(&strategies).Error
|
||||
return strategies, err
|
||||
}
|
||||
|
||||
func (r *CustomStrategyRepository) Create(strategy *model.UserCustomStrategy) error {
|
||||
return r.db.Create(strategy).Error
|
||||
}
|
||||
|
||||
func (r *CustomStrategyRepository) Update(strategy *model.UserCustomStrategy) error {
|
||||
return r.db.Save(strategy).Error
|
||||
}
|
||||
|
||||
func (r *CustomStrategyRepository) Delete(id string, userID string) error {
|
||||
return r.db.Where("id = ? AND user_id = ?", id, userID).Delete(&model.UserCustomStrategy{}).Error
|
||||
}
|
||||
|
||||
func (r *CustomStrategyRepository) GetByIDAndUser(id string, userID string) (*model.UserCustomStrategy, error) {
|
||||
var strategy model.UserCustomStrategy
|
||||
err := r.db.Where("id = ? AND user_id = ?", id, userID).First(&strategy).Error
|
||||
return &strategy, err
|
||||
}
|
||||
25
server/internal/repository/product_profile_repository.go
Normal file
25
server/internal/repository/product_profile_repository.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"github.com/zs/InsightReply/internal/model"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type ProductProfileRepository struct {
|
||||
db *gorm.DB
|
||||
}
|
||||
|
||||
func NewProductProfileRepository(db *gorm.DB) *ProductProfileRepository {
|
||||
return &ProductProfileRepository{db: db}
|
||||
}
|
||||
|
||||
func (r *ProductProfileRepository) GetByUserID(userID string) (*model.UserProductProfile, error) {
|
||||
var profile model.UserProductProfile
|
||||
err := r.db.Where("user_id = ?", userID).First(&profile).Error
|
||||
return &profile, err
|
||||
}
|
||||
|
||||
func (r *ProductProfileRepository) Save(profile *model.UserProductProfile) error {
|
||||
// Use Save to either create or update based on primary key
|
||||
return r.db.Save(profile).Error
|
||||
}
|
||||
74
server/internal/repository/reply_repository.go
Normal file
74
server/internal/repository/reply_repository.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"github.com/google/uuid"
|
||||
"github.com/zs/InsightReply/internal/model"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type ReplyRepository struct {
|
||||
db *gorm.DB
|
||||
}
|
||||
|
||||
func NewReplyRepository(db *gorm.DB) *ReplyRepository {
|
||||
return &ReplyRepository{db: db}
|
||||
}
|
||||
|
||||
// CreateGeneratedReply logs an AI generated response when it is copied/used by the user
|
||||
func (r *ReplyRepository) CreateGeneratedReply(reply *model.GeneratedReply) error {
|
||||
return r.db.Create(reply).Error
|
||||
}
|
||||
|
||||
// GetPendingPerformanceChecks returns copied replies that need their performance checked (e.g. older than 24h)
|
||||
func (r *ReplyRepository) GetPendingPerformanceChecks() ([]model.GeneratedReply, error) {
|
||||
var replies []model.GeneratedReply
|
||||
|
||||
// Complex: Fetch replies that are "copied", created more than 24 hours ago,
|
||||
// and DO NOT already have a corresponding entry in reply_performance.
|
||||
err := r.db.Table("generated_replies").
|
||||
Select("generated_replies.*").
|
||||
Joins("LEFT JOIN reply_performance rp ON rp.reply_id = generated_replies.id").
|
||||
Where("generated_replies.status = ?", "copied").
|
||||
Where("generated_replies.created_at < NOW() - INTERVAL '1 day'").
|
||||
Where("rp.id IS NULL").
|
||||
Find(&replies).Error
|
||||
|
||||
return replies, err
|
||||
}
|
||||
|
||||
// SaveReplyPerformance persists the checked engagement scores of a generated reply
|
||||
func (r *ReplyRepository) SaveReplyPerformance(perf *model.ReplyPerformance) error {
|
||||
return r.db.Create(perf).Error
|
||||
}
|
||||
|
||||
// UpsertDummyTweet acts as a safety hook to guarantee foreign key integrity exists before recording a reply onto an un-crawled Tweet.
|
||||
func (r *ReplyRepository) UpsertDummyTweet(tweet *model.Tweet) error {
|
||||
return r.db.Where("x_tweet_id = ?", tweet.XTweetID).FirstOrCreate(tweet).Error
|
||||
}
|
||||
|
||||
// GetTweetXTweetID returns the string identifier string X uses, converting backwards from the postgres UUID
|
||||
func (r *ReplyRepository) GetTweetXTweetID(tweetID uuid.UUID) (string, error) {
|
||||
var tweet model.Tweet
|
||||
err := r.db.Where("id = ?", tweetID).First(&tweet).Error
|
||||
return tweet.XTweetID, err
|
||||
}
|
||||
|
||||
// SaveStyleExtraction commits an AI-learned writing style profile against the user for future inference injection
|
||||
func (r *ReplyRepository) SaveStyleExtraction(userID uuid.UUID, styleDesc string) error {
|
||||
// user_style_profiles might not exist yet; use raw SQL or Gorm Upsert
|
||||
return r.db.Exec(`
|
||||
INSERT INTO user_style_profiles (user_id, tone_preference)
|
||||
VALUES (?, ?)
|
||||
ON CONFLICT (user_id)
|
||||
DO UPDATE SET tone_preference = EXCLUDED.tone_preference, updated_at = NOW()
|
||||
`, userID, styleDesc).Error
|
||||
}
|
||||
|
||||
// GetGeneratedRepliesByUser retrieves all AI replies for a user to display in the History dashboard
|
||||
func (r *ReplyRepository) GetGeneratedRepliesByUser(userID uuid.UUID) ([]model.GeneratedReply, error) {
|
||||
var replies []model.GeneratedReply
|
||||
// Preload the performance data if it exists. Preloading "Performance" requires GORM association.
|
||||
// We'll just fetch replies and order by newest first.
|
||||
err := r.db.Where("user_id = ?", userID).Order("created_at desc").Limit(100).Find(&replies).Error
|
||||
return replies, err
|
||||
}
|
||||
82
server/internal/repository/tweet_repository.go
Normal file
82
server/internal/repository/tweet_repository.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"github.com/zs/InsightReply/internal/model"
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
)
|
||||
|
||||
type TweetRepository struct {
|
||||
db *gorm.DB
|
||||
}
|
||||
|
||||
func NewTweetRepository(db *gorm.DB) *TweetRepository {
|
||||
return &TweetRepository{db: db}
|
||||
}
|
||||
|
||||
// Upsert intelligently inserts a new tweet or updates an existing one.
|
||||
// Crucially, on conflict, it dynamically calculates the 'heat_score' by
|
||||
// comparing the new metrics against the old metrics currently in the database.
|
||||
func (r *TweetRepository) Upsert(tweet *model.Tweet) error {
|
||||
// For new tweets being inserted, their base heat score evaluates to their current absolute metrics.
|
||||
// For existing tweets, we calculate the delta and add it to their existing heat score.
|
||||
tweet.HeatScore = float64(tweet.LikeCount*1 + tweet.RetweetCount*2 + tweet.ReplyCount*3)
|
||||
|
||||
err := r.db.Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "x_tweet_id"}},
|
||||
DoUpdates: clause.Assignments(map[string]interface{}{
|
||||
"author_id": clause.Column{Table: "excluded", Name: "author_id"},
|
||||
"author_handle": clause.Column{Table: "excluded", Name: "author_handle"},
|
||||
"content": clause.Column{Table: "excluded", Name: "content"},
|
||||
"posted_at": clause.Column{Table: "excluded", Name: "posted_at"},
|
||||
"last_crawled_at": clause.Column{Table: "excluded", Name: "last_crawled_at"},
|
||||
"like_count": clause.Column{Table: "excluded", Name: "like_count"},
|
||||
"retweet_count": clause.Column{Table: "excluded", Name: "retweet_count"},
|
||||
"reply_count": clause.Column{Table: "excluded", Name: "reply_count"},
|
||||
// Calculate delta only if the old values exist and are lower than the new values (to prevent negative spikes from X UI glitches).
|
||||
// heatTracker = old.heat_score + MAX(0, new.like - old.like)*1 + MAX(0, new.rt - old.rt)*2 + MAX(0, new.reply - old.reply)*3
|
||||
"heat_score": gorm.Expr("tweets.heat_score + GREATEST(0, EXCLUDED.like_count - tweets.like_count) * 1.0 + GREATEST(0, EXCLUDED.retweet_count - tweets.retweet_count) * 2.0 + GREATEST(0, EXCLUDED.reply_count - tweets.reply_count) * 3.0"),
|
||||
|
||||
// Smart Crawling logic: If heat score breaches threshold (e.g. 50), promote to high. If old & cold, demote.
|
||||
"crawl_queue": gorm.Expr(`
|
||||
CASE
|
||||
WHEN tweets.heat_score + GREATEST(0, EXCLUDED.like_count - tweets.like_count) * 1.0 + GREATEST(0, EXCLUDED.retweet_count - tweets.retweet_count) * 2.0 + GREATEST(0, EXCLUDED.reply_count - tweets.reply_count) * 3.0 > 50 THEN 'high'
|
||||
WHEN EXCLUDED.last_crawled_at - tweets.posted_at > INTERVAL '7 days' THEN 'low'
|
||||
ELSE 'normal'
|
||||
END
|
||||
`),
|
||||
}),
|
||||
}).Create(tweet).Error
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// GetTopHeatingTweets returns unprocessed tweets ordered by their generated heat score
|
||||
func (r *TweetRepository) GetTopHeatingTweets(limit int) ([]model.Tweet, error) {
|
||||
var tweets []model.Tweet
|
||||
err := r.db.Where("is_processed = ?", false).Order("heat_score desc").Limit(limit).Find(&tweets).Error
|
||||
return tweets, err
|
||||
}
|
||||
|
||||
// MarkAsProcessed tags a tweet so we don't present it to the user repeatedly
|
||||
func (r *TweetRepository) MarkAsProcessed(id string) error {
|
||||
return r.db.Model(&model.Tweet{}).Where("id = ?", id).Update("is_processed", true).Error
|
||||
}
|
||||
|
||||
// SearchTweets allows dynamic multi-rule filtering
|
||||
func (r *TweetRepository) SearchTweets(keyword, handle string, limit int) ([]model.Tweet, error) {
|
||||
var tweets []model.Tweet
|
||||
query := r.db.Model(&model.Tweet{})
|
||||
|
||||
if keyword != "" {
|
||||
// PostgreSQL ILIKE for case-insensitive keyword searching
|
||||
query = query.Where("content ILIKE ?", "%"+keyword+"%")
|
||||
}
|
||||
|
||||
if handle != "" {
|
||||
query = query.Where("author_handle = ?", handle)
|
||||
}
|
||||
|
||||
err := query.Order("heat_score desc, posted_at desc").Limit(limit).Find(&tweets).Error
|
||||
return tweets, err
|
||||
}
|
||||
@@ -22,3 +22,13 @@ func (r *UserRepository) GetByEmail(email string) (*model.User, error) {
|
||||
err := r.db.Where("email = ?", email).First(&user).Error
|
||||
return &user, err
|
||||
}
|
||||
|
||||
func (r *UserRepository) GetByID(id string) (*model.User, error) {
|
||||
var user model.User
|
||||
err := r.db.Where("id = ?", id).First(&user).Error
|
||||
return &user, err
|
||||
}
|
||||
|
||||
func (r *UserRepository) Update(user *model.User) error {
|
||||
return r.db.Save(user).Error
|
||||
}
|
||||
|
||||
133
server/internal/scraper/client.go
Normal file
133
server/internal/scraper/client.go
Normal file
@@ -0,0 +1,133 @@
|
||||
package scraper
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/sony/gobreaker/v2"
|
||||
"golang.org/x/exp/rand"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrCircuitOpen = errors.New("scraper circuit breaker is open")
|
||||
ErrRateLimited = errors.New("scraper hit rate limit (429)")
|
||||
ErrUnavailable = errors.New("scraper target unavailable (503)")
|
||||
)
|
||||
|
||||
type ScraperClient struct {
|
||||
http *http.Client
|
||||
breaker *gobreaker.CircuitBreaker[[]byte]
|
||||
mu sync.Mutex
|
||||
rng *rand.Rand
|
||||
}
|
||||
|
||||
var userAgents = []string{
|
||||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
|
||||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
|
||||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0",
|
||||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/112.0",
|
||||
}
|
||||
|
||||
func NewScraperClient() *ScraperClient {
|
||||
// Custom transport to mask TLS fingerprints somewhat and set timeouts
|
||||
tr := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{MinVersion: tls.VersionTLS12},
|
||||
ForceAttemptHTTP2: true,
|
||||
MaxIdleConns: 100,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: tr,
|
||||
Timeout: 15 * time.Second,
|
||||
}
|
||||
|
||||
// Circuit Breaker: Trip on 5 consecutive failures, wait 60 seconds (Exponential behavior is often custom, but standard half-open helps)
|
||||
st := gobreaker.Settings{
|
||||
Name: "NitterScraperCB",
|
||||
MaxRequests: 1,
|
||||
Interval: 0,
|
||||
Timeout: 60 * time.Second, // Wait 60s before allowing retry if Open
|
||||
ReadyToTrip: func(counts gobreaker.Counts) bool {
|
||||
return counts.ConsecutiveFailures >= 3
|
||||
},
|
||||
}
|
||||
|
||||
return &ScraperClient{
|
||||
http: client,
|
||||
breaker: gobreaker.NewCircuitBreaker[[]byte](st),
|
||||
rng: rand.New(rand.NewSource(uint64(time.Now().UnixNano()))),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ScraperClient) getRandomUserAgent() string {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return userAgents[c.rng.Intn(len(userAgents))]
|
||||
}
|
||||
|
||||
func (c *ScraperClient) JitterDelay(minMs, maxMs int) {
|
||||
c.mu.Lock()
|
||||
delay := minMs + c.rng.Intn(maxMs-minMs)
|
||||
c.mu.Unlock()
|
||||
time.Sleep(time.Duration(delay) * time.Millisecond)
|
||||
}
|
||||
|
||||
// Fetch returns the raw body byte stream while handling Circuit Breaking and Status checking.
|
||||
func (c *ScraperClient) Fetch(url string) ([]byte, error) {
|
||||
respBody, err := c.breaker.Execute(func() ([]byte, error) {
|
||||
req, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req.Header.Set("User-Agent", c.getRandomUserAgent())
|
||||
req.Header.Set("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8")
|
||||
req.Header.Set("Accept-Language", "en-US,en;q=0.5")
|
||||
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == http.StatusTooManyRequests {
|
||||
return nil, ErrRateLimited
|
||||
}
|
||||
if resp.StatusCode == http.StatusServiceUnavailable {
|
||||
return nil, ErrUnavailable
|
||||
}
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
// Read to memory in Execute block so if it fails, circuit tracks it. ReadAll is fine for HTML scrapes.
|
||||
var data []byte
|
||||
buf := make([]byte, 1024)
|
||||
for {
|
||||
n, err := resp.Body.Read(buf)
|
||||
if n > 0 {
|
||||
data = append(data, buf[:n]...)
|
||||
}
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return data, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
if err == gobreaker.ErrOpenState {
|
||||
return nil, ErrCircuitOpen
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return respBody, nil
|
||||
}
|
||||
146
server/internal/scraper/parser.go
Normal file
146
server/internal/scraper/parser.go
Normal file
@@ -0,0 +1,146 @@
|
||||
package scraper
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/PuerkitoBio/goquery"
|
||||
)
|
||||
|
||||
type ParsedTweet struct {
|
||||
ID string
|
||||
Author string
|
||||
Handle string
|
||||
Content string
|
||||
Likes int
|
||||
Retweets int
|
||||
Replies int
|
||||
CreatedAt time.Time
|
||||
}
|
||||
|
||||
// ParseTimeline extracts all tweets from a Nitter timeline HTML page.
|
||||
func ParseTimeline(htmlData []byte) ([]ParsedTweet, error) {
|
||||
doc, err := goquery.NewDocumentFromReader(bytes.NewReader(htmlData))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load HTML document: %w", err)
|
||||
}
|
||||
|
||||
var tweets []ParsedTweet
|
||||
|
||||
doc.Find(".timeline-item").Each(func(i int, s *goquery.Selection) {
|
||||
// Only parse actual tweets (not "Show thread" links or "Load more")
|
||||
if s.HasClass("show-more") || s.HasClass("more-replies") {
|
||||
return
|
||||
}
|
||||
|
||||
tweet := ParsedTweet{}
|
||||
|
||||
// Author and Handle
|
||||
authorBlock := s.Find(".fullname")
|
||||
if authorBlock.Length() > 0 {
|
||||
tweet.Author = strings.TrimSpace(authorBlock.Text())
|
||||
}
|
||||
|
||||
handleBlock := s.Find(".username")
|
||||
if handleBlock.Length() > 0 {
|
||||
tweet.Handle = strings.TrimSpace(handleBlock.Text())
|
||||
}
|
||||
|
||||
// Content
|
||||
contentBlock := s.Find(".tweet-content")
|
||||
if contentBlock.Length() > 0 {
|
||||
tweet.Content = strings.TrimSpace(contentBlock.Text())
|
||||
}
|
||||
|
||||
// Link (to get ID)
|
||||
linkBlock := s.Find("a.tweet-link")
|
||||
if linkBlock.Length() > 0 {
|
||||
href, _ := linkBlock.Attr("href")
|
||||
parts := strings.Split(href, "/")
|
||||
if len(parts) > 0 {
|
||||
tweet.ID = parts[len(parts)-1]
|
||||
// Nitter sometimes adds #m at the end of links
|
||||
tweet.ID = strings.TrimSuffix(tweet.ID, "#m")
|
||||
}
|
||||
}
|
||||
|
||||
// Date
|
||||
dateBlock := s.Find(".tweet-date a[title]")
|
||||
if dateBlock.Length() > 0 {
|
||||
titleAttr, _ := dateBlock.Attr("title")
|
||||
// Nitter format: "Feb 28, 2026 · 1:23 PM UTC"
|
||||
// A rough parsing could be done here, or we just rely on standard formats.
|
||||
// For simplicity, we just leave it default Time if we can't parse it quickly.
|
||||
if titleAttr != "" {
|
||||
parsedTime, err := time.Parse("Jan 2, 2006 · 3:04 PM MST", titleAttr)
|
||||
if err == nil {
|
||||
tweet.CreatedAt = parsedTime
|
||||
} else {
|
||||
tweet.CreatedAt = time.Now() // Fallback
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stats
|
||||
statBlock := s.Find(".tweet-stat")
|
||||
statBlock.Each(func(j int, statSel *goquery.Selection) {
|
||||
iconContainer := statSel.Find("span.icon-container > span")
|
||||
class, exists := iconContainer.Attr("class")
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
|
||||
// Find the text value beside the icon
|
||||
valStr := strings.TrimSpace(statSel.Text())
|
||||
val := parseStatString(valStr)
|
||||
|
||||
if strings.Contains(class, "icon-comment") {
|
||||
tweet.Replies = val
|
||||
} else if strings.Contains(class, "icon-retweet") {
|
||||
tweet.Retweets = val
|
||||
} else if strings.Contains(class, "icon-heart") {
|
||||
tweet.Likes = val
|
||||
}
|
||||
})
|
||||
|
||||
// Only append if it's a valid parsed tweet
|
||||
if tweet.ID != "" && tweet.Content != "" {
|
||||
tweets = append(tweets, tweet)
|
||||
}
|
||||
})
|
||||
|
||||
return tweets, nil
|
||||
}
|
||||
|
||||
// parseStatString converts string representations like "15.4K" to integer 15400
|
||||
func parseStatString(s string) int {
|
||||
if s == "" {
|
||||
return 0
|
||||
}
|
||||
s = strings.ReplaceAll(s, ",", "")
|
||||
s = strings.ReplaceAll(s, " ", "")
|
||||
|
||||
multiplier := 1.0
|
||||
lower := strings.ToLower(s)
|
||||
|
||||
if strings.HasSuffix(lower, "k") {
|
||||
multiplier = 1000.0
|
||||
s = s[:len(s)-1]
|
||||
} else if strings.HasSuffix(lower, "m") {
|
||||
multiplier = 1000000.0
|
||||
s = s[:len(s)-1]
|
||||
} else if strings.HasSuffix(lower, "b") {
|
||||
multiplier = 1000000000.0
|
||||
s = s[:len(s)-1]
|
||||
}
|
||||
|
||||
val, err := strconv.ParseFloat(s, 64)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
return int(val * multiplier)
|
||||
}
|
||||
@@ -3,57 +3,210 @@ package service
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/sashabaranov/go-openai"
|
||||
"github.com/sony/gobreaker/v2"
|
||||
"github.com/zs/InsightReply/internal/service/llm"
|
||||
)
|
||||
|
||||
type AIService struct {
|
||||
client *openai.Client
|
||||
providers map[string]llm.Provider
|
||||
breakers map[string]*gobreaker.CircuitBreaker[string]
|
||||
|
||||
defaultProvider string
|
||||
defaultModel string
|
||||
}
|
||||
|
||||
func NewAIService(apiKey string) *AIService {
|
||||
return &AIService{
|
||||
client: openai.NewClient(apiKey),
|
||||
func NewAIService() *AIService {
|
||||
s := &AIService{
|
||||
providers: make(map[string]llm.Provider),
|
||||
breakers: make(map[string]*gobreaker.CircuitBreaker[string]),
|
||||
}
|
||||
|
||||
// 1. Initialize Providers based on ENV
|
||||
if key := os.Getenv("OPENAI_API_KEY"); key != "" {
|
||||
s.providers["openai"] = llm.NewOpenAIProvider(key, os.Getenv("OPENAI_BASE_URL"), "openai")
|
||||
}
|
||||
if key := os.Getenv("ANTHROPIC_API_KEY"); key != "" {
|
||||
s.providers["anthropic"] = llm.NewAnthropicProvider(key, os.Getenv("ANTHROPIC_BASE_URL"))
|
||||
}
|
||||
if key := os.Getenv("DEEPSEEK_API_KEY"); key != "" {
|
||||
baseURL := os.Getenv("DEEPSEEK_BASE_URL")
|
||||
if baseURL == "" {
|
||||
baseURL = "https://api.deepseek.com/v1" // Add v1 as expected by OpenAI SDK compatibility
|
||||
}
|
||||
s.providers["deepseek"] = llm.NewOpenAIProvider(key, baseURL, "deepseek")
|
||||
}
|
||||
if key := os.Getenv("GEMINI_API_KEY"); key != "" {
|
||||
s.providers["gemini"] = llm.NewGeminiProvider(key, os.Getenv("GEMINI_BASE_URL"))
|
||||
}
|
||||
|
||||
// 2. Initialize Circuit Breakers for each provider
|
||||
for name := range s.providers {
|
||||
st := gobreaker.Settings{
|
||||
Name: name + "_cb",
|
||||
MaxRequests: 3, // Requests allowed in half-open state
|
||||
Interval: 10 * time.Minute, // Cyclic period for closed state counters
|
||||
Timeout: 60 * time.Second, // Open state duration
|
||||
ReadyToTrip: func(counts gobreaker.Counts) bool {
|
||||
failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
|
||||
return counts.Requests >= 5 && failureRatio >= 0.6 // Trip if 60% fail after 5 reqs
|
||||
},
|
||||
}
|
||||
s.breakers[name] = gobreaker.NewCircuitBreaker[string](st)
|
||||
}
|
||||
|
||||
s.defaultProvider = os.Getenv("LLM_PROVIDER")
|
||||
if s.defaultProvider == "" {
|
||||
s.defaultProvider = "openai"
|
||||
}
|
||||
s.defaultModel = os.Getenv("LLM_MODEL")
|
||||
if s.defaultModel == "" {
|
||||
s.defaultModel = "gpt-4o-mini"
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *AIService) TestConnection(ctx context.Context) (string, error) {
|
||||
// ... (same as before)
|
||||
return "Ready", nil // Simplified for brevity in this edit, but I'll keep the logic if needed
|
||||
if len(s.providers) == 0 {
|
||||
return "", fmt.Errorf("no LLM providers configured")
|
||||
}
|
||||
return "Ready (Multi-LLM configured)", nil
|
||||
}
|
||||
|
||||
func (s *AIService) GenerateReply(ctx context.Context, tweetContent string, strategy string, userIdentity string) (string, error) {
|
||||
prompt := fmt.Sprintf(`
|
||||
// GenerateReply dynamically routes to the preferred LLM and uses a fallback chain if it fails.
|
||||
func (s *AIService) GenerateReply(ctx context.Context, tweetContent, productContext, userIdentity string, preferredProvider, preferredModel string) (string, error) {
|
||||
systemPrompt := "You are a professional X (Twitter) ghostwriter. You MUST respond with valid JSON."
|
||||
userPrompt := fmt.Sprintf(`
|
||||
You are a social media expert.
|
||||
User Identity: %s
|
||||
%s
|
||||
Target Tweet: "%s"
|
||||
Strategy: %s
|
||||
|
||||
Generate a high-quality reply for X (Twitter).
|
||||
Keep it natural, engaging, and under 280 characters.
|
||||
Do not use quotes around the reply.
|
||||
`, userIdentity, tweetContent, strategy)
|
||||
Generate 3 high-quality, distinct replies for X (Twitter) using different strategic angles.
|
||||
Suggested angles depending on context: Contrarian, Analytical, Supportive, Data-driven, Founder's Experience, Quote Tweet.
|
||||
IMPORTANT: If "Available User Custom Strategies" are provided above, you MUST prioritize using those custom strategy angles for your replies.
|
||||
IMPORTANT: If a specific "IMMITATE STYLE" instruction is provided in the Identity or Context, you MUST perfectly clone that linguistic tone.
|
||||
|
||||
Keep each reply natural, engaging, and under 280 characters. No hashtags unless highly relevant.
|
||||
|
||||
Respond ONLY with a JSON array in the exact following format, without any markdown formatting wrappers (like markdown code blocks):
|
||||
[
|
||||
{"strategy": "Name of Strategy 1", "content": "Reply content 1"},
|
||||
{"strategy": "Name of Strategy 2", "content": "Reply content 2"},
|
||||
{"strategy": "Name of Strategy 3", "content": "Reply content 3"}
|
||||
]
|
||||
`, userIdentity, productContext, tweetContent)
|
||||
|
||||
resp, err := s.client.CreateChatCompletion(
|
||||
ctx,
|
||||
openai.ChatCompletionRequest{
|
||||
Model: openai.GPT4oMini,
|
||||
Messages: []openai.ChatCompletionMessage{
|
||||
{
|
||||
Role: openai.ChatMessageRoleSystem,
|
||||
Content: "You are a professional X (Twitter) ghostwriter.",
|
||||
},
|
||||
{
|
||||
Role: openai.ChatMessageRoleUser,
|
||||
Content: prompt,
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to generate reply: %w", err)
|
||||
targetProvider := preferredProvider
|
||||
if targetProvider == "" {
|
||||
targetProvider = s.defaultProvider
|
||||
}
|
||||
targetModel := preferredModel
|
||||
if targetModel == "" {
|
||||
targetModel = s.defaultModel
|
||||
}
|
||||
|
||||
return resp.Choices[0].Message.Content, nil
|
||||
// Fallback chain (as designed in IMPLEMENTATION_PLAN: current -> Anthropic -> OpenAI -> Gemini -> DeepSeek)
|
||||
fallbackChain := []string{targetProvider, "anthropic", "openai", "gemini", "deepseek"}
|
||||
|
||||
for _, pName := range fallbackChain {
|
||||
provider, ok := s.providers[pName]
|
||||
if !ok {
|
||||
log.Printf("Provider %s bypassed (not configured)", pName)
|
||||
continue
|
||||
}
|
||||
breaker, ok := s.breakers[pName]
|
||||
if !ok {
|
||||
continue // Should never happen
|
||||
}
|
||||
|
||||
// Use the target model only on the initially requested provider. For fallbacks, use a safe default model.
|
||||
modelToUse := targetModel
|
||||
if pName != targetProvider {
|
||||
modelToUse = getDefaultModelFor(pName)
|
||||
}
|
||||
|
||||
log.Printf("Routing request to LLM Provider: %s (Model: %s)", pName, modelToUse)
|
||||
|
||||
// Execute through circuit breaker
|
||||
reply, err := breaker.Execute(func() (string, error) {
|
||||
// Add a simple 30s timeout per call
|
||||
callCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
return provider.GenerateReply(callCtx, modelToUse, systemPrompt, userPrompt)
|
||||
})
|
||||
|
||||
if err == nil {
|
||||
return reply, nil // Success
|
||||
}
|
||||
|
||||
log.Printf("Provider %s failed: %v. Attempting next in fallback chain...", pName, err)
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("all providers failed to generate reply")
|
||||
}
|
||||
|
||||
// ExtractStyle consumes a viral AI reply and uses the LLM to reverse-engineer its linguistic fingerprint
|
||||
func (s *AIService) ExtractStyle(ctx context.Context, viralReplyContent string) (string, error) {
|
||||
systemPrompt := "You are a master linguistic analyst and copywriter."
|
||||
userPrompt := fmt.Sprintf(`
|
||||
Analyze the following highly successful social media reply:
|
||||
"%s"
|
||||
|
||||
Extract the core stylistic elements that made it successful. Focus on:
|
||||
1. Tone (e.g., witty, provocative, deadpan, empathetic)
|
||||
2. Sentence structure (e.g., short punchy sentences, questions, bullet points)
|
||||
3. Key jargon or vocabulary patterns
|
||||
|
||||
Provide ONLY a concise, 2-3 sentence description of the style profile that another AI should imitate in the future.
|
||||
No conversational filler, just the exact instruction string to append to future system prompts.
|
||||
`, viralReplyContent)
|
||||
|
||||
// Route through our Multi-LLM fallback logic
|
||||
// Try OpenAI first, fallback to Anthropic
|
||||
providers := []string{"openai", "anthropic", "gemini", "deepseek"}
|
||||
|
||||
for _, pName := range providers {
|
||||
pConf, exists := s.providers[pName]
|
||||
cb, cbExists := s.breakers[pName]
|
||||
|
||||
if !exists || !cbExists {
|
||||
continue
|
||||
}
|
||||
|
||||
styleDesc, err := cb.Execute(func() (string, error) {
|
||||
// Use a default model for style extraction, as it's not user-facing and can be optimized for cost/speed
|
||||
modelToUse := getDefaultModelFor(pName)
|
||||
if modelToUse == "" { // Fallback if getDefaultModelFor doesn't have an entry
|
||||
modelToUse = "gpt-4o-mini" // A safe default
|
||||
}
|
||||
return pConf.GenerateReply(ctx, modelToUse, systemPrompt, userPrompt)
|
||||
})
|
||||
|
||||
if err == nil && styleDesc != "" {
|
||||
return styleDesc, nil
|
||||
}
|
||||
log.Printf("Provider %s failed to extract style: %v. Attempting next...", pName, err)
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("failed to extract style from any provider")
|
||||
}
|
||||
|
||||
func getDefaultModelFor(provider string) string {
|
||||
switch provider {
|
||||
case "openai":
|
||||
return "gpt-4o-mini"
|
||||
case "anthropic":
|
||||
return "claude-3-5-haiku-latest"
|
||||
case "deepseek":
|
||||
return "deepseek-chat"
|
||||
case "gemini":
|
||||
return "gemini-2.5-flash"
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
41
server/internal/service/ai_service_test.go
Normal file
41
server/internal/service/ai_service_test.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestAIService_Initialization verifies that the AIService parses environment variables
|
||||
// correctly and initializes the required fallback strategies and default settings.
|
||||
func TestAIService_Initialization(t *testing.T) {
|
||||
// Temporarily set testing ENVs to avoid depending on local .env
|
||||
os.Setenv("LLM_PROVIDER", "anthropic")
|
||||
os.Setenv("LLM_MODEL", "claude-3-5-haiku-latest")
|
||||
os.Setenv("OPENAI_API_KEY", "test-key-openai")
|
||||
defer os.Clearenv() // Clean up after test
|
||||
|
||||
svc := NewAIService()
|
||||
if svc == nil {
|
||||
t.Fatal("Expected AIService to be initialized, got nil")
|
||||
}
|
||||
|
||||
if svc.defaultProvider != "anthropic" {
|
||||
t.Errorf("Expected default provider 'anthropic', got '%s'", svc.defaultProvider)
|
||||
}
|
||||
|
||||
if svc.defaultModel != "claude-3-5-haiku-latest" {
|
||||
t.Errorf("Expected default model 'claude-3-5-haiku-latest', got '%s'", svc.defaultModel)
|
||||
}
|
||||
|
||||
// Verify that OpenAI provider was initialized because OPENAI_API_KEY was present
|
||||
_, hasOpenAI := svc.providers["openai"]
|
||||
if !hasOpenAI {
|
||||
t.Error("Expected OpenAI provider to be initialized, but it was not found")
|
||||
}
|
||||
|
||||
// Verify that circuit breakers were initialized
|
||||
_, hasBreaker := svc.breakers["openai"]
|
||||
if !hasBreaker {
|
||||
t.Error("Expected circuit breaker for setup provider, but it was not found")
|
||||
}
|
||||
}
|
||||
26
server/internal/service/competitor_monitor_service.go
Normal file
26
server/internal/service/competitor_monitor_service.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"github.com/zs/InsightReply/internal/model"
|
||||
"github.com/zs/InsightReply/internal/repository"
|
||||
)
|
||||
|
||||
type CompetitorMonitorService struct {
|
||||
repo *repository.CompetitorMonitorRepository
|
||||
}
|
||||
|
||||
func NewCompetitorMonitorService(repo *repository.CompetitorMonitorRepository) *CompetitorMonitorService {
|
||||
return &CompetitorMonitorService{repo: repo}
|
||||
}
|
||||
|
||||
func (s *CompetitorMonitorService) ListMonitors(userID string) ([]model.CompetitorMonitor, error) {
|
||||
return s.repo.ListByUserID(userID)
|
||||
}
|
||||
|
||||
func (s *CompetitorMonitorService) CreateMonitor(monitor *model.CompetitorMonitor) error {
|
||||
return s.repo.Create(monitor)
|
||||
}
|
||||
|
||||
func (s *CompetitorMonitorService) DeleteMonitor(id string, userID string) error {
|
||||
return s.repo.Delete(id, userID)
|
||||
}
|
||||
30
server/internal/service/custom_strategy_service.go
Normal file
30
server/internal/service/custom_strategy_service.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"github.com/zs/InsightReply/internal/model"
|
||||
"github.com/zs/InsightReply/internal/repository"
|
||||
)
|
||||
|
||||
type CustomStrategyService struct {
|
||||
repo *repository.CustomStrategyRepository
|
||||
}
|
||||
|
||||
func NewCustomStrategyService(repo *repository.CustomStrategyRepository) *CustomStrategyService {
|
||||
return &CustomStrategyService{repo: repo}
|
||||
}
|
||||
|
||||
func (s *CustomStrategyService) ListStrategies(userID string) ([]model.UserCustomStrategy, error) {
|
||||
return s.repo.ListByUserID(userID)
|
||||
}
|
||||
|
||||
func (s *CustomStrategyService) CreateStrategy(strategy *model.UserCustomStrategy) error {
|
||||
return s.repo.Create(strategy)
|
||||
}
|
||||
|
||||
func (s *CustomStrategyService) UpdateStrategy(strategy *model.UserCustomStrategy) error {
|
||||
return s.repo.Update(strategy)
|
||||
}
|
||||
|
||||
func (s *CustomStrategyService) DeleteStrategy(id string, userID string) error {
|
||||
return s.repo.Delete(id, userID)
|
||||
}
|
||||
75
server/internal/service/llm/anthropic.go
Normal file
75
server/internal/service/llm/anthropic.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package llm
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type AnthropicProvider struct {
|
||||
apiKey string
|
||||
baseURL string
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
func NewAnthropicProvider(apiKey, baseURL string) *AnthropicProvider {
|
||||
if baseURL == "" {
|
||||
baseURL = "https://api.anthropic.com/v1"
|
||||
}
|
||||
return &AnthropicProvider{
|
||||
apiKey: apiKey,
|
||||
baseURL: baseURL,
|
||||
client: &http.Client{},
|
||||
}
|
||||
}
|
||||
|
||||
func (p *AnthropicProvider) Name() string {
|
||||
return "anthropic"
|
||||
}
|
||||
|
||||
func (p *AnthropicProvider) GenerateReply(ctx context.Context, model string, systemPrompt, userPrompt string) (string, error) {
|
||||
reqBody := map[string]interface{}{
|
||||
"model": model,
|
||||
"max_tokens": 1024,
|
||||
"system": systemPrompt,
|
||||
"messages": []map[string]string{
|
||||
{"role": "user", "content": userPrompt},
|
||||
},
|
||||
}
|
||||
bs, _ := json.Marshal(reqBody)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", p.baseURL+"/messages", bytes.NewReader(bs))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
req.Header.Set("x-api-key", p.apiKey)
|
||||
req.Header.Set("anthropic-version", "2023-06-01")
|
||||
req.Header.Set("content-type", "application/json")
|
||||
|
||||
resp, err := p.client.Do(req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return "", fmt.Errorf("anthropic error %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var result struct {
|
||||
Content []struct {
|
||||
Text string `json:"text"`
|
||||
} `json:"content"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return "", err
|
||||
}
|
||||
if len(result.Content) == 0 {
|
||||
return "", fmt.Errorf("anthropic returned empty content")
|
||||
}
|
||||
return result.Content[0].Text, nil
|
||||
}
|
||||
86
server/internal/service/llm/gemini.go
Normal file
86
server/internal/service/llm/gemini.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package llm
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type GeminiProvider struct {
|
||||
apiKey string
|
||||
baseURL string
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
func NewGeminiProvider(apiKey, baseURL string) *GeminiProvider {
|
||||
if baseURL == "" {
|
||||
baseURL = "https://generativelanguage.googleapis.com/v1beta/models"
|
||||
}
|
||||
return &GeminiProvider{
|
||||
apiKey: apiKey,
|
||||
baseURL: baseURL,
|
||||
client: &http.Client{},
|
||||
}
|
||||
}
|
||||
|
||||
func (p *GeminiProvider) Name() string {
|
||||
return "gemini"
|
||||
}
|
||||
|
||||
func (p *GeminiProvider) GenerateReply(ctx context.Context, model string, systemPrompt, userPrompt string) (string, error) {
|
||||
url := fmt.Sprintf("%s/%s:generateContent?key=%s", p.baseURL, model, p.apiKey)
|
||||
|
||||
reqBody := map[string]interface{}{
|
||||
"systemInstruction": map[string]interface{}{
|
||||
"parts": []map[string]interface{}{
|
||||
{"text": systemPrompt},
|
||||
},
|
||||
},
|
||||
"contents": []map[string]interface{}{
|
||||
{
|
||||
"role": "user",
|
||||
"parts": []map[string]interface{}{
|
||||
{"text": userPrompt},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
bs, _ := json.Marshal(reqBody)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(bs))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := p.client.Do(req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return "", fmt.Errorf("gemini error %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var result struct {
|
||||
Candidates []struct {
|
||||
Content struct {
|
||||
Parts []struct {
|
||||
Text string `json:"text"`
|
||||
} `json:"parts"`
|
||||
} `json:"content"`
|
||||
} `json:"candidates"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return "", err
|
||||
}
|
||||
if len(result.Candidates) == 0 || len(result.Candidates[0].Content.Parts) == 0 {
|
||||
return "", fmt.Errorf("gemini returned empty content")
|
||||
}
|
||||
return result.Candidates[0].Content.Parts[0].Text, nil
|
||||
}
|
||||
50
server/internal/service/llm/openai.go
Normal file
50
server/internal/service/llm/openai.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package llm
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/sashabaranov/go-openai"
|
||||
)
|
||||
|
||||
type OpenAIProvider struct {
|
||||
client *openai.Client
|
||||
name string
|
||||
}
|
||||
|
||||
// NewOpenAIProvider creates a new provider that uses the official or compatible OpenAI API.
|
||||
// It can also handle DeepSeek via a custom BaseURL.
|
||||
func NewOpenAIProvider(apiKey, baseURL, name string) *OpenAIProvider {
|
||||
config := openai.DefaultConfig(apiKey)
|
||||
if baseURL != "" {
|
||||
config.BaseURL = baseURL
|
||||
}
|
||||
return &OpenAIProvider{
|
||||
client: openai.NewClientWithConfig(config),
|
||||
name: name,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *OpenAIProvider) Name() string {
|
||||
return p.name
|
||||
}
|
||||
|
||||
func (p *OpenAIProvider) GenerateReply(ctx context.Context, model string, systemPrompt, userPrompt string) (string, error) {
|
||||
resp, err := p.client.CreateChatCompletion(
|
||||
ctx,
|
||||
openai.ChatCompletionRequest{
|
||||
Model: model,
|
||||
Messages: []openai.ChatCompletionMessage{
|
||||
{Role: openai.ChatMessageRoleSystem, Content: systemPrompt},
|
||||
{Role: openai.ChatMessageRoleUser, Content: userPrompt},
|
||||
},
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("%s api error: %w", p.name, err)
|
||||
}
|
||||
if len(resp.Choices) == 0 {
|
||||
return "", fmt.Errorf("%s returned no choices", p.name)
|
||||
}
|
||||
return resp.Choices[0].Message.Content, nil
|
||||
}
|
||||
8
server/internal/service/llm/provider.go
Normal file
8
server/internal/service/llm/provider.go
Normal file
@@ -0,0 +1,8 @@
|
||||
package llm
|
||||
|
||||
import "context"
|
||||
|
||||
type Provider interface {
|
||||
Name() string
|
||||
GenerateReply(ctx context.Context, model string, systemPrompt, userPrompt string) (string, error)
|
||||
}
|
||||
22
server/internal/service/product_profile_service.go
Normal file
22
server/internal/service/product_profile_service.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"github.com/zs/InsightReply/internal/model"
|
||||
"github.com/zs/InsightReply/internal/repository"
|
||||
)
|
||||
|
||||
type ProductProfileService struct {
|
||||
repo *repository.ProductProfileRepository
|
||||
}
|
||||
|
||||
func NewProductProfileService(repo *repository.ProductProfileRepository) *ProductProfileService {
|
||||
return &ProductProfileService{repo: repo}
|
||||
}
|
||||
|
||||
func (s *ProductProfileService) GetProfile(userID string) (*model.UserProductProfile, error) {
|
||||
return s.repo.GetByUserID(userID)
|
||||
}
|
||||
|
||||
func (s *ProductProfileService) SaveProfile(profile *model.UserProductProfile) error {
|
||||
return s.repo.Save(profile)
|
||||
}
|
||||
@@ -25,3 +25,24 @@ func (s *UserService) Register(email string, identity string) (*model.User, erro
|
||||
func (s *UserService) GetUser(email string) (*model.User, error) {
|
||||
return s.repo.GetByEmail(email)
|
||||
}
|
||||
|
||||
func (s *UserService) GetUserByID(id string) (*model.User, error) {
|
||||
return s.repo.GetByID(id)
|
||||
}
|
||||
|
||||
func (s *UserService) UpdatePreferences(id string, identity string, language string) (*model.User, error) {
|
||||
user, err := s.repo.GetByID(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if identity != "" {
|
||||
user.IdentityLabel = identity
|
||||
}
|
||||
if language != "" {
|
||||
user.LanguagePreference = language
|
||||
}
|
||||
|
||||
err = s.repo.Update(user)
|
||||
return user, err
|
||||
}
|
||||
|
||||
147
server/internal/worker/monitor_worker.go
Normal file
147
server/internal/worker/monitor_worker.go
Normal file
@@ -0,0 +1,147 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/zs/InsightReply/internal/model"
|
||||
"github.com/zs/InsightReply/internal/repository"
|
||||
"github.com/zs/InsightReply/internal/scraper"
|
||||
)
|
||||
|
||||
type MonitorWorker struct {
|
||||
repo *repository.CompetitorMonitorRepository
|
||||
tweetRepo *repository.TweetRepository
|
||||
client *scraper.ScraperClient
|
||||
baseUrl string
|
||||
}
|
||||
|
||||
func NewMonitorWorker(repo *repository.CompetitorMonitorRepository, tweetRepo *repository.TweetRepository) *MonitorWorker {
|
||||
return &MonitorWorker{
|
||||
repo: repo,
|
||||
tweetRepo: tweetRepo,
|
||||
client: scraper.NewScraperClient(),
|
||||
baseUrl: "https://x.beenglish.eu.org", // Self-hosted Nitter instance
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the background job loop. This should be run in a goroutine.
|
||||
func (w *MonitorWorker) Start(ctx context.Context, interval time.Duration) {
|
||||
log.Printf("[MonitorWorker] Starting background scraping loop every %v", interval)
|
||||
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Initial run
|
||||
w.runCycle(ctx)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("[MonitorWorker] Stopping background scraping loop")
|
||||
return
|
||||
case <-ticker.C:
|
||||
w.runCycle(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *MonitorWorker) runCycle(ctx context.Context) {
|
||||
log.Println("[MonitorWorker] Starting scrape cycle...")
|
||||
|
||||
monitors, err := w.repo.ListAllActive()
|
||||
if err != nil {
|
||||
log.Printf("[MonitorWorker] Error fetching active monitors: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(monitors) == 0 {
|
||||
log.Println("[MonitorWorker] No active monitors found. Skipping cycle.")
|
||||
return
|
||||
}
|
||||
|
||||
for _, monitor := range monitors {
|
||||
// Stop processing if context cancelled (e.g., app shutdown)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Determine Scraping Strategy
|
||||
var url string
|
||||
|
||||
// URL encode the brand name which acts as our keyword
|
||||
keyword := monitor.BrandName
|
||||
|
||||
if monitor.XHandle != "" {
|
||||
if monitor.XHandle == keyword || keyword == "" {
|
||||
// Standard profile timeline scraping
|
||||
log.Printf("[MonitorWorker] Scraping timeline for account @%s", monitor.XHandle)
|
||||
url = fmt.Sprintf("%s/%s", w.baseUrl, monitor.XHandle)
|
||||
} else {
|
||||
// Combo scraping: Keyword + Specific Account
|
||||
log.Printf("[MonitorWorker] Scraping combo: '%s' from @%s", keyword, monitor.XHandle)
|
||||
url = fmt.Sprintf("%s/search?f=tweets&q=%s+from%%3A%s", w.baseUrl, keyword, monitor.XHandle)
|
||||
}
|
||||
} else if keyword != "" {
|
||||
// Global search for Keyword across X
|
||||
log.Printf("[MonitorWorker] Scraping global search for keyword: '%s'", keyword)
|
||||
url = fmt.Sprintf("%s/search?f=tweets&q=%s", w.baseUrl, keyword)
|
||||
} else {
|
||||
continue // Invalid monitor config
|
||||
}
|
||||
|
||||
w.scrapeAndLog(url)
|
||||
|
||||
// Anti-Ban: Jitter delay between requests (3s to 8s)
|
||||
w.client.JitterDelay(3000, 8000)
|
||||
}
|
||||
|
||||
log.Println("[MonitorWorker] Scrape cycle completed.")
|
||||
}
|
||||
|
||||
func (w *MonitorWorker) scrapeAndLog(url string) {
|
||||
htmlData, err := w.client.Fetch(url)
|
||||
if err != nil {
|
||||
log.Printf("[MonitorWorker] Error scraping %s: %v", url, err)
|
||||
return
|
||||
}
|
||||
|
||||
tweets, err := scraper.ParseTimeline(htmlData)
|
||||
if err != nil {
|
||||
log.Printf("[MonitorWorker] Error parsing HTML for %s: %v", url, err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("[MonitorWorker] Extracted %d tweets from %s", len(tweets), url)
|
||||
|
||||
// Epic 6: Upsert into tracking database
|
||||
upsertCount := 0
|
||||
for _, rawTweet := range tweets {
|
||||
tweet := &model.Tweet{
|
||||
XTweetID: rawTweet.ID,
|
||||
AuthorHandle: rawTweet.Handle,
|
||||
Content: rawTweet.Content,
|
||||
PostedAt: rawTweet.CreatedAt,
|
||||
LikeCount: rawTweet.Likes,
|
||||
RetweetCount: rawTweet.Retweets,
|
||||
ReplyCount: rawTweet.Replies,
|
||||
CrawlQueue: "normal",
|
||||
IsProcessed: false,
|
||||
LastCrawledAt: time.Now(),
|
||||
}
|
||||
|
||||
// Save/Update in DB
|
||||
err := w.tweetRepo.Upsert(tweet)
|
||||
if err != nil {
|
||||
log.Printf("[MonitorWorker] Error UPSERTing tweet %s: %v", tweet.XTweetID, err)
|
||||
} else {
|
||||
upsertCount++
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("[MonitorWorker] Successfully Upserted %d/%d tweets to the database.", upsertCount, len(tweets))
|
||||
}
|
||||
142
server/internal/worker/performance_worker.go
Normal file
142
server/internal/worker/performance_worker.go
Normal file
@@ -0,0 +1,142 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/zs/InsightReply/internal/model"
|
||||
"github.com/zs/InsightReply/internal/repository"
|
||||
"github.com/zs/InsightReply/internal/scraper"
|
||||
"github.com/zs/InsightReply/internal/service"
|
||||
)
|
||||
|
||||
type PerformanceWorker struct {
|
||||
repo *repository.ReplyRepository
|
||||
client *scraper.ScraperClient
|
||||
aiSvc *service.AIService
|
||||
baseUrl string
|
||||
}
|
||||
|
||||
func NewPerformanceWorker(repo *repository.ReplyRepository, aiSvc *service.AIService) *PerformanceWorker {
|
||||
return &PerformanceWorker{
|
||||
repo: repo,
|
||||
client: scraper.NewScraperClient(),
|
||||
aiSvc: aiSvc,
|
||||
baseUrl: "https://x.beenglish.eu.org",
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the 24h retroactive performance checking loop
|
||||
func (w *PerformanceWorker) Start(ctx context.Context, interval time.Duration) {
|
||||
log.Printf("[PerformanceWorker] Starting retroactive engagement tracking every %v", interval)
|
||||
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("[PerformanceWorker] Stopping background performance loop")
|
||||
return
|
||||
case <-ticker.C:
|
||||
w.runCycle(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *PerformanceWorker) runCycle(ctx context.Context) {
|
||||
pending, err := w.repo.GetPendingPerformanceChecks()
|
||||
if err != nil {
|
||||
log.Printf("[PerformanceWorker] Error fetching pending checks: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, reply := range pending {
|
||||
// Stop processing if context cancelled
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
xTweetID, err := w.repo.GetTweetXTweetID(reply.TweetID)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Scrape the specific thread
|
||||
// Nitter handles /i/status/12345 generic routes
|
||||
url := fmt.Sprintf("%s/i/status/%s", w.baseUrl, xTweetID)
|
||||
log.Printf("[PerformanceWorker] Checking thread %s for user's AI reply performance", url)
|
||||
|
||||
htmlData, err := w.client.Fetch(url)
|
||||
if err != nil {
|
||||
w.client.JitterDelay(2000, 5000)
|
||||
continue
|
||||
}
|
||||
|
||||
threadReplies, err := scraper.ParseTimeline(htmlData)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Search for the user's generated text within the thread replies
|
||||
found := false
|
||||
for _, threadReply := range threadReplies {
|
||||
// Basic similarity check: if 50% of the AI sentence is present
|
||||
// Real implementation might use Levenshtein distance, but strings.Contains on chunks works for MVP
|
||||
snippet := reply.Content
|
||||
if len(snippet) > 20 {
|
||||
snippet = snippet[:20]
|
||||
}
|
||||
|
||||
if strings.Contains(threadReply.Content, snippet) {
|
||||
found = true
|
||||
|
||||
// WE FOUND OUR REPLY! Record its metrics
|
||||
perf := &model.ReplyPerformance{
|
||||
ReplyID: reply.ID,
|
||||
UserID: reply.UserID,
|
||||
LikeCountIncrease: threadReply.Likes,
|
||||
ReplyCountIncrease: threadReply.Replies,
|
||||
CheckTime: time.Now(),
|
||||
}
|
||||
|
||||
w.repo.SaveReplyPerformance(perf)
|
||||
log.Printf("[PerformanceWorker] 🎯 Verified AI reply in wild! Likes: %d, Replies: %d", perf.LikeCountIncrease, perf.ReplyCountIncrease)
|
||||
|
||||
// Epic 13 AI Tone Engine: Autonomous Style Cloning for proven viral comments
|
||||
if perf.LikeCountIncrease >= 10 {
|
||||
log.Printf("[PerformanceWorker] Reply went viral! Asking AI to reverse-engineer linguistic styling.")
|
||||
|
||||
styleProfile, err := w.aiSvc.ExtractStyle(ctx, reply.Content)
|
||||
if err == nil && styleProfile != "" {
|
||||
err = w.repo.SaveStyleExtraction(reply.UserID, styleProfile)
|
||||
if err != nil {
|
||||
log.Printf("[PerformanceWorker] Error saving style database mapping: %v", err)
|
||||
} else {
|
||||
log.Printf("[PerformanceWorker] Successfully built user style clone: %s", styleProfile)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Even if not found (maybe they edited heavily or didn't actually post it), we mark it as checked to prevent infinite re-checking
|
||||
if !found {
|
||||
perf := &model.ReplyPerformance{
|
||||
ReplyID: reply.ID,
|
||||
UserID: reply.UserID,
|
||||
CheckTime: time.Now(),
|
||||
}
|
||||
w.repo.SaveReplyPerformance(perf)
|
||||
}
|
||||
|
||||
w.client.JitterDelay(3000, 8000)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user