diff --git a/database.go b/database.go index a8a80a1..1ef5cb1 100644 --- a/database.go +++ b/database.go @@ -1106,6 +1106,7 @@ var stmtSaveMeta, stmtDeleteAllMeta, stmtDeleteOneMeta, stmtDeleteSomeMeta, stmt var stmtHonksISaved, stmtGetFilters, stmtSaveFilter, stmtDeleteFilter *sql.Stmt var stmtGetTracks *sql.Stmt var stmtSaveChonk, stmtLoadChonks, stmtGetChatters *sql.Stmt +var stmtDeliquentCheck, stmtDeliquentUpdate *sql.Stmt func preparetodie(db *sql.DB, s string) *sql.Stmt { stmt, err := db.Prepare(s) @@ -1192,4 +1193,6 @@ func prepareStatements(db *sql.DB) { stmtSaveChonk = preparetodie(db, "insert into chonks (userid, xid, who, target, dt, noise, format) values (?, ?, ?, ?, ?, ?, ?)") stmtLoadChonks = preparetodie(db, "select chonkid, userid, xid, who, target, dt, noise, format from chonks where userid = ? and dt > ? order by chonkid asc") stmtGetChatters = preparetodie(db, "select distinct(target) from chonks where userid = ?") + stmtDeliquentCheck = preparetodie(db, "select dooverid, msg from doovers where rcpt = ?") + stmtDeliquentUpdate = preparetodie(db, "update doovers set data = ? where dooverid = ?") } diff --git a/deliverator.go b/deliverator.go index 3428137..fca5290 100644 --- a/deliverator.go +++ b/deliverator.go @@ -16,8 +16,10 @@ package main import ( - "fmt" + "bytes" + "database/sql" notrand "math/rand" + "sync" "time" "humungus.tedunangst.com/r/webs/gate" @@ -26,9 +28,12 @@ import ( type Doover struct { ID int64 When time.Time + Rcpt string + Msgs [][]byte } -func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) { +func sayitagain(goarounds int64, userid int64, doover Doover) { + rcpt := doover.Rcpt var drift time.Duration switch goarounds { case 1: @@ -43,12 +48,12 @@ func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) { drift = 24 * time.Hour default: ilog.Printf("he's dead jim: %s", rcpt) - clearoutbound(rcpt) return } drift += time.Duration(notrand.Int63n(int64(drift / 10))) when := time.Now().Add(drift) - _, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), goarounds, userid, rcpt, msg) + data := bytes.Join(doover.Msgs, []byte{0}) + _, err := stmtAddDoover.Exec(when.UTC().Format(dbtimeformat), goarounds, userid, rcpt, data) if err != nil { elog.Printf("error saving doover: %s", err) } @@ -58,20 +63,45 @@ func sayitagain(goarounds int64, userid int64, rcpt string, msg []byte) { } } -func clearoutbound(rcpt string) { - hostname := originate(rcpt) - if hostname == "" { +var dqmtx sync.Mutex + +func delinquent(rcpt string, msg []byte) bool { + dqmtx.Lock() + defer dqmtx.Unlock() + row := stmtDeliquentCheck.QueryRow(rcpt) + var dooverid int64 + var data []byte + err := row.Scan(&dooverid, data) + if err == sql.ErrNoRows { + return false + } + if err != nil { + elog.Printf("error scanning deliquent check: %s", err) + return true + } + data = append(data, 0) + data = append(data, msg...) + _, err = stmtDeliquentUpdate.Exec(data, dooverid) + if err != nil { + elog.Printf("error updating deliquent: %s", err) + return true + } + return true +} + +func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) { + if delinquent(rcpt, msg) { return } - xid := fmt.Sprintf("%%https://%s/%%", hostname) - ilog.Printf("clearing outbound for %s", xid) - db := opendatabase() - db.Exec("delete from doovers where rcpt like ?", xid) + var d Doover + d.Rcpt = rcpt + d.Msgs = append(d.Msgs, msg) + deliveration(goarounds, userid, d) } var garage = gate.NewLimiter(40) -func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) { +func deliveration(goarounds int64, userid int64, doover Doover) { garage.Start() defer garage.Finish() @@ -82,6 +112,7 @@ func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) { return } var inbox string + rcpt := doover.Rcpt // already did the box indirection if rcpt[0] == '%' { inbox = rcpt[1:] @@ -90,16 +121,22 @@ func deliverate(goarounds int64, userid int64, rcpt string, msg []byte) { ok := boxofboxes.Get(rcpt, &box) if !ok { ilog.Printf("failed getting inbox for %s", rcpt) - sayitagain(goarounds+1, userid, rcpt, msg) + sayitagain(goarounds+1, userid, doover) return } inbox = box.In } - err := PostMsg(ki.keyname, ki.seckey, inbox, msg) - if err != nil { - ilog.Printf("failed to post json to %s: %s", inbox, err) - sayitagain(goarounds+1, userid, rcpt, msg) - return + for i, msg := range doover.Msgs { + if i > 0 { + time.Sleep(2 * time.Second) + } + err := PostMsg(ki.keyname, ki.seckey, inbox, msg) + if err != nil { + ilog.Printf("failed to post json to %s: %s", inbox, err) + doover.Msgs = doover.Msgs[i:] + sayitagain(goarounds+1, userid, doover) + return + } } } @@ -147,21 +184,26 @@ func redeliverator() { for _, d := range doovers { if d.When.Before(now) { var goarounds, userid int64 - var rcpt string - var msg []byte + var data []byte + dqmtx.Lock() row := stmtLoadDoover.QueryRow(d.ID) - err := row.Scan(&goarounds, &userid, &rcpt, &msg) + err := row.Scan(&goarounds, &userid, &d.Rcpt, &data) if err != nil { elog.Printf("error scanning doover: %s", err) + dqmtx.Unlock() // defer continue } _, err = stmtZapDoover.Exec(d.ID) if err != nil { elog.Printf("error deleting doover: %s", err) + dqmtx.Unlock() // defer continue } + dqmtx.Unlock() // defer + d.Msgs = bytes.Split(data, []byte{0}) + rcpt := d.Rcpt ilog.Printf("redeliverating %s try %d", rcpt, goarounds) - deliverate(goarounds, userid, rcpt, msg) + deliveration(goarounds, userid, d) } else if d.When.Before(nexttime) { nexttime = d.When }