some cleanup for the deliverator functions

This commit is contained in:
Ted Unangst 2023-06-12 15:01:03 -04:00
parent d75e1671f7
commit 4fe838f5ed
3 changed files with 47 additions and 39 deletions

View File

@ -1149,7 +1149,7 @@ func rubadubdub(user *WhatAbout, req junk.Junk) {
j["published"] = time.Now().UTC().Format(time.RFC3339) j["published"] = time.Now().UTC().Format(time.RFC3339)
j["object"] = req j["object"] = req
deliverate(0, user.ID, actor, j.ToBytes()) deliverate(user.ID, actor, j.ToBytes())
} }
func itakeitallback(user *WhatAbout, xid string, owner string, folxid string) { func itakeitallback(user *WhatAbout, xid string, owner string, folxid string) {
@ -1168,7 +1168,7 @@ func itakeitallback(user *WhatAbout, xid string, owner string, folxid string) {
j["object"] = f j["object"] = f
j["published"] = time.Now().UTC().Format(time.RFC3339) j["published"] = time.Now().UTC().Format(time.RFC3339)
deliverate(0, user.ID, owner, j.ToBytes()) deliverate(user.ID, owner, j.ToBytes())
} }
func subsub(user *WhatAbout, xid string, owner string, folxid string) { func subsub(user *WhatAbout, xid string, owner string, folxid string) {
@ -1185,7 +1185,7 @@ func subsub(user *WhatAbout, xid string, owner string, folxid string) {
j["object"] = xid j["object"] = xid
j["published"] = time.Now().UTC().Format(time.RFC3339) j["published"] = time.Now().UTC().Format(time.RFC3339)
deliverate(0, user.ID, owner, j.ToBytes()) deliverate(user.ID, owner, j.ToBytes())
} }
func activatedonks(donks []*Donk) []junk.Junk { func activatedonks(donks []*Donk) []junk.Junk {
@ -1495,7 +1495,7 @@ func sendchonk(user *WhatAbout, ch *Chonk) {
rcpts := make(map[string]bool) rcpts := make(map[string]bool)
rcpts[ch.Target] = true rcpts[ch.Target] = true
for a := range rcpts { for a := range rcpts {
go deliverate(0, user.ID, a, msg) go deliverate(user.ID, a, msg)
} }
} }
@ -1534,7 +1534,7 @@ func honkworldwide(user *WhatAbout, honk *Honk) {
} }
} }
for a := range rcpts { for a := range rcpts {
go deliverate(0, user.ID, a, msg) go deliverate(user.ID, a, msg)
} }
if honk.Public && len(honk.Onts) > 0 { if honk.Public && len(honk.Onts) > 0 {
collectiveaction(honk) collectiveaction(honk)
@ -1567,7 +1567,7 @@ func collectiveaction(honk *Honk) {
} }
msg := j.ToBytes() msg := j.ToBytes()
for a := range rcpts { for a := range rcpts {
go deliverate(0, user.ID, a, msg) go deliverate(user.ID, a, msg)
} }
} }
} }
@ -1881,7 +1881,7 @@ func updateMe(username string) {
} }
} }
for a := range rcpts { for a := range rcpts {
go deliverate(0, user.ID, a, msg) go deliverate(user.ID, a, msg)
} }
} }

View File

@ -28,14 +28,17 @@ import (
type Doover struct { type Doover struct {
ID int64 ID int64
When time.Time When time.Time
Userid int64
Tries int64
Rcpt string Rcpt string
Msgs [][]byte Msgs [][]byte
} }
func sayitagain(goarounds int64, userid int64, doover Doover) { func sayitagain(doover Doover) {
rcpt := doover.Rcpt rcpt := doover.Rcpt
var drift time.Duration var drift time.Duration
switch goarounds { doover.Tries += 1
switch doover.Tries {
case 1: case 1:
drift = 5 * time.Minute drift = 5 * time.Minute
case 2: case 2:
@ -53,7 +56,7 @@ func sayitagain(goarounds int64, userid int64, doover Doover) {
drift += time.Duration(notrand.Int63n(int64(drift / 10))) drift += time.Duration(notrand.Int63n(int64(drift / 10)))
when := time.Now().Add(drift) when := time.Now().Add(drift)
data := bytes.Join(doover.Msgs, []byte{0}) data := bytes.Join(doover.Msgs, []byte{0})
_, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), goarounds, userid, rcpt, data) _, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), doover.Tries, doover.Userid, rcpt, data)
if err != nil { if err != nil {
elog.Printf("error saving doover: %s", err) elog.Printf("error saving doover: %s", err)
} }
@ -89,24 +92,26 @@ func delinquent(rcpt string, msg []byte) bool {
return true return true
} }
func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) { func deliverate(userid int64, rcpt string, msg []byte) {
if delinquent(rcpt, msg) { if delinquent(rcpt, msg) {
return return
} }
var d Doover var d Doover
d.Userid = userid
d.Tries = 0
d.Rcpt = rcpt d.Rcpt = rcpt
d.Msgs = append(d.Msgs, msg) d.Msgs = append(d.Msgs, msg)
deliveration(goarounds, userid, d) deliveration(d)
} }
var garage = gate.NewLimiter(40) var garage = gate.NewLimiter(40)
func deliveration(goarounds int64, userid int64, doover Doover) { func deliveration(doover Doover) {
garage.Start() garage.Start()
defer garage.Finish() defer garage.Finish()
var ki *KeyInfo var ki *KeyInfo
ok := ziggies.Get(userid, &ki) ok := ziggies.Get(doover.Userid, &ki)
if !ok { if !ok {
elog.Printf("lost key for delivery") elog.Printf("lost key for delivery")
return return
@ -121,7 +126,7 @@ func deliveration(goarounds int64, userid int64, doover Doover) {
ok := boxofboxes.Get(rcpt, &box) ok := boxofboxes.Get(rcpt, &box)
if !ok { if !ok {
ilog.Printf("failed getting inbox for %s", rcpt) ilog.Printf("failed getting inbox for %s", rcpt)
sayitagain(goarounds+1, userid, doover) sayitagain(doover)
return return
} }
inbox = box.In inbox = box.In
@ -134,7 +139,7 @@ func deliveration(goarounds int64, userid int64, doover Doover) {
if err != nil { if err != nil {
ilog.Printf("failed to post json to %s: %s", inbox, err) ilog.Printf("failed to post json to %s: %s", inbox, err)
doover.Msgs = doover.Msgs[i:] doover.Msgs = doover.Msgs[i:]
sayitagain(goarounds+1, userid, doover) sayitagain(doover)
return return
} }
} }
@ -165,6 +170,23 @@ func getdoovers() []Doover {
return doovers return doovers
} }
func extractdoover(d *Doover) error {
dqmtx.Lock()
defer dqmtx.Unlock()
row := stmtLoadDoover.QueryRow(d.ID)
var data []byte
err := row.Scan(&d.Tries, &d.Userid, &d.Rcpt, &data)
if err != nil {
return err
}
_, err = stmtZapDoover.Exec(d.ID)
if err != nil {
return err
}
d.Msgs = bytes.Split(data, []byte{0})
return nil
}
func redeliverator() { func redeliverator() {
sleeper := time.NewTimer(5 * time.Second) sleeper := time.NewTimer(5 * time.Second)
for { for {
@ -183,27 +205,13 @@ func redeliverator() {
nexttime := now.Add(24 * time.Hour) nexttime := now.Add(24 * time.Hour)
for _, d := range doovers { for _, d := range doovers {
if d.When.Before(now) { if d.When.Before(now) {
var goarounds, userid int64 err := extractdoover(&d)
var data []byte
dqmtx.Lock()
row := stmtLoadDoover.QueryRow(d.ID)
err := row.Scan(&goarounds, &userid, &d.Rcpt, &data)
if err != nil { if err != nil {
elog.Printf("error scanning doover: %s", err) elog.Printf("error extracting doover: %s", err)
dqmtx.Unlock() // defer
continue continue
} }
_, err = stmtZapDoover.Exec(d.ID) ilog.Printf("redeliverating %s try %d", d.Rcpt, d.Tries)
if err != nil { deliveration(d)
elog.Printf("error deleting doover: %s", err)
dqmtx.Unlock() // defer
continue
}
dqmtx.Unlock() // defer
d.Msgs = bytes.Split(data, []byte{0})
rcpt := d.Rcpt
ilog.Printf("redeliverating %s try %d", rcpt, goarounds)
deliveration(goarounds, userid, d)
} else if d.When.Before(nexttime) { } else if d.When.Before(nexttime) {
nexttime = d.When nexttime = d.When
} }

2
web.go
View File

@ -2422,7 +2422,7 @@ func apihandler(w http.ResponseWriter, r *http.Request) {
rcpts := boxuprcpts(user, r.Form["rcpt"], public) rcpts := boxuprcpts(user, r.Form["rcpt"], public)
msg := []byte(r.FormValue("msg")) msg := []byte(r.FormValue("msg"))
for rcpt := range rcpts { for rcpt := range rcpts {
go deliverate(0, userid, rcpt, msg) go deliverate(userid, rcpt, msg)
} }
case "gethonkers": case "gethonkers":
j := junk.New() j := junk.New()