<?php // This is called by cron, and is persistent. // It takes care of build jobs. // It can be run on the same machine as the webserver. error_reporting(E_ALL); ini_set("track_errors", "On"); set_time_limit(0); $_s = microtime(TRUE); require_once("/etc/rocketgit/config.php"); $INC = dirname(__FILE__) . "/../inc"; require_once($INC . "/init.inc.php"); require_once($INC . "/log.inc.php"); require_once($INC . "/sql.inc.php"); require_once($INC . "/struct.inc.php"); require_once($INC . "/events.inc.php"); require_once($INC . "/repo.inc.php"); require_once($INC . "/prof.inc.php"); require_once($INC . "/mr.inc.php"); require_once($INC . "/keys.inc.php"); require_once($INC . "/user.inc.php"); require_once($INC . "/bug.inc.php"); require_once($INC . "/fixes.inc.php"); require_once($INC . "/plan.inc.php"); require_once($INC . "/admin.inc.php"); require_once($INC . "/ver.php"); require_once($INC . "/builder.inc.php"); require_once($INC . "/conn.inc.php"); require_once($INC . "/workers.inc.php"); if ($rg_builder_port == 0) exit(0); /* * Called when a connection closes */ function xdestroy($key) { global $workers; $workers--; } /* * Called when a new client connects */ function xnew($key, $arg) { global $rg_conns; global $workers; $s = &$rg_conns[$key]; $s['func_cmd'] = 'xdispatch'; $s['func_destroy'] = 'xdestroy'; $s['db'] = $arg; $s['auth'] = 0; $s['is_master'] = 0; $workers++; } /* * Dispatch a command from a worker */ function xdispatch($key, $line) { global $rg_conns; global $jobs; rg_log('Dispatch[' . $key . ']'); $s = &$rg_conns[$key]; $cmd = substr($line, 0, 4); $d = trim(substr($line, 4)); $x = stripcslashes($d); $u = @unserialize($x); if ($u === FALSE) { rg_conn_enq($key, 'ERR malformed command' . "\n"); rg_conn_shutdown($key, 2); return; } rg_log_ml('cmd=' . $cmd . ' u: ' . print_r($u, TRUE)); if (strcmp($cmd, 'ANN ') == 0) { $now = time(); if (($u['boot_time'] < $now - 30) || ($u['boot_time'] > $now + 30)) { rg_log('boot_time is too old; abort'); rg_conn_enq($key, 'ERR time not in sync between worker' . ' and server' . "\n"); rg_conn_shutdown($key, 2); return; } // Lookup user first if (!isset($u['type'])) $u['type'] = 'global'; if (strcasecmp($u['type'], 'global') == 0) { $worker_uid = 0; } else if (!isset($u['user'])) { rg_log('user field is not present; abort'); rg_conn_enq($key, 'ERR user not defined in conf file' . "\n"); rg_conn_shutdown($key, 2); return; } else { $w_ui = rg_user_info($s['db'], 0, $u['user'], ''); if ($w_ui['exists'] !== 1) { rg_log('invalid user; abort'); rg_conn_enq($key, 'ERR invalid user' . "\n"); rg_conn_shutdown($key, 2); return; } $worker_uid = $w_ui['uid']; } // Check if worker is registered $wi = rg_worker_find_by_name($s['db'], $worker_uid, $u['name']); if ($wi === -1) { rg_log('cannot load worker info: ' . rg_worker_error() . '; abort'); rg_conn_enq($key, 'ERR internal error' . "\n"); rg_conn_shutdown($key, 2); return; } if ($wi === 0) { rg_log('name [' . $u['name'] . '] not found; abort'); $err = 'ERR builder name not found, add it in the web' . ' interface!' . "\n"; rg_conn_enq($key, $err); rg_conn_shutdown($key, 2); return; } $sign = hash_hmac('sha512', $u['boot_time'], $wi['key']); if (strcmp($sign, $u['sign']) != 0) { rg_log('signature is not ok [' . $sign . ']' . ' != [' . $u['sign'] . ']'); rg_conn_enq($key, 'ERR wrong signature' . "\n"); rg_conn_shutdown($key, 2); return; } $s['worker_id'] = $wi['id']; $s['worker_uid'] = $worker_uid; $s['ann'] = $u; $s['auth'] = 1; $s['active_jobs'] = 0; $a = array(); $a['name'] = $u['name']; $a['uname'] = $u['uname']; $a['host'] = $u['host']; $a['arch'] = $u['arch']; $a['env'] = $u['env']; $a['ssh_key'] = $u['ssh_key']; $a['ip'] = rg_fix_ip($s['ip']); rg_worker_update($s['db'], $worker_uid, $wi['id'], $a); rg_log($key . ':Peer [' . $u['name'] . '] announce'); return; } if ($s['auth'] != 1) { rg_log($key . ':Client not authenticated!'); $a = array('error' => 'client not authenticated'); $cmd = 'ERR ' . rg_conn_prepare($a) . "\n"; rg_conn_enq($key, $cmd); return; } if (strcmp($cmd, 'STA ') == 0) { $jid = $u['id']; $jobs[$jid]['worker'] = $key; $jobs[$jid]['worker_name'] = $s['ann']['name']; $jobs[$jid]['worker_started'] = time(); $s['active_jobs'] += 1; rg_log('Job started: ' . $jid); return; } if (strcmp($cmd, 'DON ') == 0) { $s['active_jobs'] -= 1; $jid = $u['id']; if (isset($u['error'])) { rg_log('job failed with error: ' . $u['error']); // Delay job and retry (on another worker) $jobs[$jid]['next_try'] = time() + 60; $k = $s['worker_id']; $jobs[$jid]['avoid'][$k] = 1; return; } $r = rg_builder_done($s['db'], $jobs[$jid], $u['status']); if ($r === TRUE) { unset($jobs[$jid]); // Send DoneREceived - so client will delete the jobs $a = array('id' => $job['id']); $cmd = 'DRE ' . rg_conn_prepare($a) . "\n"; rg_conn_enq($key, $cmd); } return; } rg_log('Unknown command [' . $cmd . ']!'); } function rg_process_job($db, &$job) { global $rg_conns; // Job is already in progress? if (!empty($job['worker'])) return; // Should we delay because of a previous fail? if (isset($job['next_try']) && ($job['next_try'] < time())) return; rg_log_ml('Processing job: ' . print_r($job, TRUE)); // Get the worker list, so we can sort it $workers_list = rg_worker_list_all($db, $job['uid']); if ($workers_list === FALSE) { rg_log('cannot load workers list: ' . rg_worker_error()); $job['next_try'] = time() + 60; return; } //rg_log_ml('DEBUG: workers list: ' . print_r($workers_list, TRUE)); // Trying to find a worker in the list of connections $found = FALSE; foreach ($rg_conns as $key => $i) { if (strcmp($key, 'master') == 0) continue; if (!isset($i['ann'])) { rg_log('Conn ' . $key . ' has no announce.'); // TODO: close after some time? continue; } if (empty($i['ann']['env'])) { rg_log('Conn ' . $key . ' has no environments.'); continue; } if (($i['worker_uid'] > 0) && ($i['worker_uid'] != $job['uid'])) { //rg_log('uid does not match, try next'); continue; } $k = $i['worker_uid']; if (!isset($workers_list[$k])) { rg_log('Worker ' . $k . ' not found in workers_list! Strange!'); continue; } $wi = $workers_list[$k]; if (isset($job['avoid'][$k])) { rg_log('We must avoid worker ' . $k); continue; } rg_log('DEBUG: selected worker ' . $k); // If number of active jobs is == max workers, skip it if ($wi['workers'] <= $i['active_jobs']) { rg_log('DEBUG: workers=' . $wi['workers'] . ' active_jobs=' . $i['active_jobs']); continue; } foreach ($i['ann']['env'] as $env => $junk) { if (strcasecmp($job['env'], $env) != 0) { //rg_log('DEBUG job env [' . $job['env'] . ']' // . ' != worker [' . $env . ']'); continue; } // Send only what is really needed $job2 = array(); $job2['cmds'] = $job['cmds']; $job2['packages'] = $job['packages']; $job2['hook_id'] = $job['hook_id']; $job2['url'] = $job['url']; $job2['head'] = $job['head']; $job2['env'] = $job['env']; $job2['id'] = $job['id']; $cmd = 'BLD ' . rg_conn_prepare($job2) . "\n"; rg_conn_enq($key, $cmd); // TODO: get a confirmation? $job['worker'] = $key; $job['worker_started'] = 0; $job['worker_sent'] = time(); rg_log_ml('DEBUG: After sending BLD: job: ' . print_r($job, TRUE)); // TODO: after some time, if worker_started is still 0, // mark the 'worker' as '' to be able to go in other place // TODO: maybe the client must resync with server to // abort jobs already done on another host, to not // duplicate work $found = TRUE; return; } } if (!$found) rg_log('No worker found!'); } rg_prof_start("MAIN"); rg_log_set_file($rg_log_dir . "/builder.log"); rg_log_set_sid("000000"); // to spread the logs rg_log("Start (ver=$rocketgit_version)..."); rg_sql_app("rg-builder"); $db = rg_sql_open($rg_sql); if ($db === FALSE) { rg_internal_error("Cannot connect to database!"); exit(1); } if (rg_sql_struct_update_needed($db) !== 0) { rg_log('Update needed. Exit.'); exit(0); } if (rg_fixes_needed($db) !== 0) { rg_log('Fixes needed. Exit.'); exit(0); } // Prepare socket $socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if ($socket === FALSE) { rg_internal_error('Cannot create socket!'); exit(1); } socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1); $r = @socket_bind($socket, $rg_builder_bind, $rg_builder_port); if ($r === FALSE) { rg_internal_error("Cannot bind socket!"); exit(1); } $r = @socket_listen($socket, 128); if ($r === FALSE) { rg_internal_error("Cannot set queue length on socket!"); exit(1); } rg_conn_new('master', $socket); $rg_conns['master']['exit_on_close'] = 1; $rg_conns['master']['func_new'] = 'xnew'; $rg_conns['master']['func_new_arg'] = $db; $rg_conns['master']['is_master'] = 1; $jobs = array(); $workers = 0; $original_mtime = @filemtime(__FILE__); do { rg_log_buffer_clear(); // We do not want stale entries! rg_cache_core_destroy(); // Check our mtime so we can upgrade the software and this script // will restart. clearstatcache(); $mtime = @filemtime(__FILE__); if ($mtime != $original_mtime) { rg_log("mtime=$mtime, original_mtime=$original_mtime"); rg_log('File changed. Exiting...'); break; } if ($workers > 0) { $r = rg_builder_load_jobs($db); if ($r['ok'] != 1) { rg_log('Cannot load jobs from database! Sleeping 30s...'); sleep(30); continue; } foreach ($r['list'] as $jid => $job) { if (!isset($jobs[$jid])) { $job['worker'] = ''; $job['avoid'] = array(); // to avoid workers $jobs[$jid] = $job; } $r = rg_process_job($db, $jobs[$jid]); if ($r === FALSE) break; } if ($r === FALSE) break; } rg_log("Waiting for connections..."); rg_conn_wait(10); } while (1); @socket_close($socket); rg_log("Exiting..."); rg_prof_end("MAIN"); rg_prof_log(); ?>