<?php // // This functions are used for background tasks: the tasks that the user should // not wait to happen in the browser: e-mails, keyring regeneration etc. // require_once($INC . "/util.inc.php"); require_once($INC . "/log.inc.php"); require_once($INC . "/sql.inc.php"); require_once($INC . "/prof.inc.php"); if (!isset($rg_event_socket)) $rg_event_socket = "/var/lib/rocketgit/sockets/event.sock"; $rg_event_error = ""; function rg_event_set_error($str) { global $rg_event_error; $rg_event_error = $str; rg_log($str); } function rg_event_error() { global $rg_event_error; return $rg_event_error; } $rg_event_split_functions = array(); /* * Register event functions. */ function rg_event_register_functions($functions) { global $rg_event_split_functions; if (empty($functions)) { rg_event_set_error("Cannot register empty array"); return FALSE; } foreach ($functions as $type => $function) $rg_event_split_functions[$type] = $function; return TRUE; } /* * Signals the daemon that there is some work to do * if @event_id > 0, we will wait for the finish of the event */ function rg_event_signal_daemon($ev_id, $timeout) { global $rg_event_socket; rg_prof_start("event_signal_daemon"); rg_log("event_signal_daemon: event_id=[$ev_id] timeout=$timeout"); $ret = FALSE; do { $socket = socket_create(AF_UNIX, SOCK_STREAM, 0); if ($socket === FALSE) { rg_log("Could not create socket!"); break; } // try 3 times $tries = 3; while ($tries > 0) { $r = socket_connect($socket, $rg_event_socket); if ($r === FALSE) { $tries--; usleep(200000); continue; } break; } if ($r === FALSE) { rg_log("Could not connect the socket!"); break; } if (empty($ev_id)) $buf = "W"; else $buf = "NOTIFY " . $ev_id; $len = strlen($buf); $r = socket_send($socket, $buf, $len, 0); if ($r !== $len) { rg_log("Could not send!"); break; } if (empty($ev_id)) { rg_log("We do not have to wait. Exit."); $ret = TRUE; socket_close($socket); break; } $reads = array($socket); $writes = array(); $ex = array(); $r = socket_select($reads, $writes, $ex, $timeout, 0); if ($r === FALSE) { rg_log("Cannot select!"); socket_close($socket); break; } if ($r === 0) { // timeout rg_log("Timeout!"); socket_close($socket); break; } if (!in_array($socket, $reads)) { rg_log("Select returned > 0 and my socket is not in reads"); socket_close($socket); break; } $r = socket_recv($socket, $buf, 1024, 0); if ($r === FALSE) { rg_log("Cannot receive!"); break; } rg_log("Received [$buf]"); socket_close($socket); $ret = TRUE; } while (0); return $ret; } /* * Inserts an event * This function is called from all over the place to generate events */ function rg_event_add($db, $event) { rg_prof_start("event_add"); rg_log("event_add: event=" . rg_array2string($event)); $ret = FALSE; do { $now = time(); $prio = $event['prio']; $e_data = rg_sql_escape($db, serialize($event)); $sql = "INSERT INTO events (itime, prio, data)" . " VALUES ($now, $prio, '$e_data')"; $res = rg_sql_query($db, $sql); if ($res === FALSE) { rg_event_set_error("Could not add event (" . rg_sql_error() . ")"); break; } rg_sql_free_result($res); rg_event_signal_daemon("", 0); $ret = TRUE; } while (0); rg_prof_end("event_add"); return $ret; } /* * Process an event */ function rg_event_process($db, $event) { global $rg_event_split_functions; rg_prof_start("event_process"); rg_log("event_process: event=" . rg_array2string($event)); $ret = FALSE; do { $category = $event['category']; unset($event['category']); if (!isset($rg_event_split_functions[$category])) { rg_event_set_error("Cannot find event function [$category]!"); rg_log("DEBUG: rg_event_split_functions=" . rg_array2string($rg_event_split_functions)); break; } $f = $rg_event_split_functions[$category]; rg_log("Calling $f..."); $evs = $f($db, $event); if ($evs === FALSE) { rg_event_set_error("Failed to fill event for category [$category]!"); break; } if (empty($evs)) { $ret = TRUE; break; } $r = TRUE; if (!is_array($evs)) rg_internal_error("evs is not array!"); foreach ($evs as $index => $ev) { $r = rg_event_add($db, $ev); if ($r !== TRUE) break; } if ($r !== TRUE) break; $ret = TRUE; } while (0); rg_prof_end("event_process"); return $ret; } /* * Cleans the notification list */ function rg_event_notify_clean(&$notify_list) { if (empty($notify_list)) return; $limit = time() - 5 * 60; foreach ($notify_list as $ev_id => $ei) { if (empty($ei)) { unset($notify_list[$ev_id]); continue; } foreach ($ei as $index => $info) { if ($info['itime'] < $limit) { unset($notify_list[$ev_id][$index]); socket_close($info['fd']); } } } } /* * Tries to notify a client if requested */ function rg_event_notify(&$notify_list, $ev_id) { if (!isset($notify_list[$ev_id])) return; $buf = "DONE $ev_id\n"; $buf_len = strlen($buf); foreach ($notify_list[$ev_id] as $index => $info) { $fd = $info['fd']; rg_log("\tNotify [$ev_id] [fd=$fd]..."); socket_send($fd, $buf, $buf_len, 0); socket_shutdown($fd, 1); } return; } /* * Process events queue * reset id to 1 if queue is empty? * Returns FALSE on error, else, the number of events processed * @notify_list: Will be used to signal the finish of an event */ function rg_event_process_queue($db, &$notify_list) { rg_prof_start("event_process_queue"); rg_log("event_process_queue: notify_list: " . rg_array2string($notify_list)); $ret = FALSE; $do_rollback = 0; do { $r = rg_sql_begin($db); if ($r !== TRUE) { rg_event_set_error("Cannot begin transaction" . " (" . rg_sql_error() . ")"); break; } $do_rollback = 1; // We limit to be able to deal with high prio tasks $sql = "SELECT * FROM events" . " ORDER BY prio, id" . " FOR UPDATE" . " LIMIT 100"; $res = rg_sql_query($db, $sql); $no_of_events = rg_sql_num_rows($res); if ($res === FALSE) { rg_event_set_error("Cannot load job list" . " (" . rg_sql_error() . ")"); break; } $all_good = TRUE; while (($row = rg_sql_fetch_array($res))) { $ev = unserialize($row['data']); if ($ev === FALSE) { rg_internal_error("Cannot unserialize data"); break; } $r = rg_event_process($db, $ev); if ($r !== TRUE) { $all_good = FALSE; break; } if (isset($ev['notification'])) rg_event_notify($notify_list, $ev['notification']); $sql = "DELETE FROM events WHERE id = " . $row['id']; $res2 = rg_sql_query($db, $sql); rg_sql_free_result($res2); } rg_sql_free_result($res); if ($all_good !== TRUE) break; $r = rg_sql_commit($db); if ($r !== TRUE) break; $ret = $no_of_events; $do_rollback = 0; } while (0); if ($do_rollback == 1) rg_sql_rollback($db); rg_prof_end("event_process_queue"); return $ret; } ?>