use asynq as job handler (#182)
feditools/relay/pipeline/head This commit looks good Details

Reviewed-on: #182
Co-authored-by: Tyr Mactire <tyr@pettingzoo.co>
Co-committed-by: Tyr Mactire <tyr@pettingzoo.co>
pull/183/head v0.10.0
Tyr Mactire 2 weeks ago committed by PettingZoo Gitea
parent 52e7a747fe
commit cd97960f32
Signed by: PettingZoo Gitea
GPG Key ID: 39788A4390A1372F

@ -13,6 +13,7 @@ clean:
@find . -name ".DS_Store" -exec rm -v {} \;
docker-pull:
docker-compose --project-name ${PROJECT_NAME} -f deployments/docker-compose-test.yaml build --pull
docker-compose --project-name ${PROJECT_NAME} -f deployments/docker-compose-test.yaml pull
docker-restart: docker-stop docker-start

@ -15,7 +15,7 @@ import (
"git.ptzo.gdn/feditools/relay/internal/language"
"git.ptzo.gdn/feditools/relay/internal/logic/logic1"
"git.ptzo.gdn/feditools/relay/internal/metrics"
"git.ptzo.gdn/feditools/relay/internal/runner/faktory"
"git.ptzo.gdn/feditools/relay/internal/runner/asynq"
"git.ptzo.gdn/feditools/relay/internal/scheduler"
"git.ptzo.gdn/feditools/relay/internal/token"
"github.com/spf13/viper"
@ -160,15 +160,31 @@ var Start action.Action = func(topCtx context.Context) error {
logicMod.SetNotifier(notifier)
// create runner
runnerMod, err := faktory.New(logicMod)
l.Debug("creating runner module")
runnerAddr := viper.GetString(config.Keys.RedisAddress)
runnerPassword := viper.GetString(config.Keys.RedisPassword)
runnerDB := viper.GetInt(config.Keys.RedisDB)
if viper.GetString(config.Keys.RunnerAddress) != "" {
runnerAddr = viper.GetString(config.Keys.RunnerAddress)
runnerPassword = viper.GetString(config.Keys.RunnerPassword)
runnerDB = viper.GetInt(config.Keys.RunnerDB)
}
runnerMod, err := asynq.New(&asynq.Config{
Logic: logicMod,
Address: runnerAddr,
Password: runnerPassword,
DB: runnerDB,
})
if err != nil {
l.Errorf("runner: %s", err.Error())
l.Errorf("runner server: %s", err.Error())
cancel()
return err
}
logicMod.SetRunner(runnerMod)
runnerMod.Start(ctx)
// create scheduler
schedulerMod, err := scheduler.New(logicMod, runnerMod)

@ -0,0 +1,30 @@
package worker
import (
"git.ptzo.gdn/feditools/relay/internal/logic"
"git.ptzo.gdn/feditools/relay/internal/notification/manager"
"git.ptzo.gdn/feditools/relay/internal/notification/telegram"
)
func newNotifier(
logicMod logic.Logic,
) (*manager.Manager, error) {
l := logger.WithField("func", "newNotifier")
newManager, err := manager.New()
if err != nil {
l.Errorf("notification manager: %s", err.Error())
return nil, err
}
telegramMod, err := telegram.New(logicMod)
if err != nil {
l.Errorf("notification telegram: %s", err.Error())
return nil, err
}
newManager.AddService(telegramMod)
return newManager, nil
}

@ -12,7 +12,7 @@ import (
"git.ptzo.gdn/feditools/relay/internal/kv/redis"
"git.ptzo.gdn/feditools/relay/internal/logic/logic1"
"git.ptzo.gdn/feditools/relay/internal/metrics"
"git.ptzo.gdn/feditools/relay/internal/runner/faktory"
"git.ptzo.gdn/feditools/relay/internal/runner/asynq"
"git.ptzo.gdn/feditools/relay/internal/token"
"github.com/spf13/viper"
"github.com/uptrace/uptrace-go/uptrace"
@ -127,15 +127,50 @@ var Start action.Action = func(ctx context.Context) error {
}
// create runner
runnerMod, err := faktory.New(logicMod)
// create runner
l.Debug("creating runner module")
runnerAddr := viper.GetString(config.Keys.RedisAddress)
runnerPassword := viper.GetString(config.Keys.RedisPassword)
runnerDB := viper.GetInt(config.Keys.RedisDB)
if viper.GetString(config.Keys.RunnerAddress) != "" {
runnerAddr = viper.GetString(config.Keys.RunnerAddress)
runnerPassword = viper.GetString(config.Keys.RunnerPassword)
runnerDB = viper.GetInt(config.Keys.RunnerDB)
}
runnerMod, err := asynq.New(&asynq.Config{
Logic: logicMod,
Concurrency: viper.GetInt(config.Keys.RunnerConcurrency),
Address: runnerAddr,
Password: runnerPassword,
DB: runnerDB,
})
if err != nil {
l.Errorf("runner: %s", err.Error())
l.Errorf("runner server: %s", err.Error())
cancel()
return err
}
defer func() {
l.Debug("closing runner")
err := runnerMod.Stop()
if err != nil {
l.Errorf("closing runner: %s", err.Error())
}
}()
logicMod.SetRunner(runnerMod)
runnerMod.Start(ctx)
// create language module
notifier, err := newNotifier(logicMod)
if err != nil {
l.Errorf("notifier: %s", err.Error())
cancel()
return err
}
logicMod.SetNotifier(notifier)
// ** start application **
errChan := make(chan error)
@ -153,6 +188,15 @@ var Start action.Action = func(ctx context.Context) error {
}
}(metricsServer, errChan)
// start runner server
go func(m *asynq.Runner, errChan chan error) {
l.Debug("starting runner server")
err := m.Start(ctx)
if err != nil {
errChan <- fmt.Errorf("runner server: %s", err.Error())
}
}(runnerMod, errChan)
// wait for event
select {
case sig := <-stopSigChan:

@ -9,4 +9,7 @@ import (
func Runner(cmd *cobra.Command, values config.Values) {
cmd.PersistentFlags().Int(config.Keys.RunnerConcurrency, values.RunnerConcurrency, usage.RunnerConcurrency)
cmd.PersistentFlags().Int(config.Keys.RunnerPoolSize, values.RunnerPoolSize, usage.RunnerPoolSize)
cmd.PersistentFlags().String(config.Keys.RunnerAddress, values.RunnerAddress, usage.RunnerAddress)
cmd.PersistentFlags().Int(config.Keys.RunnerDB, values.RunnerDB, usage.RunnerDB)
cmd.PersistentFlags().String(config.Keys.RunnerPassword, values.RunnerPassword, usage.RunnerPassword)
}

@ -2,26 +2,52 @@
version: '3'
services:
postgres:
image: postgres:14
image: postgres:15
ports:
- 127.0.0.1:5432:5432/tcp
- "127.0.0.1:5432:5432/tcp"
environment:
- POSTGRES_PASSWORD=test
- POSTGRES_USER=test
- POSTGRES_DB=test
restart: always
faktory:
image: contribsys/faktory:1.6.1
command: /faktory -b :7419 -w :7420 -e production
ports:
- 127.0.0.1:7419:7419/tcp
- 127.0.0.1:7420:7420/tcp
environment:
- FAKTORY_PASSWORD=test
restart: always
# faktory:
# image: contribsys/faktory:1.6.1
# command: /faktory -b :7419 -w :7420 -e production
# ports:
# - 127.0.0.1:7419:7419/tcp
# - 127.0.0.1:7420:7420/tcp
# environment:
# - FAKTORY_PASSWORD=test
# restart: always
redis:
image: redis:6
command: redis-server --requirepass test
ports:
- "127.0.0.1:6379:6379/tcp"
restart: always
redis-runner:
image: redis:6
command: redis-server --requirepass test2
ports:
- "127.0.0.1:6380:6379/tcp"
restart: always
asynqmon:
image: hibiken/asynqmon:latest
ports:
- "127.0.0.1:8080:8080/tcp"
environment:
- "REDIS_ADDR=redis-runner:6379"
- "REDIS_PASSWORD=test2"
- "ENABLE_METRICS_EXPORTER=true"
- "PROMETHEUS_ADDR=http://prometheus:9090"
prometheus:
build:
context: ./prometheus
ports:
- "127.0.0.1:9090:9090/tcp"
grafana:
build:
context: ./grafana
ports:
- "127.0.0.1:3000:3000/tcp"

@ -0,0 +1,4 @@
FROM grafana/grafana:latest
ADD grafana.ini /etc/grafana/
ADD datasource.yaml /etc/grafana/provisioning/datasources/

@ -0,0 +1,9 @@
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true
editable: true

File diff suppressed because it is too large Load Diff

@ -0,0 +1,3 @@
FROM prom/prometheus
ADD prometheus.yml /etc/prometheus/

@ -0,0 +1,11 @@
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: "prometheus"
static_configs:
- targets: ["localhost:9090"]
- job_name: "asynq"
static_configs:
- targets: ["asynqmon:8080"]

@ -18,6 +18,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/gorilla/sessions v1.2.1
github.com/hashicorp/golang-lru v0.5.4
github.com/hibiken/asynq v0.24.0
github.com/jackc/pgconn v1.13.0
github.com/jackc/pgx/v4 v4.17.2
github.com/jellydator/ttlcache/v3 v3.0.0
@ -121,6 +122,7 @@ require (
golang.org/x/crypto v0.3.0 // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect
golang.org/x/tools v0.3.0 // indirect
google.golang.org/genproto v0.0.0-20221118155620-16455021b5e6 // indirect
google.golang.org/grpc v1.51.0 // indirect

@ -155,6 +155,7 @@ github.com/go-redis/redis/extra/rediscmd/v8 v8.11.5/go.mod h1:s9f/6bSbS5r/jC2ozp
github.com/go-redis/redis/extra/redisotel/v8 v8.11.5 h1:BqyYJgvdSr2S/6O2l7zmCj26ocUTxDLgagsGIRfkS+Q=
github.com/go-redis/redis/extra/redisotel/v8 v8.11.5/go.mod h1:LlDT9RRdBgOrMGvFjT/m1+GrZAmRlBaMcM3UXHPWf8g=
github.com/go-redis/redis/v8 v8.3.3/go.mod h1:jszGxBCez8QA1HWSmQxJO9Y82kNibbUmeYhKWrBejTU=
github.com/go-redis/redis/v8 v8.11.2/go.mod h1:DLomh7y2e3ggQXQLd1YgmvIfecPJoFl7WU5SOQ/r06M=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
@ -228,6 +229,7 @@ github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
@ -254,6 +256,8 @@ github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+l
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hibiken/asynq v0.24.0 h1:r1CiSVYCy1vGq9REKGI/wdB2D5n/QmtzihYHHXOuBUs=
github.com/hibiken/asynq v0.24.0/go.mod h1:FVnRfUTm6gcoDkM/EjF4OIh5/06ergCPUO6pS2B2y+w=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
@ -388,6 +392,7 @@ github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.14.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg=
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
@ -395,6 +400,7 @@ github.com/onsi/ginkgo/v2 v2.0.0/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.10.3/go.mod h1:V9xEwhxec5O8UDM77eCW8vLymOMltsqPVYWrpDsH8xc=
github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48=
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
@ -469,6 +475,7 @@ github.com/speps/go-hashids/v2 v2.0.1 h1:ViWOEqWES/pdOSq+C1SLVa8/Tnsd52XC34RY7lt
github.com/speps/go-hashids/v2 v2.0.1/go.mod h1:47LKunwvDZki/uRVD6NImtyk712yFzIs3UF3KlHohGw=
github.com/spf13/afero v1.9.3 h1:41FoI0fD7OR7mGcKE/aOiLkGreyf8ifIOQmJANWogMk=
github.com/spf13/afero v1.9.3/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y=
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w=
github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU=
github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA=
@ -536,6 +543,7 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
@ -582,6 +590,7 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
@ -642,6 +651,7 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.7.0 h1:LapD9S96VoQRhi/GrNTqeBJFrUjs5UHCAtTlgwA5oZA=
golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -678,6 +688,7 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20201006153459-a7d1128ccaa0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
@ -712,6 +723,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180525142821-c11f84a56e43/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -800,6 +812,8 @@ golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 h1:Dpdu/EMxGMFgq0CeYMh4fazTD2vtlZRYE7wyynxJb9U=
golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
@ -853,6 +867,7 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.3.0 h1:SrNbZl6ECOS1qFzgTdQfWXZM9XBkiA6tkFrH9YSTPHM=
golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

@ -37,6 +37,9 @@ type KeyNames struct {
// runner
RunnerConcurrency string
RunnerPoolSize string
RunnerAddress string
RunnerDB string
RunnerPassword string
// server
ServerExternalHostname string
@ -101,6 +104,9 @@ var Keys = KeyNames{
// runner
RunnerConcurrency: "runner-concurrency",
RunnerPoolSize: "runner-pool-size",
RunnerAddress: "runner-address",
RunnerDB: "runner-db",
RunnerPassword: "runner-password",
// server
ServerExternalHostname: "external-hostname",

@ -37,6 +37,9 @@ type Values struct {
// runner
RunnerConcurrency int
RunnerPoolSize int
RunnerAddress string
RunnerDB int
RunnerPassword string
// server
ServerExternalHostname string
@ -95,7 +98,7 @@ var Defaults = Values{
RedisDB: 0,
// runner
RunnerConcurrency: 4,
RunnerConcurrency: 10,
RunnerPoolSize: 4,
// server

@ -0,0 +1,59 @@
package asynq
import (
"context"
"encoding/json"
"fmt"
"git.ptzo.gdn/feditools/go-lib/fedihelper"
"git.ptzo.gdn/feditools/relay/internal/runner"
"github.com/hibiken/asynq"
)
const TypeDeliverActivity = "activity:deliver"
type inboxDeliverPayload struct {
InstanceID int64
Activity fedihelper.Activity
}
func (r *Runner) EnqueueDeliverActivity(ctx context.Context, instanceID int64, activity fedihelper.Activity) error {
_, span := r.tracer.Start(ctx, "EnqueueDeliverActivity")
defer span.End()
l := logger.WithField("func", "EnqueueDeliverActivity")
payload, err := json.Marshal(inboxDeliverPayload{
InstanceID: instanceID,
Activity: activity,
})
if err != nil {
return err
}
task := asynq.NewTask(TypeDeliverActivity, payload, asynq.Queue(runner.QueueDelivery))
info, err := r.client.Enqueue(task, asynq.MaxRetry(5))
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())
return err
}
l.Debugf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
return nil
}
func (r *Runner) handleDeliverActivity(ctx context.Context, t *asynq.Task) error {
_, span := r.tracer.Start(ctx, "handleDeliverActivity")
defer span.End()
//l := logger.WithField("func", "handleDeliverActivity")
var p inboxDeliverPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
// process activity
return r.logic.DeliverActivity(ctx, "", p.InstanceID, p.Activity, false)
}

@ -0,0 +1,68 @@
package asynq
import (
"context"
"encoding/json"
"fmt"
"git.ptzo.gdn/feditools/go-lib/fedihelper"
"github.com/hibiken/asynq"
"net/url"
)
const TypeInboxActivity = "activity:inbox"
type inboxActivityPayload struct {
InstanceID int64
ActorIRI *url.URL
Activity fedihelper.Activity
}
func (r *Runner) EnqueueInboxActivity(ctx context.Context, instanceID int64, actorIRI string, activity fedihelper.Activity) error {
_, span := r.tracer.Start(ctx, "EnqueueInboxAction")
defer span.End()
l := logger.WithField("func", "EnqueueInboxActivity")
actorURL, err := url.Parse(actorIRI)
if err != nil {
l.Errorf("cant parse url from argument 1: %s", err.Error())
return fmt.Errorf("cant parse url from argument 1: %s", err.Error())
}
payload, err := json.Marshal(inboxActivityPayload{
InstanceID: instanceID,
ActorIRI: actorURL,
Activity: activity,
})
if err != nil {
return err
}
task := asynq.NewTask(TypeInboxActivity, payload)
info, err := r.client.Enqueue(task, asynq.MaxRetry(5))
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())
return err
}
l.Debugf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
return nil
}
func (r *Runner) handleInboxActivity(ctx context.Context, t *asynq.Task) error {
_, span := r.tracer.Start(ctx, "handleInboxActivity")
defer span.End()
//l := logger.WithField("func", "handleInboxAction")
var p inboxActivityPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
// process activity
return r.logic.ProcessActivity(ctx, "", p.InstanceID, p.ActorIRI, p.Activity)
}

@ -0,0 +1,55 @@
package asynq
import (
"context"
"encoding/json"
"fmt"
"github.com/hibiken/asynq"
)
const TypeProcessBlockAdd = "block:add"
type processBlockAddPayload struct {
BlockID int64
}
func (r *Runner) EnqueueProcessBlockAdd(ctx context.Context, blockID int64) error {
_, span := r.tracer.Start(ctx, "EnqueueProcessBlockAdd")
defer span.End()
l := logger.WithField("func", "EnqueueProcessBlockAdd")
payload, err := json.Marshal(processBlockAddPayload{
BlockID: blockID,
})
if err != nil {
return err
}
task := asynq.NewTask(TypeProcessBlockAdd, payload)
info, err := r.client.Enqueue(task)
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())
return err
}
l.Debugf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
return nil
}
func (r *Runner) handleProcessBlockAdd(ctx context.Context, t *asynq.Task) error {
_, span := r.tracer.Start(ctx, "handleProcessBlockAdd")
defer span.End()
//l := logger.WithField("func", "handleProcessBlockAdd")
var p processBlockAddPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
// process activity
return r.logic.ProcessBlockAdd(ctx, p.BlockID)
}

@ -0,0 +1,55 @@
package asynq
import (
"context"
"encoding/json"
"fmt"
"github.com/hibiken/asynq"
)
const TypeProcessBlockDelete = "block:delete"
type processBlockDeletePayload struct {
BlockID int64
}
func (r *Runner) EnqueueProcessBlockDelete(ctx context.Context, blockID int64) error {
_, span := r.tracer.Start(ctx, "EnqueueProcessBlockDelete")
defer span.End()
l := logger.WithField("func", "EnqueueProcessBlockDelete")
payload, err := json.Marshal(processBlockDeletePayload{
BlockID: blockID,
})
if err != nil {
return err
}
task := asynq.NewTask(TypeProcessBlockDelete, payload)
info, err := r.client.Enqueue(task)
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())
return err
}
l.Debugf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
return nil
}
func (r *Runner) handleProcessBlockDelete(ctx context.Context, t *asynq.Task) error {
_, span := r.tracer.Start(ctx, "handleProcessBlockDelete")
defer span.End()
//l := logger.WithField("func", "handleProcessBlockDelete")
var p processBlockDeletePayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
// process activity
return r.logic.ProcessBlockDelete(ctx, p.BlockID)
}

@ -0,0 +1,55 @@
package asynq
import (
"context"
"encoding/json"
"fmt"
"github.com/hibiken/asynq"
)
const TypeProcessBlockUpdate = "block:update"
type processBlockUpdatePayload struct {
BlockID int64
}
func (r *Runner) EnqueueProcessBlockUpdate(ctx context.Context, blockID int64) error {
_, span := r.tracer.Start(ctx, "EnqueueInboxAction")
defer span.End()
l := logger.WithField("func", "EnqueueInboxActivity")
payload, err := json.Marshal(processBlockUpdatePayload{
BlockID: blockID,
})
if err != nil {
return err
}
task := asynq.NewTask(TypeProcessBlockUpdate, payload)
info, err := r.client.Enqueue(task)
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())
return err
}
l.Debugf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
return nil
}
func (r *Runner) handleProcessBlockUpdate(ctx context.Context, t *asynq.Task) error {
_, span := r.tracer.Start(ctx, "handleProcessBlockUpdate")
defer span.End()
//l := logger.WithField("func", "handleProcessBlockUpdate")
var p processBlockUpdatePayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
// process activity
return r.logic.ProcessBlockUpdate(ctx, p.BlockID)
}

@ -0,0 +1,12 @@
package asynq
import "git.ptzo.gdn/feditools/relay/internal/logic"
type Config struct {
Logic logic.Logic
Concurrency int
Address string
Password string
DB int
}

@ -0,0 +1,9 @@
package asynq
import (
"git.ptzo.gdn/feditools/relay/internal/log"
)
type empty struct{}
var logger = log.WithPackageField(empty{})

@ -0,0 +1,36 @@
package asynq
import (
"context"
"github.com/hibiken/asynq"
)
const TypeMaintDeliveryErrorTimeout = "maint:delivery-timeout"
func (r *Runner) EnqueueMaintDeliveryErrorTimeout(ctx context.Context) error {
_, span := r.tracer.Start(ctx, "EnqueueMaintDeliveryErrorTimeout")
defer span.End()
l := logger.WithField("func", "EnqueueMaintDeliveryErrorTimeout")
task := asynq.NewTask(TypeMaintDeliveryErrorTimeout, nil)
info, err := r.client.Enqueue(task)
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())
return err
}
l.Debugf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
return nil
}
func (r *Runner) handleMaintDeliveryErrorTimeout(ctx context.Context, _ *asynq.Task) error {
_, span := r.tracer.Start(ctx, "handleMaintDeliveryErrorTimeout")
defer span.End()
//l := logger.WithField("func", "handleMaintDeliveryErrorTimeout")
return r.logic.MaintDeliveryErrorTimeout(ctx, "")
}

@ -0,0 +1,80 @@
package asynq
import (
"context"
"git.ptzo.gdn/feditools/relay/internal/logic"
"git.ptzo.gdn/feditools/relay/internal/runner"
"github.com/hibiken/asynq"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
func New(c *Config) (*Runner, error) {
client := asynq.NewClient(asynq.RedisClientOpt{
Addr: c.Address,
Password: c.Password,
DB: c.DB,
})
return &Runner{
client: client,
logic: c.Logic,
tracer: otel.Tracer("internal/runner/asynq"),
concurrency: c.Concurrency,
address: c.Address,
password: c.Password,
db: c.DB,
}, nil
}
type Runner struct {
client *asynq.Client
logic logic.Logic
server *asynq.Server
tracer trace.Tracer
concurrency int
address string
password string
db int
}
var _ runner.Runner = (*Runner)(nil)
func (r *Runner) Start(_ context.Context) error {
r.server = asynq.NewServer(
asynq.RedisClientOpt{
Addr: r.address,
Password: r.password,
DB: r.db,
},
asynq.Config{
Concurrency: r.concurrency,
Queues: map[string]int{
runner.QueuePriority: 6,
runner.QueueDefault: 3,
runner.QueueDelivery: 1,
},
},
)
mux := asynq.NewServeMux()
mux.HandleFunc(TypeDeliverActivity, r.handleDeliverActivity)
mux.HandleFunc(TypeInboxActivity, r.handleInboxActivity)
mux.HandleFunc(TypeProcessBlockAdd, r.handleProcessBlockAdd)
mux.HandleFunc(TypeProcessBlockDelete, r.handleProcessBlockDelete)
mux.HandleFunc(TypeProcessBlockUpdate, r.handleProcessBlockUpdate)
mux.HandleFunc(TypeMaintDeliveryErrorTimeout, r.handleMaintDeliveryErrorTimeout)
mux.HandleFunc(TypeSendNotification, r.handleSendNotification)
mux.HandleFunc(TypeUpdateAccountInfo, r.handleUpdateAccountInfo)
mux.HandleFunc(TypeUpdateInstanceInfo, r.handleUpdateInstanceInfo)
return r.server.Run(mux)
}
func (r *Runner) Stop() error {
r.server.Stop()
return nil
}

@ -0,0 +1,63 @@
package asynq
import (
"context"
"encoding/json"
"fmt"
"git.ptzo.gdn/feditools/relay/internal/models"
"git.ptzo.gdn/feditools/relay/internal/runner"
"github.com/hibiken/asynq"
)
const TypeSendNotification = "notification:send"
type sendNotificationPayload struct {
Event models.EventType
Metadata map[string]interface{}
}
func (r *Runner) EnqueueSendNotification(ctx context.Context, event models.EventType, metadata map[string]interface{}) error {
_, span := r.tracer.Start(ctx, "EnqueueSendNotification")
defer span.End()
l := logger.WithField("func", "EnqueueSendNotification")
payload, err := json.Marshal(sendNotificationPayload{
Event: event,
Metadata: metadata,
})
if err != nil {
return err
}
task := asynq.NewTask(TypeSendNotification, payload, asynq.Queue(runner.QueuePriority))
info, err := r.client.Enqueue(task)
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())
return err
}
l.Debugf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
return nil
}
func (r *Runner) handleSendNotification(ctx context.Context, t *asynq.Task) error {
_, span := r.tracer.Start(ctx, "handleSendNotification")
defer span.End()
//l := logger.WithField("func", "handleSendNotification")
var p sendNotificationPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
// process activity
if err := r.logic.SendNotification(ctx, "", p.Event, p.Metadata); err != nil {
return fmt.Errorf("send notification failed: %v: %w", err, asynq.SkipRetry)
}
return nil
}

@ -0,0 +1,59 @@
package asynq
import (
"context"
"encoding/json"
"fmt"
"github.com/hibiken/asynq"
)
const TypeUpdateAccountInfo = "update:account"
type updateAccountInfoPayload struct {
AccountID int64
}
func (r *Runner) EnqueueUpdateAccountInfo(ctx context.Context, accountID int64) error {
_, span := r.tracer.Start(ctx, "EnqueueUpdateAccountInfo")
defer span.End()
l := logger.WithField("func", "EnqueueUpdateAccountInfo")
payload, err := json.Marshal(updateAccountInfoPayload{
AccountID: accountID,
})
if err != nil {
return err
}
task := asynq.NewTask(TypeUpdateAccountInfo, payload)
info, err := r.client.Enqueue(task)
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())
return err
}
l.Debugf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
return nil
}
func (r *Runner) handleUpdateAccountInfo(ctx context.Context, t *asynq.Task) error {
_, span := r.tracer.Start(ctx, "handleUpdateAccountInfo")
defer span.End()
//l := logger.WithField("func", "handleUpdateAccountInfo")
var p updateAccountInfoPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
// process activity
if err := r.logic.UpdateAccountInfo(ctx, "", p.AccountID); err != nil {
return fmt.Errorf("account update failed: %v: %w", err, asynq.SkipRetry)
}
return nil
}

@ -0,0 +1,59 @@
package asynq
import (
"context"
"encoding/json"
"fmt"
"github.com/hibiken/asynq"
)
const TypeUpdateInstanceInfo = "update:instance"
type updateInstanceInfoPayload struct {
InstanceID int64
}
func (r *Runner) EnqueueUpdateInstanceInfo(ctx context.Context, instanceID int64) error {
_, span := r.tracer.Start(ctx, "EnqueueUpdateInstanceInfo")
defer span.End()
l := logger.WithField("func", "EnqueueUpdateInstanceInfo")
payload, err := json.Marshal(updateInstanceInfoPayload{
InstanceID: instanceID,
})
if err != nil {
return err
}
task := asynq.NewTask(TypeUpdateInstanceInfo, payload)
info, err := r.client.Enqueue(task)
if err != nil {
l.Debugf("can't enqueue: %s", err.Error())
return err
}
l.Debugf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
return nil
}
func (r *Runner) handleUpdateInstanceInfo(ctx context.Context, t *asynq.Task) error {
_, span := r.tracer.Start(ctx, "handleUpdateInstanceInfo")
defer span.End()
//l := logger.WithField("func", "handleUpdateInstanceInfo")
var p updateInstanceInfoPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
}
// process activity
if err := r.logic.UpdateInstanceInfo(ctx, "", p.InstanceID); err != nil {
return fmt.Errorf("account update failed: %v: %w", err, asynq.SkipRetry)
}
return nil
}

@ -3,5 +3,9 @@ package runner
import "go.opentelemetry.io/otel/attribute"
const (
RunnerJobIDKey = attribute.Key("runner.job.id")
JobIDKey = attribute.Key("runner.job.id")
QueuePriority = "priority" // high
QueueDefault = "default" // medium
QueueDelivery = "delivery" // low
)

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"git.ptzo.gdn/feditools/go-lib/fedihelper"
"git.ptzo.gdn/feditools/relay/internal/runner"
faktory "github.com/contribsys/faktory/client"
worker "github.com/contribsys/faktory_worker_go"
"github.com/sirupsen/logrus"
@ -15,7 +16,7 @@ func (r *Runner) EnqueueDeliverActivity(_ context.Context, instanceID int64, act
retry := 8
job := faktory.NewJob(JobDeliverActivity, strconv.FormatInt(instanceID, 10), activity)
job.Queue = QueueDelivery
job.Queue = runner.QueueDelivery
job.Retry = &retry
return r.manager.Pool.With(func(conn *faktory.Client) error {
@ -61,7 +62,7 @@ func (r *Runner) deliverActivity(ctx context.Context, args ...interface{}) error
func (r *Runner) EnqueueInboxActivity(_ context.Context, instanceID int64, actorIRI string, activity fedihelper.Activity) error {
job := faktory.NewJob(JobInboxActivity, strconv.FormatInt(instanceID, 10), actorIRI, activity)
job.Queue = QueueDefault
job.Queue = runner.QueueDefault
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)

@ -12,8 +12,4 @@ const (
JobSendNotification = "SendNotification"
JobUpdateAccountInfo = "UpdateAccountInfo"
JobUpdateInstanceInfo = "UpdateInstanceInfo"
QueueDefault = "default" // medium
QueueDelivery = "delivery" // low
QueuePriority = "priority" // high
)

@ -9,7 +9,7 @@ import (
func (r *Runner) Middleware(ctx context.Context, job *faktory.Job, next func(ctx context.Context) error) error {
opts := []trace.SpanStartOption{
trace.WithAttributes(runner.RunnerJobIDKey.String(job.Jid)),
trace.WithAttributes(runner.JobIDKey.String(job.Jid)),
trace.WithSpanKind(trace.SpanKindConsumer),
}
tctx, main := r.tracer.Start(ctx, job.Type, opts...)

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"git.ptzo.gdn/feditools/relay/internal/models"
"git.ptzo.gdn/feditools/relay/internal/runner"
faktory "github.com/contribsys/faktory/client"
worker "github.com/contribsys/faktory_worker_go"
"github.com/sirupsen/logrus"
@ -11,7 +12,7 @@ import (
func (r *Runner) EnqueueSendNotification(_ context.Context, event models.EventType, metadata map[string]interface{}) error {
job := faktory.NewJob(JobSendNotification, event, metadata)
job.Queue = QueuePriority
job.Queue = runner.QueuePriority
return r.manager.Pool.With(func(conn *faktory.Client) error {
return conn.Push(job)

@ -4,6 +4,7 @@ import (
"context"
"git.ptzo.gdn/feditools/relay/internal/config"
"git.ptzo.gdn/feditools/relay/internal/logic"
"git.ptzo.gdn/feditools/relay/internal/runner"
faktory "github.com/contribsys/faktory/client"
worker "github.com/contribsys/faktory_worker_go"
"github.com/spf13/viper"
@ -33,7 +34,7 @@ func New(l logic.Logic) (*Runner, error) {
mgr := worker.NewManager()
mgr.Pool = faktoryPool
mgr.Concurrency = viper.GetInt(config.Keys.RunnerConcurrency)
mgr.ProcessWeightedPriorityQueues(map[string]int{QueuePriority: 3, QueueDefault: 2, QueueDelivery: 1})
mgr.ProcessWeightedPriorityQueues(map[string]int{runner.QueuePriority: 3, runner.QueueDefault: 2, runner.QueueDelivery: 1})
// add handlers
mgr.Use(newRunner.Middleware)

@ -88,6 +88,24 @@ db-crypto-key: "test1234test5678test9123test4567"
# Default: ""
redis-password: "test"
############
## RUNNER ##
############
# String. Address and port of the redis instance
# Default: "localhost:6379"
runner-address: "localhost:6380"
# Int. Redis database to use
# Examples: [0, 1, 15]
# Default: 0
#runner-db: 0
# String. Password to use for the redis connection
# Examples: ["","test","password"]
# Default: ""
runner-password: "test2"
#######
# WEB #
#######

@ -0,0 +1,27 @@
vendor
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Ignore examples for now
/examples
# Ignore tool binaries
/tools/asynq/asynq
/tools/metrics_exporter/metrics_exporter
# Ignore asynq config file
.asynq.*
# Ignore editor config files
.vscode
.idea

@ -0,0 +1,526 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on ["Keep a Changelog"](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.24.0] - 2023-01-02
### Added
- `PreEnqueueFunc`, `PostEnqueueFunc` is added in `Scheduler` and deprecated `EnqueueErrorHandler` (PR: https://github.com/hibiken/asynq/pull/476)
### Changed
- Removed error log when `Scheduler` failed to enqueue a task. Use `PostEnqueueFunc` to check for errors and task actions if needed.
- Changed log level from ERROR to WARNINING when `Scheduler` failed to record `SchedulerEnqueueEvent`.
## [0.23.0] - 2022-04-11
### Added
- `Group` option is introduced to enqueue task in a group.
- `GroupAggregator` and related types are introduced for task aggregation feature.
- `GroupGracePeriod`, `GroupMaxSize`, `GroupMaxDelay`, and `GroupAggregator` fields are added to `Config`.
- `Inspector` has new methods related to "aggregating tasks".
- `Group` field is added to `TaskInfo`.
- (CLI): `group ls` command is added
- (CLI): `task ls` supports listing aggregating tasks via `--state=aggregating --group=<GROUP>` flags
- Enable rediss url parsing support
### Fixed
- Fixed overflow issue with 32-bit systems (For details, see https://github.com/hibiken/asynq/pull/426)
## [0.22.1] - 2022-02-20
### Fixed
- Fixed Redis version compatibility: Keep support for redis v4.0+
## [0.22.0] - 2022-02-19
### Added
- `BaseContext` is introduced in `Config` to specify callback hook to provide a base `context` from which `Handler` `context` is derived
- `IsOrphaned` field is added to `TaskInfo` to describe a task left in active state with no worker processing it.
### Changed
- `Server` now recovers tasks with an expired lease. Recovered tasks are retried/archived with `ErrLeaseExpired` error.
## [0.21.0] - 2022-01-22
### Added
- `PeriodicTaskManager` is added. Prefer using this over `Scheduler` as it has better support for dynamic periodic tasks.
- The `asynq stats` command now supports a `--json` option, making its output a JSON object
- Introduced new configuration for `DelayedTaskCheckInterval`. See [godoc](https://godoc.org/github.com/hibiken/asynq) for more details.
## [0.20.0] - 2021-12-19
### Added
- Package `x/metrics` is added.
- Tool `tools/metrics_exporter` binary is added.
- `ProcessedTotal` and `FailedTotal` fields were added to `QueueInfo` struct.
## [0.19.1] - 2021-12-12
### Added
- `Latency` field is added to `QueueInfo`.
- `EnqueueContext` method is added to `Client`.
### Fixed
- Fixed an error when user pass a duration less than 1s to `Unique` option
## [0.19.0] - 2021-11-06
### Changed
- `NewTask` takes `Option` as variadic argument
- Bumped minimum supported go version to 1.14 (i.e. go1.14 or higher is required).
### Added
- `Retention` option is added to allow user to specify task retention duration after completion.
- `TaskID` option is added to allow user to specify task ID.
- `ErrTaskIDConflict` sentinel error value is added.
- `ResultWriter` type is added and provided through `Task.ResultWriter` method.
- `TaskInfo` has new fields `CompletedAt`, `Result` and `Retention`.
### Removed
- `Client.SetDefaultOptions` is removed. Use `NewTask` instead to pass default options for tasks.
## [0.18.6] - 2021-10-03
### Changed
- Updated `github.com/go-redis/redis` package to v8
## [0.18.5] - 2021-09-01
### Added
- `IsFailure` config option is added to determine whether error returned from Handler counts as a failure.
## [0.18.4] - 2021-08-17
### Fixed
- Scheduler methods are now thread-safe. It's now safe to call `Register` and `Unregister` concurrently.
## [0.18.3] - 2021-08-09
### Changed
- `Client.Enqueue` no longer enqueues tasks with empty typename; Error message is returned.
## [0.18.2] - 2021-07-15
### Changed
- Changed `Queue` function to not to convert the provided queue name to lowercase. Queue names are now case-sensitive.
- `QueueInfo.MemoryUsage` is now an approximate usage value.
### Fixed
- Fixed latency issue around memory usage (see https://github.com/hibiken/asynq/issues/309).
## [0.18.1] - 2021-07-04
### Changed
- Changed to execute task recovering logic when server starts up; Previously it needed to wait for a minute for task recovering logic to exeucte.
### Fixed
- Fixed task recovering logic to execute every minute
## [0.18.0] - 2021-06-29
### Changed
- NewTask function now takes array of bytes as payload.
- Task `Type` and `Payload` should be accessed by a method call.
- `Server` API has changed. Renamed `Quiet` to `Stop`. Renamed `Stop` to `Shutdown`. _Note:_ As a result of this renaming, the behavior of `Stop` has changed. Please update the exising code to call `Shutdown` where it used to call `Stop`.
- `Scheduler` API has changed. Renamed `Stop` to `Shutdown`.
- Requires redis v4.0+ for multiple field/value pair support
- `Client.Enqueue` now returns `TaskInfo`
- `Inspector.RunTaskByKey` is replaced with `Inspector.RunTask`
- `Inspector.DeleteTaskByKey` is replaced with `Inspector.DeleteTask`
- `Inspector.ArchiveTaskByKey` is replaced with `Inspector.ArchiveTask`
- `inspeq` package is removed. All types and functions from the package is moved to `asynq` package.