Megatest

Diff
Login

Differences From Artifact [7404179285]:

To Artifact [d79db433da]:


16
17
18
19
20
21
22


23
24
25
26
27
28
29

(import (prefix sqlite3 sqlite3:))
(import (prefix base64 base64:))

(declare (unit common))

(include "common_records.scm")



;; (require-library margs)
;; (include "margs.scm")

;; (define old-exit exit)
;; 
;; (define (exit . code)







>
>







16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

(import (prefix sqlite3 sqlite3:))
(import (prefix base64 base64:))

(declare (unit common))

(include "common_records.scm")
(include "thunk-utils.scm")


;; (require-library margs)
;; (include "margs.scm")

;; (define old-exit exit)
;; 
;; (define (exit . code)
58
59
60
61
62
63
64






65


66


67

68
69
70
71
72
73
74
  (mutex-lock! *context-mutex*)
  (let ((cxt (hash-table-ref/default *contexts* toppath #f)))
    (if (not cxt)
        (set! cxt (let ((x (make-cxt)))(hash-table-set! *contexts* toppath x) x)))
    (let ((cxt-mutex (cxt-mutex cxt)))
      (mutex-unlock! *context-mutex*)
      (mutex-lock! cxt-mutex)






      (let ((res (proc cxt)))


        (mutex-unlock! cxt-mutex)


        res))))

        
(define *db-keys* #f)

(define *configinfo*   #f)   ;; raw results from setup, includes toppath and table from megatest.config
(define *runconfigdat* #f)   ;; run configs data
(define *configdat*    #f)   ;; megatest.config data
(define *configstatus* #f)   ;; status of data; 'fulldata : all processing done, #f : no data yet, 'partialdata : partial read done







>
>
>
>
>
>
|
>
>
|
>
>
|
>







60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
  (mutex-lock! *context-mutex*)
  (let ((cxt (hash-table-ref/default *contexts* toppath #f)))
    (if (not cxt)
        (set! cxt (let ((x (make-cxt)))(hash-table-set! *contexts* toppath x) x)))
    (let ((cxt-mutex (cxt-mutex cxt)))
      (mutex-unlock! *context-mutex*)
      (mutex-lock! cxt-mutex)
      ;; here we guard proc with exception handler so
      ;; no matter how proc succeeds or fails,
      ;; the cxt-mutex will be unlocked afterward.
      (let* ((EXCEPTION-SYMBOL (gensym)) ;; use a generated symbol
             (guarded-proc               ;; to avoid collision
              (lambda args
                (let* ((res (condition-case
                             (apply proc args)
                             [x () (cons EXCEPTION-SYMBOL x)])))
                 (mutex-unlock! cxt-mutex)
                 (if (and (pair? res) (eq? (car res) EXCEPTION))
                     (abort (cdr res))
                     res)))))
        (guarded-proc cxt)))))
        
(define *db-keys* #f)

(define *configinfo*   #f)   ;; raw results from setup, includes toppath and table from megatest.config
(define *runconfigdat* #f)   ;; run configs data
(define *configdat*    #f)   ;; megatest.config data
(define *configstatus* #f)   ;; status of data; 'fulldata : all processing done, #f : no data yet, 'partialdata : partial read done
100
101
102
103
104
105
106
107











108
109
110
111
112
113
114
(define *task-db*             #f) ;; (vector db path-to-db)
(define *db-access-allowed*   #t) ;; flag to allow access
(define *db-access-mutex*     (make-mutex))
(define *db-cache-path*       #f)

;; SERVER
(define *my-client-signature* #f)
(define *transport-type*    'http)             ;; override with [server] transport http|rpc|nmsg











(define *runremote*         #f)                ;; if set up for server communication this will hold <host port>
(define *max-cache-size*    0)
(define *logged-in-clients* (make-hash-table))
(define *server-id*         #f)
(define *server-info*       #f)
(define *time-to-exit*      #f)
(define *server-run*        #t)







|
>
>
>
>
>
>
>
>
>
>
>







113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
(define *task-db*             #f) ;; (vector db path-to-db)
(define *db-access-allowed*   #t) ;; flag to allow access
(define *db-access-mutex*     (make-mutex))
(define *db-cache-path*       #f)

;; SERVER
(define *my-client-signature* #f)
(define *transport-type*  #f)             ;; override with [server] transport http|rpc|nmsg

(define *DEFAULT-TRANSPORT* "http")
(define (common:set-transport-type)
  (set! *transport-type*
        (string->symbol
         (or
          (args:get-arg "-transport")
          (configf:lookup *configdat* "server" "transport")
          *DEFAULT-TRANSPORT*)))
  *transport-type*)
  
(define *runremote*         #f)                ;; if set up for server communication this will hold <host port>
(define *max-cache-size*    0)
(define *logged-in-clients* (make-hash-table))
(define *server-id*         #f)
(define *server-info*       #f)
(define *time-to-exit*      #f)
(define *server-run*        #t)
607
608
609
610
611
612
613

614
615
616
617
618
619
620
			(thread-sleep! 1)
			(delay-loop (+ count 1))))
		  (loop)))
	    (if (common:low-noise-print 30)
		(debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = " *time-to-exit*)))))))

(define (std-exit-procedure)

  (let ((no-hurry  (if *time-to-exit* ;; hurry up
		       #f
		       (begin
			 (set! *time-to-exit* #t)
			 #t))))
    (debug:print-info 4 *default-log-port* "starting exit process, finalizing databases.")
    (if (and no-hurry (debug:debug-mode 18))







>







631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
			(thread-sleep! 1)
			(delay-loop (+ count 1))))
		  (loop)))
	    (if (common:low-noise-print 30)
		(debug:print-info 0 *default-log-port* "Exiting watchdog timer, *time-to-exit* = " *time-to-exit*)))))))

(define (std-exit-procedure)
  
  (let ((no-hurry  (if *time-to-exit* ;; hurry up
		       #f
		       (begin
			 (set! *time-to-exit* #t)
			 #t))))
    (debug:print-info 4 *default-log-port* "starting exit process, finalizing databases.")
    (if (and no-hurry (debug:debug-mode 18))
635
636
637
638
639
640
641









642
643
644
645
646
647
648
			      (debug:print 4 *default-log-port* "Attempting clean exit. Please be patient and wait a few seconds...")
			      (if no-hurry
				  (thread-sleep! 5) ;; give the clean up few seconds to do it's stuff
				  (thread-sleep! 2))
			      (debug:print 4 *default-log-port* " ... done")
			      )
			    "clean exit")))









      (thread-start! th1)
      (thread-start! th2)
      (thread-join! th1))))

(define (std-signal-handler signum)
  ;; (signal-mask! signum)
  (set! *time-to-exit* #t)







>
>
>
>
>
>
>
>
>







660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
			      (debug:print 4 *default-log-port* "Attempting clean exit. Please be patient and wait a few seconds...")
			      (if no-hurry
				  (thread-sleep! 5) ;; give the clean up few seconds to do it's stuff
				  (thread-sleep! 2))
			      (debug:print 4 *default-log-port* " ... done")
			      )
			    "clean exit")))

      ;; let's try to clean up open sockets
      (if *runremote*
          (case (remote-transport *runremote*)
            ((http) #t)
            ((rpc)  (rpc:close-all-connections!))
            (else
             (debug:print-info 0 *default-log-port* "Transport "(remote-transport *runremote*)" not supported"))))

      (thread-start! th1)
      (thread-start! th2)
      (thread-join! th1))))

(define (std-signal-handler signum)
  ;; (signal-mask! signum)
  (set! *time-to-exit* #t)
1082
1083
1084
1085
1086
1087
1088
1089

1090
1091
1092
1093
1094
1095
1096
	   (with-input-from-pipe 
	    (conc "ssh " remote-host " cat /proc/loadavg")
	    (lambda ()(list (read)(read)(read)))))
      (with-input-from-file "/proc/loadavg" 
	(lambda ()(list (read)(read)(read))))))

;; get normalized cpu load by reading from /proc/loadavg and /proc/cpuinfo return all three values and the number of real cpus and the number of threads
;; returns list (normalized-proc-load normalized-core-load 1m 5m 15m ncores nthreads)

;;
(define (common:get-normalized-cpu-load remote-host)
  (let ((data (if remote-host
                  (with-input-from-pipe 
                   (conc "ssh " remote-host " cat /proc/loadavg;cat /proc/cpuinfo;echo end")
                   read-lines)
                  (append 







|
>







1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
	   (with-input-from-pipe 
	    (conc "ssh " remote-host " cat /proc/loadavg")
	    (lambda ()(list (read)(read)(read)))))
      (with-input-from-file "/proc/loadavg" 
	(lambda ()(list (read)(read)(read))))))

;; get normalized cpu load by reading from /proc/loadavg and /proc/cpuinfo return all three values and the number of real cpus and the number of threads
;; returns alist '((adj-cpu-load . normalized-proc-load) ... etc.
;;  keys: adj-proc-load, adj-core-load, 1m-load, 5m-load, 15m-load
;;
(define (common:get-normalized-cpu-load remote-host)
  (let ((data (if remote-host
                  (with-input-from-pipe 
                   (conc "ssh " remote-host " cat /proc/loadavg;cat /proc/cpuinfo;echo end")
                   read-lines)
                  (append 
1141
1142
1143
1144
1145
1146
1147


1148





























1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174

1175
1176
1177



















1178

1179
1180


1181
1182
1183
1184
1185
1186
1187
1188



1189
1190
1191
1192
1193
1194
1195

(define (common:unix-ping hostname)
  (let ((res (system (conc "ping -c 1 " hostname " > /dev/null"))))
    (eq? res 0)))

;; ideally put all this info into the db, no need to preserve it across moving homehost
;;


(define (common:get-least-loaded-host hosts-raw)





























  (let* ((hosts (filter (lambda (x)
                          (string-match (regexp "^\\S+$") x))
                        hosts-raw)))
    (if (null? hosts)
        #f
        ;;
        ;; stategy:
        ;;    sort by last-used and normalized-load
        ;;    if last-updated > 15 seconds then re-update
        ;;    take the host with the lowest load with the lowest last-used (i.e. not used for longest time)
        ;;
        (let ((best-host #f)
              (curr-time (current-seconds)))
          (for-each
           (lambda (hostname)
             (let* ((rec       (let ((h (hash-table-ref/default *host-loads* hostname #f)))
                                 (if h
                                     h
                                     (let ((h (make-host)))
                                       (hash-table-set! *host-loads* hostname h)
                                       h))))
                    ;; if host hasn't been pinged in 15 sec update it's data
                    (ping-good (if (< (- curr-time (host-last-update rec)) 15)
                                   (host-reachable rec)
                                   (or (host-reachable rec)
                                       (begin

                                         (host-reachable-set! rec (common:unix-ping hostname))
                                         (host-last-update-set! rec curr-time)
                                         (host-last-cpuload-set! rec (common:get-normalized-cpu-load hostname))



















                                         (host-reachable rec))))))

               (cond
                ((not best-host)


                 (set! best-host hostname))
                ((and ping-good
                      (< (alist-ref 'adj-core-load (host-last-cpuload rec))
                         (alist-ref 'adj-core-load
                                    (host-last-cpuload (hash-table-ref *host-loads* best-host)))))
                 (set! best-host hostname)))))
           hosts)
          best-host))))




(define (common:wait-for-cpuload maxload numcpus waitdelay #!key (count 1000) (msg #f)(remote-host #f))
  (let* ((loadavg (common:get-cpu-load remote-host))
	 (first   (car loadavg))
	 (next    (cadr loadavg))
	 (adjload (* maxload numcpus))
	 (loadjmp (- first next)))







>
>
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>



<
<
<
<
<
<
<
<
<
<
|
|
|
|
|
|
|
|
<
|
|
|
<
>
|
|
|
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
|
>
|
|
>
>
|
<
<
<
<
|
|
|
>
>
>







1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217










1218
1219
1220
1221
1222
1223
1224
1225

1226
1227
1228

1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258




1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271

(define (common:unix-ping hostname)
  (let ((res (system (conc "ping -c 1 " hostname " > /dev/null"))))
    (eq? res 0)))

;; ideally put all this info into the db, no need to preserve it across moving homehost
;;
;; return list of
;;  ( reachable? cpuload update-time )
(define (common:get-host-info hostname)
  (let* ((loadinfo (rmt:get-latest-host-load hostname))
         (load (car loadinfo))
         (load-sample-time (cdr loadinfo))
         (load-sample-age (- (current-seconds) load-sample-time))
         (loadinfo-timeout-seconds 20)
         (host-last-update-timeout-seconds 10)
         (host-rec (hash-table-ref/default *host-loads* hostname #f))
         )
    (cond
     ((< load-sample-age loadinfo-timeout-seconds)
      ;;(print "BB> chr - 1")
      (list #t
            load-sample-time
            load))
     ((and host-rec
           (< (current-seconds) (+ (host-last-update host-rec) host-last-update-timeout-seconds)))
      ;;(print "BB> chr - 2")
      (list #t
            (host-last-update host-rec)
            (host-last-cpuload host-rec )))
     ((common:unix-ping hostname)
      ;;(print "BB> chr - 3 host-rec="host-rec" lu="(if host-rec (- (current-seconds) (host-last-update host-rec)) "None"))
      (list #t
            (current-seconds)
            (alist-ref 'adj-core-load (common:get-normalized-cpu-load hostname))))
     (else
      (list #f 0 -1)))))
    
(define (common:update-host-loads-table hosts-raw)
  (let* ((hosts (filter (lambda (x)
                          (string-match (regexp "^\\S+$") x))
                        hosts-raw)))










    (for-each
     (lambda (hostname)
       (let* ((rec       (let ((h (hash-table-ref/default *host-loads* hostname #f)))
                          (if h
                              h
                              (let ((h (make-host)))
                                (hash-table-set! *host-loads* hostname h)
                                h))))

              (host-info         (common:get-host-info hostname))
              (is-reachable      (car host-info))
              (last-reached-time (cadr host-info))

              (load              (caddr host-info)))
         (host-reachable-set!    rec is-reachable)
         (host-last-update-set!  rec last-reached-time)
         (host-last-cpuload-set! rec load)))
     hosts)))

(define (common:get-least-loaded-host hosts-raw)
  (let* ((hosts (filter (lambda (x)
                          (string-match (regexp "^\\S+$") x))
                        hosts-raw))
         (best-host #f)
         (best-load 99999)
         (curr-time (current-seconds)))
    (common:update-host-loads-table hosts)
    (for-each
     (lambda (hostname)
       (let* ((rec
               (let ((h (hash-table-ref/default *host-loads* hostname #f)))
                 (if h
                     h
                     (let ((h (make-host)))
                       (hash-table-set! *host-loads* hostname h)
                       h))))
              (reachable (host-reachable rec))
              (load      (host-last-cpuload   rec)))
         (cond
          ((not reachable) #f)
          ((< (+ load (/ (random 250) 1000))         ;; add a random factor to keep from getting in a rut
              (+ best-load (/ (random 250) 1000))  )
           (set! best-load load)




           (set! best-host hostname)))))
     hosts)
    best-host))




(define (common:wait-for-cpuload maxload numcpus waitdelay #!key (count 1000) (msg #f)(remote-host #f))
  (let* ((loadavg (common:get-cpu-load remote-host))
	 (first   (car loadavg))
	 (next    (cadr loadavg))
	 (adjload (* maxload numcpus))
	 (loadjmp (- first next)))