<?php // This is called by cron, and is persistent. // It takes care of build jobs. // It can be run on the same machine as the webserver. error_reporting(E_ALL); set_time_limit(0); $_s = microtime(TRUE); require_once('/etc/rocketgit/config.php'); $INC = dirname(__FILE__) . '/../inc'; require_once($INC . '/init.inc.php'); require_once($INC . '/log.inc.php'); require_once($INC . '/sql.inc.php'); require_once($INC . '/struct.inc.php'); require_once($INC . '/events.inc.php'); require_once($INC . '/repo.inc.php'); require_once($INC . '/prof.inc.php'); require_once($INC . '/mr.inc.php'); require_once($INC . '/keys.inc.php'); require_once($INC . '/user.inc.php'); require_once($INC . '/bug.inc.php'); require_once($INC . '/fixes.inc.php'); require_once($INC . '/plan.inc.php'); require_once($INC . '/admin.inc.php'); require_once($INC . '/ver.php'); require_once($INC . '/builder.inc.php'); require_once($INC . '/conn.inc.php'); require_once($INC . '/workers.inc.php'); require_once($INC . '/mime.inc.php'); if ($rg_builder_port == 0) exit(0); function job_save($job) { global $state_dir; $f = $state_dir . '/job-' . $job['id'] . '.ser'; return rg_save($f, $job); } /* * Called when a connection closes */ function xdestroy($key) { global $workers; $workers--; } /* * Called when a new worker connects */ function xnew($key, $arg) { global $rg_conns; global $features; $s = &$rg_conns[$key]; $s['func_data'] = 'xdispatch'; $s['func_destroy'] = 'xdestroy'; $s['db'] = $arg; $s['auth'] = 0; $s['artifacts'] = array(); unset($s); $f = array( 'op' => 'FEATURES', 'features' => $features ); rg_conn_enq($key, json_encode($f) . "\n"); } /* * Returns the number of active jobs for a worker */ function worker_active_jobs($wid) { global $jobs; $ret = 0; foreach ($jobs as $jid => $info) if ($info['worker_id'] == $wid) $ret++; return $ret; } /* * Called for binary part of the upload artifact */ function xdispatch_artifact_upload_chunk($key, $data) { global $rg_conns; global $jobs; $s = &$rg_conns[$key]; $jid = $s['artifacts']['current_jid']; $aid = $s['artifacts']['current_aid']; $d = &$s['artifacts'][$jid][$aid]; $l = $key . ': ' . $jid . ': ' . $aid; //rg_log_enter($l . ': DEBUG: xdispatch_artifact_upload_chunk: buf_len=' // . $d['buf_len']); //rg_log(' data[' . strlen($data) . ']: ' . $data); // some other data may be present in $data $data = substr($data, 0, $d['buf_len']); $base = array( 'op' => 'artifact_upload_chunk', 'id' => $jid, 'aid' => $aid ); $used = 0; while (!empty($data)) { $r = @fwrite($d['fd'], $data); if ($r === FALSE) { $m = 'cannot write: ' . rg_php_err(); rg_log($l . ': error: ' . $m); $base['errstr'] = $m; rg_conn_enq($key, json_encode($base) . "\n"); rg_conn_shutdown($key, 2); break; } //rg_log($l . ': DEBUG: wrote: ' . $r); $data = substr($data, $r); $d['buf_len'] -= $r; $d['pos'] += $r; $used += $r; } if ($d['buf_len'] == 0) { //rg_log($l . ': DEBUG: buf_len is 0! unset xdispatch_custom'); unset($s['xdispatch_custom']); //rg_log($l . ': DEBUG: Sending confirmation with pos ' . $d['pos']); $base['next_pos'] = $d['pos']; rg_conn_enq($key, json_encode($base) . "\n"); } unset($d); unset($s); //rg_log_exit(); return $used; } /* * Dispatch a command from a worker (one json) */ function xdispatch_one($key, $data) { global $rg_conns; global $jobs; global $features; global $state_dir; global $workers; $now = time(); $s = &$rg_conns[$key]; //rg_log('DEBUG: data=' . $data); //rg_log_ml($key . ': DEBUG: xdispatch_one s: ' . print_r($s, TRUE)); while (1) { $u = @json_decode($data, TRUE); if ($u === NULL) { $m = 'cannot decode JSON: ' . json_last_error_msg(); $err = array('errstr' => $m); rg_log_ml($key . ': data=[' . $data . ']' . $m); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); break; } //rg_log_ml($key . ': DEBUG: u: ' . print_r($u, TRUE)); if (!isset($u['op'])) { $err = array('errstr' => 'op parameter is missing'); rg_log_ml($key . ': DEBUG: op parameter is missing; u: ' . print_r($u, TRUE)); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); break; } $op = $u['op']; unset($u['op']); if (strcmp($op, 'ANN') == 0) { if (($u['boot_time'] < $now - 30) || ($u['boot_time'] > $now + 30)) { $err = array( 'op' => $op, 'errstr' => 'boot_time is too old; time desync or replay attack?' ); rg_log($key . ': boot_time is too old; abort'); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); break; } // Lookup user first if (!isset($u['type'])) $u['type'] = 'global'; if (strcasecmp($u['type'], 'global') == 0) { $worker_uid = 0; } else if (!isset($u['user'])) { rg_log($key . ': user field is not present; abort'); $err = array('op' => $op, 'errstr' => 'user not defined in conf file'); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); break; } else { $w_ui = rg_user_info($s['db'], 0, $u['user'], ''); if ($w_ui['exists'] !== 1) { rg_log($key . ': invalid user; abort'); $err = array('op' => $op, 'errstr' => 'invalid user'); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); break; } $worker_uid = $w_ui['uid']; } // Check if worker is registered $wi = rg_worker_find_by_name($s['db'], $worker_uid, $u['name']); if ($wi === -1) { rg_log($key . ': cannot load worker info: ' . rg_worker_error() . '; abort'); $err = array('op' => $op, 'errstr' => 'internal error'); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); break; } if ($wi === 0) { rg_log($key . ': name [' . $u['name'] . '] not found; abort'); $err = array('op' >= $op, 'errstr' => 'builder name not found, add it' . ' in the web interface'); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); break; } $sign = hash_hmac('sha512', $u['boot_time'], $wi['key']); if (strcmp($sign, $u['sign']) != 0) { rg_log($key . ': signature is not ok [' . $sign . ']' . ' != [' . $u['sign'] . ']'); $err = array('op' => $op, 'errstr' => 'wrong signature'); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); break; } $s['worker_id'] = $wi['id']; $s['worker_name'] = $wi['name']; $s['worker_uid'] = $worker_uid; $s['ann'] = $u; $s['auth'] = 1; $a = array(); $a['name'] = $u['name']; $a['uname'] = $u['uname']; $a['host'] = $u['host']; $a['arch'] = $u['arch']; $a['env'] = empty($u['env']) ? array() : $u['env']; $a['ssh_key'] = $u['ssh_key']; $a['ip'] = rg_fix_ip($s['ip']); $r = rg_worker_update($s['db'], $worker_uid, $wi['id'], $a); if ($r !== TRUE) { rg_log($key . ': cannot update worker: ' . rg_worker_error()); $err = array('op' => $op, 'errstr' => rg_worker_info()); rg_conn_enq($key, json_encode($err) . "\n"); rg_conn_shutdown($key, 2); break; } rg_log($key . ': peer [' . $u['name'] . '] announce processed.'); $workers++; break; } if ($s['auth'] != 1) { rg_log($key . ': Client not authenticated!'); $err = array('op' => $op, 'errstr' => 'client not authenticated'); rg_conn_enq($key, json_encode($err) . "\n"); break; } if (strcmp($op, 'WORKER_STATS') == 0) { $_x = $u; $_ts = $_x['ts']; unset($_x['ts']); rg_worker_stats_insert($s['db'], $s['worker_id'], $_ts, $_x); break; } // The rest of the commands need a job id if (!isset($u['id'])) { $err = array('op' => $op, 'errstr' => 'protocol error: job id not found'); rg_conn_enq($key, @json_encode($err) . "\n"); rg_conn_shutdown($key, 2); break; } $jid = $u['id']; $job = &$jobs[$jid]; // security check if ($job['worker_id'] != $s['worker_id']) { $m = 'job not associated with worker'; rg_log($key . ': ' . $jid . ': error: ' . $m); rg_log($key . ': ' . $jid . ': job[worker_id]=' . $job['worker_id']); rg_log($key . ': ' . $jid . ': s[worker_id]=' . $s['worker_id']); $err = array('op' => 'abort_job', 'jid' => $jid, 'errstr' => $m); rg_conn_enq($key, json_encode($err) . "\n"); break; } if (strcmp($op, 'STA') == 0) { $job['worker_started'] = $now; job_save($job); rg_log($key . ': ' . $jid . ': worker started work on job'); break; } if (strcmp($op, 'DON') == 0) { rg_log_ml($key . ': ' . $jid . ': DEBUG: DON u:' . print_r($u, TRUE)); $send_confirmation = TRUE; if (!empty($u['error'])) { // TODO: we need to distinguish between fatal errors and transient errors rg_log($key . ': ' . $jid . ': job failed with error: ' . $u['error']); rg_log($key . ': ' . $jid . ': job failed with error2: ' . $u['error2']); // Delay job and retry (on another worker) $job['next_try'] = $now + 60; $k = $s['worker_id']; $job['avoid'][$k] = 1; $job['worker_id'] = 0; job_save($job); } else { $r = rg_builder_done($s['db'], $job, $u['status']); if ($r === TRUE) { @unlink($state_dir . '/job-' . $jid . '.ser'); unset($jobs[$jid]); unset($s['artifacts'][$jid]); } else { $send_confirmation = FALSE; // we expect the client to send it again } } if ($send_confirmation) { // Send DoneREceived - so client will delete the job $a = array('op' => 'DRE', 'id' => $jid); rg_log_ml($key . ': ' . $jid . ': DEBUG: Sending DRE: ' . print_r($a, TRUE)); rg_conn_enq($key, @json_encode($a) . "\n"); } break; } if (strcmp($op, 'artifact_upload_info') == 0) { rg_log_ml($key . ': ' . $jid . ': ' . $op . ': u: ' . print_r($u, TRUE)); if (!isset($job['artifacts'])) $job['artifacts'] = array(); $job['artifacts']['info'] = $u; $r = job_save($job); if ($r !== FALSE) { $a = array('op' => $op, 'id' => $jid); rg_conn_enq($key, @json_encode($a) . "\n"); } break; } // generic if (strncmp($op, 'artifact_upload_', 16) == 0) { $base = array('op' => $op, 'id' => $jid); if (!isset($u['aid'])) { $base['errstr'] = 'protocol error: aid not found'; rg_conn_enq($key, @json_encode($base) . "\n"); rg_conn_shutdown($key, 2); break; } $aid = $u['aid']; $base['aid'] = $aid; rg_log_ml($key . ': ' . $jid . ': ' . $aid . ': ' . $op . ': u: ' . print_r($u, TRUE)); // TODO: Move this where the job is prepared - where?! if (!isset($job['repo_path'])) $job['repo_path'] = rg_repo_path_by_id($job['request']['uid'], $job['repo_id']); if (!isset($job['artifacts_path'])) $job['artifacts_path'] = rg_repo_artifacts_path_by_id($job['request']['uid'], $job['repo_id']); if (!isset($s['artifacts'][$jid])) $s['artifacts'][$jid] = array(); if (isset($u['aid']) && !isset($s['artifacts'][$jid][$aid])) $s['artifacts'][$jid][$aid] = array('ready_to_write' => 0); } if (strcmp($op, 'artifact_upload_status') == 0) { $base['status'] = isset($job['artifacts'][$aid]['done']) ? $job['artifacts'][$aid]['done'] : 0; rg_log_ml($key . ': ' . $jid . ': ' . ': ' . $aid . ': ' . $op . ': sending status ' . $base['status']); rg_conn_enq($key, @json_encode($base) . "\n"); break; } if (strcmp($op, 'artifact_upload_start') == 0) { if ($s['artifacts'][$jid][$aid]['ready_to_write'] == 1) { @fclose($s['artifacts'][$jid][$aid]['fd']); $s['artifacts'][$jid][$aid]['ready_to_write'] = 0; } $job['artifacts'][$aid] = $u; $job['artifacts'][$aid]['tpath'] = $state_dir . '/ja-' . $jid . '-' . $aid; $s['artifacts'][$jid][$aid]['fd'] = @fopen($job['artifacts'][$aid]['tpath'], 'wb'); if ($s['artifacts'][$jid][$aid]['fd'] === FALSE) { $m = 'cannot open temp artifacts file: ' . rg_php_err(); rg_log($key . ': ' . $jid . ': error: ' . $m); break; } $s['artifacts'][$jid][$aid]['ready_to_write'] = 1; $s['artifacts']['current_jid'] = $jid; $s['artifacts']['current_aid'] = $aid; break; } if (strncmp($op, 'artifact_upload_', 16) == 0) { if (!isset($job['artifacts'][$aid])) { $m = 'aid not found'; rg_log($key . ': ' . $jid . ': error: ' . $m); $base['errstr'] = $m; rg_conn_enq($key, json_encode($base) . "\n"); rg_conn_shutdown($key, 2); break; } if ($s['artifacts'][$jid][$aid]['ready_to_write'] != 1) { $m = 'artifact chunk/done op before start'; rg_log($key . ': ' . $jid . ': error: ' . $m); $base['errstr'] = $m; rg_conn_enq($key, json_encode($base) . "\n"); rg_conn_shutdown($key, 2); // TODO: we should not close the connection so aggressively (everyehere) break; } } if (strcmp($op, 'artifact_upload_chunk') == 0) { //rg_log($key . ': ' . $jid . ': ' . $aid // . ': DEBUG: seeking to ' . $u['pos'] . '...'); $r = @fseek($s['artifacts'][$jid][$aid]['fd'], $u['pos'], SEEK_SET); if ($r === -1) { $m = 'cannot seek: ' . rg_php_err(); rg_log($key . ': ' . $jid . ': ' . $aid . ': error: ' . $m); $base['errstr'] = $m; rg_conn_enq($key, json_encode($base) . "\n"); rg_conn_shutdown($key, 2); break; } // prepare the conn manager to receive binary data $s['artifacts'][$jid][$aid]['buf_len'] = $u['buf_len']; $s['artifacts'][$jid][$aid]['pos'] = $u['pos']; $s['xdispatch_custom'] = 'xdispatch_artifact_upload_chunk'; break; } if (strcmp($op, 'artifact_upload_done') == 0) { $r = @fclose($s['artifacts'][$jid][$aid]['fd']); if ($r === FALSE) { $m = 'cannot close file: ' . rg_php_err(); rg_log($key . ': ' . $jid . ': ' . $aid . ': error: ' . $m); $base['errstr'] = $m; rg_conn_enq($key, json_encode($base) . "\n"); rg_conn_shutdown($key, 2); break; } $s['artifacts'][$jid][$aid]['ready_to_write'] = 0; $adir = $job['artifacts_path']; if (!is_dir($adir)) { $r = @mkdir($adir, 0770, TRUE); if ($r === FALSE) { $m = 'cannot create artifacts dir: ' . rg_php_err(); rg_log($key . ': ' . $jid . ': ' . $aid . ': error: ' . $m); $base['errstr'] = $m; rg_conn_enq($key, json_encode($base) . "\n"); rg_conn_shutdown($key, 2); } } rg_log_ml($key . ': ' . $jid . ': ' . $aid . ': DEBUG: adir=' . $adir); // Prepare the replace of special strings $a = array(); $b = array(); $list = array('env', 'head', 'refname', 'refname_short', 'hook_id', 'uid'); foreach ($list as $k) { if (!isset($job['request'][$k])) continue; $a[] = '@@' . $k . '@@'; $b[] = $job['request'][$k]; } // We do not want './' $f = $job['artifacts'][$aid]['file']; if (strncmp($f, './', 2) == 0) $f = substr($f, 2); $a[] = '@@original_path@@'; $b[] = $f; $r = strrpos($f, '.'); $no_ext = ($r !== FALSE) ? substr($f, 0, $r) : $f; $a[] = '@@original_path_no_ext@@'; $b[] = $no_ext; $meta0 = array( 'size' => $job['artifacts'][$aid]['size'], 'original_path' => $f, 'map_into_source' => array() ); $all_ok = TRUE; foreach ($job['artifacts'][$aid]['para'] as $para_i => $para) { $meta = $meta0; if (isset($para['content_type'])) $meta = $para['content_type']; if (isset($para['map_into_source'])) { foreach ($para['map_into_source'] as $path) { $path = str_replace($a, $b, $path); $meta['map_into_source'][] = $path; } } else { rg_log_ml('DEBUG: para does not have map_into_source: ' . print_r($para, TRUE)); } foreach ($para['map'] as $path) { rg_log_ml($key . ': ' . $jid . ': ' . $aid . ': DEBUG map ' . $job['artifacts'][$aid]['tpath'] . ' to ' . $path); $last = substr($path, -1); if (strcmp($last, '/') == 0) $path .= basename($f); $path = str_replace($a, $b, $path); if (strstr($path, '..')) { $m = 'path [' . $path . '] is trying to escape the artifacts dir; ignore it'; rg_log($key . ': ' . $jid . ': ' . $aid . ': error: ' . $m); continue; } $apath = $adir . '/' . $path; rg_log($key . ': ' . $jid . ': ' . $aid . ': DEBUG: apath=' . $apath); $d = dirname($apath); if (!is_dir($d)) { $r = @mkdir($d, 0770, TRUE); if ($r === FALSE) { $m = 'cannot create artifacts dir: ' . rg_php_err(); rg_log($key . ': ' . $jid . ': ' . $aid . ': error: ' . $m); $base['errstr'] = $m; rg_conn_enq($key, json_encode($base) . "\n"); rg_conn_shutdown($key, 2); $all_ok = FALSE; break; } } $r = @link($job['artifacts'][$aid]['tpath'], $apath . '.tmp'); if ($r === FALSE) { $m = 'cannot link file ' . $job['artifacts'][$aid]['tpath'] . ' to ' . $apath . '.tmp: ' . rg_php_err(); rg_log($key . ': ' . $jid . ': ' . $aid . ': error: ' . $m); $base['errstr'] = $m; rg_conn_enq($key, json_encode($base) . "\n"); rg_conn_shutdown($key, 2); $all_ok = FALSE; break; } $r = @rename($apath . '.tmp', $apath); if ($r === FALSE) { $m = 'cannot rename: ' . rg_php_err(); rg_log($key . ': ' . $jid . ': ' . $aid . ': error: ' . $m); $base['errstr'] = $m; rg_conn_enq($key, json_encode($base) . "\n"); rg_conn_shutdown($key, 2); $all_ok = FALSE; break; } $meta['upload_ts'] = time(); rg_log_ml($key . ': ' . $jid . ': ' . $aid . ': meta: ' . print_r($meta, TRUE)); $r = @file_put_contents($apath . '.rg.meta', rg_serialize($meta)); if ($r === FALSE) { $m = 'cannot save metadata: ' . rg_php_err(); rg_log($key . ': ' . $jid . ': ' . $aid . ': error: ' . $m); $base['errstr'] = $m; rg_conn_enq($key, @json_encode($base) . "\n"); rg_conn_shutdown($key, 2); $all_ok = FALSE; break; } } } if ($all_ok) { $job['artifacts'][$aid]['done'] = 1; @unlink($job['artifacts'][$aid]['tpath']); $base['ok'] = 1; rg_conn_enq($key, json_encode($base) . "\n"); } job_save($job); break; } rg_log_ml($key . ': Unknown operation [' . $op . ']! u: ' . print_r($u, TRUE)); $err = array('op' => $op, 'errstr' => 'unknown operation'); rg_conn_enq($key, json_encode($err) . "\n"); break; } unset($job); //rg_log_ml('DEBUG: s: ' . print_r($s, TRUE)); unset($s); } /* * Dispatch a command from a worker */ function xdispatch($key, $data) { global $rg_conns; $ret = 0; while (!empty($data)) { if (isset($rg_conns[$key]['xdispatch_custom'])) { $r = $rg_conns[$key]['xdispatch_custom']($key, $data); $data = substr($data, $r); $ret += $r; continue; } //rg_log('DEBUG: searching for \n in ' . substr($data, 0, 100) . '...'); $pos = strpos($data, "\n"); if ($pos === FALSE) return $ret; $one = substr($data, 0, $pos); xdispatch_one($key, $one); $data = substr($data, $pos + 1); $ret += $pos + 1; } return $ret; } function rg_process_job($db, &$job) { global $rg_conns; $jid = $job['id']; // Job is already in progress? if ($job['worker_id'] > 0) { //rg_log($jid . ': job is already assigned to' // . ' worker [' . $job['worker_name'] . ']' // . ' (id ' . $job['worker_id'] . '); skip it'); return; } // Should we delay because of a previous fail? if (isset($job['next_try']) && ($job['next_try'] < time())) { rg_log($jid . ': job is suspended till ' . date('Ym-m-d H:i:s', $job['next_try'])); return; } rg_log_ml($jid . ': processing job...'); if (!isset($job['request'])) $req = $job; else $req = $job['request']; // Get the worker list, so we can sort it $workers_list = rg_worker_list_all($db, $req['uid']); if ($workers_list === FALSE) { rg_log($jid . ': cannot load workers list: ' . rg_worker_error()); $job['next_try'] = time() + 60; return; } //rg_log_ml('DEBUG: workers list: ' . print_r($workers_list, TRUE)); // Trying to find a worker in the list of connections foreach ($rg_conns as $key => &$i) { if (strcmp($key, 'master') == 0) continue; if (!isset($i['ann'])) { //rg_log($key . ': ' . $jid . ': conn has no announce.'); // TODO: close after some time? continue; } if (empty($i['ann']['env'])) { //rg_log($key . ': ' . $jid . ': conn has no environments.'); continue; } if (($i['worker_uid'] > 0) && ($i['worker_uid'] != $req['uid'])) { rg_log($key . ': ' . $jid . ': uids do not match, try next'); continue; } $k = $i['worker_id']; $name = $i['worker_name']; if (!isset($workers_list[$k])) { rg_internal_error('Worker ' . $name . ' not found' . ' in workers_list! Strange!'); continue; } $wi = $workers_list[$k]; if (isset($job['avoid'][$k])) { rg_log($key . ': ' . $jid . ': we must avoid worker ' . $name); continue; } // If number of active jobs is == max workers, skip it if ($wi['workers']) { $aj = worker_active_jobs($i['worker_id']); if ($aj >= $wi['workers']) { rg_log($key . ': ' . $jid . ': DEBUG: skip worker ' . $name . ' because' . ' sent jobs(' . $aj . ')' . ' >= workers(' . $wi['workers'] . ')'); continue; } } foreach ($i['ann']['env'] as $env => $junk) { if (strcasecmp($req['env'], $env) != 0) { rg_log($key . ': ' . $jid . ': DEBUG: worker ' . $name . ': job env [' . $req['env'] . ']' . ' != worker [' . $env . ']'); continue; } // Send only what is really needed $job2 = array(); $job2['op'] = 'BLD'; $job2['id'] = $jid; $job2['cmds'] = $req['cmds']; $job2['secrets'] = $req['secrets']; $job2['packages'] = $req['packages']; $job2['hook_id'] = $req['hook_id']; $job2['url'] = $req['url']; $job2['head'] = $req['head']; $job2['env'] = $req['env']; rg_conn_enq($key, json_encode($job2) . "\n"); $job['worker_id'] = $i['worker_id']; $job['worker_name'] = $name; $job['worker_started'] = 0; $job['worker_sent'] = time(); //rg_log_ml('DEBUG: After sending BLD: job: ' . print_r($job, TRUE)); // TODO: after some time, if worker_started is still 0, // set the 'worker_id' to 0 to be able to go in other place // TODO: maybe the client must resync with server to // abort jobs already done on another host, to not // duplicate work rg_log($key . ': ' . $jid . ': job sent to worker [' . $name . ']'); return; } } unset($i); rg_log('No workers found!'); } rg_prof_start('MAIN'); rg_log_set_file($rg_log_dir . '/builder.log'); rg_log_set_sid('000000'); // to spread the logs rg_lock_or_exit('builder.lock'); rg_log('Start (ver=' . $rocketgit_version . ')...'); if (!is_dir($rg_state_dir . '/builder')) { $r = @mkdir($rg_state_dir . '/builder', 0770); if ($r !== TRUE) { rg_log('Cannot create state dir: ' . rg_php_err()); exit(1); } } $jobs = array(); $state_dir = $rg_state_dir . '/builder'; $saved_jobs = rg_load_files($state_dir, 'job-[0-9]*.ser', 'id'); rg_log_ml('Jobs loaded from dir: ' . print_r($saved_jobs, TRUE)); rg_sql_app('rg-builder'); $db = rg_sql_open($rg_sql); if ($db === FALSE) { rg_internal_error('Cannot connect to database!'); exit(1); } if (rg_struct_ok($db) === FALSE) exit(0); // Prepare socket $socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if ($socket === FALSE) { rg_internal_error('Cannot create socket!'); exit(1); } socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1); $r = @socket_bind($socket, $rg_builder_bind, $rg_builder_port); if ($r === FALSE) { rg_internal_error('Cannot bind socket!'); exit(1); } $r = @socket_listen($socket, 128); if ($r === FALSE) { rg_internal_error('Cannot set queue length on socket!'); exit(1); } rg_conn_new('master', $socket); $rg_conns['master']['exit_on_close'] = 1; $rg_conns['master']['func_new'] = 'xnew'; $rg_conns['master']['func_new_arg'] = $db; // What features the builder supports $features = array('allow_stats' => 1); $workers = 0; $original_mtime = @filemtime(__FILE__); do { // We do not want stale entries! rg_cache_core_destroy(); // Check our mtime so we can upgrade the software and this script // will restart. clearstatcache(); $mtime = @filemtime(__FILE__); if ($mtime != $original_mtime) { rg_log('mtime=' . $mtime . ', original_mtime=' . $original_mtime); rg_log('File changed. Exiting...'); break; } if ($workers > 0) { $r = rg_builder_load_jobs($db, 'done = 0', 'itime', ''); if ($r['ok'] != 1) { rg_log('Cannot load jobs from database!'); continue; } $_r = TRUE; foreach ($r['list'] as $jid => $job) { unset($job['status']); // not neeeded for builder if (isset($saved_jobs[$jid])) { $jobs[$jid] = $saved_jobs[$jid]; unset($saved_jobs[$jid]); } if (!isset($jobs[$jid])) { rg_log('New job from db: ' . $jid); $job['worker_id'] = 0; $job['avoid'] = array(); // list of workers to avoid $jobs[$jid] = $job; } $_r = rg_process_job($db, $jobs[$jid]); if ($_r === FALSE) break; } if ($_r === FALSE) break; // Clean old jobs foreach ($saved_jobs as $jid => $info) @unlink($state_dir . '/job-' . $jid . '.ser'); $saved_jobs = array(); } rg_conn_wait(1); } while (1); @socket_close($socket); rg_log('Exiting...'); rg_prof_end('MAIN'); rg_prof_log();