<?php // // This functions are used for background tasks: the tasks that the user should // not wait to happen in the browser: e-mails, keyring regeneration etc. // prio - lower = more important // require_once($INC . "/util.inc.php"); require_once($INC . "/log.inc.php"); require_once($INC . "/sql.inc.php"); require_once($INC . "/state.inc.php"); require_once($INC . "/prof.inc.php"); if (!isset($rg_event_socket)) $rg_event_socket = "/var/lib/rocketgit/sockets/event.sock"; $rg_event_error = ""; function rg_event_set_error($str) { global $rg_event_error; $rg_event_error = $str; rg_log($str); } function rg_event_error() { global $rg_event_error; return $rg_event_error; } $rg_event_split_functions = array(); /* * Register event functions. */ function rg_event_register_functions($functions) { global $rg_event_split_functions; if (empty($functions)) return FALSE; foreach ($functions as $type => $function) $rg_event_split_functions[$type] = $function; return TRUE; } /* * Signals the daemon that there is some work to do * @timeout == NULL=forever, 0=no_wait, else, wait @timeout ms */ function rg_event_signal_daemon($ev_id, $timeout) { global $rg_event_socket; if (empty($rg_event_socket)) { rg_log('DEBUG: rg_event_socket is not defined!'); return TRUE; } if ($timeout === NULL) $s_timeout = "forever"; else if ($timeout === 0) $s_timeout = "no_wait"; else $s_timeout = $timeout . "ms"; rg_prof_start("event_signal_daemon"); rg_log_enter("event_signal_daemon: event_id=[$ev_id] timeout=$s_timeout"); if (empty($ev_id)) $buf = "W\n"; else $buf = "NOTIFY " . $ev_id . "\n"; $flags = 0; if ($timeout === 0) $flags |= RG_SOCKET_NO_WAIT; $r = rg_socket($rg_event_socket, $buf, $timeout, 1, $flags); rg_log_exit(); return $r; } /* * Inserts an event * This function is called from all over the place to generate events */ function rg_event_add($db, $event) { rg_prof_start("event_add"); rg_log_enter("event_add: event=" . rg_array2string($event)); if (!isset($event['ip'])) $event['ip'] = rg_var_str('REMOTE_ADDR'); $event['debug'] = rg_state_get($db, 'debug'); $ret = FALSE; while (1) { $now = time(); $prio = $event['prio']; unset($event['prio']); $params = array("now" => $now, "prio" => $prio, "data" => rg_serialize($event)); $sql = "INSERT INTO events (itime, prio, data)" . " VALUES (@@now@@, @@prio@@, @@data@@)"; $res = rg_sql_query_params($db, $sql, $params); if ($res === FALSE) { rg_event_set_error("Could not add event (" . rg_sql_error() . ")"); break; } rg_sql_free_result($res); $ret = TRUE; break; } rg_log_exit(); rg_prof_end("event_add"); return $ret; } /* * Process an event */ function rg_event_process($db, $ev_id, $event) { global $rg_event_split_functions; rg_prof_start("event_process"); rg_log_enter("event_process: ev_id=$ev_id" . " event=" . rg_array2string($event)); $ret = FALSE; $rollback = FALSE; while (1) { $category = $event['category']; unset($event['category']); if (!isset($rg_event_split_functions[$category])) { rg_event_set_error("Cannot find event function [cat=$category]!"); break; } $f = $rg_event_split_functions[$category]; rg_log("Calling $f..."); $evs = $f($db, $event); if ($evs === FALSE) { rg_event_set_error('Error in function [' . $f . '] (category [' . $category . '])!'); break; } if (!is_array($evs)) { rg_internal_error("evs is not array!"); break; } $r = rg_sql_begin($db); if ($r !== TRUE) break; $rollback = TRUE; $r = TRUE; foreach ($evs as $index => $ev) { $r = rg_event_add($db, $ev); if ($r !== TRUE) break; } if ($r !== TRUE) break; $r = rg_sql_commit($db); if ($r !== TRUE) break; $rollback = FALSE; $ret = TRUE; break; } if ($rollback) rg_sql_rollback($db); rg_log_exit(); rg_prof_end("event_process"); return $ret; } /* * Notifies all listeners when @ev_id is happening. */ function rg_event_notify(&$notify_list, $ev_id, $misc) { if (!isset($notify_list[$ev_id])) return; $buf = "DONE $ev_id $misc\n"; $buf_len = strlen($buf); foreach ($notify_list[$ev_id] as $index => $info) { if (isset($info['func'])) { $info['func']($info['priv'], $buf); continue; } $fd = $info['fd']; rg_log("Notify [$ev_id] [fd=$fd]..."); $r = @socket_send($fd, $buf, $buf_len, 0); if ($r === FALSE) rg_log("Error in sending."); } return; } /* * Process events queue * reset id to 1 if queue is empty? * Returns FALSE on error, else, the number of events processed * @notify_list: Will be used to signal the finish of an event */ function rg_event_process_queue($db, &$notify_list) { rg_prof_start("event_process_queue"); rg_log_enter('event_process_queue: notify_list: ' . rg_array2string($notify_list) . '.'); $ret = FALSE; while (1) { $now = time(); // We limit to be able to deal with high prio tasks $sql = "SELECT * FROM events" . " WHERE fail = 0" . " AND next_try < " . $now . " ORDER BY prio, id" . " FOR UPDATE" . " LIMIT 100"; $res = rg_sql_query($db, $sql); if ($res === FALSE) { rg_event_set_error("Cannot load job list" . " (" . rg_sql_error() . ")"); break; } $no_of_events = rg_sql_num_rows($res); while (($row = rg_sql_fetch_array($res))) { $params = array('id' => $row['id']); $sql = "UPDATE events SET fail = 1 WHERE id = @@id@@"; while (1) { $ev = rg_unserialize($row['data']); if ($ev === FALSE) break; $ev['prio'] = $row['prio']; $ev['itime'] = $row['itime']; $r = rg_event_process($db, $row['id'], $ev); if ($r !== TRUE) { if ($ev['debug'] == 1) break; $sql = "UPDATE events" . " SET tries = tries + 1" . ", next_try = $now + tries * 600" . " WHERE id = @@id@@"; break; } if (isset($ev['notification'])) rg_event_notify($notify_list, $ev['notification'], ""); $sql = "DELETE FROM events WHERE id = @@id@@"; break; } $res2 = rg_sql_query_params($db, $sql, $params); if ($res2 === FALSE) { rg_event_set_error('internal error'); break; } rg_sql_free_result($res2); } rg_sql_free_result($res); $ret = $no_of_events; break; } rg_log_exit(); rg_prof_end("event_process_queue"); return $ret; }