<?php require_once($INC . "/log.inc.php"); require_once($INC . "/prof.inc.php"); // Some constants for sql error codes define('RG_SQL_UNIQUE_VIOLATION', '23505'); if (!function_exists("pg_connect")) die("FATAL: php PostgreSQL is not installed!"); if (!isset($rg_sql_debug)) $rg_sql_debug = 0; $rg_sql_conn = array(); $rg_sql_error = ""; /* * Set error string */ function rg_sql_set_error($str) { global $rg_sql_error; $rg_sql_error = $str; rg_log('sql_set_error: ' . $str); } function rg_sql_error() { global $rg_sql_error; return $rg_sql_error; } /* * Set application name to be able to identify the scripts */ $rg_sql_app = "rg-unk"; function rg_sql_app($name) { global $rg_sql_app; $rg_sql_app = $name; } /* * Connect to database */ function rg_sql_open_nodelay($h) { global $rg_sql_debug; global $rg_sql_conn; if ($rg_sql_debug > 20) rg_log_enter('sql_open_nodelay'); $ret = FALSE; while (1) { if (!isset($rg_sql_conn[$h])) { rg_internal_error('Handler [' . $h . '] not present!'); break; } if ($rg_sql_debug > 40) { rg_log('My pid: ' . getmypid()); rg_log_ml('DEBUG: rg_sql_conn: ' . print_r($rg_sql_conn, TRUE)); } if (isset($rg_sql_conn[$h]['db'])) { if (getmypid() == $rg_sql_conn[$h]['pid']) { if ($rg_sql_debug > 30) rg_log('DB: Same pid, reuse connection'); $ret = $rg_sql_conn[$h]['db']; break; } if ($rg_sql_debug > 25) rg_log('DB: pid is different, reconnecting...'); unset($rg_sql_conn[$h]['db']); } putenv('PGAPPNAME=' . $rg_sql_conn[$h]['app']); $str = $rg_sql_conn[$h]['str']; if ($rg_sql_debug > 0) rg_log("DB: openning [$str]..."); rg_prof_set(array('db_conn' => 1)); // This is used to test if we forked $rg_sql_conn[$h]['pid'] = getmypid(); $_s = microtime(TRUE); $tries = 0; while (1) { $db = @pg_pconnect($str); if ($db !== FALSE) { // reconnect if needed $x = @pg_ping($db); if ($x === TRUE) break; } if ($tries == 0) rg_log('Cannot connect to db. Keep trying...'); $tries++; if ($tries > 30) { $db = FALSE; break; } sleep(1); } $diff = intval((microtime(TRUE) - $_s) * 1000); rg_prof_set(array('db_c_ms' => $diff)); if ($db === FALSE) { $err = 'cannot connect to database'; rg_sql_set_error($err); rg_internal_error($err); rg_prof_set(array('db_conn_errors' => 1)); break; } $rg_sql_conn[$h]['db'] = $db; $ret = $db; break; } if ($rg_sql_debug > 20) rg_log_exit(); return $ret; } /* * Prepare to connect to database (delayed connection). * Returns a special handler. */ function rg_sql_open($str) { global $rg_sql_conn; global $rg_sql_app; $free_index = count($rg_sql_conn); $rg_sql_conn[$free_index] = array( 'str' => $str, 'app' => $rg_sql_app ); //rg_log("Delay connection to [$str], index $free_index."); return $free_index; } /* * Escaping */ function rg_sql_escape($h, $str) { $db = rg_sql_open_nodelay($h); if ($db === FALSE) return FALSE; return pg_escape_string($db, $str); } /* * Helper for sql_query and sql_query_params */ function rg_sql_query0($db, $sql, $r, $start_ts, $ignore, &$ignore_kicked) { global $rg_sql_debug; $ignore_kicked = FALSE; while (1) { if ($r !== TRUE) { $err = "$sql: send: " . @pg_last_error($db); $res = FALSE; break; } $res = @pg_get_result($db); if ($res === FALSE) { $err = $sql . ': get: no pending query'; break; } $state = rg_sql_last_error_code($res); if ($state === FALSE) { $err = $sql . ': get: pg_result_error_field error'; break; } if (($state !== NULL) && (strcmp($state, '00000') !== 0)) { foreach ($ignore as $code) { if (strcmp($code, $state) == 0) { $ignore_kicked = TRUE; break; } } if ($ignore_kicked) if ($rg_sql_debug > 50) rg_log('DB: We should ignore the error!'); $err = $sql . ': ' . @pg_last_error($db) . ' (' . $state . ')'; @pg_free_result($res); $res = FALSE; break; } $diff = sprintf("%u", (microtime(TRUE) - $start_ts) * 1000); $rows = rg_sql_num_rows($res); if ($rows == 0) $arows = rg_sql_affected_rows($res); else $arows = 0; if ($rg_sql_debug > 0) rg_log("DB: Took " . $diff . "ms, $rows row(s), $arows affected"); rg_prof_set(array("q" => 1, "rows" => $rows, "arows" => $arows, "q_ms" => $diff)); break; } if ($res === FALSE) { rg_sql_set_error($err); if (!$ignore_kicked) { rg_internal_error($err); rg_prof_set(array('qerrors' => 1)); } // reconnect if needed @pg_ping($db); } return $res; } /* * Do a query */ function rg_sql_query($h, $sql) { global $rg_sql_debug; if ($rg_sql_debug > 0) rg_log_enter("sql_query: sql=$sql"); $ret = FALSE; while (1) { $db = rg_sql_open_nodelay($h); if ($db === FALSE) break; $ignore = array(); $start_ts = microtime(TRUE); $r = @pg_send_query($db, $sql); $ret = rg_sql_query0($db, $sql, $r, $start_ts, $ignore, $ignore_kicked); break; } if ($rg_sql_debug > 0) rg_log_exit(); return $ret; } /* * Queries using params * @params - array of fields -> values * @ignore - array of strings with errors we should not log as internal errors * See https://www.postgresql.org/docs/current/static/errcodes-appendix.html * @ignore_kicked will be set to true if the error is in @ignore array * Examples: $params = array("id" => "1", "name" = "bau") * $sql = "UPDATE x SET name = @@name@@ WHERE id = @@id@@ AND @@name@@ = @@name@@" * => $sql2 = "UPDATE x SET name = $1 WHERE id = $2 AND name = $1" */ function rg_sql_query_params_ignore($h, $sql, $params, $ignore, &$ignore_kicked) { global $rg_sql_debug; if ($rg_sql_debug > 0) rg_log_enter('sql_query_params: sql=[' . $sql . ']' . ' params=[' . rg_array2string($params) . ']' . ' ignore=' . implode(',', $ignore)); $ret = FALSE; while (1) { $db = rg_sql_open_nodelay($h); if ($db === FALSE) break; // Transforms @params into $x system $params2 = array(); $i = 1; foreach ($params as $k => $v) { $what = '@@' . $k . '@@'; $value = '$' . $i; $sql = str_replace($what, $value, $sql, $count); //rg_log("rg_sql_query_params: k=[$k] value=$value count=$count"); if ($count > 0) { $params2[] = $v; $i++; } } //rg_log("new sql: $sql"); //rg_log("params2: " . rg_array2string($params2)); $start_ts = microtime(TRUE); $r = @pg_send_query_params($db, $sql, $params2); $ret = rg_sql_query0($db, $sql, $r, $start_ts, $ignore, $ignore_kicked); break; } if ($rg_sql_debug > 0) rg_log_exit(); return $ret; } function rg_sql_query_params($h, $sql, $params) { $ignore = array(); return rg_sql_query_params_ignore($h, $sql, $params, $ignore, $ignore_kicked); } /* * Close database */ function rg_sql_close($h) { global $rg_sql_conn; if (!isset($rg_sql_conn[$h])) { rg_internal_error('Handler ' . $h . ' was not allocated!'); return FALSE; } if (!isset($rg_sql_conn[$h]['db'])) { // was not opened return TRUE; } $r = pg_close($rg_sql_conn[$h]['db']); if ($r === FALSE) return FALSE; unset($rg_sql_conn[$h]['db']); return TRUE; } /* * Free results */ function rg_sql_free_result($res) { if (!is_resource($res)) { rg_internal_error("res is not resource!"); return; } pg_free_result($res); } /* * Returns a row as an associated array */ function rg_sql_fetch_array($res) { if (!is_resource($res)) { rg_internal_error("res is not resource!"); return FALSE; } return pg_fetch_assoc($res); } function rg_sql_last_id($h) { $sql = "SELECT lastval() AS id"; $res = rg_sql_query($h, $sql); if ($res === FALSE) return FALSE; $row = rg_sql_fetch_array($res); rg_sql_free_result($res); return $row['id']; } function rg_sql_num_rows($res) { if (!is_resource($res)) { rg_internal_error("res is not resource!"); return FALSE; } return pg_num_rows($res); } function rg_sql_affected_rows($res) { if (!is_resource($res)) { rg_internal_error("res is not resource!"); return FALSE; } return pg_affected_rows($res); } function rg_sql_begin($h) { $res = rg_sql_query($h, "BEGIN"); if ($res === FALSE) return FALSE; rg_sql_free_result($res); return TRUE; } function rg_sql_commit($h) { $res = rg_sql_query($h, "COMMIT"); if ($res === FALSE) return FALSE; rg_sql_free_result($res); return TRUE; } function rg_sql_rollback($h) { $res = rg_sql_query($h, "ROLLBACK"); if ($res === FALSE) return FALSE; rg_sql_free_result($res); return TRUE; } /* * Test if a table exists * Returns FALSE on error, 0 if does not exists, 1 if exists */ function rg_sql_rel_exists($h, $rel) { $sql = "SELECT 1 FROM pg_class" . " WHERE relname = '" . $rel . "'"; $res = rg_sql_query($h, $sql); if ($res === FALSE) return FALSE; $rows = rg_sql_num_rows($res); rg_sql_free_result($res); return $rows; } /* * Returns the fileds names of a table */ function rg_sql_fields($h, $table) { $params = array('table' => $table); $sql = 'SELECT column_name FROM information_schema.columns' . ' WHERE table_name = @@table@@'; $res = rg_sql_query_params($h, $sql, $params); if ($res === FALSE) return FALSE; $ret = array(); while (($row = rg_sql_fetch_array($res))) { $f = $row['column_name']; $ret[$f] = 1; } rg_sql_free_result($res); return $ret; } /* * Returns the last error codes */ function rg_sql_last_error_code($res) { return @pg_result_error_field($res, PGSQL_DIAG_SQLSTATE); } ?>