convert deliverater queue to per rcpt fifo

This commit is contained in:
Ted Unangst 2023-06-12 14:40:28 -04:00
parent 6e43340381
commit d75e1671f7
2 changed files with 67 additions and 22 deletions

View File

@ -1106,6 +1106,7 @@ var stmtSaveMeta, stmtDeleteAllMeta, stmtDeleteOneMeta, stmtDeleteSomeMeta, stmt
var stmtHonksISaved, stmtGetFilters, stmtSaveFilter, stmtDeleteFilter *sql.Stmt var stmtHonksISaved, stmtGetFilters, stmtSaveFilter, stmtDeleteFilter *sql.Stmt
var stmtGetTracks *sql.Stmt var stmtGetTracks *sql.Stmt
var stmtSaveChonk, stmtLoadChonks, stmtGetChatters *sql.Stmt var stmtSaveChonk, stmtLoadChonks, stmtGetChatters *sql.Stmt
var stmtDeliquentCheck, stmtDeliquentUpdate *sql.Stmt
func preparetodie(db *sql.DB, s string) *sql.Stmt { func preparetodie(db *sql.DB, s string) *sql.Stmt {
stmt, err := db.Prepare(s) stmt, err := db.Prepare(s)
@ -1192,4 +1193,6 @@ func prepareStatements(db *sql.DB) {
stmtSaveChonk = preparetodie(db, "insert into chonks (userid, xid, who, target, dt, noise, format) values (?, ?, ?, ?, ?, ?, ?)") stmtSaveChonk = preparetodie(db, "insert into chonks (userid, xid, who, target, dt, noise, format) values (?, ?, ?, ?, ?, ?, ?)")
stmtLoadChonks = preparetodie(db, "select chonkid, userid, xid, who, target, dt, noise, format from chonks where userid = ? and dt > ? order by chonkid asc") stmtLoadChonks = preparetodie(db, "select chonkid, userid, xid, who, target, dt, noise, format from chonks where userid = ? and dt > ? order by chonkid asc")
stmtGetChatters = preparetodie(db, "select distinct(target) from chonks where userid = ?") stmtGetChatters = preparetodie(db, "select distinct(target) from chonks where userid = ?")
stmtDeliquentCheck = preparetodie(db, "select dooverid, msg from doovers where rcpt = ?")
stmtDeliquentUpdate = preparetodie(db, "update doovers set data = ? where dooverid = ?")
} }

View File

@ -16,8 +16,10 @@
package main package main
import ( import (
"fmt" "bytes"
"database/sql"
notrand "math/rand" notrand "math/rand"
"sync"
"time" "time"
"humungus.tedunangst.com/r/webs/gate" "humungus.tedunangst.com/r/webs/gate"
@ -26,9 +28,12 @@ import (
type Doover struct { type Doover struct {
ID int64 ID int64
When time.Time When time.Time
Rcpt string
Msgs [][]byte
} }
func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) { func sayitagain(goarounds int64, userid int64, doover Doover) {
rcpt := doover.Rcpt
var drift time.Duration var drift time.Duration
switch goarounds { switch goarounds {
case 1: case 1:
@ -43,12 +48,12 @@ func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) {
drift = 24 * time.Hour drift = 24 * time.Hour
default: default:
ilog.Printf("he's dead jim: %s", rcpt) ilog.Printf("he's dead jim: %s", rcpt)
clearoutbound(rcpt)
return return
} }
drift += time.Duration(notrand.Int63n(int64(drift / 10))) drift += time.Duration(notrand.Int63n(int64(drift / 10)))
when := time.Now().Add(drift) when := time.Now().Add(drift)
_, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), goarounds, userid, rcpt, msg) data := bytes.Join(doover.Msgs, []byte{0})
_, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), goarounds, userid, rcpt, data)
if err != nil { if err != nil {
elog.Printf("error saving doover: %s", err) elog.Printf("error saving doover: %s", err)
} }
@ -58,20 +63,45 @@ func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) {
} }
} }
func clearoutbound(rcpt string) { var dqmtx sync.Mutex
hostname := originate(rcpt)
if hostname == "" { func delinquent(rcpt string, msg []byte) bool {
dqmtx.Lock()
defer dqmtx.Unlock()
row := stmtDeliquentCheck.QueryRow(rcpt)
var dooverid int64
var data []byte
err := row.Scan(&dooverid, data)
if err == sql.ErrNoRows {
return false
}
if err != nil {
elog.Printf("error scanning deliquent check: %s", err)
return true
}
data = append(data, 0)
data = append(data, msg...)
_, err = stmtDeliquentUpdate.Exec(data, dooverid)
if err != nil {
elog.Printf("error updating deliquent: %s", err)
return true
}
return true
}
func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) {
if delinquent(rcpt, msg) {
return return
} }
xid := fmt.Sprintf("%%https://%s/%%", hostname) var d Doover
ilog.Printf("clearing outbound for %s", xid) d.Rcpt = rcpt
db := opendatabase() d.Msgs = append(d.Msgs, msg)
db.Exec("delete from doovers where rcpt like ?", xid) deliveration(goarounds, userid, d)
} }
var garage = gate.NewLimiter(40) var garage = gate.NewLimiter(40)
func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) { func deliveration(goarounds int64, userid int64, doover Doover) {
garage.Start() garage.Start()
defer garage.Finish() defer garage.Finish()
@ -82,6 +112,7 @@ func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) {
return return
} }
var inbox string var inbox string
rcpt := doover.Rcpt
// already did the box indirection // already did the box indirection
if rcpt[0] == '%' { if rcpt[0] == '%' {
inbox = rcpt[1:] inbox = rcpt[1:]
@ -90,17 +121,23 @@ func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) {
ok := boxofboxes.Get(rcpt, &box) ok := boxofboxes.Get(rcpt, &box)
if !ok { if !ok {
ilog.Printf("failed getting inbox for %s", rcpt) ilog.Printf("failed getting inbox for %s", rcpt)
sayitagain(goarounds+1, userid, rcpt, msg) sayitagain(goarounds+1, userid, doover)
return return
} }
inbox = box.In inbox = box.In
} }
for i, msg := range doover.Msgs {
if i > 0 {
time.Sleep(2 * time.Second)
}
err := PostMsg(ki.keyname, ki.seckey, inbox, msg) err := PostMsg(ki.keyname, ki.seckey, inbox, msg)
if err != nil { if err != nil {
ilog.Printf("failed to post json to %s: %s", inbox, err) ilog.Printf("failed to post json to %s: %s", inbox, err)
sayitagain(goarounds+1, userid, rcpt, msg) doover.Msgs = doover.Msgs[i:]
sayitagain(goarounds+1, userid, doover)
return return
} }
}
} }
var pokechan = make(chan int, 1) var pokechan = make(chan int, 1)
@ -147,21 +184,26 @@ func redeliverator() {
for _, d := range doovers { for _, d := range doovers {
if d.When.Before(now) { if d.When.Before(now) {
var goarounds, userid int64 var goarounds, userid int64
var rcpt string var data []byte
var msg []byte dqmtx.Lock()
row := stmtLoadDoover.QueryRow(d.ID) row := stmtLoadDoover.QueryRow(d.ID)
err := row.Scan(&goarounds, &userid, &rcpt, &msg) err := row.Scan(&goarounds, &userid, &d.Rcpt, &data)
if err != nil { if err != nil {
elog.Printf("error scanning doover: %s", err) elog.Printf("error scanning doover: %s", err)
dqmtx.Unlock() // defer
continue continue
} }
_, err = stmtZapDoover.Exec(d.ID) _, err = stmtZapDoover.Exec(d.ID)
if err != nil { if err != nil {
elog.Printf("error deleting doover: %s", err) elog.Printf("error deleting doover: %s", err)
dqmtx.Unlock() // defer
continue continue
} }
dqmtx.Unlock() // defer
d.Msgs = bytes.Split(data, []byte{0})
rcpt := d.Rcpt
ilog.Printf("redeliverating %s try %d", rcpt, goarounds) ilog.Printf("redeliverating %s try %d", rcpt, goarounds)
deliverate(goarounds, userid, rcpt, msg) deliveration(goarounds, userid, d)
} else if d.When.Before(nexttime) { } else if d.When.Before(nexttime) {
nexttime = d.When nexttime = d.When
} }