xaizek / rocketgit (License: AGPLv3+) (since 2018-12-09)
Light and fast Git hosting solution suitable to serve both as a hub or as a personal code storage with its tickets, pull requests, API and much more.
<root> / inc / events.inc.php (9a17fefa80879356e50134486fdf39293017a589) (7,014B) (mode 100644) [raw]
<?php
//
// This functions are used for background tasks: the tasks that the user should
// not wait to happen in the browser: e-mails, keyring regeneration etc.
//
require_once($INC . "/util.inc.php");
require_once($INC . "/log.inc.php");
require_once($INC . "/sql.inc.php");
require_once($INC . "/prof.inc.php");

if (!isset($rg_event_socket))
	$rg_event_socket = "/var/lib/rocketgit/sockets/event.sock";

$rg_event_error = "";

function rg_event_set_error($str)
{
	global $rg_event_error;
	$rg_event_error = $str;
	rg_log($str);
}

function rg_event_error()
{
	global $rg_event_error;
	return $rg_event_error;
}

$rg_event_split_functions = array();

/*
 * Register event functions.
 */
function rg_event_register_functions($functions)
{
	global $rg_event_split_functions;

	if (empty($functions)) {
		rg_event_set_error("Cannot register empty array");
		return FALSE;
	}

	foreach ($functions as $type => $function)
		$rg_event_split_functions[$type] = $function;

	return TRUE;
}

/*
 * Signals the daemon that there is some work to do
 * if @event_id > 0, we will wait for the finish of the event
 */
function rg_event_signal_daemon($ev_id, $timeout)
{
	global $rg_event_socket;

	rg_prof_start("event_signal_daemon");
	rg_log("event_signal_daemon: event_id=[$ev_id] timeout=$timeout");

	$ret = FALSE;
	do {
		$socket = socket_create(AF_UNIX, SOCK_STREAM, 0);
		if ($socket === FALSE) {
			rg_log("Could not create socket!");
			break;
		}

		// try 3 times
		$tries = 3;
		while ($tries > 0) {
			$r = socket_connect($socket, $rg_event_socket);
			if ($r === FALSE) {
				$tries--;
				usleep(200000);
				continue;
			}

			break;
		}
		if ($r === FALSE) {
			rg_log("Could not connect the socket!");
			break;
		}

		if (empty($ev_id))
			$buf = "W";
		else
			$buf = "NOTIFY " . $ev_id;
		$len = strlen($buf);
		$r = socket_send($socket, $buf, $len, 0);
		if ($r !== $len) {
			rg_log("Could not send!");
			break;
		}

		if (empty($ev_id)) {
			rg_log("We do not have to wait. Exit.");
			$ret = TRUE;
			socket_close($socket);
			break;
		}

		$reads = array($socket); $writes = array(); $ex = array();
		$r = socket_select($reads, $writes, $ex, $timeout, 0);
		if ($r === FALSE) {
			rg_log("Cannot select!");
			socket_close($socket);
			break;
		}

		if ($r === 0) { // timeout
			rg_log("Timeout!");
			socket_close($socket);
			break;
		}

		if (!in_array($socket, $reads)) {
			rg_log("Select returned > 0 and my socket is not in reads");
			socket_close($socket);
			break;
		}

		$r = socket_recv($socket, $buf, 1024, 0);
		if ($r === FALSE) {
			rg_log("Cannot receive!");
			break;
		}
		rg_log("Received [$buf]");

		socket_close($socket);
		$ret = TRUE;
	} while (0);

	return $ret;
}

/*
 * Inserts an event
 * This function is called from all over the place to generate events
 */
function rg_event_add($db, $event)
{
	rg_prof_start("event_add");
	rg_log("event_add: event=" . rg_array2string($event));

	$ret = FALSE;
	do {
		$now = time();
		$prio = $event['prio'];
		$e_data = rg_sql_escape($db, serialize($event));
		$sql = "INSERT INTO events (itime, prio, data)"
			. " VALUES ($now, $prio, '$e_data')";
		$res = rg_sql_query($db, $sql);
		if ($res === FALSE) {
			rg_event_set_error("Could not add event (" . rg_sql_error() . ")");
			break;
		}
		rg_sql_free_result($res);

		rg_event_signal_daemon("", 0);

		$ret = TRUE;
	} while (0);

	rg_prof_end("event_add");
	return $ret;
}

/*
 * Process an event
 */
function rg_event_process($db, $event)
{
	global $rg_event_split_functions;

	rg_prof_start("event_process");
	rg_log("event_process: event=" . rg_array2string($event));

	$ret = FALSE;
	do {
		$category = $event['category'];
		unset($event['category']);

		if (!isset($rg_event_split_functions[$category])) {
			rg_event_set_error("Cannot find event function [$category]!");
			rg_log("DEBUG: rg_event_split_functions=" . rg_array2string($rg_event_split_functions));
			break;
		}

		$f = $rg_event_split_functions[$category];
		rg_log("Calling $f...");
		$evs = $f($db, $event);
		if ($evs === FALSE) {
			rg_event_set_error("Failed to fill event for category [$category]!");
			break;
		}

		if (empty($evs)) {
			$ret = TRUE;
			break;
		}

		$r = TRUE;
		if (!is_array($evs))
			rg_internal_error("evs is not array!");
		foreach ($evs as $index => $ev) {
			$r = rg_event_add($db, $ev);
			if ($r !== TRUE)
				break;
		}
		if ($r !== TRUE)
			break;

		$ret = TRUE;
	} while (0);

	rg_prof_end("event_process");
	return $ret;
}

/*
 * Cleans the notification list
 */
function rg_event_notify_clean(&$notify_list)
{
	if (empty($notify_list))
		return;

	$limit = time() - 5 * 60;

	foreach ($notify_list as $ev_id => $ei) {
		if (empty($ei)) {
			unset($notify_list[$ev_id]);
			continue;
		}

		foreach ($ei as $index => $info) {
			if ($info['itime'] < $limit) {
				unset($notify_list[$ev_id][$index]);
				socket_close($info['fd']);
			}
		}
	}
}

/*
 * Tries to notify a client if requested
 */
function rg_event_notify(&$notify_list, $ev_id)
{
	if (!isset($notify_list[$ev_id]))
		return;

	$buf = "DONE $ev_id\n";
	$buf_len = strlen($buf);
	foreach ($notify_list[$ev_id] as $index => $info) {
		$fd = $info['fd'];
		rg_log("\tNotify [$ev_id] [fd=$fd]...");
		socket_send($fd, $buf, $buf_len, 0);
		socket_shutdown($fd, 1);
	}

	return;
}

/*
 * Process events queue
 * reset id to 1 if queue is empty?
 * Returns FALSE on error, else, the number of events processed
 * @notify_list: Will be used to signal the finish of an event
 */
function rg_event_process_queue($db, &$notify_list)
{
	rg_prof_start("event_process_queue");
	rg_log("event_process_queue: notify_list: "
		. rg_array2string($notify_list));

	$ret = FALSE;
	$do_rollback = 0;
	do {
		$r = rg_sql_begin($db);
		if ($r !== TRUE) {
			rg_event_set_error("Cannot begin transaction"
				. " (" . rg_sql_error() . ")");
			break;
		}
		$do_rollback = 1;

		// We limit to be able to deal with high prio tasks
		$sql = "SELECT * FROM events"
			. " ORDER BY prio, id"
			. " FOR UPDATE"
			. " LIMIT 100";
		$res = rg_sql_query($db, $sql);
		$no_of_events = rg_sql_num_rows($res);
		if ($res === FALSE) {
			rg_event_set_error("Cannot load job list"
				. " (" . rg_sql_error() . ")");
			break;
		}

		$all_good = TRUE;
		while (($row = rg_sql_fetch_array($res))) {
			$ev = unserialize($row['data']);
			if ($ev === FALSE) {
				rg_internal_error("Cannot unserialize data");
				break;
			}
			$r = rg_event_process($db, $ev);
			if ($r !== TRUE) {
				$all_good = FALSE;
				break;
			}

			if (isset($ev['notification']))
				rg_event_notify($notify_list, $ev['notification']);

			$sql = "DELETE FROM events WHERE id = " . $row['id'];
			$res2 = rg_sql_query($db, $sql);
			rg_sql_free_result($res2);
		}
		rg_sql_free_result($res);

		if ($all_good !== TRUE)
			break;

		$r = rg_sql_commit($db);
		if ($r !== TRUE)
			break;

		$ret = $no_of_events;
		$do_rollback = 0;
	} while (0);

	if ($do_rollback == 1)
		rg_sql_rollback($db);

	rg_prof_end("event_process_queue");
	return $ret;
}

?>
Hints

Before first commit, do not forget to setup your git environment:
git config --global user.name "your_name_here"
git config --global user.email "your@email_here"

Clone this repository using HTTP(S):
git clone https://code.reversed.top/user/xaizek/rocketgit

Clone this repository using ssh (do not forget to upload a key first):
git clone ssh://rocketgit@code.reversed.top/user/xaizek/rocketgit

You are allowed to anonymously push to this repository.
This means that your pushed commits will automatically be transformed into a pull request:
... clone the repository ...
... make some changes and some commits ...
git push origin master