@@ -91,11 +91,11 @@ (define (tasks:hostinfo-get-transport vec) (vector-ref vec 4)) (define (tasks:hostinfo-get-pid vec) (vector-ref vec 5)) (define (tasks:hostinfo-get-hostname vec) (vector-ref vec 6)) (define (tasks:server-lock-slot mdb run-id) - (tasks:server-clean-out-old-records-for-run-id mdb run-id) + (tasks:server-clean-out-old-records-for-run-id mdb run-id " tasks:server-lock-slot") (if (< (tasks:num-in-available-state mdb run-id) 4) (begin (tasks:server-set-available mdb run-id) (thread-sleep! 0.2) ;; Try removing this. It may not be needed. (tasks:server-am-i-the-server? mdb run-id)) @@ -123,58 +123,79 @@ (let ((res 0)) (sqlite3:for-each-row (lambda (num-in-queue) (set! res num-in-queue)) mdb - "SELECT count(id) FROM servers WHERE run_id=?;" + "SELECT count(id) FROM servers WHERE run_id=? AND state = 'available';" run-id) res)) -(define (tasks:server-clean-out-old-records-for-run-id mdb run-id) - (sqlite3:execute mdb "DELETE FROM servers WHERE state in ('available','shutting-down') AND (strftime('%s','now') - start_time) > 100 AND run_id=?;" run-id)) - -(define (tasks:server-force-clean-running-records-for-run-id mdb run-id) - (sqlite3:execute mdb "DELETE FROM servers WHERE state = 'running' AND run_id=?;" run-id)) - -(define (tasks:server-force-clean-run-record mdb run-id iface port) - (sqlite3:execute mdb "DELETE FROM servers WHERE state = 'running' AND run_id=? AND interface=? AND port=?;" - run-id iface port)) +(define (tasks:server-clean-out-old-records-for-run-id mdb run-id tag) + (sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE state in ('available','shutting-down') AND (strftime('%s','now') - start_time) > 50 AND run_id=?;" + (conc "defunct" tag) run-id)) + +(define (tasks:server-force-clean-running-records-for-run-id mdb run-id tag) + (sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE state = 'running' AND run_id=?;" + (conc "defunct" tag) run-id)) + +(define (tasks:server-force-clean-run-record mdb run-id iface port tag) + (sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE state = 'running' AND run_id=? AND interface=? AND port=?;" + (conc "defunct" tag) run-id iface port)) + +(define (tasks:server-delete-records-for-this-pid mdb tag) + (sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE hostname=? AND pid=?;" + (conc "defunct" tag) (get-host-name) (current-process-id))) + +(define (tasks:server-delete-record mdb server-id tag) + (sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE id=?;" + (conc "defunct" tag) server-id) + ;; use this opportuntity to clean out records over one month old or over 10 minutes old with port = -1 (i.e. a never used placeholder) + (sqlite3:execute mdb "DELETE FROM servers WHERE state not in ('running','shutting-down') AND (strftime('%s','now') - start_time) > 2628000;") + (sqlite3:execute mdb "DELETE FROM servers WHERE state like 'defunct%' AND port=-1 AND (strftime('%s','now') - start_time) > 600;") + ) (define (tasks:server-set-state! mdb server-id state) - (sqlite3:execute mdb "UPDATE servers SET state=? WHERE id=?;" state server-id)) - -(define (tasks:server-delete-record! mdb server-id) - (sqlite3:execute mdb "DELETE FROM servers WHERE id=?;" server-id)) - -(define (tasks:server-delete-records-for-this-pid mdb) - (sqlite3:execute mdb "DELETE FROM servers WHERE hostname=? AND pid=?;" (get-host-name) (current-process-id))) + (sqlite3:execute mdb "UPDATE servers SET state=?,heartbeat=strftime('%s','now') WHERE id=?;" state server-id)) (define (tasks:server-set-interface-port mdb server-id interface port) - (sqlite3:execute mdb "UPDATE servers SET interface=?,port=? WHERE id=?;" interface port server-id)) + (sqlite3:execute mdb "UPDATE servers SET interface=?,port=?,heartbeat=strftime('%s','now') WHERE id=?;" interface port server-id)) +;; Get random port not used in long time +;; (define (tasks:server-get-next-port mdb) - (let ((res #f) - (port-param (if (and (args:get-arg "-port") - (string->number (args:get-arg "-port"))) - (string->number (args:get-arg "-port")) - #f)) - (config-port (if (and (config-lookup *configdat* "server" "port") - (string->number (config-lookup *configdat* "server" "port"))) - (string->number (config-lookup *configdat* "server" "port")) - #f))) + (let* ((lownum 30000) + (highnum 64000) + (used-ports '()) + (get-rand-port (lambda () + (+ lownum (random (- highnum lownum))))) + (port-param (if (and (args:get-arg "-port") + (string->number (args:get-arg "-port"))) + (string->number (args:get-arg "-port")) + #f)) + ;; (config-port (if (and (config-lookup *configdat* "server" "port") + ;; (string->number (config-lookup *configdat* "server" "port"))) + ;; (string->number (config-lookup *configdat* "server" "port")) + ;; #f)) + ) (sqlite3:for-each-row (lambda (port) - (set! res (+ port 1))) ;; set to next + (set! used-ports (cons port used-ports))) mdb - "SELECT max(port) FROM servers;") + "SELECT port FROM servers;") (cond ((and port-param res) (if (> res port-param) res port-param)) (port-param port-param) - ((and config-port res) (if (> res config-port) res config-port)) - (config-port config-port) - ((and res (> res 8080)) res) - (else (+ 5000 (random 1001)))))) + ;; ((and config-port res) (if (> res config-port) res config-port)) + ;; (config-port config-port) + (else + (let loop ((port (get-rand-port)) + (remtries 100)) + (if (member port used-ports) + (if (> remtries 0) + (loop (get-rand-port)(- remtries 1)) + (get-rand-port)) + port)))))) (define (tasks:server-am-i-the-server? mdb run-id) (let* ((all (tasks:server-get-servers-vying-for-run-id mdb run-id)) (first (if (null? all) (begin (debug:print 0 "ERROR: no servers listed, should be at least one by now.") @@ -205,11 +226,11 @@ (res '())) (sqlite3:for-each-row (lambda (a . b) (set! res (cons (apply vector a b) res))) mdb - (conc "SELECT " selstr " FROM servers WHERE run_id=? ORDER BY start_time DESC;") + (conc "SELECT " selstr " FROM servers WHERE run_id=? AND state in ('available','running') ORDER BY start_time DESC;") run-id) (vector header res))) (define (tasks:get-server mdb run-id) (let ((res #f) @@ -226,14 +247,15 @@ res)) (define (tasks:get-all-servers mdb) (let ((res '())) (sqlite3:for-each-row - (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport) - (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport) res))) + (lambda (id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id) + ;; 0 1 2 3 4 5 6 7 8 9 10 11 12 + (set! res (cons (vector id pid hostname interface port pubport start-time priority state mt-version last-update transport run-id) res))) mdb - "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport FROM servers ORDER BY start_time DESC;") + "SELECT id,pid,hostname,interface,port,pubport,start_time,priority,state,mt_version,strftime('%s','now')-heartbeat AS last_update,transport,run_id FROM servers WHERE state NOT LIKE 'defunct%' ORDER BY start_time DESC;") res)) (define (tasks:kill-server status hostname port pid) (debug:print-info 1 "Removing defunct server record for " hostname ":" port) (if port @@ -253,11 +275,11 @@ ) ;; local machine, send sig term (begin ;;(debug:print-info 1 "Stopping remote servers not yet supported.")))) (debug:print-info 1 "Telling alive server on " hostname ":" port " to commit servercide") (let ((serverdat (list hostname port))) - (http-transport:client-connect hostname port) + (hash-table-set! *runremote* run-id (http-transport:client-connect hostname port)) (cdb:kill-server serverdat pid))))) ;; remote machine, try telling server to commit suicide (begin (if status (if (equal? hostname (get-host-name)) (begin