diff --git a/activity.go b/activity.go index d86769f..ba00bd9 100644 --- a/activity.go +++ b/activity.go @@ -1149,7 +1149,7 @@ func rubadubdub(user *WhatAbout, req junk.Junk) { j["published"] = time.Now().UTC().Format(time.RFC3339) j["object"] = req - deliverate(0, user.ID, actor, j.ToBytes()) + deliverate(user.ID, actor, j.ToBytes()) } func itakeitallback(user *WhatAbout, xid string, owner string, folxid string) { @@ -1168,7 +1168,7 @@ func itakeitallback(user *WhatAbout, xid string, owner string, folxid string) { j["object"] = f j["published"] = time.Now().UTC().Format(time.RFC3339) - deliverate(0, user.ID, owner, j.ToBytes()) + deliverate(user.ID, owner, j.ToBytes()) } func subsub(user *WhatAbout, xid string, owner string, folxid string) { @@ -1185,7 +1185,7 @@ func subsub(user *WhatAbout, xid string, owner string, folxid string) { j["object"] = xid j["published"] = time.Now().UTC().Format(time.RFC3339) - deliverate(0, user.ID, owner, j.ToBytes()) + deliverate(user.ID, owner, j.ToBytes()) } func activatedonks(donks []*Donk) []junk.Junk { @@ -1495,7 +1495,7 @@ func sendchonk(user *WhatAbout, ch *Chonk) { rcpts := make(map[string]bool) rcpts[ch.Target] = true for a := range rcpts { - go deliverate(0, user.ID, a, msg) + go deliverate(user.ID, a, msg) } } @@ -1534,7 +1534,7 @@ func honkworldwide(user *WhatAbout, honk *Honk) { } } for a := range rcpts { - go deliverate(0, user.ID, a, msg) + go deliverate(user.ID, a, msg) } if honk.Public && len(honk.Onts) > 0 { collectiveaction(honk) @@ -1567,7 +1567,7 @@ func collectiveaction(honk *Honk) { } msg := j.ToBytes() for a := range rcpts { - go deliverate(0, user.ID, a, msg) + go deliverate(user.ID, a, msg) } } } @@ -1881,7 +1881,7 @@ func updateMe(username string) { } } for a := range rcpts { - go deliverate(0, user.ID, a, msg) + go deliverate(user.ID, a, msg) } } diff --git a/deliverator.go b/deliverator.go index fca5290..67ec737 100644 --- a/deliverator.go +++ b/deliverator.go @@ -26,16 +26,19 @@ import ( ) type Doover struct { - ID int64 - When time.Time - Rcpt string - Msgs [][]byte + ID int64 + When time.Time + Userid int64 + Tries int64 + Rcpt string + Msgs [][]byte } -func sayitagain(goarounds int64, userid int64, doover Doover) { +func sayitagain(doover Doover) { rcpt := doover.Rcpt var drift time.Duration - switch goarounds { + doover.Tries += 1 + switch doover.Tries { case 1: drift = 5 * time.Minute case 2: @@ -53,7 +56,7 @@ func sayitagain(goarounds int64, userid int64, doover Doover) { drift += time.Duration(notrand.Int63n(int64(drift / 10))) when := time.Now().Add(drift) data := bytes.Join(doover.Msgs, []byte{0}) - _, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), goarounds, userid, rcpt, data) + _, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), doover.Tries, doover.Userid, rcpt, data) if err != nil { elog.Printf("error saving doover: %s", err) } @@ -89,24 +92,26 @@ func delinquent(rcpt string, msg []byte) bool { return true } -func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) { +func deliverate(userid int64, rcpt string, msg []byte) { if delinquent(rcpt, msg) { return } var d Doover + d.Userid = userid + d.Tries = 0 d.Rcpt = rcpt d.Msgs = append(d.Msgs, msg) - deliveration(goarounds, userid, d) + deliveration(d) } var garage = gate.NewLimiter(40) -func deliveration(goarounds int64, userid int64, doover Doover) { +func deliveration(doover Doover) { garage.Start() defer garage.Finish() var ki *KeyInfo - ok := ziggies.Get(userid, &ki) + ok := ziggies.Get(doover.Userid, &ki) if !ok { elog.Printf("lost key for delivery") return @@ -121,7 +126,7 @@ func deliveration(goarounds int64, userid int64, doover Doover) { ok := boxofboxes.Get(rcpt, &box) if !ok { ilog.Printf("failed getting inbox for %s", rcpt) - sayitagain(goarounds+1, userid, doover) + sayitagain(doover) return } inbox = box.In @@ -134,7 +139,7 @@ func deliveration(goarounds int64, userid int64, doover Doover) { if err != nil { ilog.Printf("failed to post json to %s: %s", inbox, err) doover.Msgs = doover.Msgs[i:] - sayitagain(goarounds+1, userid, doover) + sayitagain(doover) return } } @@ -165,6 +170,23 @@ func getdoovers() []Doover { return doovers } +func extractdoover(d *Doover) error { + dqmtx.Lock() + defer dqmtx.Unlock() + row := stmtLoadDoover.QueryRow(d.ID) + var data []byte + err := row.Scan(&d.Tries, &d.Userid, &d.Rcpt, &data) + if err != nil { + return err + } + _, err = stmtZapDoover.Exec(d.ID) + if err != nil { + return err + } + d.Msgs = bytes.Split(data, []byte{0}) + return nil +} + func redeliverator() { sleeper := time.NewTimer(5 * time.Second) for { @@ -183,27 +205,13 @@ func redeliverator() { nexttime := now.Add(24 * time.Hour) for _, d := range doovers { if d.When.Before(now) { - var goarounds, userid int64 - var data []byte - dqmtx.Lock() - row := stmtLoadDoover.QueryRow(d.ID) - err := row.Scan(&goarounds, &userid, &d.Rcpt, &data) + err := extractdoover(&d) if err != nil { - elog.Printf("error scanning doover: %s", err) - dqmtx.Unlock() // defer + elog.Printf("error extracting doover: %s", err) continue } - _, err = stmtZapDoover.Exec(d.ID) - if err != nil { - elog.Printf("error deleting doover: %s", err) - dqmtx.Unlock() // defer - continue - } - dqmtx.Unlock() // defer - d.Msgs = bytes.Split(data, []byte{0}) - rcpt := d.Rcpt - ilog.Printf("redeliverating %s try %d", rcpt, goarounds) - deliveration(goarounds, userid, d) + ilog.Printf("redeliverating %s try %d", d.Rcpt, d.Tries) + deliveration(d) } else if d.When.Before(nexttime) { nexttime = d.When } diff --git a/web.go b/web.go index df6e0be..9cb0e75 100644 --- a/web.go +++ b/web.go @@ -2422,7 +2422,7 @@ func apihandler(w http.ResponseWriter, r *http.Request) { rcpts := boxuprcpts(user, r.Form["rcpt"], public) msg := []byte(r.FormValue("msg")) for rcpt := range rcpts { - go deliverate(0, userid, rcpt, msg) + go deliverate(userid, rcpt, msg) } case "gethonkers": j := junk.New()