<?php // This is called by cron, and is persistent. // It takes care of build jobs. // It can be run on the same machine as the webserver. error_reporting(E_ALL); ini_set('track_errors', 'On'); set_time_limit(0); $_s = microtime(TRUE); require_once('/etc/rocketgit/config.php'); $INC = dirname(__FILE__) . '/../inc'; require_once($INC . '/init.inc.php'); require_once($INC . '/log.inc.php'); require_once($INC . '/sql.inc.php'); require_once($INC . '/struct.inc.php'); require_once($INC . '/events.inc.php'); require_once($INC . '/repo.inc.php'); require_once($INC . '/prof.inc.php'); require_once($INC . '/mr.inc.php'); require_once($INC . '/keys.inc.php'); require_once($INC . '/user.inc.php'); require_once($INC . '/bug.inc.php'); require_once($INC . '/fixes.inc.php'); require_once($INC . '/plan.inc.php'); require_once($INC . '/admin.inc.php'); require_once($INC . '/ver.php'); require_once($INC . '/builder.inc.php'); require_once($INC . '/conn.inc.php'); require_once($INC . '/workers.inc.php'); if ($rg_builder_port == 0) exit(0); /* * Called when a connection closes */ function xdestroy($key) { global $workers; $workers--; } /* * Called when a new worker connects */ function xnew($key, $arg) { global $rg_conns; global $workers; global $features; $s = &$rg_conns[$key]; $s['func_data'] = 'xdispatch'; $s['func_destroy'] = 'xdestroy'; $s['db'] = $arg; $s['auth'] = 0; $s['is_master'] = 0; $workers++; $f = array( 'op' => 'FEATURES', 'features' => $features ); rg_conn_enq($key, json_encode($f) . "\n"); } /* * Dispatch a command from a worker (one json) */ function xdispatch_one($key, $data) { global $rg_conns; global $jobs; global $features; rg_log($key . ': dispatching'); $s = &$rg_conns[$key]; $u = @json_decode($data, TRUE); if ($u === NULL) { $err = array( 'errstr' => 'cannot decode JSON:' . json_last_error_msg() ); rg_log_ml('Cannot decode JSON: ' . json_last_error_msg()); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); return; } //rg_log_ml('DEBUG: u: ' . print_r($u, TRUE)); if (strcmp($u['op'], 'ANN') == 0) { $now = time(); if (($u['boot_time'] < $now - 30) || ($u['boot_time'] > $now + 30)) { $err = array( 'errstr' => 'boot_time is too old; time desync or replay attack?' ); rg_log('boot_time is too old; abort'); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); return; } // Lookup user first if (!isset($u['type'])) $u['type'] = 'global'; if (strcasecmp($u['type'], 'global') == 0) { $worker_uid = 0; } else if (!isset($u['user'])) { rg_log('user field is not present; abort'); $err = array('errstr' => 'user not defined in conf file'); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); return; } else { $w_ui = rg_user_info($s['db'], 0, $u['user'], ''); if ($w_ui['exists'] !== 1) { rg_log('invalid user; abort'); $err = array('errstr' => 'invalid user'); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); return; } $worker_uid = $w_ui['uid']; } // Check if worker is registered $wi = rg_worker_find_by_name($s['db'], $worker_uid, $u['name']); if ($wi === -1) { rg_log('cannot load worker info: ' . rg_worker_error() . '; abort'); $err = array('errstr' => 'internal error'); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); return; } if ($wi === 0) { rg_log('name [' . $u['name'] . '] not found; abort'); $err = array('errstr' => 'builder name not found, add it' . ' in the web interface'); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); return; } $sign = hash_hmac('sha512', $u['boot_time'], $wi['key']); if (strcmp($sign, $u['sign']) != 0) { rg_log('signature is not ok [' . $sign . ']' . ' != [' . $u['sign'] . ']'); $err = array('errstr' => 'wrong signature'); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); return; } $s['worker_id'] = $wi['id']; $s['worker_name'] = $wi['name']; $s['worker_uid'] = $worker_uid; $s['ann'] = $u; $s['auth'] = 1; $s['active_jobs'] = array(); $a = array(); $a['name'] = $u['name']; $a['uname'] = $u['uname']; $a['host'] = $u['host']; $a['arch'] = $u['arch']; $a['env'] = empty($u['env']) ? array() : $u['env']; $a['ssh_key'] = $u['ssh_key']; $a['ip'] = rg_fix_ip($s['ip']); $r = rg_worker_update($s['db'], $worker_uid, $wi['id'], $a); if ($r !== TRUE) { rg_log('cannot update worker: ' . rg_worker_error()); $err = array('errstr' => rg_worker_info()); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); return; } rg_log($key . ': peer [' . $u['name'] . '] announce processed.'); return; } if ($s['auth'] != 1) { rg_log($key . ':Client not authenticated!'); $err = array('errstr' => 'client not authenticated'); rg_conn_enq($key, json_encode($err) . "\n"); return; } if (strcmp($u['op'], 'STA') == 0) { $jid = $u['id']; $jobs[$jid]['worker'] = $key; $jobs[$jid]['worker_name'] = $s['ann']['name']; $jobs[$jid]['worker_started'] = time(); if (!isset($s['active_jobs'][$jid])) $s['active_jobs'][$jid] = 1; rg_log('Job started: ' . $jid); return; } if (strcmp($u['op'], 'DON') == 0) { $jid = $u['id']; if (isset($s['active_jobs'][$jid])) unset($s['active_jobs'][$jid]); if (isset($u['error'])) { rg_log('job failed with error: ' . $u['error']); // Delay job and retry (on another worker) $jobs[$jid]['next_try'] = time() + 60; $k = $s['worker_id']; $jobs[$jid]['avoid'][$k] = 1; return; } $r = rg_builder_done($s['db'], $jobs[$jid], $u['status']); if ($r === TRUE) { unset($jobs[$jid]); // Send DoneREceived - so client will delete the job $a = array('op' => 'DRE', 'id' => $job['id']); rg_conn_enq($key, json_encode($a) . "\n"); } return; } if (strcmp($u['op'], 'WORKER_STATS') == 0) { $_x = $u; unset($_x['op']); $_ts = $_x['ts']; unset($_x['ts']); rg_worker_stats_insert($s['db'], $s['worker_id'], $_ts, $_x); return; } rg_log('Unknown command [' . $u['op'] . ']!'); $err = array('errstr' => 'unknown op'); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); } /* * Dispatch a command from a worker */ function xdispatch($key, $data) { $ret = 0; while (1) { $pos = strpos($data, "\n"); if ($pos === FALSE) return $ret; $one = substr($data, 0, $pos); xdispatch_one($key, $one); $data = substr($data, $pos + 1); $ret += $pos + 1; } return $ret; } function rg_process_job($db, &$job) { global $rg_conns; // Job is already in progress? if (!empty($job['worker'])) return; // Should we delay because of a previous fail? if (isset($job['next_try']) && ($job['next_try'] < time())) return; rg_log_ml('Processing job: ' . print_r($job, TRUE)); if (!isset($job['request'])) $req = $job; else $req = $job['request']; // Get the worker list, so we can sort it $workers_list = rg_worker_list_all($db, $req['uid']); if ($workers_list === FALSE) { rg_log('cannot load workers list: ' . rg_worker_error()); $job['next_try'] = time() + 60; return; } //rg_log_ml('DEBUG: workers list: ' . print_r($workers_list, TRUE)); // Trying to find a worker in the list of connections foreach ($rg_conns as $key => $i) { if (strcmp($key, 'master') == 0) continue; if (!isset($i['ann'])) { rg_log('Conn ' . $key . ' has no announce.'); // TODO: close after some time? continue; } if (empty($i['ann']['env'])) { rg_log('Conn ' . $key . ' has no environments.'); continue; } if (($i['worker_uid'] > 0) && ($i['worker_uid'] != $req['uid'])) { rg_log('uids do not match, try next'); continue; } $k = $i['worker_id']; $name = $i['worker_name']; if (!isset($workers_list[$k])) { rg_internal_error('Worker ' . $name . ' not found' . ' in workers_list! Strange!'); continue; } $wi = $workers_list[$k]; if (isset($job['avoid'][$k])) { rg_log('We must avoid worker ' . $name); continue; } // If number of active jobs is == max workers, skip it $aj = count($i['active_jobs']); if ($wi['workers'] && ($aj >= $wi['workers'])) { rg_log('DEBUG: skip worker ' . $name . ' because' . ' active_jobs(' . $aj . ')' . ' >= workers(' . $wi['workers'] . ')'); continue; } foreach ($i['ann']['env'] as $env => $junk) { if (strcasecmp($req['env'], $env) != 0) { rg_log('DEBUG: worker ' . $name . ': job env [' . $req['env'] . ']' . ' != worker [' . $env . ']'); continue; } // Send only what is really needed $job2 = array(); $job2['op'] = 'BLD'; $job2['cmds'] = $req['cmds']; $job2['packages'] = $req['packages']; $job2['hook_id'] = $req['hook_id']; $job2['url'] = $req['url']; $job2['head'] = $req['head']; $job2['env'] = $req['env']; $job2['id'] = $job['id']; rg_conn_enq($key, json_encode($job2) . "\n"); // TODO: get a confirmation? We get 'STA'. $job['worker'] = $key; $job['worker_started'] = 0; $job['worker_sent'] = time(); rg_log_ml('DEBUG: After sending BLD: job: ' . print_r($job, TRUE)); // TODO: after some time, if worker_started is still 0, // mark the 'worker' as '' to be able to go in other place // TODO: maybe the client must resync with server to // abort jobs already done on another host, to not // duplicate work return; } } rg_log('No workers found!'); } rg_prof_start('MAIN'); rg_log_set_file($rg_log_dir . '/builder.log'); rg_log_set_sid('000000'); // to spread the logs rg_lock_or_exit('builder.lock'); rg_log('Start (ver=' . $rocketgit_version . ')...'); rg_sql_app('rg-builder'); $db = rg_sql_open($rg_sql); if ($db === FALSE) { rg_internal_error('Cannot connect to database!'); exit(1); } if (rg_struct_ok($db) === FALSE) exit(0); // Prepare socket $socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if ($socket === FALSE) { rg_internal_error('Cannot create socket!'); exit(1); } socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1); $r = @socket_bind($socket, $rg_builder_bind, $rg_builder_port); if ($r === FALSE) { rg_internal_error('Cannot bind socket!'); exit(1); } $r = @socket_listen($socket, 128); if ($r === FALSE) { rg_internal_error('Cannot set queue length on socket!'); exit(1); } rg_conn_new('master', $socket); $rg_conns['master']['exit_on_close'] = 1; $rg_conns['master']['func_new'] = 'xnew'; $rg_conns['master']['func_new_arg'] = $db; $rg_conns['master']['is_master'] = 1; $jobs = array(); // What features the builder supports $features = array('allow_stats' => 1); $workers = 0; $original_mtime = @filemtime(__FILE__); do { // We do not want stale entries! rg_cache_core_destroy(); // Check our mtime so we can upgrade the software and this script // will restart. clearstatcache(); $mtime = @filemtime(__FILE__); if ($mtime != $original_mtime) { rg_log('mtime=' . $mtime . ', original_mtime=' . $original_mtime); rg_log('File changed. Exiting...'); break; } if ($workers > 0) { $r = rg_builder_load_jobs($db, 'done = 0'); if ($r['ok'] != 1) { rg_log('Cannot load jobs from database! Sleeping 30s...'); sleep(30); continue; } $_r = TRUE; foreach ($r['list'] as $jid => $job) { if (!isset($jobs[$jid])) { $job['worker'] = ''; $job['avoid'] = array(); // to avoid some workers $jobs[$jid] = $job; } $_r = rg_process_job($db, $jobs[$jid]); if ($_r === FALSE) break; } if ($_r === FALSE) break; } rg_conn_wait(10); } while (1); @socket_close($socket); rg_log('Exiting...'); rg_prof_end('MAIN'); rg_prof_log(); ?>