main.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package main
  2. import (
  3. "context"
  4. "flag"
  5. "os"
  6. "os/signal"
  7. "sync"
  8. "syscall"
  9. "time"
  10. "github.com/labring/aiproxy/core/common"
  11. "github.com/labring/aiproxy/core/common/config"
  12. "github.com/labring/aiproxy/core/common/consume"
  13. "github.com/labring/aiproxy/core/controller"
  14. "github.com/labring/aiproxy/core/model"
  15. "github.com/labring/aiproxy/core/task"
  16. log "github.com/sirupsen/logrus"
  17. )
  18. var (
  19. listen string
  20. pprofPort int
  21. )
  22. func init() {
  23. flag.StringVar(&listen, "listen", "0.0.0.0:3000", "http server listen")
  24. flag.IntVar(&pprofPort, "pprof-port", 15000, "pport http server port")
  25. }
  26. // Swagger godoc
  27. //
  28. // @title AI Proxy Swagger API
  29. // @version 1.0
  30. // @securityDefinitions.apikey ApiKeyAuth
  31. // @in header
  32. // @name Authorization
  33. func main() {
  34. flag.Parse()
  35. loadEnv()
  36. config.ReloadEnv()
  37. if err := ensureAdminKey(); err != nil {
  38. log.Warn("failed to ensure AdminKey: " + err.Error())
  39. }
  40. common.InitLog(log.StandardLogger(), config.DebugEnabled)
  41. printLoadedEnvFiles()
  42. if err := initializeServices(pprofPort); err != nil {
  43. log.Fatal("failed to initialize services: " + err.Error())
  44. }
  45. defer func() {
  46. if err := model.CloseDB(); err != nil {
  47. log.Fatal("failed to close database: " + err.Error())
  48. }
  49. }()
  50. ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
  51. defer stop()
  52. var wg sync.WaitGroup
  53. startSyncServices(ctx, &wg)
  54. srv, _ := setupHTTPServer(listen)
  55. log.Info("auto test banned models task started")
  56. go task.AutoTestBannedModelsTask(ctx)
  57. log.Info("clean log task started")
  58. go task.CleanLogTask(ctx)
  59. log.Info("detect ip groups task started")
  60. go task.DetectIPGroupsTask(ctx)
  61. log.Info("usage alert task started")
  62. go task.UsageAlertTask(ctx)
  63. log.Info("update channels balance task started")
  64. go controller.UpdateChannelsBalance(time.Minute * 10)
  65. batchProcessorCtx, batchProcessorCancel := context.WithCancel(context.Background())
  66. wg.Add(1)
  67. go model.StartBatchProcessorSummary(batchProcessorCtx, &wg)
  68. log.Infof("server started on http://%s", srv.Addr)
  69. log.Infof("swagger started on http://%s/swagger/index.html", srv.Addr)
  70. go listenAndServe(srv)
  71. <-ctx.Done()
  72. shutdownSrvCtx, shutdownSrvCancel := context.WithTimeout(context.Background(), 600*time.Second)
  73. defer shutdownSrvCancel()
  74. log.Info("shutting down http server...")
  75. log.Info("max wait time: 600s")
  76. if err := srv.Shutdown(shutdownSrvCtx); err != nil {
  77. log.Error("server forced to shutdown: " + err.Error())
  78. } else {
  79. log.Info("server shutdown successfully")
  80. }
  81. log.Info("shutting down consumer...")
  82. consume.Wait()
  83. batchProcessorCancel()
  84. log.Info("shutting down sync services...")
  85. wg.Wait()
  86. log.Info("shutting down batch summary...")
  87. log.Info("max wait time: 600s")
  88. cleanCtx, cleanCancel := context.WithTimeout(context.Background(), 600*time.Second)
  89. defer cleanCancel()
  90. model.CleanBatchUpdatesSummary(cleanCtx)
  91. log.Info("server exiting")
  92. }