xaizek / rocketgit (License: AGPLv3+) (since 2018-12-09)
Light and fast Git hosting solution suitable to serve both as a hub or as a personal code storage with its tickets, pull requests, API and much more.
<root> / scripts / builder.php (250ef0823e7972d0d4287c4677870f925c220127) (6,641B) (mode 100644) [raw]
<?php
// This is called by cron, and is persistent.
// It takes care of build jobs.
error_reporting(E_ALL);
ini_set("track_errors", "On");
set_time_limit(0);

$_s = microtime(TRUE);

require_once("/etc/rocketgit/config.php");

$INC = dirname(__FILE__) . "/../inc";
require_once($INC . "/init.inc.php");
require_once($INC . "/log.inc.php");
require_once($INC . "/sql.inc.php");
require_once($INC . "/struct.inc.php");
require_once($INC . "/events.inc.php");
require_once($INC . "/repo.inc.php");
require_once($INC . "/prof.inc.php");
require_once($INC . "/mr.inc.php");
require_once($INC . "/keys.inc.php");
require_once($INC . "/user.inc.php");
require_once($INC . "/bug.inc.php");
require_once($INC . "/fixes.inc.php");
require_once($INC . "/plan.inc.php");
require_once($INC . "/admin.inc.php");
require_once($INC . "/ver.php");
require_once($INC . "/builder.inc.php");
require_once($INC . "/conn.inc.php");

/*
 * Called when a new client connects
 */
function xnew($key, $arg)
{
	global $rg_conns;

	$s = &$rg_conns[$key];
	$s['func_cmd'] = 'xdispatch';
	$s['db'] = $arg;
	$s['auth'] = 0;
}

function xdispatch($key, $line)
{
	global $rg_conns;
	global $jobs;
	global $rg_builder_key;

	rg_log('Dispatch[' . $key . ']');

	$s = &$rg_conns[$key];

	$cmd = substr($line, 0, 4);
	$d = trim(substr($line, 4));
	$x = stripcslashes($d);
	$u = @unserialize($x);
	if ($u === FALSE) {
		rg_conn_destroy($key);
		return;
	}

	rg_log_ml('cmd=' . $cmd . ' u: ' . print_r($u, TRUE));

	if (strcmp($cmd, 'ANN ') == 0) {
		$now = time();
		if ($u['boot_time'] < $now - 10) {
			rg_log('boot_time is too old; abort');
			rg_conn_destroy($key);
			return;
		}
		$sign = hash_hmac('sha512', $u['boot_time'], $rg_builder_key);
		if (strcmp($sign, $u['sign']) != 0) {
			rg_log('signature is not ok [' . $sign . ']'
				. ' != [' . $u['sign'] . ']');
			rg_conn_destroy($key);
			return;
		}
		$s['ann'] = $u;
		$s['auth'] = 1;
		rg_log('Peer announced itself!');
	} else if (strcmp($cmd, 'STA ') == 0) {
		if ($s['auth'] != 1) {
			rg_log($key . ':Client not authenticated!');
			$a = array('error' => 'client not authenticated');
			$cmd = 'ERR ' . rg_conn_prepare($a) . "\n";
			rg_conn_enq($key, $cmd);
			return;
		}
		$jid = $u['id'];
		$jobs[$jid]['worker'] = $key;
		$jobs[$jid]['worker_started'] = time();
		rg_log('Job started: ' . $jid);
	} else if (strcmp($cmd, 'DON ') == 0) {
		if ($s['auth'] != 1) {
			rg_log($key . ':Client not authenticated!');
			$a = array('error' => 'client not authenticated');
			$cmd = 'ERR ' . rg_conn_prepare($a) . "\n";
			rg_conn_enq($key, $cmd);
			return;
		}
		$jid = $u['id'];
		$r = rg_builder_done($s['db'], $jobs[$jid], $u['status']);
		if ($r === TRUE) {
			unset($jobs[$jid]);
			// Send DoneREceived - so client will delete the jobs
			$a = array('id' => $job['id']);
			$cmd = 'DRE ' . rg_conn_prepare($a) . "\n";
			rg_conn_enq($key, $cmd);
		}
	} else {
		rg_log('Unknown command [' . $cmd . ']!');
	}
}

function rg_process_queue($db, &$job)
{
	global $rg_conns;

	rg_log('process_queue: worker=' . $job['worker']);
	if (!empty($job['worker']))
		return;

	rg_log_ml('Processing job: ' . print_r($job, TRUE));

	// Find a target for this job
	$found = FALSE;
	foreach ($rg_conns as $key => $i) {
		// Is it a worker?
		if (!isset($i['ann'])) {
			rg_log('Conn ' . $key . ' has no announce.');
			// TODO: close after some time?
			continue;
		}

		if (empty($i['ann']['env'])) {
			rg_log('Conn ' . $key . ' has no environments.');
			continue;
		}

		foreach ($i['ann']['env'] as $env => $junk) {
			if (strcmp($job['env'], $env) != 0) {
				rg_log('job env [' . $job['env'] . ']'
					. ' != worker [' . $env . ']');
				continue;
			}

			// Remove some stuff
			$job2 = $job;
			unset($job2['worker']);
			unset($job2['events']);
			unset($job2['flags']);

			$cmd = 'BLD ' . rg_conn_prepare($job2) . "\n";
			rg_conn_enq($key, $cmd);
			// TODO: get a confirmation?
			$job['worker'] = $key;
			$job['worker_started'] = 0;
			$job['worker_sent'] = time();
			rg_log_ml('After sending BLD: ' . print_r($job, TRUE));
			// TODO: after some time, if worker_started is still 0,
			// mark the 'worker' as '' to be able to go in other place
			// TODO: maybe the client must resync with server to
			// abort jobs already done on another host, to not
			// duplicate work
			$found = TRUE;
			return;
		}
	}

	if (!$found)
		rg_log('No worker found!');
}


rg_prof_start("MAIN");

rg_log_set_file($rg_log_dir . "/builder.log");
rg_log_set_sid("000000"); // to spread the logs
// We must disable cache, else, we will not receive the updates because
// of the core cache. Do not forget that we are a long live process.
$rg_cache_core_enable = FALSE;

rg_log("Start (ver=$rocketgit_version)...");

rg_sql_app("rg-builder");
$db = rg_sql_open($rg_sql);
if ($db === FALSE) {
	rg_internal_error("Cannot connect to database!");
	exit(1);
}

if (rg_sql_struct_update_needed($db) !== 0) {
	rg_log('Update needed. Exit.');
	exit(0);
}

if (rg_fixes_needed($db) !== 0) {
	rg_log('Fixes needed. Exit.');
	exit(0);
}

// Prepare socket
$socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if ($socket === FALSE) {
	rg_internal_error('Cannot create socket!');
	exit(1);
}

socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1);

$r = @socket_bind($socket, $rg_builder_bind, $rg_builder_port);
if ($r === FALSE) {
	rg_internal_error("Cannot bind socket!");
	exit(1);
}

$r = @socket_listen($socket, 128);
if ($r === FALSE) {
	rg_internal_error("Cannot set queue length on socket!");
	exit(1);
}

rg_conn_new('master', $socket);
$rg_conns['master']['exit_on_close'] = 1;
$rg_conns['master']['func_new'] = 'xnew';
$rg_conns['master']['func_new_arg'] = $db;

$jobs = array();

$original_mtime = @filemtime(__FILE__);
do {
	rg_log_buffer_clear();

	// Check our mtime so we can upgrade the software and this script
	// will restart.
	clearstatcache();
	$mtime = @filemtime(__FILE__);
	if ($mtime != $original_mtime) {
		rg_log("mtime=$mtime, original_mtime=$original_mtime");
		rg_log('File changed. Exiting...');
		break;
	}

	$r = rg_builder_load_jobs($db);
	if ($r['ok'] != 1) {
		rg_log('Cannot load jobs from database!');
		sleep(30);
		continue;
	}

	foreach ($r['list'] as $jid => $job) {
		if (!isset($jobs[$jid])) {
			$job['worker'] = '';
			$job['url'] = 'git://rg.embedromix.ro/user/catab/delme10'; // TODO
			$jobs[$jid] = $job;
		}

		$r = rg_process_queue($db, $jobs[$jid]);
		if ($r === FALSE)
			break;
	}
	if ($r === FALSE)
		break;

	rg_log("Waiting for connections...");
	rg_conn_wait(10);
} while (1);

@socket_close($socket);

rg_log("Exiting...");

rg_prof_end("MAIN");
rg_prof_log();
?>
Hints

Before first commit, do not forget to setup your git environment:
git config --global user.name "your_name_here"
git config --global user.email "your@email_here"

Clone this repository using HTTP(S):
git clone https://code.reversed.top/user/xaizek/rocketgit

Clone this repository using ssh (do not forget to upload a key first):
git clone ssh://rocketgit@code.reversed.top/user/xaizek/rocketgit

You are allowed to anonymously push to this repository.
This means that your pushed commits will automatically be transformed into a pull request:
... clone the repository ...
... make some changes and some commits ...
git push origin master