xaizek / rocketgit (License: AGPLv3+) (since 2018-12-09)
Light and fast Git hosting solution suitable to serve both as a hub or as a personal code storage with its tickets, pull requests, API and much more.
<root> / inc / events.inc.php (ef94bf51704babadae9f0cc68ca62d58df2bbcb9) (6,256B) (mode 100644) [raw]
<?php
//
// This functions are used for background tasks: the tasks that the user should
// not wait to happen in the browser: e-mails, keyring regeneration etc.
//
require_once($INC . "/util.inc.php");
require_once($INC . "/log.inc.php");
require_once($INC . "/sql.inc.php");
require_once($INC . "/prof.inc.php");

if (!isset($rg_event_socket))
	$rg_event_socket = "/var/lib/rocketgit/sockets/event.sock";

$rg_event_error = "";

function rg_event_set_error($str)
{
	global $rg_event_error;
	$rg_event_error = $str;
	rg_log($str);
}

function rg_event_error()
{
	global $rg_event_error;
	return $rg_event_error;
}

$rg_event_split_functions = array();

/*
 * Register event functions.
 */
function rg_event_register_functions($functions)
{
	global $rg_event_split_functions;

	if (empty($functions))
		return FALSE;

	foreach ($functions as $type => $function)
		$rg_event_split_functions[$type] = $function;

	return TRUE;
}

/*
 * Signals the daemon that there is some work to do
 * @timeout == NULL=forever, 0=no_wait, else, wait @timeout ms
 */
function rg_event_signal_daemon($ev_id, $timeout)
{
	global $rg_event_socket;

	if (empty($rg_event_socket)) {
		rg_log('DEBUG: rg_event_socket is not defined!');
		return TRUE;
	}

	if ($timeout === NULL)
		 $s_timeout = "forever";
	else if ($timeout === 0)
		$s_timeout = "no_wait";
	else
		$s_timeout = $timeout . "ms";

	rg_prof_start("event_signal_daemon");
	rg_log_enter("event_signal_daemon: event_id=[$ev_id] timeout=$s_timeout");

	if (empty($ev_id))
		$buf = "W\n";
	else
		$buf = "NOTIFY " . $ev_id . "\n";

	$flags = 0;
	if ($timeout === 0)
		$flags |= RG_SOCKET_NO_WAIT;

	$r = rg_socket($rg_event_socket, $buf, $timeout, 1, $flags);

	rg_log_exit();
	return $r;
}

/*
 * Inserts an event
 * This function is called from all over the place to generate events
 */
function rg_event_add($db, $event)
{
	global $rg_debug;

	rg_prof_start("event_add");
	rg_log_enter("event_add: event=" . rg_array2string($event));

	if (!isset($event['ip']))
		$event['ip'] = rg_var_str('REMOTE_ADDR');

	if (!isset($event['debug']))
		$event['debug'] = 0;
	if ($event['debug'] == 0)
		$event['debug'] = rg_var_uint('rg_debug');
	if ($rg_debug)
		$event['debug'] = 1;

	$ret = FALSE;
	while (1) {
		$now = time();
		$prio = $event['prio']; unset($event['prio']);
		$params = array("now" => $now,
			"prio" => $prio,
			"data" => serialize($event));
		$sql = "INSERT INTO events (itime, prio, data)"
			. " VALUES (@@now@@, @@prio@@, @@data@@)";
		$res = rg_sql_query_params($db, $sql, $params);
		if ($res === FALSE) {
			rg_event_set_error("Could not add event (" . rg_sql_error() . ")");
			break;
		}
		rg_sql_free_result($res);

		$ret = TRUE;
		break;
	}

	rg_log_exit();
	rg_prof_end("event_add");
	return $ret;
}

/*
 * Process an event
 */
function rg_event_process($db, $event)
{
	global $rg_event_split_functions;

	rg_prof_start("event_process");
	rg_log_enter("event_process: event=" . rg_array2string($event));

	$ret = FALSE;
	while (1) {
		$category = $event['category'];
		unset($event['category']);

		if (!isset($rg_event_split_functions[$category])) {
			rg_event_set_error("Cannot find event function [cat=$category]!");
			rg_log("DEBUG: rg_event_split_functions=" . rg_array2string($rg_event_split_functions));
			break;
		}

		$f = $rg_event_split_functions[$category];
		rg_log("Calling $f...");
		$evs = $f($db, $event);
		if ($evs === FALSE) {
			rg_event_set_error("Failed to fill event for category [$category]!");
			break;
		}

		if (!is_array($evs))
			rg_internal_error("evs is not array!");

		if (empty($evs)) {
			$ret = TRUE;
			break;
		}

		// TODO: here we must do a transaction?

		$r = TRUE;
		foreach ($evs as $index => $ev) {
			$r = rg_event_add($db, $ev);
			if ($r !== TRUE)
				break;
		}
		if ($r !== TRUE)
			break;

		$ret = TRUE;
		break;
	}

	rg_log_exit();
	rg_prof_end("event_process");
	return $ret;
}

/*
 * Notifies all listeners when @ev_id is happening.
 */
function rg_event_notify(&$notify_list, $ev_id, $misc)
{
	if (!isset($notify_list[$ev_id]))
		return;

	$buf = "DONE $ev_id $misc\n";
	$buf_len = strlen($buf);
	foreach ($notify_list[$ev_id] as $index => $info) {
		if (isset($info['func'])) {
			$info['func']($info['priv'], $buf);
			continue;
		}

		$fd = $info['fd'];
		rg_log("Notify [$ev_id] [fd=$fd]...");
		$r = @socket_send($fd, $buf, $buf_len, 0);
		if ($r === FALSE)
			rg_log("Error in sending.");
	}

	return;
}

/*
 * Process events queue
 * reset id to 1 if queue is empty?
 * Returns FALSE on error, else, the number of events processed
 * @notify_list: Will be used to signal the finish of an event
 */
function rg_event_process_queue($db, &$notify_list)
{
	rg_prof_start("event_process_queue");
	rg_log_enter("event_process_queue: notify_list: "
		. rg_array2string($notify_list));

	$ret = FALSE;
	while (1) {
		$now = time();

		// We limit to be able to deal with high prio tasks
		$sql = "SELECT * FROM events"
			. " WHERE fail = 0"
			. " AND next_try < $now"
			. " ORDER BY prio, id"
			. " FOR UPDATE"
			. " LIMIT 100";
		$res = rg_sql_query($db, $sql);
		if ($res === FALSE) {
			rg_event_set_error("Cannot load job list"
				. " (" . rg_sql_error() . ")");
			break;
		}

		$no_of_events = rg_sql_num_rows($res);
		while (($row = rg_sql_fetch_array($res))) {
			$params = array('id' => $row['id']);

			$sql = "UPDATE events SET fail = 1 WHERE id = @@id@@";
			while (1) {
				$ev = unserialize($row['data']);
				if ($ev === FALSE) {
					rg_internal_error("Cannot unserialize data");
					break;
				}

				$ev['prio'] = $row['prio'];
				$ev['itime'] = $row['itime'];

				$r = rg_event_process($db, $ev);
				if ($r !== TRUE) {
					if ($ev['debug'] == 1)
						break;

					$sql = "UPDATE events"
						. " SET tries = tries + 1"
						. ", next_try = $now + tries * 600"
						. " WHERE id = @@id@@";
					break;
				}

				if (isset($ev['notification']))
					rg_event_notify($notify_list, $ev['notification'], "");

				$sql = "DELETE FROM events WHERE id = @@id@@";
				break;
			}

			$res2 = rg_sql_query_params($db, $sql, $params);
			rg_sql_free_result($res2);
		}
		rg_sql_free_result($res);

		$ret = $no_of_events;
		break;
	}

	rg_log_exit();
	rg_prof_end("event_process_queue");
	return $ret;
}

?>
Hints

Before first commit, do not forget to setup your git environment:
git config --global user.name "your_name_here"
git config --global user.email "your@email_here"

Clone this repository using HTTP(S):
git clone https://code.reversed.top/user/xaizek/rocketgit

Clone this repository using ssh (do not forget to upload a key first):
git clone ssh://rocketgit@code.reversed.top/user/xaizek/rocketgit

You are allowed to anonymously push to this repository.
This means that your pushed commits will automatically be transformed into a pull request:
... clone the repository ...
... make some changes and some commits ...
git push origin master