<?php // Client for continuous integration and deployment // It can run on the same machine as the web server. error_reporting(E_ALL); ini_set('track_errors', 'On'); set_time_limit(0); define('RG_JOB_INIT', 1); define('RG_JOB_HELPER_STARTED', 2); define('RG_JOB_STARTED', 3); define('RG_JOB_ERROR', 4); define('RG_JOB_DONE', 10); define('RG_JOB_ARTIFACTS', 20); define('RG_JOB_FINISH', 30); define('RG_JOB_COLLECT_INFO', 30); $_s = microtime(TRUE); $INC = dirname(__FILE__) . "/../inc"; require_once($INC . "/init.inc.php"); require_once($INC . "/log.inc.php"); require_once($INC . "/prof.inc.php"); require_once($INC . "/builder.inc.php"); require_once($INC . "/conn.inc.php"); rg_prof_start('MAIN'); if (!isset($_SERVER['argv'][1])) { $id = 'main'; } else { $id = $_SERVER['argv'][1]; } if (!isset($_SERVER['argv'][2])) { $conf_file = '/etc/rocketgit/worker.conf'; } else { $conf_file = $_SERVER['argv'][2]; } rg_log_set_file($rg_log_dir . '/worker-' . $id . '.log'); rg_log_set_sid("000000"); // to spread the logs function job_state($state) { switch ($state) { case RG_JOB_INIT: return 'init'; case RG_JOB_HELPER_STARTED: return 'helper_started'; case RG_JOB_STARTED: return 'started'; case RG_JOB_ERROR: return 'error'; case RG_JOB_DONE: return 'done'; case RG_JOB_ARTIFACTS: return 'artifacts'; default: return "unknown"; } } /* * Load configuration file */ function load_config_file($file) { global $conf; $s = @stat($file); if ($s === FALSE) { rg_log('Cannot stat conf file ' . $file . ': ' . rg_php_err()); exit(1); } if (($s['mode'] & 07) != 0) { rg_log('Error! Others can access the conf file [' . $file . '] and read the key' . ' (mode=' . base_convert($s['mode'], 10, 8) . ')!'); exit(1); } rg_log('Loading configuration from ' . $file); $_conf = @file($file); if ($_conf === FALSE) { rg_log('Error! Cannot open the conf file ' . $file . ': ' . rg_php_err()); exit(1); } $last_key = FALSE; foreach ($_conf as $line) { $tline = trim($line); if (empty($tline)) continue; if (strncmp($tline, '#', 1) == 0) continue; $t = explode('=', $line, 2); if (count($t) != 2) { rg_log('Invalid line [' . $line . ']!'); continue; } $var = trim($t[0]); $value = trim($t[1]); if (strcmp($var, 'env') == 0) { $conf['env'][$value] = array('paras' => ''); $last_parent = &$conf['env'][$value]; } else if ((strncmp($line, " ", 1) == 0) || (strncmp($line, "\t", 1) == 0)) { if ($last_parent === FALSE) { rg_log('Invalid line [' . $line . ']!'); continue; } $t = explode('=', $line, 2); if (count($t) != 2) { rg_log('Invalid line [' . $line . ']!'); continue; } $var = trim($t[0]); $value = trim($t[1]); $last_parent[$var] = $value; } else if (strcmp($var, 'include') == 0) { if (strncmp($value, '/', 1) != 0) $value = dirname($file) . '/' . $value; load_config_file($value); } else { $conf[$var] = $value; $last_parent = FALSE; } } } /* * Load configuration file */ function load_config($file) { global $conf; $conf = array('env' => array()); load_config_file($file); if (!isset($conf['master'])) { rg_log('master line not present in the conf file!'); sleep(60); exit(1); } if (!strstr($conf['master'], '://')) { $conf['master_proto'] = 'tcp'; $conf['master_host'] = $conf['master']; $conf['master_port'] = isset($conf['port']) ? $conf['port'] : 65000; $conf['master_url'] = ''; } else { $_t = explode('://', $conf['master']); $conf['master_proto'] = trim($_t[0]); $_t = explode('/', $_t[1]); // _t[1]: host[:port][/url] $_x = explode(':', $_t[0]); // _t[0]: host[:port] $conf['master_host'] = $_x[0]; $conf['master_port'] = isset($_x[1]) ? $_x[1] : 443; $conf['master_url'] = isset($_t[1]) ? $_t[1] : ''; } unset($conf['master']); unset($conf['port']); // Create state dir if (!is_dir($conf['state'])) { $r = @mkdir($conf['state'], 0771); if ($r !== TRUE) { rg_log('Cannot create state dir: ' . rg_php_err()); sleep(60); exit(1); } } // allow libvirt user to access some stuff $r = @chmod($conf['state'], 0771); if ($r !== TRUE) { rg_log('Cannot chmod state dir [' . $conf['state'] . ']: ' . rg_php_err()); sleep(60); exit(1); } if (!file_exists($conf['state'] . '/key.pub')) { rg_log('Creating SSH key...'); $cmd = 'ssh-keygen -t rsa -b 4096 -N \'\'' . ' -C \'Key to connect to builder\'' . ' -f ' . escapeshellarg($conf['state'] . '/key'); $r = rg_exec($cmd, '', FALSE, FALSE, FALSE); if ($r['ok'] != 1) { rg_log('Cannot create key: ' . $r['errmsg'] . '!'); sleep(60); exit(1); } } $conf['ssh_key'] = @file_get_contents($conf['state'] . '/key.pub'); if ($conf['ssh_key'] === FALSE) { rg_log('Cannot load key!'); sleep(1); exit(0); } $conf['ssh_key'] = trim($conf['ssh_key']); if (!isset($conf['templates'])) { $conf['templates'] = '/var/lib/libvirt/images/rgw/templates'; rg_log('Warn: \'templates\' configuration line is missing' . '; I will assume to be ' . $conf['templates']); } if (!isset($conf['images'])) { $conf['images'] = '/var/lib/libvirt/images/rgw/images'; rg_log('Warn: \'images\' configuration line is missing' . '; I will assume the individual section specify full path'); } // Validate the path to the templates and images foreach ($conf['env'] as $name => &$i) { if (!isset($i['image'])) { // TODO: send this error to rocketgit to be sent by e-mail? rg_log('Warning! Environment ' . $name . ' is missing \'image\' declaration' . '; I will disable it.'); unset($conf['env'][$name]); continue; } if (strncmp($i['image'], '/', 1) != 0) $i['image'] = $conf['images'] . '/' . $i['image']; if (!file_exists($i['image'])) { rg_log('Warning! Environment ' . $name . ' is missing ' . $i['image'] . ' file; I will disable it.'); unset($conf['env'][$name]); continue; } if (!isset($i['arch'])) { rg_log('Warning! Environment ' . $name . ' is missing \'arch\' declaration' . '; I will assume to be x86_64.'); $i['arch'] = 'x86_64'; } $f = $conf['templates'] . '/' . $i['arch'] . '.xml'; if (!file_exists($f)) { rg_log('Warning! Environment ' . $name . ' is missing ' . $i['arch'] . ' file (' . $f . '); I will disable it.'); unset($conf['env'][$name]); continue; } if (!isset($i['pkg_cmd'])) { $x = explode('-', $name, 3); $x = $x[0]; $y = isset($x[1]) ? $x[1] : ''; if ((strcasecmp($x, 'debian') == 0) || (strcasecmp($x, 'ubuntu') == 0)) { $i['pkg_cmd'] = 'apt --assume-yes --ignore-hold --allow-downgrades --allow-remove-essential --allow-change-held-packages -o Dpkg::Options::=--force-confnew install'; } else if (strcasecmp($x, 'fedora') == 0) { $i['pkg_cmd'] = 'dnf -y install'; } else if (strcasecmp($x, 'centos') == 0) { if ($y > 7) $i['pkg_cmd'] = 'dnf -y install'; else $i['pkg_cmd'] = 'yum -y install'; } else { rg_log('Error! I do not know how to install packages on ' . $name); unset($conf['env'][$name]); continue; } } } unset($i); if (empty($conf['env'])) { rg_log('Fatal! No environments found!'); sleep(60); exit(1); } if (!isset($conf['net'])) { rg_log('\'net\' was not specified, so I will disable network' . ' access for the build user'); $conf['net'] = 0; } if (!isset($conf['libvirtd_user'])) $conf['libvirtd_user'] = 'qemu'; if (!isset($conf['libvirtd_group'])) $conf['libvirtd_group'] = 'qemu'; rg_log_ml('conf: ' . print_r($conf, TRUE)); } /* * Save a job */ function save_job(&$job) { global $conf; $ret = array('ok' => 0); while (1) { if (!isset($job['dirty']) || $job['dirty'] == 0) { $ret['ok'] = 1; break; } rg_log($job['id'] . ': DEBUG: saving job...'); $f = $conf['state'] . '/job-' . $job['id'] . '.ser'; $r = rg_save($f, $job); if ($r === FALSE) { $ret['errstr'] = 'cannot store job: ' . rg_util_error(); break; } $job['dirty'] = 0; $ret['ok'] = 1; break; } return $ret; } /* * Set correct libvirt rights on a file */ function rg_worker_libvirt_rights($f, $mode, $user, $group, &$reason, &$reason2) { $ret = FALSE; while (1) { // We need to allow libvirt access to the image $r = @chown($f, $user); if ($r !== TRUE) { $reason = 'cannot chown image to qemu user'; $reason2 = rg_php_err(); break; } $r = @chgrp($f, $group); if ($r !== TRUE) { $reason = 'cannot chgrp image to qemu user'; $reason2 = rg_php_err(); break; } $r = @chmod($f, $mode); if ($r !== TRUE) { $reason = 'cannot chmod image'; $reason2 = rg_php_err(); break; } $r = rg_exec('/usr/sbin/restorecon ' . escapeshellarg($f), '', FALSE, FALSE, FALSE); if ($r['ok'] !== 1) { rg_log('restorecon failed, trying to continue (' . $r['errmsg'] . ': ' . $r['stderr'] . ')'); } $ret = TRUE; break; } return $ret; } /* * Starts an worker */ function start_worker($job) { global $rg_log_dir; global $conf; $jid = $job['id']; rg_log_set_file($rg_log_dir . '/worker-' . $conf['id'] . '-' . $jid . '.log'); rg_log('DEBUG: start_worker: job: ' . rg_array2string($job)); $env = $conf['env'][$job['env']]; rg_log_ml('DEBUG: start_worker: env: ' . print_r($env, TRUE)); $emain = escapeshellarg($job['main']); $name = 'rg-worker-' . $conf['id'] . '-' . $jid; $ename = escapeshellarg($name); $master = escapeshellarg($env['image']); $img = $job['main'] . '/image.qcow2'; // TODO: let admin control the image type $eimg = escapeshellarg($img); $img2 = $job['main'] . '/image2.raw'; $eimg2 = escapeshellarg($img2); $do_umount = FALSE; $err = TRUE; $reason = ''; $reason2 = ''; while (1) { rg_exec('umount ' . $emain . '/root', '', FALSE, FALSE, FALSE); $r = rg_del_tree($job['main']); if ($r === FALSE) { $reason = 'cannot delete main dir'; $reason2 = rg_util_error(); break; } $r = @mkdir($job['main'], 0770); if ($r === FALSE) { $reason = 'cannot create main dir'; $reason2 = 'cannot create main dir (' . $job['main'] . '):' . ' (' . rg_php_err() . ')'; break; } // We need to allow libvirt access to the image stored inside $r = rg_worker_libvirt_rights($job['main'], 0770, $conf['libvirtd_user'], $conf['libvirtd_group'], $reason, $reason2); if ($r !== TRUE) break; rg_exec('virsh destroy ' . $ename, '', FALSE, FALSE, FALSE); rg_exec('virsh undefine --nvram ' . $ename, '', FALSE, FALSE, FALSE); $r = rg_exec('qemu-img create -o lazy_refcounts=on,cluster_size=256K' . ' -b ' . $master . ' -f qcow2 ' . $eimg, '', FALSE, FALSE, FALSE); if ($r['ok'] !== 1) { $reason = 'cannot create VM image'; $reason2 = $r['errmsg'] . ': ' . $r['stderr']; break; } $r = rg_worker_libvirt_rights($img, 0660, $conf['libvirtd_user'], $conf['libvirtd_group'], $reason, $reason2); if ($r !== TRUE) break; $r = rg_exec('qemu-img create -f raw ' . $eimg2 . ' ' . escapeshellarg($job['disk_size_gib'] . 'G'), '', FALSE, FALSE, FALSE); if ($r['ok'] !== 1) { $reason = 'cannot create VM image2'; $reason2 = $r['errmsg'] . ': ' . $r['stderr']; break; } $r = rg_worker_libvirt_rights($img2, 0660, $conf['libvirtd_user'], $conf['libvirtd_group'], $reason, $reason2); if ($r !== TRUE) break; // Seems that mkfs is not in PATH when we are running from cron. $path = getenv('PATH'); putenv('PATH=' . $path . ':/usr/sbin'); // TODO: let user choose fs type? $r = rg_exec('mkfs.ext4 -L RG ' . $eimg2, '', FALSE, FALSE, FALSE); if ($r['ok'] !== 1) { $reason = 'cannot create fs'; $reason2 = $r['errmsg'] . ': ' . $r['stderr']; break; } $r = @mkdir($job['main'] . '/root', 0700); if ($r === FALSE) { $reason = 'cannot create root dir'; $reason2 = 'cannot create root dir (' . rg_php_err() . ')'; break; } $r = rg_exec('mount ' . $eimg2 . ' ' . $emain . '/root', '', FALSE, FALSE, FALSE); if ($r['ok'] !== 1) { $reason = 'cannot mount fs'; $reason2 = $r['errmsg'] . ': ' . $r['stderr']; break; } $do_umount = TRUE; $r = @file_put_contents($job['main'] . '/root/junk', ''); if ($r === FALSE) { $reason = 'internal error'; $reason2 = 'cannot create junk file: ' . rg_php_err(); break; } // Clone repo $_env = 'GIT_SSH_COMMAND=ssh' . ' -o PasswordAuthentication=no' . ' -o ControlMaster=no' . ' -o IdentitiesOnly=yes' . ' -o IdentityFile=' . escapeshellarg($conf['state'] . '/key'); putenv($_env); $_s = time(); $cmd = 'git clone' . ' --recurse-submodules' //TODO . ' --shallow-submodules' . ' --no-checkout' . ' ' . escapeshellarg($job['url']) . ' ' . $emain . '/root/git'; $r = rg_exec($cmd, '', FALSE, FALSE, FALSE); @file_put_contents($job['main'] . '/root/T_clone', time() - $_s); if ($r['ok'] !== 1) { $reason = 'git clone error; contact admin'; $reason2 = $r['errmsg'] . ': ' . $r['stderr']; break; } // Creating build.sh file // TODO: document how a user can add labels in configure or make $s = '#!/bin/bash' . "\n"; $s .= 'date +%s > /mnt/status/build.sh.start' . "\n\n"; $s .= 'export RG_LABELS=/mnt/status/RG_LABELS' . "\n\n"; $s .= "\n"; // build.sh: secrets $s .= '# Secrets' . "\n"; foreach ($job['secrets'] as $i => $info) { if (empty($info['name'])) continue; $s .= 'export ' . escapeshellarg($info['name']) . '=' . escapeshellarg($info['value']) . "\n"; } $s .= "\n"; // build.sh: checkout $s .= 'cd /mnt/git' . "\n\n"; $s .= 'git branch -f rgw ' . escapeshellarg($job['head']) . " &>/mnt/status/git.branch.log\n\n"; $s .= 'git checkout rgw' . " &>/mnt/status/git.checkout.log\n\n"; foreach ($job['cmds'] as $_name => $i) { if (empty($i['cmd'])) continue; $prefix = '/mnt/status/' . escapeshellarg($_name); if (empty($i['label_ok'])) $lok = 'echo -n' . "\n"; else $lok = ' echo ' . escapeshellarg($i['label_ok']) . ' >>/mnt/status/RG_LABELS' . "\n"; if (empty($i['label_nok'])) $lnok = 'echo -n' . "\n"; else $lnok = ' echo ' . escapeshellarg($i['label_nok']) . ' >>/mnt/status/RG_LABELS' . "\n"; $s .= 'date +%s > ' . $prefix . '.start' . "\n" . 'echo "Executing [' . $i['cmd'] . ']..."' . "\n" . '(' . $i['cmd'] . ') &>' . $prefix . '.log' . "\n" . 'E=${?}' . "\n" . 'echo ${E} > ' . $prefix . ".status\n" . 'date +%s > ' . $prefix . '.done' . "\n" . 'if [ "${E}" != "0" ]; then' . "\n" . $lnok . ($i['abort'] ? ' exit 0' . "\n" : '') . 'else' . "\n" . $lok . 'fi' . "\n\n"; } //rg_log_ml('DEBUG: build.sh: ' . $s); $r = @file_put_contents($job['main'] . '/root/build.sh', $s); if ($r === FALSE) { $reason = 'cannot store build commands'; $reason2 = 'cannot store build commands (' . rg_php_err() . ')'; break; } $r = @chmod($job['main'] . '/root/build.sh', 0755); if ($r === FALSE) { $reason = 'cannot chmod build.sh'; $reason2 = 'cannot chmod build.sh (' . rg_php_err() . ')'; break; } // Prepare packages - for now, we must list every package // on a single line to avoid error if one is not available $p_i_cmd = ''; if (!empty($job['packages'])) { rg_log('DEBUG: packages: ' . $job['packages'] . '.'); $pkgs = explode(' ', $job['packages']); $p_i_cmd .= '> /mnt/packages.log' . "\n"; // Debian/Ubuntu stuff $p_i_cmd .= 'export DEBIAN_FRONTEND=noninteractive' . "\n"; $p_i_cmd .= 'export APT_LISTCHANGES_FRONTEND=none' . "\n"; foreach ($pkgs as $p) { $p_i_cmd .= $env['pkg_cmd'] . ' ' . escapeshellarg($p) . ' >> /mnt/packages.log 2>&1' . "\n"; } } // Store commands in rg.sh (executed at boot) // TODO: we should not wait for net forever $r = @file_put_contents($job['main'] . '/root/rg.sh', '#!/bin/bash' . "\n" . 'mkdir /mnt/tmp && chmod 1777 /mnt/tmp' . "\n" . 'mount --bind /mnt/tmp /tmp' . "\n" . 'mkdir -p /mnt/var/tmp && chmod 1777 /mnt/var/tmp' . "\n" . 'mount --bind /mnt/var/tmp /var/tmp' . "\n" . "\n" . 'mkdir /mnt/status' . "\n" . 'chown -R build:build /mnt/git /mnt/status' . "\n" . 'echo "PATH=${PATH}"' . "\n" . 'ERR=""' . "\n" . 'id' . "\n" . 'date +%s > /mnt/T_START' . "\n" . '# Waiting for net...' . "\n" . 'while [ "`ip ro li | grep ^default`" = "" ]; do' . "\n" . ' sleep 1' . "\n" . 'done' . "\n" . 'date +%s > /mnt/T_NET_OK' . "\n\n" . "\n" . $p_i_cmd . 'date +%s > /mnt/T_PKGS_OK' . "\n\n" . "\n" . '# Disabling further module loading.' . "\n" . 'echo 1 > /proc/sys/kernel/modules_disabled' . "\n" . "\n" . '# Restricting dmesg' . "\n" . 'sysctl kernel.dmesg_restrict=1' . "\n" . "\n" . '# Disabling root login' . "\n" . 'chage -E 0 root' . "\n" . 'if [ "${?}" != "0" ]; then' . "\n" . ' ERR="cannot disable root account"' . "\n" . 'fi' . "\n" . "\n" . '# Disable network access if needed' . "\n" . 'if [ "' . $conf['net'] . '" = "1" ]; then' . "\n" . ' echo -n # we allow network access' . "\n" . 'else' . "\n" . ' iptables -I OUTPUT -m owner --uid-owner build -j REJECT' . "\n" . ' if [ "${?}" != "0" ]; then' . "\n" . ' ERR="Cannot disable network access"' . "\n" . ' fi' . "\n" . 'fi' . "\n" . "\n" . 'if [ "${ERR}" = "" ]; then' . "\n" . ' su - build -c "bash /mnt/build.sh" &>/mnt/status/build.log' . "\n" . ' sync' . "\n" . 'else' . "\n" . ' echo "${ERR}" > /mnt/status/err' . "\n" . 'fi' . "\n" . 'date +%s > /mnt/T_DONE' . "\n\n" . 'shutdown -h now' ); if ($r === FALSE) { $reason = 'cannot store commands'; $reason2 = 'cannot store commands (' . rg_php_err() . ')'; break; } $r = @chmod($job['main'] . '/root/rg.sh', 0700); if ($r === FALSE) { $reason = 'cannot to chmod on rg.sh'; $reason2 = 'cannot to chmod on rg.sh (' . rg_php_err() . ')'; break; } $r = rg_exec('umount ' . $emain . '/root', '', FALSE, FALSE, FALSE); if ($r['ok'] !== 1) { $reason = 'cannot umount fs'; $reason2 = $r['errmsg'] . ': ' . $r['stderr']; break; } $do_umount = FALSE; $_f = $conf['templates'] . '/' . $env['arch'] . '.xml'; $template = @file_get_contents($_f); if ($template === FALSE) { $reason = 'cannot load template'; $reason2 = 'cannot load template from ' . $_f . ': ' . rg_php_err(); break; } $template = str_replace('@@name@@', $name, $template); $template = str_replace('@@mem@@', $job['mem_mib'], $template); $template = str_replace('@@cpus@@', $job['cpus'], $template); $template = str_replace('@@disk0@@', $img, $template); $template = str_replace('@@disk0_type@@', 'qcow2', $template); $template = str_replace('@@disk1@@', $img2, $template); $template = str_replace('@@disk1_type@@', 'raw', $template); // TODO: allow firewall specification $template = str_replace('@@net0@@', '<interface type=\'network\'><source network=\'default\'/><model type=\'virtio\'/></interface>', $template); // TODO: take care of XML injection? $template = str_replace('@@chan1_path@@', $job['main'] . '/x.chan', $template); $_xml = $job['main'] . '/machine.xml'; $r = @file_put_contents($_xml, $template); if ($r === FALSE) { $reason = 'cannot store template file'; $reason2 = 'cannot store template in ' . $_xml . ': ' . rg_php_err(); break; } $r = rg_exec('virsh create ' . escapeshellarg($_xml), '', FALSE, FALSE, FALSE); if ($r['ok'] !== 1) { $reason = 'cannot define VM'; $reason2 = $r['errmsg'] . ': ' . $r['stderr']; break; } $r = @file_put_contents($job['main'] . '/vm.start', time()); $err = FALSE; break; } if ($do_umount) rg_exec('umount ' . $emain . '/root', '', FALSE, FALSE, FALSE); // Any error above must retrigger the build on other worker if ($err) { rg_log('error: ' . $reason); rg_log('error2: ' . $reason2); @file_put_contents($job['main'] . '/error.log', $reason); @file_put_contents($job['main'] . '/error2.log', $reason2); } else { @file_put_contents($job['main'] . '/helper-ok.log', ''); } rg_log('Done'); } /* * Handle received commands (one JSON) */ function xhandle_one($key, $data) { global $rg_log_dir; global $jobs; global $conf; global $pid_to_jid; global $features; $u = @json_decode($data, TRUE); if ($u === NULL) { rg_log($key . ': JSON: ' . $data); rg_log_ml($key . ': cannot decode JSON: ' . json_last_error_msg()); $err = array('errstr' => 'cannot decode json'); rg_conn_enq('master', json_encode($err) . "\n"); rg_conn_destroy($key); return; } $jid = isset($u['id']) ? $u['id'] : 0; if (isset($u['op'])) { $op = $u['op']; unset($u['op']); } else { $op = 'unknown'; } if (strcmp($op, 'FEATURES') == 0) { // what master suports rg_log_ml($key . ': FEATURES command: ' . print_r($u, TRUE)); $features = $u['features']; return; } // From here, we require a jid if ($jid == 0) { $err = array('errstr' => 'job id not specified'); rg_conn_enq('master', json_encode($err) . "\n"); rg_conn_destroy($key); return; } if (strcmp($op, 'BLD') == 0) { rg_log($key . ': ' . $jid . ': build job: ' . rg_array2string($u)); if (isset($jobs[$jid])) { // TODO: this should not happen, right? rg_log($key . ': ' . $jid . ': job already in queue!'); return; } $jobs[$jid] = $u; $jobs[$jid]['received'] = time(); $jobs[$jid]['done_last_sent'] = 0; $jobs[$jid]['main'] = $conf['state'] . '/rocketgit-j-' . $jid; // TODO: add bellow configuration to the web form if (!isset($jobs[$jid]['disk_size_gib'])) $jobs[$jid]['disk_size_gib'] = '20'; if (!isset($jobs[$jid]['mem_mib'])) $jobs[$jid]['mem_mib'] = '500'; if (!isset($jobs[$jid]['cpus'])) $jobs[$jid]['cpus'] = '1'; $jobs[$jid]['state'] = RG_JOB_INIT; $jobs[$jid]['dirty'] = 1; return; } if (strcmp($op, 'DRE') == 0) { // DRE = done received if (!isset($jobs[$jid])) { // We already received a DRE message return; } rg_log_ml($key . ': ' . $jid . ': DEBUG: DRE u: ' . print_r($u, TRUE)); // So, we can clean up everything related to this job // TODO: do we clear the state file? // TODO: clear artifacts stuff? $_job = $jobs[$jid]; if (isset($_job['pid'])) unset($pid_to_jid[$_job['pid']]); unset($jobs[$jid]); @unlink($conf['state'] . '/job-' . $jid . '.ser'); return; } // From now on we requre jobs[$jid] to be present if (!isset($jobs[$jid])) { rg_log($key . ': ' . $jid . ': got command for an unknown job!'); return; } $job = &$jobs[$jid]; $aid = isset($u['aid']) ? $u['aid'] : 0; if (strcmp($op, 'artifact_upload_info') == 0) { rg_log_ml($key . ': ' . $jid . ': UPLOAD INFO u: ' . print_r($u, TRUE)); $job['info_confirmed'] = 1; } else if (strcmp($op, 'artifact_upload_chunk') == 0) { rg_log_ml($key . ': ' . $jid . ': UPLOAD CHUNK u: ' . print_r($u, TRUE)); $job['artifacts_state'][$aid]['last_confirmed_pos'] = $u['next_pos']; $job['dirty'] = 1; } else if (strcmp($op, 'artifact_upload_done') == 0) { rg_log_ml($key . ': ' . $jid . ': UPLOAD DONE u: ' . print_r($u, TRUE)); $job['artifacts_state'][$aid]['done'] = time(); $job['dirty'] = 1; } else if (strcmp($op, 'artifact_upload_status') == 0) { rg_log_ml($key . ': ' . $jid . ': ' . $aid . ': UPLOAD STATUS u: ' . print_r($u, TRUE)); if ($u['status'] == 1) { rg_log_ml($key . ': ' . $jid . ': ' . $aid . ': server has the aid'); $job['artifacts_state'][$aid]['done'] = time(); } else { rg_log_ml($key . ': ' . $jid . ': ' . $aid . ': restarting aid upload'); unset($job['artifacts_state'][$aid]['local_done']); if (isset($job['artifacts_state'][$aid]['last_confirmed_pos'])) { rg_log_ml($key . ': ' . $jid . ': ' . $aid . ': we have a confimed position: ' . $job['artifacts_state'][$aid]['last_confirmed_pos']); $job['artifacts_state'][$aid]['pos'] = $job['artifacts_state'][$aid]['last_confirmed_pos']; } else { $job['artifacts_state'][$aid]['pos'] = 0; } } $job['artifacts_state'][$aid]['wait_status'] = 0; } else { rg_log_ml($key . ': cannot handle op: ' . $u['op'] . ': ' . print_r($u, TRUE)); } unset($jobs); } /* * Handle received commands */ function xhandle($key, $data) { $ret = 0; while (1) { $pos = strpos($data, "\n"); if ($pos === FALSE) return $ret; $one = substr($data, 0, $pos); xhandle_one($key, $one); $data = substr($data, $pos + 1); $ret += $pos + 1; } return $ret; } /* * Artifacts parser */ function rg_job_artifacts_parse($root, $a) { rg_log('DEBUG: artifacts text: ' . $a); $ret = array('files' => array(), 'errors' => array(), 'total_size' => 0); while (1) { $e = explode("\n", $a); $f = FALSE; $line_no = 0; $aid = 0; $next_aid = 0; $list = array(); foreach ($e as $line) { $line_no++; $line = rtrim($line); $line = str_replace("\r", '', $line); if (empty($line)) continue; if (strncmp($line, '#', 1) == 0) continue; rg_log('DEBUG: line=' . $line); if ((strncmp($line, ' ', 1) == 0) || (strncmp($line, "\t", 1) == 0)) { rg_log('DEBUG: we have a sub path option'); // we have an option for the previous declared file if ($next_aid == 0) { $ret['errors'][] = 'line ' . $line_no . ': we have an option without a file' . ' defined first; ignore it'; continue; } $line = trim($line); $x = explode('=', $line, 2); if (!isset($x[1])) { $ret['errors'][] = 'line ' . $line_no . ': no = found'; continue; } $k = trim($x[0]); $para_i = $list[$aid]['para_i']; if (!isset($list[$aid]['para'][$para_i])) $list[$aid]['para'][$para_i] = array(); $make_array = FALSE; if (strcmp($k, 'map') == 0) $make_array = TRUE; if (strcmp($k, 'map_into_source') == 0) $make_array = TRUE; if ($make_array) { if (!isset($list[$aid]['para'][$para_i][$k])) $list[$aid]['para'][$para_i][$k] = array(); $list[$aid]['para'][$para_i][$k][] = trim($x[1]); } else { $list[$aid]['para'][$para_i][$k] = trim($x[1]); } continue; } rg_log('DEBUG: We have a new path'); $last = substr($line, -1); if (strcmp($last, '/') != 0) { // Search, maybe the file is sent again with different parameters $found = FALSE; foreach ($list as $_aid => $_i) { if (strcmp($_i['path'], $line) != 0) continue; rg_log('DEBUG: file ' . $_aid . ' is sent again'); $aid = $_aid; $list[$_aid]['para_i']++; $found = TRUE; break; } if ($found) continue; } // we have a file defined $next_aid++; $list[$next_aid] = array( 'path' => $line, 'para' => array(), 'para_i' => 0 ); $aid = $next_aid; } // Now, it is time to filter the list of files, if needed $i = 0; foreach ($list as $aid => $per_aid) { unset($per_aid['para_i']); if (empty($per_aid['para'])) { rg_log('DEBUG: empty para, ignore entry!'); unset($list[$aid]); continue; } $last = substr($per_aid['path'], -1); if (strcmp($last, '/') != 0) { // It is a plain file $r = rg_path_validate($root, $per_aid['path']); if ($r === FALSE) { rg_log('wrong path: ' . rg_util_error()); unset($list[$aid]); continue; } $per_aid['size'] = $r['stat']['size']; $ret['total_size'] += $per_aid['size']; $per_aid['realpath'] = $r['realpath']; $ret['files'][$i++] = $per_aid; continue; } rg_log('DEBUG: we have a dir to match! path=' . $per_aid['path']); $d = rg_dir_load($root . '/' . $per_aid['path']); if ($d === FALSE) { rg_log('DEBUG: cannot load dir: ' . rg_util_error()); continue; } foreach ($per_aid['para'] as $para_i => $per_para) { foreach ($per_para['map'] as $imap => $map) { $last = substr($map, -1); if (strcmp($last, '/') != 0) { rg_log('DEBUG: map does not end in /. Bad! Ignore it!'); unset($list[$aid]['para'][$para_i]['map'][$imap]); continue; } } if (empty($list[$aid]['para'][$para_i]['map'])) { rg_log('DEBUG: map is empty! Ignore whole para!'); unset($list[$aid]['para'][$para_i]); continue; } if (!isset($per_para['regex'])) $per_para['regex'] = '.*'; foreach ($d as $f) { if (preg_match('/' . $per_para['regex'] . '/uD', $f) !== 1) { rg_log('DEBUG: f=' . $f . ' does not match pattern ' . $per_para['regex']); continue; } $r = @stat($root . '/' . $per_aid['path'] . $f); if ($r === FALSE) { rg_log('cannot stat file! Strange! Ignore it!'); continue; } $x = $per_aid; unset($x['para'][$para_i]['regex']); $x['path'] = $per_aid['path'] . $f; $x['realpath'] = $root . $x['path']; $x['size'] = $r['size']; $ret['total_size'] += $r['size']; $ret['files'][$i++] = $x; rg_log('DEBUG: f=' . $f . ' matches ' . $per_para['regex']); } } } // TODO: remove entries without map lines? // TODO: For now, yes. In the future we may want to // declare files to be used in the repositories. break; } rg_log_ml('artifacts parsed: ' . print_r($ret, TRUE)); return $ret; } /* * Upload artifacts * Returns TRUE when everything is done. */ function rg_job_upload_artifacts(&$job) { global $conf; global $rg_log_dir; global $last_reconnect; rg_log('rg_job_upload_artifacts'); $f = $job['main'] . '/root/git/rocketgit/artifacts'; if (!file_exists($f)) { rg_log('DEBUG: No artifacts file - no work to do.'); // TODO: Should we update the builder with the new state? Yes! $job['state'] = RG_JOB_FINISH; $job['dirty'] = 1; return TRUE; } $jid = $job['id']; $root = $job['main'] . '/root/git/'; $root_len = strlen($root); if (!isset($job['artifacts'])) { $a = @file_get_contents($f); if ($a === FALSE) { rg_log($jid . ': Cannot load artifacts file: ' . rg_php_err()); return FALSE; } $job['artifacts'] = rg_job_artifacts_parse($root, $a); $job['dirty'] = 1; } while(1) { if (isset($job['info_confirmed'])) break; if (isset($job['last_reconnect']) && ($job['last_reconnect'] == $last_reconnect)) break; $j = array( 'op' => 'artifact_upload_info', 'id' => $job['id'], 'total_size' => $job['artifacts']['total_size'] ); rg_conn_enq('master', @json_encode($j) . "\n"); $job['last_reconnect'] = $last_reconnect; } $all_done = TRUE; foreach ($job['artifacts']['files'] as $aid => $fi) { $path = $fi['path']; if (!isset($job['artifacts_state'])) $job['artifacts_state'] = array(); if (!isset($job['artifacts_state'][$aid])) $job['artifacts_state'][$aid] = array( 'start' => time(), 'last_reconnect' => $last_reconnect, 'wait_status' => 0, 'in_progress' => 0, 'pos' => 0); // It was confirmed by the server? if (isset($job['artifacts_state'][$aid]['done'])) { rg_log($jid . ':' . $aid . ': DEBUG: server confirmed aid - skip'); continue; } rg_log($jid . ':' . $aid . ': DEBUG: not confirmed by the server; set all_done to FALSE'); $all_done = FALSE; if (!isset($job['artifacts_state'][$aid]['last_reconnect']) || ($job['artifacts_state'][$aid]['last_reconnect'] != $last_reconnect)) { rg_log($jid . ':' . $aid . ': DEBUG: local_done is set; reconnected; requesting status from server'); $j = array( 'op' => 'artifact_upload_status', 'id' => $job['id'], 'aid' => $aid ); rg_conn_enq('master', @json_encode($j) . "\n"); $job['artifacts_state'][$aid]['in_progress'] = 0; $job['artifacts_state'][$aid]['last_reconnect'] = $last_reconnect; $job['artifacts_state'][$aid]['wait_status'] = 1; continue; } if ($job['artifacts_state'][$aid]['wait_status'] == 1) continue; // we sent it but we do not have yet the confirmation if (isset($job['artifacts_state'][$aid]['local_done'])) continue; rg_log($jid . ':' . $aid . ': DEBUG: Sending path [' . $path . ']...'); $job['dirty'] = 1; // for sure we will change things if ($job['artifacts_state'][$aid]['in_progress'] == 0) { rg_log($jid . ':' . $aid . ': DEBUG: Preparing the uploading of file [' . $path . ']...'); $x = array(); $x['op'] = 'artifact_upload_start'; $x['id'] = $job['id']; $x['aid'] = $aid; $x['file'] = $path; $x['para'] = $fi['para']; $x['size'] = $fi['size']; $j = @json_encode($x); if ($j === NULL) { rg_internal_error('cannot encode json: ' . json_last_error_msg()); continue; } rg_conn_enq('master', $j . "\n"); $job['artifacts_state'][$aid]['in_progress'] = 1; rg_log($jid . ':' . $aid . ': DEBUG: aid is in progress now'); } // TODO: optimize to not continuously open the file. // TODO: But, be aware that the fd cannot be saved in the state file! $fd = @fopen($fi['realpath'], 'rb'); if ($fd === FALSE) { rg_log($jid . ':' . $aid . ': DEBUG: cannot open file: ' . rg_php_err()); continue; } //rg_log($jid . ':' . $aid . ': DEBUG: seeking to pos' // . $job['artifacts_state'][$aid]['pos'] . '...'); $r = @fseek($fd, $job['artifacts_state'][$aid]['pos'], SEEK_SET); if ($r === -1) { rg_log($jid . ':' . $aid . ': DEBUG: Cannot seek: ' . rg_php_err()); @fclose($fd); continue; } $buf = @fread($fd, 65536); if ($buf === FALSE) { rg_log($jid . ':' . $aid . ': DEBUG: cannot fread: ' . rg_php_err() . '; ignore the file'); @fclose($fd); continue; } if (empty($buf)) { rg_log($jid . ':' . $aid . ': DEBUG: EOF! Will set local_done to 1'); @fclose($fd); $j = array( 'op' => 'artifact_upload_done', 'id' => $job['id'], 'aid' => $aid ); rg_conn_enq('master', json_encode($j) . "\n"); $job['artifacts_state'][$aid]['local_done'] = 1; continue; } $buf_len = strlen($buf); $j = array( 'op' => 'artifact_upload_chunk', 'id' => $job['id'], 'aid' => $aid, 'pos' => $job['artifacts_state'][$aid]['pos'], 'buf_len' => $buf_len ); //rg_log($jid . ':' . $aid . ': DEBUG: Sending: ' . $j['buf']); rg_conn_enq('master', @json_encode($j) . "\n" . $buf); $job['artifacts_state'][$aid]['pos'] += $buf_len; @fclose($fd); break; // Give a chance to other jobs } rg_log($jid . ': DEBUG: all_done=' . ($all_done ? 'TRUE' : 'FALSE')); if (!$all_done) return FALSE; // TODO: we have a variable in $job and a file! We should not have both! TODO rg_log('Done uploading artifacts! Switching to state FINISH.'); $job['state'] = RG_JOB_FINISH; $job['dirty'] = 1; return TRUE; } /* * Extracts info from the virtual disk * TODO: if something fails, we may keep the file mounted! * Returns FALSE if something went wrong. * TODO: we need to limit the time spent in an error state. */ function rg_job_extract_info(&$job) { global $conf; rg_log_ml('DEBUG: extract_info: job: ' . print_r($job, TRUE)); $jid = $job['id']; $emain = escapeshellarg($job['main']); $ret = FALSE; while (1) { $job['error'] = ''; $job['error2'] = ''; $job['dirty'] = 1; if (!is_dir($job['main'])) { $job['error'] = 'internal error (main dir not found)'; $m = 'Main dir [' . $job['main'] . '] not present;' . ' probably disk space problems'; rg_log($m); $job['error2'] = $m; break; } $r = @file_get_contents($job['main'] . '/error.log'); if ($r !== FALSE) { if (!empty($r)) rg_log('error set from file to [' . $r . ']'); $job['error'] = $r; break; } $r = @file_get_contents($job['main'] . '/error2.log'); if ($r !== FALSE) { if (!empty($r)) rg_log('error2 set from file to [' . $r . ']'); $job['error2'] = $r; break; } // Extract how much disk space was used // TODO: Warn the user when the disk space is close to the limit? // TODO: or alloc a lot of space by default? $r = @stat($job['main'] . '/image2.raw'); if ($r === FALSE) { $m = 'missing image2 file'; rg_log($m); $job['error'] = $m; break; } $job['status']['disk_used_mib'] = intval($r['blocks'] / 2 / 1024); // TODO - remove this $cmd = 'ln -f ' . $emain . '/image2.raw ' . $emain . '/..'; $r = rg_exec($cmd, '', FALSE, FALSE, FALSE); // TODO - remove this $cmd = 'ln -f ' . $emain . '/machine.xml ' . $emain . '/..'; $r = rg_exec($cmd, '', FALSE, FALSE, FALSE); if (!file_exists($job['main'] . '/root/junk')) { $cmd = 'mount ' . $emain . '/image2.raw ' . $emain . '/root'; $r = rg_exec($cmd, '', FALSE, FALSE, FALSE); if ($r['ok'] != 1) { $m = 'could not mount image: ' . $r['errmsg']; rg_log($m); $job['error'] = $m; break; } } $r = @file_get_contents($job['main'] . '/root/status/err'); if ($r !== FALSE) $job['error'] = $r; $labels = @file($job['main'] . '/root/status/RG_LABELS'); if ($labels === FALSE) $labels = array(); foreach ($labels as $index => $l) $labels[$index] = trim($l); // Add worker name as label $labels[] = 'worker/' . $conf['name'] . '/color=fff'; $clone_elap = @file_get_contents($job['main'] . '/root/T_clone'); if ($clone_elap === FALSE) $clone_elap = 'n/a'; $build_sh_start = @file_get_contents($job['main'] . '/root/status/build.sh.start'); if ($build_sh_start === FALSE) $build_sh_start = 'n/a'; else $build_sh_start = intval($build_sh_start); $vm_start = @file_get_contents($job['main'] . '/vm.start'); if ($vm_start === FALSE) $vm_start = 0; else $vm_start = intval($vm_start); $job['status'] = array( 'vm_start' => $vm_start, 'build_sh_start' => $build_sh_start, 'packages' => @trim(file_get_contents($job['main'] . '/root/packages.log')), 'clone_elap' => $clone_elap, 'start' => @trim(file_get_contents($job['main'] . '/root/T_START')), 'net_ok' => @trim(file_get_contents($job['main'] . '/root/T_NET_OK')), 'pkgs_ok' => @trim(file_get_contents($job['main'] . '/root/T_PKGS_OK')), 'done' => @trim(file_get_contents($job['main'] . '/root/T_DONE')), 'labels' => $labels ); $job['status']['cmds'] = array(); foreach ($job['cmds'] as $cmd => $i) { if (empty($i['cmd'])) continue; $sd = $job['main'] . '/root/status/' . $cmd; $job['status']['cmds'][$cmd] = array( 'cmd' => $i['cmd'], 'start' => trim(@file_get_contents($sd . '.start')), 'done' => trim(@file_get_contents($sd . '.done')), 'status' => trim(@file_get_contents($sd . '.status')), 'log' => trim(rg_file_get_tail($sd . '.log', 4 * 4096)) ); } unset($job['cmds']); unset($job['url']); unset($job['head']); unset($job['env']); rg_log('DEBUG: state set to JOB_ARTIFACTS'); $job['state'] = RG_JOB_ARTIFACTS; break; } if (!empty($job['error'])) { rg_log('DEBUG: state set to JOB_ERROR (' . $job['error'] . ')'); $job['state'] = RG_JOB_ERROR; return; } } /* * Extract blk/net/cpu/mem info from a VM */ function vm_extract_info($name) { $ret = FALSE; while (1) { $cmd = 'virsh domstats --raw ' . escapeshellarg($name); $r = rg_exec($cmd, '', FALSE, FALSE, FALSE); if ($r['ok'] != 1) { rg_log('Could not get dom stats: ' . $r['errmsg']); break; } //rg_log_ml('DEBUG: domstats: ' . print_r($r['data'], TRUE)); $data = array(); $t = explode("\n", $r['data']); foreach ($t as $line) { $line = trim($line); $x = explode('=', $line, 2); if (!isset($x[1])) continue; $data[$x[0]] = $x[1]; } $ret = array(); $ret['rx_bytes'] = $data['net.0.rx.bytes']; $ret['rx_pkts'] = $data['net.0.rx.pkts']; $ret['tx_bytes'] = $data['net.0.tx.bytes']; $ret['tx_pkts'] = $data['net.0.tx.pkts']; $ret['block_read_ops'] = $data['block.1.rd.reqs']; $ret['block_read_bytes'] = $data['block.1.rd.bytes']; $ret['block_write_ops'] = $data['block.1.wr.reqs']; $ret['block_write_bytes'] = $data['block.1.wr.bytes']; $ret['block_physical_bytes'] = $data['block.1.physical']; $ret['block_allocation_bytes'] = $data['block.1.allocation']; $ret['cpu_time_ns'] = $data['cpu.time']; $ret['ballon_current_mib'] = intval($data['balloon.current'] / 1024); $ret['ballon_rss_mib'] = intval($data['balloon.rss'] / 1024); break; } return $ret; } /* * Send stats about the worker */ function send_worker_stats() { static $last_time = 0; global $features; global $stats; if (!isset($features['allow_stats'])) return; $load = rg_load(); $now = time(); if ($last_time > $now - 30) return; $a = array( 'op' => 'WORKER_STATS', 'ts' => $now, 'load' => $load, 'jobs' => $stats['jobs'] ); rg_conn_enq('master', json_encode($a) . "\n"); $last_time = $now; } function rg_worker_connect() { global $conf; global $rg_conns; global $jobs; global $last_reconnect; rg_log('Connecting to ' . $conf['master_host'] . '/' . $conf['master_port'] . ' with proto ' . $conf['master_proto'] . ' with url [' . $conf['master_url'] . ']' . '...'); if (strcmp($conf['master_proto'], 'tcp') == 0) { $socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if ($socket === FALSE) { rg_log('Cannot create socket: ' . rg_php_err()); exit(1); } $r = @socket_connect($socket, $conf['master_host'], $conf['master_port']); if ($r === FALSE) { rg_log('Cannot connect: ' . rg_php_err()); exit(1); } } else if (strcmp($conf['master_proto'], 'proxy_tls') == 0) { $context = stream_context_create(); // TODO: make timeout configurable $socket = @stream_socket_client('tls://' . $conf['master_host'] . ':' . $conf['master_port'], $errno, $errstr, 30, STREAM_CLIENT_CONNECT, $context); if ($socket === FALSE) { rg_log('Cannot connect: ' . $errstr); exit(1); } $s = "GET " . $conf['maser_url'] . " HTTP/1.1\r\n" . "Host: " . $conf['master_host'] . ':' . $conf['master_port'] . "\r\n" . "Connection: keep-alive, Upgrade\r\n" . "Pragma: no-cache\r\n" . "Cache-Control: no-cache\r\n" . "Upgrade: websocket\r\n" . "\r\n"; $r = @fwrite($socket, $s); if ($r === FALSE) { rg_log('Cannot write HTTP request: ' . rg_php_err()); exit(1); } $buf = ''; while (1) { $r = @fread($socket, 4096); if ($r === FALSE) { rg_log('Cannot read HTTP answer: ' . rg_php_err()); exit(1); } $buf .= $r; rg_log_ml('Answer: ' . $buf); if (strstr($buf, "\r\n\r\n") || strstr($buf, "\n\n")) break; } } else { rg_log('Invalid master protocol: ' . $conf['master_proto'] . '!'); exit(1); } rg_log('Connected.'); rg_conn_new('master', $socket); $rg_conns['master']['exit_on_close'] = 1; $rg_conns['master']['func_data'] = 'xhandle'; // announce ourselves $ann = $conf; unset($ann['key']); $ann['op'] = 'ANN'; $ann['uname'] = php_uname('a'); $ann['host'] = php_uname('n'); $ann['arch'] = php_uname('m'); $ann['boot_time'] = time(); $ann['sign'] = hash_hmac('sha512', $ann['boot_time'], $conf['key']); $j_ann = @json_encode($ann); if ($j_ann === FALSE) { rg_log('Cannot encode json: ' . json_last_error_msg()); exit(1); } rg_conn_enq('master', $j_ann . "\n"); $last_reconnect = microtime(TRUE); } umask(0007); load_config($conf_file); rg_log('id is [' . $id . ']'); $conf['id'] = $id; // What master supports $features = array(); // stats $stats = array('jobs' => 0); $jobs = rg_load_files($conf['state'], 'job-[0-9]*.ser', 'id'); if (!empty($jobs)) rg_log_ml('Jobs loaded from dir: ' . print_r($jobs, TRUE)); rg_worker_connect(); $pid_to_jid = array(); while(1) { rg_conn_wait(1); send_worker_stats(); // Verify if the jobs are really started while (1) { $pid = pcntl_waitpid(-1, $status, WNOHANG); if ($pid === 0) break; if ($pid == -1) break; if (!isset($pid_to_jid[$pid])) { rg_internal_error('pid not in list; strange'); break; } $jid = $pid_to_jid[$pid]; //rg_log('Pid ' . $pid . ' exited (job ' . $jid . ')' // . ' with status ' . $status . '!'); unset($pid_to_jid[$pid]); rg_log($jid . ': Switch to state STARTED'); $jobs[$jid]['state'] = RG_JOB_STARTED; $jobs[$jid]['dirty'] = 1; $stats['jobs']++; } // Verify if VMs finished //if (!empty($jobs)) // rg_log_ml('DEBUG: jobs: ' . print_r($jobs, TRUE)); $vms_loaded = FALSE; foreach ($jobs as $jid => &$job) { //rg_log($jid . ': DEBUG: state \'' . job_state($job['state']) . '\''); $vms = rg_builder_vm_list(); if ($vms === FALSE) break; //rg_log_ml('vms: ' . print_r($vms, TRUE)); $name = 'rg-worker-' . $conf['id'] . '-' . $jid; while ($job['state'] == RG_JOB_INIT) { $k = array_search($name, $vms); if ($k !== FALSE) { rg_log($jid . ': Switch to state STARTED'); $job['state'] = RG_JOB_STARTED; $job['dirty'] = 1; save_job($job); break; } $pid = pcntl_fork(); if ($pid == -1) { rg_log($jid . ': Cannot fork!'); break; } if ($pid == 0) { // child start_worker($job); exit(0); } rg_log($jid . ': Started worker with pid ' . $pid); $job['state'] = RG_JOB_HELPER_STARTED; $job['start'] = time(); $job['dirty'] = 1; save_job($job); $pid_to_jid[$pid] = $jid; $a = array('id' => $jid); $a['op'] = 'STA'; rg_conn_enq('master', json_encode($a) . "\n"); break; } // TODO: it may be possible that the VM did not start! if ($job['state'] == RG_JOB_HELPER_STARTED) { // We need to wait for the VM to start $k = array_search($name, $vms); if ($k !== FALSE) { rg_log($jid . ': VM found, switch to state STARTED'); $job['state'] = RG_JOB_STARTED; } // TODO: how much to wait in this state? } if ($job['state'] == RG_JOB_STARTED) { $k = array_search($name, $vms); if ($k !== FALSE) { //rg_log('VM in progress'); // TODO: if too much time, abort (kill // worker and destroy virtual machine) //TODO: $job['error'] = 'too much time'; // TODO: Signal from inside VM that we finished and extracts stats at that time // TODO: what if we cannot extract info?! Wouldn't we lost stats?!! $r = vm_extract_info($name); if ($r !== FALSE) $job['stats'] = $r; // TODO: timeout must be controlled by user if ($job['start'] + 6 * 3600 < time()) { $cmd = 'virsh destroy rocketgit-j-' . $jid; $r = rg_exec($cmd, '', FALSE, FALSE, FALSE); if ($r['ok'] != 1) { // If error, probably the machine was not running, so, this is just a warning // TODO: But we set an error! $job['error'] = 'Could not destroy: ' . $r['errmsg']; rg_log('Error: ' . $job['error']); } } continue; } rg_log($jid . ': VM finished; switch to state COLLECT_INFO'); $job['state'] = RG_JOB_COLLECT_INFO; $job['dirty'] = 1; save_job($job); } if ($job['state'] == RG_JOB_COLLECT_INFO) { rg_job_extract_info($job); save_job($job); } if ($job['state'] == RG_JOB_ARTIFACTS) { rg_job_upload_artifacts($job); save_job($job); } if ($job['state'] == RG_JOB_FINISH) { $emain = escapeshellarg($job['main']); $cmd = 'umount ' . $emain . '/root'; $r = rg_exec($cmd, '', FALSE, FALSE, FALSE); if ($r['ok'] != 1) rg_internal_error('Cannot unmount [' . $job['main'] . '/root]: ' . $r['errmsg'] . '!'); $r = rg_del_tree($job['main']); if ($r === FALSE) rg_log($jid . ': cannot delete tree: ' . rg_util_error()); rg_log($jid . ': DEBUG: state set to JOB_DONE'); $job['state'] = RG_JOB_DONE; $job['dirty'] = 1; save_job($job); //rg_log_ml($jid . ': DEBUG: job_extract_info: job: ' . print_r($job, TRUE)); } //rg_log($jid . ': DEBUG: state \'' . job_state($job['state']) . '\''); if (($job['state'] != RG_JOB_ERROR) && ($job['state'] != RG_JOB_DONE)) continue; // Avoid flooding server if (!isset($job['done_last_sent'])) $job['done_last_sent'] = 0; if ($job['done_last_sent'] + 30 > time()) continue; rg_log($jid . ': DEBUG: Sending DON command...'); $xjob = $job; $xjob['op'] = 'DON'; unset($xjob['debug']); unset($xjob['packages']); unset($xjob['main']); unset($xjob['secrets']); unset($xjob['artifacts']); unset($xjob['last_reconnect']); unset($xjob['info_confirmed']); unset($xjob['dirty']); $j_xjob = @json_encode($xjob); if ($j_xjob !== FALSE) { // TODO: this is fragile. Better to iterate all jobs to compute stats. if (!isset($job['stats_done'])) { $stats['jobs']--; $job['stats_done'] = 1; } rg_conn_enq('master', $j_xjob . "\n"); $job['done_last_sent'] = time(); continue; } rg_log($jid . ': Warn: Cannot encode json: ' . json_last_error_msg()); } unset($job); } rg_prof_end('MAIN'); rg_prof_log();