Index: install.php =================================================================== --- install.php (revision 14467) +++ install.php (working copy) @@ -151,6 +151,7 @@ function Init() { + include_once(FULL_PATH . REL_PATH . '/kernel/kbase.php'); // required by kDBConnection class include_once(FULL_PATH . REL_PATH . '/kernel/utility/multibyte.php'); // emulating multi-byte php extension require_once(FULL_PATH . REL_PATH . '/install/install_toolkit.php'); // toolkit required for module installations to installator $this->toolkit = new kInstallToolkit(); @@ -1245,12 +1246,7 @@ } $this->Conn = new kDBConnection($this->toolkit->getSystemConfig('Database', 'DBType'), Array(&$this, 'DBErrorHandler')); - $this->Conn->Connect( - $this->toolkit->getSystemConfig('Database', 'DBHost'), - $this->toolkit->getSystemConfig('Database', 'DBUser'), - $this->toolkit->getSystemConfig('Database', 'DBUserPassword'), - $this->toolkit->getSystemConfig('Database', 'DBName') - ); + $this->Conn->setup( $this->toolkit->systemConfig ); // setup toolkit too $this->toolkit->Conn =& $this->Conn; Index: kernel/application.php =================================================================== --- kernel/application.php (revision 14467) +++ kernel/application.php (working copy) @@ -307,12 +307,14 @@ } } - $this->Conn = new kDBConnection(SQL_TYPE, Array(&$this, 'handleSQLError') ); - $this->Conn->debugMode = $this->isDebugMode(); - $this->Conn->Connect(SQL_SERVER, SQL_USER, SQL_PASS, SQL_DB); - $this->Factory = new kFactory(); $this->registerDefaultClasses(); + + // TODO: in v 5.2.0+ makeClass has 2 arguments ($pseudo, $argumuments) + $this->Conn =& $this->Application->makeClass( 'kDBLoadBalancer', SQL_TYPE, Array (&$this->Application, 'handleSQLError')); +// $this->Conn =& $this->Application->makeClass( 'kDBConnection', SQL_TYPE, Array (&$this->Application, 'handleSQLError')); + $this->Conn->setup( parse_portal_ini(true) ); + $this->Phrases = new PhrasesCache(); $this->memoryCache =& $this->Factory->makeClass('Cache'); $this->EventManager =& $this->Factory->makeClass('EventManager'); @@ -708,6 +710,9 @@ $this->registerClass('kTagProcessor', KERNEL_PATH . '/processors/tag_processor.php'); $this->registerClass('kMainTagProcessor', KERNEL_PATH . '/processors/main_processor.php','m_TagProcessor', 'kTagProcessor'); + $this->registerClass('kDBConnection', KERNEL_PATH . '/db/db_connection.php'); + $this->registerClass('kDBLoadBalancer', KERNEL_PATH . '/db/db_load_balancer.php'); + $this->registerClass('kDBList', KERNEL_PATH . '/db/dblist.php'); $this->registerClass('kDBItem', KERNEL_PATH . '/db/dbitem.php'); $this->registerClass('kDBEventHandler', KERNEL_PATH . '/db/db_event_handler.php'); Index: kernel/db/db_connection.php =================================================================== --- kernel/db/db_connection.php (revision 14467) +++ kernel/db/db_connection.php (working copy) @@ -18,16 +18,9 @@ * Multi database connection class * */ - class kDBConnection { + class kDBConnection extends kBase { /** - * Holds reference to global KernelApplication instance - * @access public - * @var kApplication - */ - var $Application; - - /** * Current database type * * @var string @@ -44,6 +37,13 @@ var $connectionID = null; /** + * Remembers, that database connection was opened successfully + * + * @var unknown_type + */ + var $connectionOpened = false; + + /** * Connection parameters, that were used * * @var Array @@ -51,6 +51,13 @@ var $connectionParams = Array ('host' => '', 'user' => '', 'pass' => '', 'db' => ''); /** + * Index of database server + * + * @var int + */ + var $serverIndex = 0; + + /** * Handle of currenty processed recordset * * @var resource @@ -135,6 +142,13 @@ var $nextQueryCachable = false; /** + * For backwards compatibility with kDBLoadBalancer class + * + * @var bool + */ + public $nextQueryFromMaster = false; + + /** * Initializes connection class with * db type to used in future * @@ -142,9 +156,16 @@ * @return DBConnection * @access public */ - function kDBConnection($dbType, $errorHandler = '') + function kDBConnection($dbType, $errorHandler = '', $server_index = 0) { + if ( class_exists('kApplication') ) { + // prevents "Fatal Error" on 2nd installation step (when database is empty) + parent::kBase(); + } + $this->dbType = $dbType; + $this->serverIndex = $server_index; + // $this->initMetaFunctions(); if (!$errorHandler) { $this->errorHandler = Array(&$this, 'handleError'); @@ -154,11 +175,6 @@ } $this->_captureStatistics = defined('DBG_CAPTURE_STATISTICS') && DBG_CAPTURE_STATISTICS && !(defined('ADMIN') && ADMIN); - - if (class_exists('kApplication')) { - // prevents "Fatal Error" on 2nd installation step (when database is empty) - $this->Application =& kApplication::Instance(); - } } /** @@ -260,7 +276,9 @@ $func = $this->getMetaFunction('errno'); $this->errorCode = $this->connectionID ? $func($this->connectionID) : $func(); - if ( !$this->hasError() ) { + if ( is_resource($this->connectionID) && !$this->hasError() ) { + $this->connectionOpened = true; + return true; } @@ -270,9 +288,31 @@ trigger_error($error_msg, (defined('IS_INSTALL') && IS_INSTALL) || $retry ? E_USER_WARNING : E_USER_ERROR); + $this->connectionOpened = false; + return false; } + /** + * Setups the connection according given configuration + * + * @param Array $config + * @return bool + */ + function setup($config) + { + if ( !defined('IS_INSTALL') ) { + $this->debugMode = $this->Application->isDebugMode(); + } + + return $this->Connect( + $config['Database']['DBHost'], + $config['Database']['DBUser'], + $config['Database']['DBUserPassword'], + $config['Database']['DBName'] + ); + } + function ReConnect($force_new = false) { $retry_count = 0; @@ -310,7 +350,7 @@ function showError($sql = '', $key_field = null, $no_debug = false) { static $retry_count = 0; - + $func = $this->getMetaFunction('errno'); if (!$this->connectionID) { @@ -595,7 +635,7 @@ $first_cell = substr($first_cell, 0, 50) . ' ...'; } - $debugger->profileFinish('sql_'.$queryID, null, null, $this->getAffectedRows(), $first_cell, $this->_queryCount, $this->nextQueryCachable); + $debugger->profileFinish('sql_'.$queryID, null, null, $this->getAffectedRows(), $first_cell, $this->_queryCount, $this->nextQueryCachable, $this->serverIndex); $debugger->profilerAddTotal('sql', 'sql_'.$queryID); $this->nextQueryCachable = false; } @@ -607,7 +647,7 @@ else { // set 2nd checkpoint: begin if ($profileSQLs) { - $debugger->profileFinish('sql_'.$queryID, null, null, $this->getAffectedRows(), null, $this->_queryCount, $this->nextQueryCachable); + $debugger->profileFinish('sql_'.$queryID, null, null, $this->getAffectedRows(), null, $this->_queryCount, $this->nextQueryCachable, $this->serverIndex); $debugger->profilerAddTotal('sql', 'sql_'.$queryID); $this->nextQueryCachable = false; } @@ -828,4 +868,63 @@ { return Array ('time' => $this->_queryTime, 'count' => $this->_queryCount); } + + /** + * Get status information from SHOW STATUS in an associative array + * + * @param string $which + * @return Array + */ + function getStatus($which = '%') + { + $status = Array (); + $records = $this->Query('SHOW STATUS LIKE "' . $which . '"'); + + foreach ($records as $record) { + $status[ $record['Variable_name'] ] = $record['Value']; + } + + return $status; + } + + /** + * Get slave replication lag. It will only work if the DB user has the PROCESS privilege. + * + * @return int + */ + function getSlaveLag() + { + // don't use kDBConnection::Query method, since it will create an array of all server processes + $rs = mysql_query('SHOW PROCESSLIST', $this->connectionID); + + $skip_states = Array ( + 'Waiting for master to send event', + 'Connecting to master', + 'Queueing master event to the relay log', + 'Waiting for master update', + 'Requesting binlog dump', + ); + + // find slave SQL thread + while ( $row = mysql_fetch_array($rs) ) { + if ( $row['User'] == 'system user' && !in_array($row['State'], $skip_states) ) { + // this is it, return the time (except -ve) + return $row['Time'] > 0x7fffffff ? false : $row['Time']; + } + } + + return false; + } + + /** + * Create new instance of object + * + * @return kDBConnection + */ + function &makeClass($dbType, $errorHandler = '', $server_index = 0) + { + $object = new kDBConnection($dbType, $errorHandler, $server_index); + + return $object; + } } \ No newline at end of file Index: kernel/db/db_load_balancer.php =================================================================== --- kernel/db/db_load_balancer.php (revision 0) +++ kernel/db/db_load_balancer.php (revision 0) @@ -0,0 +1,569 @@ +dbType = $dbType; + $this->errorHandler = $errorHandler; + + $this->DBClusterTimeout *= 1e6; // convert to miliseconds + } + + /** + * Setups load balancer according given configuration + * + * @param Array $config + * @return bool + */ + function setup($config) + { + $this->servers = Array (); + + $this->servers[0] = Array ( + 'DBHost' => $config['Database']['DBHost'], + 'DBUser' => $config['Database']['DBUser'], + 'DBUserPassword' => $config['Database']['DBUserPassword'], + 'DBName' => $config['Database']['DBName'], + 'DBLoad' => 0, + ); + + if ( isset($config['Databases']) ) { + $this->servers = array_merge($this->servers, $config['Databases']); + } + + foreach ($this->servers as $server_index => $server_setting) { + $this->serverLoads[$server_index] = $server_setting['DBLoad']; + } + } + + /** + * Returns connection index to master database + * + * @return int + */ + function getMasterIndex() + { + return 0; + } + + /** + * Returns connection index to slave database. This takes into account load ratios and lag times. + * Side effect: opens connections to databases + * + * @return int + */ + function getSlaveIndex() + { + if ( count($this->servers) == 1 || $this->Application->isAdmin ) { + // skip the load balancing if there's only one server OR in admin console + return 0; + } + elseif ( $this->slaveIndex !== false ) { + // shortcut if generic reader exists already + return $this->slaveIndex; + } + + $total_elapsed = 0; + $non_error_loads = $this->serverLoads; + $i = $found = $lagged_slave_mode = false; + + // first try quickly looking through the available servers for a server that meets our criteria + do { + $current_loads = $non_error_loads; + $overloaded_servers = $total_threads_connected = 0; + + while ($current_loads) { + if ( $lagged_slave_mode ) { + // when all slave servers are too lagged, then ignore lag and pick random server + $i = $this->pickRandom($current_loads); + } + else { + $i = $this->getRandomNonLagged($current_loads); + + if ( $i === false && $current_loads ) { + // all slaves lagged -> pick random lagged slave then + $lagged_slave_mode = true; + $i = $this->pickRandom( $current_loads ); + } + } + + if ( $i === false ) { + // all slaves are down -> use master as a slave + $this->slaveIndex = $this->getMasterIndex(); + + return $this->slaveIndex; + } + + $conn =& $this->openConnection($i); + + if ( !$conn ) { + unset($non_error_loads[$i], $current_loads[$i]); + continue; + } + + // Perform post-connection backoff + $threshold = isset($this->servers[$i]['DBMaxThreads']) ? $this->servers[$i]['DBMaxThreads'] : false; + $backoff = $this->postConnectionBackoff($conn, $threshold); + + if ( $backoff ) { + // post-connection overload, don't use this server for now + $total_threads_connected += $backoff; + $overloaded_servers++; + + unset( $current_loads[$i] ); + } + else { + // return this server + break 2; + } + } + + // no server found yet + $i = false; + + // if all servers were down, quit now + if ( !$non_error_loads ) { + break; + } + + // back off for a while + // scale the sleep time by the number of connected threads, to produce a roughly constant global poll rate + $avg_threads = $total_threads_connected / $overloaded_servers; + + usleep($this->DBAvgStatusPoll * $avg_threads); + $total_elapsed += $this->DBAvgStatusPoll * $avg_threads; + } while ( $total_elapsed < $this->DBClusterTimeout ); + + if ( $i !== false ) { + // slave connection successful + if ( $this->slaveIndex <= 0 && $this->serverLoads[$i] > 0 && $i !== false ) { + $this->slaveIndex = $i; + } + } + + return $i; + } + + /** + * Returns random non-lagged server + * + * @param Array $loads + * @return int + */ + function getRandomNonLagged($loads) + { + // unset excessively lagged servers + $lags = $this->getLagTimes(); + + foreach ($lags as $i => $lag) { + if ( $i != 0 && isset($this->servers[$i]['DBMaxLag']) ) { + if ( $lag === false ) { + unset( $loads[$i] ); // server is not replicating + } + elseif ( $lag > $this->servers[$i]['DBMaxLag'] ) { + unset( $loads[$i] ); // server is excessively lagged + } + } + } + + // find out if all the slaves with non-zero load are lagged + if ( !$loads || array_sum($loads) == 0 ) { + return false; + } + + // return a random representative of the remainder + return $this->pickRandom($loads); + } + + /** + * Select an element from an array of non-normalised probabilities + * + * @param Array $weights + * @return int + */ + function pickRandom($weights) + { + if ( !is_array($weights) || !$weights ) { + return false; + } + + $sum = array_sum($weights); + + if ( $sum == 0 ) { + return false; + } + + $max = mt_getrandmax(); + $rand = mt_rand(0, $max) / $max * $sum; + + $sum = 0; + + foreach ($weights as $index => $weight) { + $sum += $weight; + + if ( $sum >= $rand ) { + break; + } + } + + return $index; + } + + /** + * Get lag time for each server + * Results are cached for a short time in memcached, and indefinitely in the process cache + */ + function getLagTimes( $wiki = false ) + { + if ( $this->serverLagTimes ) { + return $this->serverLagTimes; + } + + $expiry = 5; + $request_rate = 10; + + $cache_key = 'lag_times:' . $this->servers[0]['DBHost']; + $times = $this->Application->getCache($cache_key); + + if ( $times ) { + // randomly recache with probability rising over $expiry + $elapsed = adodb_mktime() - $times['timestamp']; + $chance = max(0, ($expiry - $elapsed) * $request_rate); + + if ( mt_rand(0, $chance) != 0 ) { + unset( $times['timestamp'] ); + $this->serverLagTimes = $times; + + return $times; + } + } + + // cache key missing or expired + $times = Array(); + + foreach ($this->servers as $index => $server) { + if ($index == 0) { + $times[$index] = 0; // master + } + else { + $conn =& $this->openConnection($index); + + if ($conn !== false) { + $times[$index] = $conn->getSlaveLag(); + } + } + } + + // add a timestamp key so we know when it was cached + $times['timestamp'] = adodb_mktime(); + $this->Application->setCache($cache_key, $times, $expiry); + + // but don't give the timestamp to the caller + unset($times['timestamp']); + $this->serverLagTimes = $times; + + return $this->serverLagTimes; + } + + /** + * Determines whatever server should not be used, even, when connection was made + * + * @param kDBConnection $conn + * @param int $threshold + * @return int + */ + function postConnectionBackoff(&$conn, $threshold) + { + if ( !$threshold ) { + return 0; + } + + $status = $conn->getStatus('Thread%'); + + return $status['Threads_running'] > $threshold ? $status['Threads_connected'] : 0; + } + + /** + * Open a connection to the server given by the specified index + * Index must be an actual index into the array. + * If the server is already open, returns it. + * + * On error, returns false. + * + * @param integer $i Server index + * @return kDBConnection + */ + function &openConnection($i) + { + if ( isset($this->connections[$i]) ) { + $conn =& $this->connections[$i]; + } + else { + $server = $this->servers[$i]; + $server['serverIndex'] = $i; + $conn =& $this->reallyOpenConnection($server); + + if ( $conn->connectionOpened ) { + $this->connections[$i] =&$conn; + $this->lastUsedIndex = $i; + } + else { + $conn = false; + } + } + + return $conn; + } + + /** + * Really opens a connection. + * Returns a database object whether or not the connection was successful. + * + * @return kDBConnection + */ + function &reallyOpenConnection($server) + { + // TODO: in v 5.2.0+ makeClass has 2 arguments ($pseudo, $argumuments) + $db =& $this->Application->makeClass( 'kDBConnection', $this->dbType, $this->errorHandler, $server['serverIndex'] ); + + $db->debugMode = $this->Application->isDebugMode(); + $db->Connect($server['DBHost'], $server['DBUser'], $server['DBUserPassword'], $this->servers[0]['DBName'], true, true); + + return $db; + } + + /** + * Returns first field of first line of recordset if query ok or false otherwise + * + * @param string $sql + * @param int $offset + * @return string + */ + function GetOne($sql, $offset = 0) + { + $conn =& $this->chooseConnection($sql); + + return $conn->GetOne($sql, $offset); + } + + /** + * Returns first row of recordset if query ok, false otherwise + * + * @param string $sql + * @param int $offset + * @return Array + */ + function GetRow($sql, $offset = 0) + { + $conn =& $this->chooseConnection($sql); + + return $conn->GetRow($sql, $offset); + } + + /** + * Returns 1st column of recordset as one-dimensional array or false otherwise + * Optional parameter $key_field can be used to set field name to be used as resulting array key + * + * @param string $sql + * @param string $key_field + * @return Array + */ + function GetCol($sql, $key_field = null) + { + $conn =& $this->chooseConnection($sql); + + return $conn->GetCol($sql, $key_field); + } + + /** + * Queries db with $sql query supplied and returns rows selected if any, false + * otherwise. Optional parameter $key_field allows to set one of the query fields + * value as key in string array. + * + * @param string $sql + * @param string $key_field + * @return Array + */ + function Query($sql, $key_field = null, $no_debug = false) + { + $conn =& $this->chooseConnection($sql); + + return $conn->Query($sql, $key_field, $no_debug); + } + + function ChangeQuery($sql) + { + $conn =& $this->chooseConnection($sql); + + return $conn->ChangeQuery($sql); + } + + /** + * If it's a string, adds quotes and backslashes (only work since PHP 4.3.0) + * Otherwise returns as-is + * + * @param mixed $string + * + * @return string + */ + function qstr($string) + { + $conn =& $this->openConnection($this->lastUsedIndex); + + return $conn->qstr($string); + } + + /** + * Performs insert of given data (useful with small number of queries) + * or stores it to perform multiple insert later (useful with large number of queries) + * + * @param Array $fields_hash + * @param string $table + * @param string $type + * @param bool $insert_now + * @return bool + */ + function doInsert($fields_hash, $table, $type = 'INSERT', $insert_now = true) + { + $conn =& $this->openConnection( $this->getMasterIndex() ); + + return $conn->doInsert($fields_hash, $table, $type, $insert_now); + } + + function doUpdate($fields_hash, $table, $key_clause) + { + $conn =& $this->openConnection( $this->getMasterIndex() ); + + return $conn->doUpdate($fields_hash, $table, $key_clause); + } + + public function __call($name, $arguments) + { + $conn =& $this->openConnection($this->lastUsedIndex); + + return call_user_func_array( Array (&$conn, $name), $arguments ); + } + + /** + * Returns appropriate connection based on sql + * + * @param string $sql + * @return kDBConnection + */ + function &chooseConnection($sql) + { + if ( $this->nextQueryFromMaster ) { + $this->nextQueryFromMaster = false; + $index = $this->getMasterIndex(); + } + else { + $sid = isset($this->Application->Session) ? $this->Application->GetSID() : '9999999999999999999999'; + + if ( preg_match('/(^[ \t\r\n]*(ALTER|CREATE|DROP|RENAME|DELETE|DO|INSERT|LOAD|REPLACE|TRUNCATE|UPDATE))|ses_' . $sid . '/', $sql) ) { + $index = $this->getMasterIndex(); + } + else { + $index = $this->getSlaveIndex(); + } + } + + $this->lastUsedIndex = $index; + $conn =& $this->openConnection($index); + + return $conn; + } + + + /** + * Create new instance of object + * + * @return kDBLoadBalancer + */ + function &makeClass($dbType, $errorHandler = '') + { + $object = new kDBLoadBalancer($dbType, $errorHandler); + + return $object; + } +} Property changes on: kernel\db\db_load_balancer.php ___________________________________________________________________ Added: svn:keywords + Id Added: svn:eol-style + LF Index: kernel/globals.php =================================================================== --- kernel/globals.php (revision 14467) +++ kernel/globals.php (working copy) @@ -156,6 +156,10 @@ require($file); if ($parse_section) { + if ( isset($_CONFIG['Database']['LoadBalancing']) && $_CONFIG['Database']['LoadBalancing'] ) { + require FULL_PATH . DIRECTORY_SEPARATOR . 'system' . DIRECTORY_SEPARATOR . 'db_servers.php'; + } + return $_CONFIG; } Index: kernel/startup.php =================================================================== --- kernel/startup.php (revision 14467) +++ kernel/startup.php (working copy) @@ -165,8 +165,9 @@ $includes = Array( KERNEL_PATH . '/application.php', FULL_PATH . APPLICATION_PATH, + KERNEL_PATH . "/kbase.php", KERNEL_PATH . '/db/db_connection.php', - KERNEL_PATH . "/kbase.php", + KERNEL_PATH . '/db/db_load_balancer.php', KERNEL_PATH . '/utility/event.php', KERNEL_PATH . "/utility/factory.php", KERNEL_PATH . "/languages/phrases_cache.php", Index: kernel/utility/cache.php =================================================================== --- kernel/utility/cache.php (revision 14467) +++ kernel/utility/cache.php (working copy) @@ -90,6 +90,7 @@ $handler_class = $GLOBALS['vars']['CacheHandler'] . 'CacheHandler'; } else { + $this->Application->Conn->nextQueryFromMaster = true; $handler_class = $this->Application->ConfigValue('CacheHandler') . 'CacheHandler'; } Index: kernel/utility/debugger.php =================================================================== --- kernel/utility/debugger.php (revision 14467) +++ kernel/utility/debugger.php (working copy) @@ -896,8 +896,14 @@ $trace_count = count($trace_results); $i = 0; while ($i < $trace_count) { + if ( !isset($trace_results[$i]['file']) ) { + $i++; + continue; + } + $trace_file = basename($trace_results[$i]['file']); - if ($trace_file != 'db_connection.php' && $trace_file != 'adodb.inc.php') { + + if ($trace_file != 'db_connection.php' && $trace_file != 'db_load_balancer.php' && $trace_file != 'adodb.inc.php') { break; } $i++; @@ -954,7 +960,11 @@ $this->ProfilerData[$key]['subtitle'] = 'cachable'; } - if (array_key_exists('prefix_special', $this->ProfilerData[$key])) { + if ($func_arguments[7]) { + $additional[] = Array ('name' => 'Server #', 'value' => $func_arguments[7]); + } + + if ( isset($this->ProfilerData[$key]['prefix_special']) && $this->ProfilerData[$key]['prefix_special'] ) { $additional[] = Array ('name' => 'PrefixSpecial', 'value' => $this->ProfilerData[$key]['prefix_special']); }