<?php // This is called by cron, and is persistent. // It takes care fast caching, like memcache. // It will receive signals using a UNIX socket. error_reporting(E_ALL); ini_set("track_errors", "On"); set_time_limit(0); // Increment this if we need to restart this daemon (protocol changes etc.) $rg_cache_version = 35; $now = time(); $_s = microtime(TRUE); require_once("/etc/rocketgit/config.php"); $INC = dirname(__FILE__) . "/../inc"; require_once($INC . "/init.inc.php"); require_once($INC . "/log.inc.php"); require_once($INC . "/sql.inc.php"); require_once($INC . "/struct.inc.php"); require_once($INC . "/cache.inc.php"); require_once($INC . "/repo.inc.php"); require_once($INC . "/prof.inc.php"); require_once($INC . "/mr.inc.php"); require_once($INC . "/keys.inc.php"); require_once($INC . "/user.inc.php"); require_once($INC . "/bug.inc.php"); require_once($INC . "/fixes.inc.php"); require_once($INC . "/ver.php"); function rg_destroy($k, &$conn_table) { if (isset($conn_table['r'][$k])) unset($conn_table['r'][$k]); if (isset($conn_table['w'][$k])) unset($conn_table['w'][$k]); // TODO: seems socket is already closed if (is_resource($conn_table['conns'][$k]['socket'])) socket_close($conn_table['conns'][$k]['socket']); unset($conn_table['conns'][$k]); } function rg_handle_command($k, &$conn_table, $cmd) { rg_log("rg_handle_command: k=$k, cmd=$cmd"); $s = &$conn_table['conns'][$k]; $a = explode(" ", $cmd, 2); $buf = "ER Invalid command\n"; while (1) { if (!isset($a[0])) break; $cmd = trim($a[0]); /* here, no parameters */ /* From, here, at least 1 para */ if (!isset($a[1])) break; $para1 = trim($a[1]); if (strcmp($cmd, "SET") == 0) { $ns_var_value = explode("=", $para1, 2); if (!isset($ns_var_value[1])) break; $ns_var = trim($ns_var_value[0]); $value = trim($ns_var_value[1]); $value = unserialize(stripcslashes($value)); if ($value !== FALSE) { rg_cache_core_set("normal::" . $ns_var, $value); $buf = "OK\n"; } else { $buf = "ER cannot unserialize data\n"; } break; } if (strcmp($cmd, "MERGE") == 0) { $ns_var_value = explode("=", $para1, 2); if (!isset($ns_var_value[1])) break; $ns_var = trim($ns_var_value[0]); $value = trim($ns_var_value[1]); $value = unserialize(stripcslashes($value)); if ($value !== FALSE) { rg_cache_core_merge("normal::" . $ns_var, $value); $buf = "OK\n"; } else { $buf = "ER cannot unserialize data\n"; } break; } if (strcmp($cmd, "INC") == 0) { $v = rg_cache_core_inc("normal::" . $para1); $buf = "OK v=$v\n"; break; } if (strcmp($cmd, "GET") == 0) { $ret = rg_cache_core_get("normal::" . $para1); if ($ret === FALSE) $buf = "NOT_FOUND\n"; else $buf = "OK " . rg_cache_prepare($ret) . "\n"; break; } if (strcmp($cmd, "UNSET") == 0) { $ret = rg_cache_core_unset("normal::" . $para1); if ($ret === FALSE) $buf = "NOT_FOUND\n"; else $buf = "OK\n"; break; } if (strcmp($cmd, "ADUMP") == 0) { $ret = rg_cache_core_adump("normal::" . $para1); if ($ret === FALSE) $buf = "NOT_FOUND\n"; else $buf = "OK " . rg_cache_prepare($ret) . "\n"; break; } if (strcmp($cmd, "APUSH") == 0) { $ns_var_value = explode("=", $para1, 2); if (!isset($ns_var_value[1])) break; $ns_var = trim($ns_var_value[0]); $value = trim($ns_var_value[1]); rg_cache_core_apush("normal::" . $ns_var, $value); $buf = "OK\n"; break; } if (strcmp($cmd, "APOP") == 0) { $ret = rg_cache_core_apop("normal::" . $para1); if ($ret === FALSE) $buf = "NOT_FOUND\n"; else $buf = "OK " . $ret . "\n"; break; } if (strcmp($cmd, "ASHIFT") == 0) { $ret = rg_cache_core_ashift("normal::" . $para1); if ($ret === FALSE) $buf = "NOT_FOUND\n"; else $buf = "OK " . $ret . "\n"; break; } break; } $s['send'] .= $buf; $conn_table['w'][$k] = $s['socket']; } function rg_handle_recv($k, &$conn_table) { $s = &$conn_table['conns'][$k]; $ret = @socket_recv($s['socket'], $buf, 32 * 4096, 0); if ($ret === FALSE) { rg_log("Error in recv (" . socket_strerror(socket_last_error()) . ")"); rg_destroy($k, $conn_table); return; } if ($ret === 0) { //rg_log("Remote closed the connection."); rg_destroy($k, $conn_table); return; } rg_log("Received data on key $k [$buf]..."); $s['recv'] .= $buf; $s['close_at'] = time() + 300; while (1) { $pos = strpos($s['recv'], "\n"); if ($pos === FALSE) return; $cmd = substr($s['recv'], 0, $pos); $s['recv'] = substr($s['recv'], $pos + 1); rg_handle_command($k, $conn_table, $cmd); }; } function rg_handle_send($k, &$conn_table) { $s = &$conn_table['conns'][$k]; rg_log("Sending on key $k [" . $s['send'] . "]..."); $ret = @socket_send($s['socket'], $s['send'], strlen($s['send']), 0); if ($ret === FALSE) { rg_log("Cannot send (" . socket_strerror(socket_last_error()) . ")"); rg_destroy($k, $conn_table); return; } $s['send'] = substr($s['send'], $ret); if (empty($s['send'])) unset($conn_table['w'][$k]); } function rg_handle_new($client, &$conn_table) { socket_set_nonblock($client); $key = intval($client); $conn_table['conns'][$key] = array( "socket" => $client, "recv" => "", "send" => "", "close_at" => time() + 300 ); $conn_table['r'][$key] = $client; rg_log("Added client with key $key."); } function rg_handle_idle(&$conn_table) { $now = time(); foreach ($conn_table['conns'] as $key => $info) { if (strcmp($key, "master") == 0) continue; if ($info['close_at'] < $now) { rg_log("Destroy $key because was too much time idle."); rg_destroy($key, $conn_table); } } } rg_prof_start("MAIN"); rg_log_set_file($rg_log_dir . "/cache.log"); rg_log_set_sid("000000"); // to spread the logs rg_log("Start (ver=$rocketgit_version)..."); // Remove the socket, else we will get error if (file_exists($rg_cache_socket)) unlink($rg_cache_socket); $master = socket_create(AF_UNIX, SOCK_STREAM, 0); if ($master === FALSE) { rg_internal_error("Cannot create events socket!"); exit(1); } $r = socket_bind($master, $rg_cache_socket); if ($r === FALSE) { rg_internal_error("Cannot bind socket!"); exit(1); } $r = socket_listen($master, 128); if ($r === FALSE) { rg_internal_error("Cannot set queue length on socket!"); exit(1); } socket_set_nonblock($master); // Allow apache (at least) to connect to socket // TODO: this is a security hole. Take care. Hm. How?! $r = chmod($rg_cache_socket, 0666); if ($r === FALSE) { rg_internal_error("Cannot set rights on cache socket!"); exit(1); } $conn_table = array("r" => array(), "w" => array(), "conns" => array()); $conn_table['r']['master'] = $master; do { $new_ver = file_get_contents(__FILE__); $new_ver = preg_match("/rg_cache_version = (.*);/", $new_ver, $matches); $new_ver = $matches[1]; if ($rg_cache_version != $new_ver) { rg_log("Version upgraded. Exiting..."); break; } rg_log_buffer_clear(); $r2 = $conn_table['r']; $w2 = $conn_table['w']; $ex = array(); $r = socket_select($r2, $w2, $ex, 5); if ($r === FALSE) rg_fatal("Cannot select (" . socket_strerror(socket_last_error()) . ")!"); //rg_log_ml("conn_table: " . print_r($conn_table, TRUE)); foreach ($r2 as $key => $socket) { if (strcmp($key, "master") == 0) { $client = socket_accept($socket); if ($client === FALSE) { rg_log("Connection seems broken!"); continue; } rg_handle_new($client, $conn_table); continue; } rg_handle_recv($key, $conn_table); } foreach ($w2 as $key => $sock) rg_handle_send($key, $conn_table); rg_handle_idle($conn_table); } while (1); socket_close($master); rg_log("Exiting..."); rg_prof_end("MAIN"); rg_prof_log(); ?>