Remove explicit relay handling
GitHub-Pull-Request: https://github.com/syncthing/discosrv/pull/40
This commit is contained in:
committed by
Jakob Borg
parent
6d3aae32bc
commit
94a392144b
@@ -52,14 +52,6 @@ func (s *cleansrv) cleanOldEntries() (err error) {
|
|||||||
log.Printf("Clean: %d old addresses", rows)
|
log.Printf("Clean: %d old addresses", rows)
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err = tx.Stmt(s.prep["cleanRelay"]).Exec()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if rows, _ := res.RowsAffected(); rows > 0 {
|
|
||||||
log.Printf("Clean: %d old relays", rows)
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err = tx.Stmt(s.prep["cleanDevice"]).Exec()
|
res, err = tx.Stmt(s.prep["cleanDevice"]).Exec()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -68,7 +60,7 @@ func (s *cleansrv) cleanOldEntries() (err error) {
|
|||||||
log.Printf("Clean: %d old devices", rows)
|
log.Printf("Clean: %d old devices", rows)
|
||||||
}
|
}
|
||||||
|
|
||||||
var devs, addrs, relays int
|
var devs, addrs int
|
||||||
row := tx.Stmt(s.prep["countDevice"]).QueryRow()
|
row := tx.Stmt(s.prep["countDevice"]).QueryRow()
|
||||||
if err = row.Scan(&devs); err != nil {
|
if err = row.Scan(&devs); err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -77,11 +69,7 @@ func (s *cleansrv) cleanOldEntries() (err error) {
|
|||||||
if err = row.Scan(&addrs); err != nil {
|
if err = row.Scan(&addrs); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
row = tx.Stmt(s.prep["countRelay"]).QueryRow()
|
|
||||||
if err = row.Scan(&relays); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("Database: %d devices, %d addresses, %d relays", devs, addrs, relays)
|
log.Printf("Database: %d devices, %d addresses", devs, addrs)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -68,52 +68,21 @@ func postgresSetup(db *sql.DB) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS Relays (
|
|
||||||
DeviceID CHAR(63) NOT NULL,
|
|
||||||
Seen TIMESTAMP NOT NULL,
|
|
||||||
Address VARCHAR(256) NOT NULL,
|
|
||||||
Latency INTEGER NOT NULL
|
|
||||||
)`)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
row = db.QueryRow(`SELECT 'RelaysDeviceIDSeenIndex'::regclass`)
|
|
||||||
if err := row.Scan(nil); err != nil {
|
|
||||||
_, err = db.Exec(`CREATE INDEX RelaysDeviceIDSeenIndex ON Relays (DeviceID, Seen)`)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
row = db.QueryRow(`SELECT 'RelaysDeviceIDAddressIndex'::regclass`)
|
|
||||||
if err := row.Scan(nil); err != nil {
|
|
||||||
_, err = db.Exec(`CREATE INDEX RelaysDeviceIDAddressIndex ON Relays (DeviceID, Address)`)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func postgresCompile(db *sql.DB) (map[string]*sql.Stmt, error) {
|
func postgresCompile(db *sql.DB) (map[string]*sql.Stmt, error) {
|
||||||
stmts := map[string]string{
|
stmts := map[string]string{
|
||||||
"cleanAddress": "DELETE FROM Addresses WHERE Seen < now() - '2 hour'::INTERVAL",
|
"cleanAddress": "DELETE FROM Addresses WHERE Seen < now() - '2 hour'::INTERVAL",
|
||||||
"cleanRelay": "DELETE FROM Relays WHERE Seen < now() - '2 hour'::INTERVAL",
|
|
||||||
"cleanDevice": fmt.Sprintf("DELETE FROM Devices WHERE Seen < now() - '%d hour'::INTERVAL", maxDeviceAge/3600),
|
"cleanDevice": fmt.Sprintf("DELETE FROM Devices WHERE Seen < now() - '%d hour'::INTERVAL", maxDeviceAge/3600),
|
||||||
"countAddress": "SELECT count(*) FROM Addresses",
|
"countAddress": "SELECT count(*) FROM Addresses",
|
||||||
"countDevice": "SELECT count(*) FROM Devices",
|
"countDevice": "SELECT count(*) FROM Devices",
|
||||||
"countRelay": "SELECT count(*) FROM Relays",
|
|
||||||
"insertAddress": "INSERT INTO Addresses (DeviceID, Seen, Address) VALUES ($1, now(), $2)",
|
"insertAddress": "INSERT INTO Addresses (DeviceID, Seen, Address) VALUES ($1, now(), $2)",
|
||||||
"insertRelay": "INSERT INTO Relays (DeviceID, Seen, Address, Latency) VALUES ($1, now(), $2, $3)",
|
|
||||||
"insertDevice": "INSERT INTO Devices (DeviceID, Seen) VALUES ($1, now())",
|
"insertDevice": "INSERT INTO Devices (DeviceID, Seen) VALUES ($1, now())",
|
||||||
"selectAddress": "SELECT Address FROM Addresses WHERE DeviceID=$1 AND Seen > now() - '1 hour'::INTERVAL ORDER BY random() LIMIT 16",
|
"selectAddress": "SELECT Address FROM Addresses WHERE DeviceID=$1 AND Seen > now() - '1 hour'::INTERVAL ORDER BY random() LIMIT 16",
|
||||||
"selectRelay": "SELECT Address, Latency FROM Relays WHERE DeviceID=$1 AND Seen > now() - '1 hour'::INTERVAL ORDER BY random() LIMIT 16",
|
|
||||||
"selectDevice": "SELECT Seen FROM Devices WHERE DeviceID=$1",
|
"selectDevice": "SELECT Seen FROM Devices WHERE DeviceID=$1",
|
||||||
"updateAddress": "UPDATE Addresses SET Seen=now() WHERE DeviceID=$1 AND Address=$2",
|
"updateAddress": "UPDATE Addresses SET Seen=now() WHERE DeviceID=$1 AND Address=$2",
|
||||||
"updateDevice": "UPDATE Devices SET Seen=now() WHERE DeviceID=$1",
|
"updateDevice": "UPDATE Devices SET Seen=now() WHERE DeviceID=$1",
|
||||||
"deleteRelay": "DELETE FROM Relays WHERE DeviceID=$1",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
res := make(map[string]*sql.Stmt, len(stmts))
|
res := make(map[string]*sql.Stmt, len(stmts))
|
||||||
|
|||||||
@@ -50,41 +50,22 @@ func qlSetup(db *sql.DB) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err = tx.Exec(`CREATE INDEX IF NOT EXISTS AddressesDeviceIDAddressIndex ON Addresses (DeviceID, Address)`); err != nil {
|
_, err = tx.Exec(`CREATE INDEX IF NOT EXISTS AddressesDeviceIDAddressIndex ON Addresses (DeviceID, Address)`)
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = tx.Exec(`CREATE TABLE IF NOT EXISTS Relays (
|
|
||||||
DeviceID STRING NOT NULL,
|
|
||||||
Seen TIME NOT NULL,
|
|
||||||
Address STRING NOT NULL,
|
|
||||||
Latency INT NOT NULL,
|
|
||||||
)`)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = tx.Exec(`CREATE INDEX IF NOT EXISTS RelaysDeviceIDAddressIndex ON Relays (DeviceID, Address)`)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func qlCompile(db *sql.DB) (map[string]*sql.Stmt, error) {
|
func qlCompile(db *sql.DB) (map[string]*sql.Stmt, error) {
|
||||||
stmts := map[string]string{
|
stmts := map[string]string{
|
||||||
"cleanAddress": `DELETE FROM Addresses WHERE Seen < now() - duration("2h")`,
|
"cleanAddress": `DELETE FROM Addresses WHERE Seen < now() - duration("2h")`,
|
||||||
"cleanRelay": `DELETE FROM Relays WHERE Seen < now() - duration("2h")`,
|
|
||||||
"cleanDevice": fmt.Sprintf(`DELETE FROM Devices WHERE Seen < now() - duration("%dh")`, maxDeviceAge/3600),
|
"cleanDevice": fmt.Sprintf(`DELETE FROM Devices WHERE Seen < now() - duration("%dh")`, maxDeviceAge/3600),
|
||||||
"countAddress": "SELECT count(*) FROM Addresses",
|
"countAddress": "SELECT count(*) FROM Addresses",
|
||||||
"countDevice": "SELECT count(*) FROM Devices",
|
"countDevice": "SELECT count(*) FROM Devices",
|
||||||
"countRelay": "SELECT count(*) FROM Relays",
|
|
||||||
"insertAddress": "INSERT INTO Addresses (DeviceID, Seen, Address) VALUES ($1, now(), $2)",
|
"insertAddress": "INSERT INTO Addresses (DeviceID, Seen, Address) VALUES ($1, now(), $2)",
|
||||||
"insertRelay": "INSERT INTO Relays (DeviceID, Seen, Address, Latency) VALUES ($1, now(), $2, $3)",
|
|
||||||
"insertDevice": "INSERT INTO Devices (DeviceID, Seen) VALUES ($1, now())",
|
"insertDevice": "INSERT INTO Devices (DeviceID, Seen) VALUES ($1, now())",
|
||||||
"selectAddress": `SELECT Address from Addresses WHERE DeviceID==$1 AND Seen > now() - duration("1h") LIMIT 16`,
|
"selectAddress": `SELECT Address from Addresses WHERE DeviceID==$1 AND Seen > now() - duration("1h") LIMIT 16`,
|
||||||
"selectRelay": `SELECT Address, Latency from Relays WHERE DeviceID==$1 AND Seen > now() - duration("1h") LIMIT 16`,
|
|
||||||
"selectDevice": "SELECT Seen FROM Devices WHERE DeviceID==$1",
|
"selectDevice": "SELECT Seen FROM Devices WHERE DeviceID==$1",
|
||||||
"updateAddress": "UPDATE Addresses Seen=now() WHERE DeviceID==$1 AND Address==$2",
|
"updateAddress": "UPDATE Addresses Seen=now() WHERE DeviceID==$1 AND Address==$2",
|
||||||
"updateDevice": "UPDATE Devices Seen=now() WHERE DeviceID==$1",
|
"updateDevice": "UPDATE Devices Seen=now() WHERE DeviceID==$1",
|
||||||
"deleteRelay": "DELETE FROM Relays WHERE DeviceID==$1",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
res := make(map[string]*sql.Stmt, len(stmts))
|
res := make(map[string]*sql.Stmt, len(stmts))
|
||||||
|
|||||||
@@ -35,13 +35,7 @@ type querysrv struct {
|
|||||||
|
|
||||||
type announcement struct {
|
type announcement struct {
|
||||||
Seen time.Time
|
Seen time.Time
|
||||||
Direct []string `json:"direct"`
|
Addresses []string `json:"addresses"`
|
||||||
Relays []annRelay `json:"relays"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type annRelay struct {
|
|
||||||
URL string `json:"url"`
|
|
||||||
Latency int `json:"latency"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type safeCache struct {
|
type safeCache struct {
|
||||||
@@ -120,7 +114,7 @@ func (s *querysrv) Serve() {
|
|||||||
s.listener = tlsListener
|
s.listener = tlsListener
|
||||||
}
|
}
|
||||||
|
|
||||||
http.HandleFunc("/", s.handler)
|
http.HandleFunc("/v13/", s.handler)
|
||||||
http.HandleFunc("/ping", handlePing)
|
http.HandleFunc("/ping", handlePing)
|
||||||
|
|
||||||
srv := &http.Server{
|
srv := &http.Server{
|
||||||
@@ -219,7 +213,7 @@ func (s *querysrv) handleGET(ctx context.Context, w http.ResponseWriter, req *ht
|
|||||||
}
|
}
|
||||||
|
|
||||||
t0 := time.Now()
|
t0 := time.Now()
|
||||||
ann.Direct, err = s.getAddresses(ctx, deviceID)
|
ann.Addresses, err = s.getAddresses(ctx, deviceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println(reqID, "getAddresses:", err)
|
log.Println(reqID, "getAddresses:", err)
|
||||||
globalStats.Error()
|
globalStats.Error()
|
||||||
@@ -230,21 +224,9 @@ func (s *querysrv) handleGET(ctx context.Context, w http.ResponseWriter, req *ht
|
|||||||
log.Println(reqID, "getAddresses in", time.Since(t0))
|
log.Println(reqID, "getAddresses in", time.Since(t0))
|
||||||
}
|
}
|
||||||
|
|
||||||
t0 = time.Now()
|
|
||||||
ann.Relays, err = s.getRelays(deviceID)
|
|
||||||
if err != nil {
|
|
||||||
log.Println(reqID, "getRelays:", err)
|
|
||||||
globalStats.Error()
|
|
||||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if debug {
|
|
||||||
log.Println(reqID, "getRelays in", time.Since(t0))
|
|
||||||
}
|
|
||||||
|
|
||||||
globalStats.Query()
|
globalStats.Query()
|
||||||
|
|
||||||
if len(ann.Direct)+len(ann.Relays) == 0 {
|
if len(ann.Addresses) == 0 {
|
||||||
http.Error(w, "Not Found", http.StatusNotFound)
|
http.Error(w, "Not Found", http.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -283,9 +265,9 @@ func (s *querysrv) handlePOST(ctx context.Context, remoteIP net.IP, w http.Respo
|
|||||||
// handleAnnounce returns *two* errors. The first indicates a problem with
|
// handleAnnounce returns *two* errors. The first indicates a problem with
|
||||||
// something the client posted to us. We should return a 400 Bad Request
|
// something the client posted to us. We should return a 400 Bad Request
|
||||||
// and not worry about it. The second indicates that the request was fine,
|
// and not worry about it. The second indicates that the request was fine,
|
||||||
// but something internal fucked up. We should log it and respond with a
|
// but something internal messed up. We should log it and respond with a
|
||||||
// more apologetic 500 Internal Server Error.
|
// more apologetic 500 Internal Server Error.
|
||||||
userErr, internalErr := s.handleAnnounce(ctx, remoteIP, deviceID, ann.Direct, ann.Relays)
|
userErr, internalErr := s.handleAnnounce(ctx, remoteIP, deviceID, ann.Addresses)
|
||||||
if userErr != nil {
|
if userErr != nil {
|
||||||
if debug {
|
if debug {
|
||||||
log.Println(reqID, "handleAnnounce:", userErr)
|
log.Println(reqID, "handleAnnounce:", userErr)
|
||||||
@@ -316,7 +298,7 @@ func (s *querysrv) Stop() {
|
|||||||
s.listener.Close()
|
s.listener.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *querysrv) handleAnnounce(ctx context.Context, remote net.IP, deviceID protocol.DeviceID, direct []string, relays []annRelay) (userErr, internalErr error) {
|
func (s *querysrv) handleAnnounce(ctx context.Context, remote net.IP, deviceID protocol.DeviceID, addresses []string) (userErr, internalErr error) {
|
||||||
reqID := ctx.Value("id").(requestID)
|
reqID := ctx.Value("id").(requestID)
|
||||||
|
|
||||||
tx, err := s.db.Begin()
|
tx, err := s.db.Begin()
|
||||||
@@ -333,7 +315,7 @@ func (s *querysrv) handleAnnounce(ctx context.Context, remote net.IP, deviceID p
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for _, annAddr := range direct {
|
for _, annAddr := range addresses {
|
||||||
uri, err := url.Parse(annAddr)
|
uri, err := url.Parse(annAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
userErr = err
|
userErr = err
|
||||||
@@ -357,40 +339,12 @@ func (s *querysrv) handleAnnounce(ctx context.Context, remote net.IP, deviceID p
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
t0 := time.Now()
|
|
||||||
_, err = tx.Stmt(s.prep["deleteRelay"]).Exec(deviceID.String())
|
|
||||||
if err != nil {
|
|
||||||
internalErr = err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if debug {
|
|
||||||
log.Println(reqID, "deleteRelay in", time.Since(t0))
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, relay := range relays {
|
|
||||||
uri, err := url.Parse(relay.URL)
|
|
||||||
if err != nil {
|
|
||||||
userErr = err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
t0 = time.Now()
|
|
||||||
_, err = tx.Stmt(s.prep["insertRelay"]).Exec(deviceID.String(), uri.String(), relay.Latency)
|
|
||||||
if err != nil {
|
|
||||||
internalErr = err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if debug {
|
|
||||||
log.Println(reqID, "insertRelay in", time.Since(t0))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.updateDevice(ctx, tx, deviceID); err != nil {
|
if err := s.updateDevice(ctx, tx, deviceID); err != nil {
|
||||||
internalErr = err
|
internalErr = err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
t0 = time.Now()
|
t0 := time.Now()
|
||||||
internalErr = tx.Commit()
|
internalErr = tx.Commit()
|
||||||
if debug {
|
if debug {
|
||||||
log.Println(reqID, "commit in", time.Since(t0))
|
log.Println(reqID, "commit in", time.Since(t0))
|
||||||
@@ -488,27 +442,6 @@ func (s *querysrv) getDeviceSeen(device protocol.DeviceID) (time.Time, error) {
|
|||||||
return seen, nil
|
return seen, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *querysrv) getRelays(device protocol.DeviceID) ([]annRelay, error) {
|
|
||||||
rows, err := s.prep["selectRelay"].Query(device.String())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer rows.Close()
|
|
||||||
|
|
||||||
var res []annRelay
|
|
||||||
for rows.Next() {
|
|
||||||
var rel annRelay
|
|
||||||
|
|
||||||
err := rows.Scan(&rel.URL, &rel.Latency)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
res = append(res, rel)
|
|
||||||
}
|
|
||||||
|
|
||||||
return res, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func handlePing(w http.ResponseWriter, r *http.Request) {
|
func handlePing(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(204)
|
w.WriteHeader(204)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user