asynq scheduler (#185)
feditools/relay/pipeline/head This commit looks good Details

Reviewed-on: #185
Co-authored-by: Tyr Mactire <tyr@pettingzoo.co>
Co-committed-by: Tyr Mactire <tyr@pettingzoo.co>
pull/186/head
Tyr Mactire 1 week ago committed by PettingZoo Gitea
parent 68627d7848
commit cdaec9b3e0
Signed by: PettingZoo Gitea
GPG Key ID: 39788A4390A1372F

@ -1,8 +1,6 @@
package scheduler
import (
"git.ptzo.gdn/feditools/relay/internal/log"
)
import "git.ptzo.gdn/feditools/relay/internal/log"
type empty struct{}

@ -0,0 +1,210 @@
package scheduler
import (
"context"
"fmt"
"git.ptzo.gdn/feditools/relay/cmd/relay/action"
"git.ptzo.gdn/feditools/relay/internal/clock"
"git.ptzo.gdn/feditools/relay/internal/config"
"git.ptzo.gdn/feditools/relay/internal/db/bun"
"git.ptzo.gdn/feditools/relay/internal/db/cachemem"
"git.ptzo.gdn/feditools/relay/internal/http"
"git.ptzo.gdn/feditools/relay/internal/kv/redis"
"git.ptzo.gdn/feditools/relay/internal/logic/logic1"
"git.ptzo.gdn/feditools/relay/internal/metrics"
"git.ptzo.gdn/feditools/relay/internal/runner/asynq"
"git.ptzo.gdn/feditools/relay/internal/token"
"github.com/spf13/viper"
"github.com/uptrace/uptrace-go/uptrace"
"os"
"os/signal"
"syscall"
)
// Start runs a worker migrations.
var Start action.Action = func(ctx context.Context) error {
l := logger.WithField("func", "Start")
l.Info("starting")
ctx, cancel := context.WithCancel(ctx)
// Configure OpenTelemetry with sensible defaults.
uptrace.ConfigureOpentelemetry(
// copy your project DSN here or use UPTRACE_DSN env var
//uptrace.WithDSN("https://<token>@uptrace.dev/<project_id>"),
uptrace.WithServiceName(viper.GetString(config.Keys.ApplicationName)),
uptrace.WithServiceVersion(viper.GetString(config.Keys.SoftwareVersion)),
)
// Send buffered spans and free resources.
defer func() {
l.Info("closing uptrace")
err := uptrace.Shutdown(context.Background())
if err != nil {
l.Errorf("closing uptrace: %s", err.Error())
}
}()
// create metrics server
metricsServer := metrics.New(viper.GetString(config.Keys.MetricsHTTPBind))
// create clock module
l.Debug("creating clock")
clockMod := clock.NewClock()
// create database client
l.Debug("creating database client")
dbClient, err := bun.New(ctx)
if err != nil {
l.Errorf("db: %s", err.Error())
cancel()
return err
}
dbCacheClient, err := cachemem.New(ctx, dbClient)
if err != nil {
l.Errorf("db: %s", err.Error())
cancel()
return err
}
defer func() {
err := dbCacheClient.Close(ctx)
if err != nil {
l.Errorf("closing db: %s", err.Error())
}
}()
// create http client
httpClient, err := http.NewClient(
ctx,
fmt.Sprintf("Go-http-client/2.0 (%s/%s; +https://%s/)",
viper.GetString(config.Keys.ApplicationName),
viper.GetString(config.Keys.SoftwareVersion),
viper.GetString(config.Keys.ServerExternalHostname),
),
viper.GetInt(config.Keys.HTTPClientTimeout),
)
if err != nil {
l.Errorf("http client: %s", err.Error())
cancel()
return err
}
// create kv client
kvClient, err := redis.New(ctx)
if err != nil {
l.Errorf("redis: %s", err.Error())
cancel()
return err
}
defer func() {
err := kvClient.Close(ctx)
if err != nil {
l.Errorf("closing redis: %s", err.Error())
}
}()
// create tokenizer
tokz, err := token.New()
if err != nil {
l.Errorf("create tokenizer: %s", err.Error())
cancel()
return err
}
// create logic module
l.Debug("creating logic module")
logicMod, err := logic1.New(ctx, clockMod, dbCacheClient, httpClient, kvClient, tokz)
if err != nil {
l.Errorf("logic: %s", err.Error())
cancel()
return err
}
// create runner
// create runner
l.Debug("creating runner module")
runnerAddr := viper.GetString(config.Keys.RedisAddress)
runnerPassword := viper.GetString(config.Keys.RedisPassword)
runnerDB := viper.GetInt(config.Keys.RedisDB)
if viper.GetString(config.Keys.RunnerAddress) != "" {
runnerAddr = viper.GetString(config.Keys.RunnerAddress)
runnerPassword = viper.GetString(config.Keys.RunnerPassword)
runnerDB = viper.GetInt(config.Keys.RunnerDB)
}
runnerMod, err := asynq.New(&asynq.Config{
Logic: logicMod,
Concurrency: viper.GetInt(config.Keys.RunnerConcurrency),
Address: runnerAddr,
Password: runnerPassword,
DB: runnerDB,
})
if err != nil {
l.Errorf("runner server: %s", err.Error())
cancel()
return err
}
logicMod.SetRunner(runnerMod)
// scheduler
scheduler, err := runnerMod.NewScheduler()
if err != nil {
l.Errorf("getting scheduler: %s", err.Error())
cancel()
return err
}
err = scheduler.Start()
if err != nil {
l.Errorf("starting scheduler: %s", err.Error())
cancel()
return err
}
defer func() {
l.Debug("closing scheduler")
err := scheduler.Stop()
if err != nil {
l.Errorf("closing scheduler: %s", err.Error())
}
}()
// ** start application **
errChan := make(chan error)
// Wait for SIGINT and SIGTERM (HIT CTRL-C)
stopSigChan := make(chan os.Signal)
signal.Notify(stopSigChan, syscall.SIGINT, syscall.SIGTERM)
// start metrics server
go func(m *metrics.Module, errChan chan error) {
l.Debug("starting metrics server")
err := m.Start()
if err != nil {
errChan <- fmt.Errorf("metrics server: %s", err.Error())
}
}(metricsServer, errChan)
// wait for event
select {
case sig := <-stopSigChan:
l.Infof("got sig: %s", sig)
cancel()
case err := <-errChan:
l.Fatal(err.Error())
cancel()
}
<-ctx.Done()
l.Infof("done")
return nil
}

@ -2,7 +2,6 @@ package server
import (
"context"
"errors"
"fmt"
"git.ptzo.gdn/feditools/relay/cmd/relay/action"
"git.ptzo.gdn/feditools/relay/internal/clock"
@ -16,7 +15,6 @@ import (
"git.ptzo.gdn/feditools/relay/internal/logic/logic1"
"git.ptzo.gdn/feditools/relay/internal/metrics"
"git.ptzo.gdn/feditools/relay/internal/runner/asynq"
"git.ptzo.gdn/feditools/relay/internal/scheduler"
"git.ptzo.gdn/feditools/relay/internal/token"
"github.com/spf13/viper"
"github.com/uptrace/uptrace-go/uptrace"
@ -186,16 +184,6 @@ var Start action.Action = func(topCtx context.Context) error {
}
logicMod.SetRunner(runnerMod)
// create scheduler
schedulerMod, err := scheduler.New(logicMod, runnerMod)
if err != nil {
l.Errorf("scheduler: %s", err.Error())
cancel()
return err
}
logicMod.SetScheduler(schedulerMod)
// create http server
l.Debug("creating http server")
server, err := newHttpServer(
@ -231,13 +219,6 @@ var Start action.Action = func(topCtx context.Context) error {
}
}(metricsServer, errChan)
// start scheduler
go func(s *scheduler.Module, errChan chan error) {
l.Debug("starting scheduler server")
s.Start()
errChan <- errors.New("scheduler stopped")
}(schedulerMod, errChan)
// start webserver
go func(s *http.Server, errChan chan error) {
l.Debug("starting http server")

@ -0,0 +1,15 @@
package flag
import (
"git.ptzo.gdn/feditools/relay/internal/config"
"github.com/spf13/cobra"
)
// Scheduler adds all flags for running the scheduler.
func Scheduler(cmd *cobra.Command, values config.Values) {
Redis(cmd, values)
Runner(cmd, values)
// metrics
cmd.PersistentFlags().String(config.Keys.MetricsHTTPBind, values.MetricsHTTPBind, usage.MetricsHTTPBind)
}

@ -51,6 +51,7 @@ func main() {
rootCmd.AddCommand(accountCommands())
rootCmd.AddCommand(databaseCommands())
rootCmd.AddCommand(serverCommands())
rootCmd.AddCommand(schedulerCommands())
rootCmd.AddCommand(workerCommands())
err = rootCmd.Execute()

@ -0,0 +1,33 @@
package main
import (
"git.ptzo.gdn/feditools/relay/cmd/relay/action/scheduler"
"git.ptzo.gdn/feditools/relay/cmd/relay/flag"
"git.ptzo.gdn/feditools/relay/internal/config"
"github.com/spf13/cobra"
)
// schedulerCommands returns the 'scheduler' subcommand
func schedulerCommands() *cobra.Command {
schedulerCmd := &cobra.Command{
Use: "scheduler",
Short: "controls a scheduler",
}
schedulerStartCmd := &cobra.Command{
Use: "start",
Short: "start the relay scheduler",
PreRunE: func(cmd *cobra.Command, args []string) error {
return preRun(cmd)
},
RunE: func(cmd *cobra.Command, args []string) error {
return run(cmd.Context(), scheduler.Start)
},
}
flag.Scheduler(schedulerStartCmd, config.Defaults)
schedulerCmd.AddCommand(schedulerStartCmd)
return schedulerCmd
}

@ -5,9 +5,6 @@ go 1.19
require (
git.ptzo.gdn/feditools/go-lib v0.20.2-0.20230110023750-5bd60af86f11
github.com/allegro/bigcache/v3 v3.1.0
github.com/contribsys/faktory v1.6.2
github.com/contribsys/faktory_worker_go v1.6.0
github.com/go-co-op/gocron v1.18.0
github.com/go-fed/activity v1.0.0
github.com/go-fed/httpsig v1.1.0
github.com/go-playground/validator/v10 v10.11.1

@ -81,10 +81,6 @@ github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/contribsys/faktory v1.6.2 h1:HuqJI9ZEeInN2nJg10WRy8zPpxNwVIZgACbex7wQG1A=
github.com/contribsys/faktory v1.6.2/go.mod h1:R8+inlM1rq3GzyG8iZUL7qhfNfXGIgcbQRvTHmSyuUI=
github.com/contribsys/faktory_worker_go v1.6.0 h1:ov69BLHL62i/wRLJwvuj5UphwgjMOINRCGW3KzrKOjk=
github.com/contribsys/faktory_worker_go v1.6.0/go.mod h1:XMNGn3sBJdqFGfTH4SkmYkMovhdkq5cDJj36wowfbNY=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
@ -119,8 +115,6 @@ github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmV
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-co-op/gocron v1.18.0 h1:SxTyJ5xnSN4byCq7b10LmmszFdxQlSQJod8s3gbnXxA=
github.com/go-co-op/gocron v1.18.0/go.mod h1:sD/a0Aadtw5CpflUJ/lpP9Vfdk979Wl1Sg33HPHg0FY=
github.com/go-fed/activity v1.0.0 h1:j7w3auHZnVCjUcgA1mE+UqSOjFBhvW2Z2res3vNol+o=
github.com/go-fed/activity v1.0.0/go.mod h1:v4QoPaAzjWZ8zN2VFVGL5ep9C02mst0hQYHUpQwso4Q=
github.com/go-fed/httpsig v0.1.1-0.20190914113940-c2de3672e5b5/go.mod h1:T56HUNYZUQ1AGUzhAYPugZfp36sKApVnGBgKlIY+aIE=

@ -4,7 +4,6 @@ import (
libtemplate "git.ptzo.gdn/feditools/go-lib/template"
"git.ptzo.gdn/feditools/relay/internal/http/template"
"git.ptzo.gdn/feditools/relay/internal/language"
"git.ptzo.gdn/feditools/relay/internal/scheduler"
"net/http"
)
@ -75,18 +74,6 @@ func (m *Module) displayAdminJobs(w http.ResponseWriter, r *http.Request, config
Required: true,
Options: []template.FormSelectOption{
{},
{
Text: string(scheduler.JobMaintDeliveryErrorTimeout),
Value: string(scheduler.JobMaintDeliveryErrorTimeout),
},
{
Text: string(scheduler.JobUpdateAccountInfo),
Value: string(scheduler.JobUpdateAccountInfo),
},
{
Text: string(scheduler.JobUpdateInstanceInfo),
Value: string(scheduler.JobUpdateInstanceInfo),
},
},
Validation: config.FormRunJobJobValidation,
},

@ -91,3 +91,21 @@ func (l *Localizer) TextStatistics() *LocalizedString {
string: text,
}
}
// TextStatus returns a translated phrase.
func (l *Localizer) TextStatus() *LocalizedString {
text, tag, err := l.localizer.LocalizeWithTag(&i18n.LocalizeConfig{
DefaultMessage: &i18n.Message{
ID: "Status",
Other: "Status",
},
})
if err != nil {
logger.WithField("func", "TextStatus").Warningf(missingTranslationWarning, err.Error())
}
return &LocalizedString{
language: tag,
string: text,
}
}

@ -5,7 +5,6 @@ import (
"errors"
"git.ptzo.gdn/feditools/relay/internal/db"
"git.ptzo.gdn/feditools/relay/internal/models"
worker "github.com/contribsys/faktory_worker_go"
"github.com/sirupsen/logrus"
)
@ -137,11 +136,8 @@ func (l *Logic) AddBlocks(ctx context.Context, account *models.Account, blocks .
}
func (l *Logic) ProcessBlockAdd(ctx context.Context, blockID int64) error {
help := worker.HelperFor(ctx)
log := logger.WithFields(logrus.Fields{
"func": "ProcessBlockAdd",
"jid": help.Jid(),
})
// read block
@ -160,11 +156,8 @@ func (l *Logic) ProcessBlockAdd(ctx context.Context, blockID int64) error {
}
func (l *Logic) doBlockAdd(ctx context.Context, block *models.Block) error {
help := worker.HelperFor(ctx)
log := logger.WithFields(logrus.Fields{
"func": "doBlockAdd",
"jid": help.Jid(),
})
instance, err := l.db.ReadInstanceByDomain(ctx, block.Domain)
@ -181,11 +174,8 @@ func (l *Logic) doBlockAdd(ctx context.Context, block *models.Block) error {
}
func (l *Logic) doBlockAddSubdomains(ctx context.Context, block *models.Block) error {
help := worker.HelperFor(ctx)
log := logger.WithFields(logrus.Fields{
"func": "doBlockAddSubdomains",
"jid": help.Jid(),
})
instances, err := l.db.ReadInstancesWithDomainSuffix(ctx, block.Domain)

@ -5,7 +5,6 @@ import (
"errors"
"git.ptzo.gdn/feditools/relay/internal/db"
"git.ptzo.gdn/feditools/relay/internal/models"
worker "github.com/contribsys/faktory_worker_go"
"github.com/sirupsen/logrus"
"time"
)
@ -75,11 +74,8 @@ func (l *Logic) DeleteBlock(ctx context.Context, account *models.Account, block
}
func (l *Logic) ProcessBlockDelete(ctx context.Context, blockID int64) error {
help := worker.HelperFor(ctx)
log := logger.WithFields(logrus.Fields{
"func": "ProcessBlockDelete",
"jid": help.Jid(),
})
// read block

@ -5,7 +5,6 @@ import (
"errors"
"git.ptzo.gdn/feditools/relay/internal/db"
"git.ptzo.gdn/feditools/relay/internal/models"
worker "github.com/contribsys/faktory_worker_go"
"github.com/sirupsen/logrus"
)
@ -82,11 +81,8 @@ func (l *Logic) UpdateBlock(
}
func (l *Logic) ProcessBlockUpdate(ctx context.Context, blockID int64) error {
help := worker.HelperFor(ctx)
log := logger.WithFields(logrus.Fields{
"func": "ProcessBlockUpdate",
"jid": help.Jid(),
})
// read block
@ -105,11 +101,8 @@ func (l *Logic) ProcessBlockUpdate(ctx context.Context, blockID int64) error {
}
func (l *Logic) doBlockUpdate(ctx context.Context, block *models.Block) error {
help := worker.HelperFor(ctx)
log := logger.WithFields(logrus.Fields{
"func": "doBlockUpdate",
"jid": help.Jid(),
})
instances, err := l.db.ReadInstancesWithBlockID(ctx, block.ID)
@ -141,11 +134,8 @@ func (l *Logic) doBlockUpdate(ctx context.Context, block *models.Block) error {
}
func (l *Logic) doBlockUpdateSubdomains(ctx context.Context, block *models.Block) error {
help := worker.HelperFor(ctx)
log := logger.WithFields(logrus.Fields{
"func": "doBlockUpdateSubdomains",
"jid": help.Jid(),
})
instances, err := l.db.ReadInstancesWithDomainSuffix(ctx, block.Domain)

@ -12,7 +12,6 @@ import (
"git.ptzo.gdn/feditools/relay/internal/notification"
"git.ptzo.gdn/feditools/relay/internal/path"
"git.ptzo.gdn/feditools/relay/internal/runner"
"git.ptzo.gdn/feditools/relay/internal/scheduler"
"git.ptzo.gdn/feditools/relay/internal/token"
"github.com/go-fed/activity/pub"
"github.com/go-fed/httpsig"
@ -33,7 +32,6 @@ type Logic struct {
kv kv.KV
notifier notification.Notifier
runner runner.Runner
scheduler *scheduler.Module
tokz *token.Tokenizer
tracer trace.Tracer
transport *fedihelper.Transport
@ -137,7 +135,3 @@ func (l *Logic) SetNotifier(n notification.Notifier) {
func (l *Logic) SetRunner(r runner.Runner) {
l.runner = r
}
func (l *Logic) SetScheduler(s *scheduler.Module) {
l.scheduler = s
}

@ -2,25 +2,12 @@ package logic1
import (
"git.ptzo.gdn/feditools/relay/internal/logic"
"git.ptzo.gdn/feditools/relay/internal/scheduler"
)
func (l *Logic) GetSchedulerJobs() ([]logic.SchedulerJob, error) {
jobs := l.scheduler.Jobs()
logicJobs := make([]logic.SchedulerJob, len(jobs))
for i, j := range jobs {
logicJobs[i].Name = j.Tags()[0]
logicJobs[i].Running = j.IsRunning()
logicJobs[i].RunCount = j.RunCount()
logicJobs[i].LastRun = j.LastRun()
logicJobs[i].NextRun = j.NextRun()
logicJobs[i].LastError = j.Error()
}
return logicJobs, nil
return l.runner.Jobs()
}
func (l *Logic) RunSchedulerJob(job string) error {
return l.scheduler.Run(scheduler.Job(job))
return nil
}

@ -11,9 +11,9 @@ type Scheduler interface {
type SchedulerJob struct {
Name string
Running bool
State string
RunCount int
LastRun time.Time
NextRun time.Time
LastError error
LastError string
}

@ -32,7 +32,7 @@ func (r *Runner) EnqueueDeliverActivity(ctx context.Context, instanceID int64, a
task := asynq.NewTask(TypeDeliverActivity, payload, asynq.Queue(runner.QueueDelivery))
info, err := r.client.Enqueue(task, asynq.MaxRetry(5))
info, err := r.client.EnqueueContext(ctx, task, asynq.MaxRetry(5))
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())

@ -41,7 +41,7 @@ func (r *Runner) EnqueueInboxActivity(ctx context.Context, instanceID int64, act
task := asynq.NewTask(TypeInboxActivity, payload)
info, err := r.client.Enqueue(task, asynq.MaxRetry(5))
info, err := r.client.EnqueueContext(ctx, task, asynq.MaxRetry(5))
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())

@ -28,7 +28,7 @@ func (r *Runner) EnqueueProcessBlockAdd(ctx context.Context, blockID int64) erro
task := asynq.NewTask(TypeProcessBlockAdd, payload)
info, err := r.client.Enqueue(task)
info, err := r.client.EnqueueContext(ctx, task)
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())

@ -28,7 +28,7 @@ func (r *Runner) EnqueueProcessBlockDelete(ctx context.Context, blockID int64) e
task := asynq.NewTask(TypeProcessBlockDelete, payload)
info, err := r.client.Enqueue(task)
info, err := r.client.EnqueueContext(ctx, task)
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())

@ -28,7 +28,7 @@ func (r *Runner) EnqueueProcessBlockUpdate(ctx context.Context, blockID int64) e
task := asynq.NewTask(TypeProcessBlockUpdate, payload)
info, err := r.client.Enqueue(task)
info, err := r.client.EnqueueContext(ctx, task)
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())

@ -2,6 +2,7 @@ package asynq
import (
"context"
"fmt"
"github.com/hibiken/asynq"
)
@ -15,7 +16,7 @@ func (r *Runner) EnqueueMaintDeliveryErrorTimeout(ctx context.Context) error {
task := asynq.NewTask(TypeMaintDeliveryErrorTimeout, nil)
info, err := r.client.Enqueue(task)
info, err := r.client.EnqueueContext(ctx, task)
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())
@ -32,5 +33,11 @@ func (r *Runner) handleMaintDeliveryErrorTimeout(ctx context.Context, _ *asynq.T
//l := logger.WithField("func", "handleMaintDeliveryErrorTimeout")
return r.logic.MaintDeliveryErrorTimeout(ctx, "")
err := r.logic.MaintDeliveryErrorTimeout(ctx, "")
if err != nil {
span.RecordError(err)
return fmt.Errorf("%s: %w", err.Error(), asynq.SkipRetry)
}
return nil
}

@ -10,16 +10,17 @@ import (
)
func New(c *Config) (*Runner, error) {
client := asynq.NewClient(asynq.RedisClientOpt{
redisConf := asynq.RedisClientOpt{
Addr: c.Address,
Password: c.Password,
DB: c.DB,
})
}
return &Runner{
client: client,
logic: c.Logic,
tracer: otel.Tracer("internal/runner/asynq"),
client: asynq.NewClient(redisConf),
inspector: asynq.NewInspector(redisConf),
logic: c.Logic,
tracer: otel.Tracer("internal/runner/asynq"),
concurrency: c.Concurrency,
address: c.Address,
@ -29,10 +30,11 @@ func New(c *Config) (*Runner, error) {
}
type Runner struct {
client *asynq.Client
logic logic.Logic
server *asynq.Server
tracer trace.Tracer
client *asynq.Client
inspector *asynq.Inspector
logic logic.Logic
server *asynq.Server
tracer trace.Tracer
concurrency int
address string
@ -68,7 +70,9 @@ func (r *Runner) Start(_ context.Context) error {
mux.HandleFunc(TypeMaintDeliveryErrorTimeout, r.handleMaintDeliveryErrorTimeout)
mux.HandleFunc(TypeSendNotification, r.handleSendNotification)
mux.HandleFunc(TypeUpdateAccountInfo, r.handleUpdateAccountInfo)
mux.HandleFunc(TypeUpdateAccountsInfo, r.handleUpdateAccountsInfo)
mux.HandleFunc(TypeUpdateInstanceInfo, r.handleUpdateInstanceInfo)
mux.HandleFunc(TypeUpdateInstancesInfo, r.handleUpdateInstancesInfo)
return r.server.Run(mux)
}
@ -78,3 +82,22 @@ func (r *Runner) Stop() error {
return nil
}
func (r *Runner) Jobs() ([]logic.SchedulerJob, error) {
jobs, err := r.inspector.ListScheduledTasks(runner.QueuePriority)
if err != nil {
return nil, err
}
logicJobs := make([]logic.SchedulerJob, len(jobs))
for i, j := range jobs {
logicJobs[i].Name = j.Type
logicJobs[i].State = j.State.String()
logicJobs[i].RunCount = 0
logicJobs[i].LastRun = j.CompletedAt
logicJobs[i].NextRun = j.NextProcessAt
logicJobs[i].LastError = j.LastErr
}
return logicJobs, nil
}

@ -0,0 +1,61 @@
package asynq
import (
"git.ptzo.gdn/feditools/relay/internal/runner"
"github.com/hibiken/asynq"
)
func (r *Runner) NewScheduler() (*Scheduler, error) {
l := logger.WithField("func", "NewScheduler")
s := asynq.NewScheduler(
asynq.RedisClientOpt{
Addr: r.address,
Password: r.password,
DB: r.db,
},
nil,
)
// delivery error
taskMaintDeliveryErrorTimeout := asynq.NewTask(TypeMaintDeliveryErrorTimeout, nil)
entryID, err := s.Register("0 1 * * *", taskMaintDeliveryErrorTimeout, asynq.Queue(runner.QueuePriority))
if err != nil {
return nil, err
}
l.Debugf("registered task %s: %q\n", TypeMaintDeliveryErrorTimeout, entryID)
// update accounts
taskUpdateAccountsInfo := asynq.NewTask(TypeUpdateAccountsInfo, nil)
entryID, err = s.Register("0 2 * * *", taskUpdateAccountsInfo, asynq.Queue(runner.QueuePriority))
if err != nil {
return nil, err
}
l.Debugf("registered task %s: %q\n", TypeUpdateAccountsInfo, entryID)
// update instances
taskUpdateInstancesInfo := asynq.NewTask(TypeUpdateInstancesInfo, nil)
entryID, err = s.Register("0 3 * * *", taskUpdateInstancesInfo, asynq.Queue(runner.QueuePriority))
if err != nil {
return nil, err
}
l.Debugf("registered task %s: %q\n", TypeUpdateInstancesInfo, entryID)
return &Scheduler{
scheduler: s,
}, nil
}
type Scheduler struct {
scheduler *asynq.Scheduler
}
func (s *Scheduler) Start() error {
return s.scheduler.Start()
}
func (s *Scheduler) Stop() error {
s.scheduler.Shutdown()
return nil
}

@ -32,7 +32,7 @@ func (r *Runner) EnqueueSendNotification(ctx context.Context, event models.Event
task := asynq.NewTask(TypeSendNotification, payload, asynq.Queue(runner.QueuePriority))
info, err := r.client.Enqueue(task)
info, err := r.client.EnqueueContext(ctx, task)
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())

@ -28,7 +28,7 @@ func (r *Runner) EnqueueUpdateAccountInfo(ctx context.Context, accountID int64)
task := asynq.NewTask(TypeUpdateAccountInfo, payload)
info, err := r.client.Enqueue(task)
info, err := r.client.EnqueueContext(ctx, task)
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())

@ -0,0 +1,23 @@
package asynq
import (
"context"
"fmt"
"github.com/hibiken/asynq"
)
const TypeUpdateAccountsInfo = "update:accounts"
func (r *Runner) handleUpdateAccountsInfo(ctx context.Context, _ *asynq.Task) error {
ctx, span := r.tracer.Start(ctx, "handleUpdateAccountsInfo")
defer span.End()
//l := logger.WithField("func", "handleUpdateAccountsInfo")
// process activity
if err := r.logic.EnqueueAccountInfoUpdates(ctx); err != nil {
return fmt.Errorf("%s: %w", err.Error(), asynq.SkipRetry)
}
return nil
}

@ -28,7 +28,7 @@ func (r *Runner) EnqueueUpdateInstanceInfo(ctx context.Context, instanceID int64
task := asynq.NewTask(TypeUpdateInstanceInfo, payload)
info, err := r.client.Enqueue(task)
info, err := r.client.EnqueueContext(ctx, task)
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())

@ -0,0 +1,23 @@
package asynq
import (
"context"
"fmt"
"github.com/hibiken/asynq"
)
const TypeUpdateInstancesInfo = "update:instances"
func (r *Runner) handleUpdateInstancesInfo(ctx context.Context, _ *asynq.Task) error {
ctx, span := r.tracer.Start(ctx, "handleUpdateInstancesInfo")
defer span.End()
//l := logger.WithField("func", "handleUpdateInstancesInfo")
// process activity
if err := r.logic.EnqueueInstanceInfoUpdates(ctx); err != nil {
return fmt.Errorf("%s: %w", err.Error(), asynq.SkipRetry)
}
return nil
}

@ -1,43 +0,0 @@
package faktory
import (
"context"
faktory "github.com/contribsys/faktory/client"
worker "github.com/contribsys/faktory_worker_go"
"github.com/sirupsen/logrus"
"strconv"
)
func (r *Runner) EnqueueUpdateAccountInfo(_ context.Context, accountID int64) error {
retry := 0
job := faktory.NewJob(JobUpdateAccountInfo, strconv.FormatInt(accountID, 10))
job.Retry = &retry
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) updateAccountInfo(ctx context.Context, args ...interface{}) error {
help := worker.HelperFor(ctx)
l := logger.WithFields(logrus.Fields{
"func": "updateAccountInfo",
"jid": help.Jid(),
})
if len(args) != 1 {
l.Errorf("wrong number of arguments, got: %d, want: %d", len(args), 2)
}
// cast arguments
accountID, err := getInt64(args, 0)
if err != nil {
l.Error(err.Error())
return err
}
return r.logic.UpdateAccountInfo(ctx, help.Jid(), accountID)
}

@ -1,120 +0,0 @@
package faktory
import (
"context"
"fmt"
"git.ptzo.gdn/feditools/go-lib/fedihelper"
"git.ptzo.gdn/feditools/relay/internal/runner"
faktory "github.com/contribsys/faktory/client"
worker "github.com/contribsys/faktory_worker_go"
"github.com/sirupsen/logrus"
"net/url"
"strconv"
)
func (r *Runner) EnqueueDeliverActivity(_ context.Context, instanceID int64, activity fedihelper.Activity) error {
retry := 8
job := faktory.NewJob(JobDeliverActivity, strconv.FormatInt(instanceID, 10), activity)
job.Queue = runner.QueueDelivery
job.Retry = &retry
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) deliverActivity(ctx context.Context, args ...interface{}) error {
help := worker.HelperFor(ctx)
l := logger.WithFields(logrus.Fields{
"func": "deliverActivity",
"jid": help.Jid(),
})
if len(args) != 2 {
l.Errorf("wrong number of arguments, got: %d, want: %d", len(args), 2)
}
// cast arguments
instanceIDStr, ok := args[0].(string)
if !ok {
l.Errorf("argument 0 is not an string")
return fmt.Errorf("argument 0 is not an int")
}
instanceID, err := strconv.ParseInt(instanceIDStr, 10, 64)
if err != nil {
l.Errorf("cant parse int from argument 0: %s", err.Error())
return fmt.Errorf("cant parse int from argument 0: %s", err.Error())
}
activity, ok := args[1].(map[string]interface{})
if !ok {
l.Errorf("argument 1 is not an activity")
return fmt.Errorf("argument 1 is not an activity")
}
return r.logic.DeliverActivity(ctx, help.Jid(), instanceID, activity, false)
}
func (r *Runner) EnqueueInboxActivity(_ context.Context, instanceID int64, actorIRI string, activity fedihelper.Activity) error {
job := faktory.NewJob(JobInboxActivity, strconv.FormatInt(instanceID, 10), actorIRI, activity)
job.Queue = runner.QueueDefault
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) inboxActivity(ctx context.Context, args ...interface{}) error {
help := worker.HelperFor(ctx)
l := logger.WithFields(logrus.Fields{
"func": "inboxActivity",
"jid": help.Jid(),
})
if len(args) != 3 {
l.Errorf("wrong number of arguments, got: %d, want: %d", len(args), 2)
}
// cast arguments
instanceIDStr, ok := args[0].(string)
if !ok {
l.Errorf("argument 0 is not an string, got %T", args[0])
return fmt.Errorf("argument 0 is not an int")
}
instanceID, err := strconv.ParseInt(instanceIDStr, 10, 64)
if err != nil {
l.Errorf("cant parse int from argument 0: %s", err.Error())
return fmt.Errorf("cant parse int from argument 0: %s", err.Error())
}
actorID, ok := args[1].(string)
if !ok {
l.Errorf("argument 1 is not an string, got %T", args[1])
return fmt.Errorf("argument 1 is not an actor")
}
actorIRI, err := url.Parse(actorID)
if err != nil {
l.Errorf("cant parse url from argument 1: %s", err.Error())
return fmt.Errorf("cant parse url from argument 1: %s", err.Error())
}
activity, ok := args[2].(map[string]interface{})
if !ok {
l.Errorf("argument 2 is not an activity, got %T", args[2])
return fmt.Errorf("argument 2 is not an activity")
}
// process activity
return r.logic.ProcessActivity(ctx, help.Jid(), instanceID, actorIRI, activity)
}

@ -1,102 +0,0 @@
package faktory
import (
"context"
faktory "github.com/contribsys/faktory/client"
worker "github.com/contribsys/faktory_worker_go"
"github.com/sirupsen/logrus"
"strconv"
)
func (r *Runner) EnqueueProcessBlockAdd(_ context.Context, blockID int64) error {
job := faktory.NewJob(JobProcessBlockAdd, strconv.FormatInt(blockID, 10))
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) EnqueueProcessBlockDelete(_ context.Context, blockID int64) error {
job := faktory.NewJob(JobProcessBlockDelete, strconv.FormatInt(blockID, 10))
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) EnqueueProcessBlockUpdate(_ context.Context, blockID int64) error {
job := faktory.NewJob(JobProcessBlockUpdate, strconv.FormatInt(blockID, 10))
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) processBlockAdd(ctx context.Context, args ...interface{}) error {
help := worker.HelperFor(ctx)
l := logger.WithFields(logrus.Fields{
"func": "processBlockAdd",
"jid": help.Jid(),
})
if len(args) != 1 {
l.Errorf("wrong number of arguments, got: %d, want: %d", len(args), 2)
}
// cast arguments
blockID, err := getInt64(args, 0)
if err != nil {
l.Error(err.Error())
return err
}
return r.logic.ProcessBlockAdd(ctx, blockID)
}
func (r *Runner) processBlockDelete(ctx context.Context, args ...interface{}) error {
help := worker.HelperFor(ctx)
l := logger.WithFields(logrus.Fields{
"func": "processBlockDelete",
"jid": help.Jid(),
})
if len(args) != 1 {
l.Errorf("wrong number of arguments, got: %d, want: %d", len(args), 2)
}
// cast arguments
blockID, err := getInt64(args, 0)
if err != nil {
l.Error(err.Error())
return err
}
return r.logic.ProcessBlockDelete(ctx, blockID)
}
func (r *Runner) processBlockUpdate(ctx context.Context, args ...interface{}) error {
help := worker.HelperFor(ctx)
l := logger.WithFields(logrus.Fields{
"func": "processBlockDelete",
"jid": help.Jid(),
})
if len(args) != 1 {
l.Errorf("wrong number of arguments, got: %d, want: %d", len(args), 2)
}
// cast arguments
blockID, err := getInt64(args, 0)
if err != nil {
l.Error(err.Error())
return err
}
return r.logic.ProcessBlockUpdate(ctx, blockID)
}

@ -1,15 +0,0 @@
package faktory
const (
twice = 2
JobDeliverActivity = "DeliverActivity"
JobInboxActivity = "InboxActivity"
JobMaintDeliveryErrorTimeout = "MaintDeliveryErrorTimeout"
JobProcessBlockAdd = "ProcessBlockAdd"
JobProcessBlockDelete = "ProcessBlockDelete"
JobProcessBlockUpdate = "ProcessBlockUpdate"
JobSendNotification = "SendNotification"
JobUpdateAccountInfo = "UpdateAccountInfo"
JobUpdateInstanceInfo = "UpdateInstanceInfo"
)

@ -1,43 +0,0 @@
package faktory
import (
"context"
faktory "github.com/contribsys/faktory/client"
worker "github.com/contribsys/faktory_worker_go"
"github.com/sirupsen/logrus"
"strconv"
)
func (r *Runner) EnqueueUpdateInstanceInfo(_ context.Context, instanceID int64) error {
retry := 0
job := faktory.NewJob(JobUpdateInstanceInfo, strconv.FormatInt(instanceID, 10))
job.Retry = &retry
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) updateInstanceInfo(ctx context.Context, args ...interface{}) error {
help := worker.HelperFor(ctx)
l := logger.WithFields(logrus.Fields{
"func": "updateInstanceInfo",
"jid": help.Jid(),
})
if len(args) != 1 {
l.Errorf("wrong number of arguments, got: %d, want: %d", len(args), 2)
}
// cast arguments
instanceID, err := getInt64(args, 0)
if err != nil {
l.Error(err.Error())
return err
}
return r.logic.UpdateInstanceInfo(ctx, help.Jid(), instanceID)
}

@ -1,9 +0,0 @@
package faktory
import (
"git.ptzo.gdn/feditools/relay/internal/log"
)
type empty struct{}
var logger = log.WithPackageField(empty{})

@ -1,32 +0,0 @@
package faktory
import (
"context"
faktory "github.com/contribsys/faktory/client"
worker "github.com/contribsys/faktory_worker_go"
"github.com/sirupsen/logrus"
)
func (r *Runner) EnqueueMaintDeliveryErrorTimeout(_ context.Context) error {
job := faktory.NewJob(JobMaintDeliveryErrorTimeout)
job.Args = []interface{}{}
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) maintDeliveryErrorTimeout(ctx context.Context, args ...interface{}) error {
help := worker.HelperFor(ctx)
l := logger.WithFields(logrus.Fields{
"func": "maintDeliveryErrorTimeout",
"jid": help.Jid(),
})
if len(args) != 0 {
l.Errorf("wrong number of arguments, got: %d, want: %d", len(args), 0)
}
return r.logic.MaintDeliveryErrorTimeout(ctx, help.Jid())
}

@ -1,19 +0,0 @@
package faktory
import (
"context"
"git.ptzo.gdn/feditools/relay/internal/runner"
faktory "github.com/contribsys/faktory/client"
"go.opentelemetry.io/otel/trace"
)
func (r *Runner) Middleware(ctx context.Context, job *faktory.Job, next func(ctx context.Context) error) error {
opts := []trace.SpanStartOption{
trace.WithAttributes(runner.JobIDKey.String(job.Jid)),
trace.WithSpanKind(trace.SpanKindConsumer),
}
tctx, main := r.tracer.Start(ctx, job.Type, opts...)
defer main.End()
return next(tctx)
}

@ -1,49 +0,0 @@
package faktory
import (
"context"
"fmt"
"git.ptzo.gdn/feditools/relay/internal/models"
"git.ptzo.gdn/feditools/relay/internal/runner"
faktory "github.com/contribsys/faktory/client"
worker "github.com/contribsys/faktory_worker_go"
"github.com/sirupsen/logrus"
)
func (r *Runner) EnqueueSendNotification(_ context.Context, event models.EventType, metadata map[string]interface{}) error {
job := faktory.NewJob(JobSendNotification, event, metadata)
job.Queue = runner.QueuePriority
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)
})
}
func (r *Runner) sendNotification(ctx context.Context, args ...interface{}) error {
help := worker.HelperFor(ctx)