Merge branch 'MDL-53019' of git://github.com/stronk7/moodle
[moodle.git] / lib / dml / pgsql_native_moodle_database.php
1 <?php
2 // This file is part of Moodle - http://moodle.org/
3 //
4 // Moodle is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // Moodle is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with Moodle.  If not, see <http://www.gnu.org/licenses/>.
17 /**
18  * Native pgsql class representing moodle database interface.
19  *
20  * @package    core_dml
21  * @copyright  2008 Petr Skoda (http://skodak.org)
22  * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
23  */
25 defined('MOODLE_INTERNAL') || die();
27 require_once(__DIR__.'/moodle_database.php');
28 require_once(__DIR__.'/pgsql_native_moodle_recordset.php');
29 require_once(__DIR__.'/pgsql_native_moodle_temptables.php');
31 /**
32  * Native pgsql class representing moodle database interface.
33  *
34  * @package    core_dml
35  * @copyright  2008 Petr Skoda (http://skodak.org)
36  * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
37  */
38 class pgsql_native_moodle_database extends moodle_database {
40     /** @var resource $pgsql database resource */
41     protected $pgsql     = null;
43     protected $last_error_reporting; // To handle pgsql driver default verbosity
45     /** @var bool savepoint hack for MDL-35506 - workaround for automatic transaction rollback on error */
46     protected $savepointpresent = false;
48     /**
49      * Detects if all needed PHP stuff installed.
50      * Note: can be used before connect()
51      * @return mixed true if ok, string if something
52      */
53     public function driver_installed() {
54         if (!extension_loaded('pgsql')) {
55             return get_string('pgsqlextensionisnotpresentinphp', 'install');
56         }
57         return true;
58     }
60     /**
61      * Returns database family type - describes SQL dialect
62      * Note: can be used before connect()
63      * @return string db family name (mysql, postgres, mssql, oracle, etc.)
64      */
65     public function get_dbfamily() {
66         return 'postgres';
67     }
69     /**
70      * Returns more specific database driver type
71      * Note: can be used before connect()
72      * @return string db type mysqli, pgsql, oci, mssql, sqlsrv
73      */
74     protected function get_dbtype() {
75         return 'pgsql';
76     }
78     /**
79      * Returns general database library name
80      * Note: can be used before connect()
81      * @return string db type pdo, native
82      */
83     protected function get_dblibrary() {
84         return 'native';
85     }
87     /**
88      * Returns localised database type name
89      * Note: can be used before connect()
90      * @return string
91      */
92     public function get_name() {
93         return get_string('nativepgsql', 'install');
94     }
96     /**
97      * Returns localised database configuration help.
98      * Note: can be used before connect()
99      * @return string
100      */
101     public function get_configuration_help() {
102         return get_string('nativepgsqlhelp', 'install');
103     }
105     /**
106      * Connect to db
107      * Must be called before other methods.
108      * @param string $dbhost The database host.
109      * @param string $dbuser The database username.
110      * @param string $dbpass The database username's password.
111      * @param string $dbname The name of the database being connected to.
112      * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used
113      * @param array $dboptions driver specific options
114      * @return bool true
115      * @throws dml_connection_exception if error
116      */
117     public function connect($dbhost, $dbuser, $dbpass, $dbname, $prefix, array $dboptions=null) {
118         if ($prefix == '' and !$this->external) {
119             //Enforce prefixes for everybody but mysql
120             throw new dml_exception('prefixcannotbeempty', $this->get_dbfamily());
121         }
123         $driverstatus = $this->driver_installed();
125         if ($driverstatus !== true) {
126             throw new dml_exception('dbdriverproblem', $driverstatus);
127         }
129         $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions);
131         $pass = addcslashes($this->dbpass, "'\\");
133         // Unix socket connections should have lower overhead
134         if (!empty($this->dboptions['dbsocket']) and ($this->dbhost === 'localhost' or $this->dbhost === '127.0.0.1')) {
135             $connection = "user='$this->dbuser' password='$pass' dbname='$this->dbname'";
136             if (strpos($this->dboptions['dbsocket'], '/') !== false) {
137                 $connection = $connection." host='".$this->dboptions['dbsocket']."'";
138                 if (!empty($this->dboptions['dbport'])) {
139                     // Somehow non-standard port is important for sockets - see MDL-44862.
140                     $connection = $connection." port ='".$this->dboptions['dbport']."'";
141                 }
142             }
143         } else {
144             $this->dboptions['dbsocket'] = '';
145             if (empty($this->dbname)) {
146                 // probably old style socket connection - do not add port
147                 $port = "";
148             } else if (empty($this->dboptions['dbport'])) {
149                 $port = "port ='5432'";
150             } else {
151                 $port = "port ='".$this->dboptions['dbport']."'";
152             }
153             $connection = "host='$this->dbhost' $port user='$this->dbuser' password='$pass' dbname='$this->dbname'";
154         }
156         // ALTER USER and ALTER DATABASE are overridden by these settings.
157         $options = array('--client_encoding=utf8', '--standard_conforming_strings=on');
158         // Select schema if specified, otherwise the first one wins.
159         if (!empty($this->dboptions['dbschema'])) {
160             $options[] = "-c search_path=" . addcslashes($this->dboptions['dbschema'], "'\\");
161         }
163         $connection .= " options='".implode(' ', $options)."'";
165         ob_start();
166         if (empty($this->dboptions['dbpersist'])) {
167             $this->pgsql = pg_connect($connection, PGSQL_CONNECT_FORCE_NEW);
168         } else {
169             $this->pgsql = pg_pconnect($connection, PGSQL_CONNECT_FORCE_NEW);
170         }
171         $dberr = ob_get_contents();
172         ob_end_clean();
174         $status = pg_connection_status($this->pgsql);
176         if ($status === false or $status === PGSQL_CONNECTION_BAD) {
177             $this->pgsql = null;
178             throw new dml_connection_exception($dberr);
179         }
181         // Connection stabilised and configured, going to instantiate the temptables controller
182         $this->temptables = new pgsql_native_moodle_temptables($this);
184         return true;
185     }
187     /**
188      * Close database connection and release all resources
189      * and memory (especially circular memory references).
190      * Do NOT use connect() again, create a new instance if needed.
191      */
192     public function dispose() {
193         parent::dispose(); // Call parent dispose to write/close session and other common stuff before closing connection
194         if ($this->pgsql) {
195             pg_close($this->pgsql);
196             $this->pgsql = null;
197         }
198     }
201     /**
202      * Called before each db query.
203      * @param string $sql
204      * @param array array of parameters
205      * @param int $type type of query
206      * @param mixed $extrainfo driver specific extra information
207      * @return void
208      */
209     protected function query_start($sql, array $params=null, $type, $extrainfo=null) {
210         parent::query_start($sql, $params, $type, $extrainfo);
211         // pgsql driver tents to send debug to output, we do not need that ;-)
212         $this->last_error_reporting = error_reporting(0);
213     }
215     /**
216      * Called immediately after each db query.
217      * @param mixed db specific result
218      * @return void
219      */
220     protected function query_end($result) {
221         // reset original debug level
222         error_reporting($this->last_error_reporting);
223         try {
224             parent::query_end($result);
225             if ($this->savepointpresent and $this->last_type != SQL_QUERY_AUX and $this->last_type != SQL_QUERY_SELECT) {
226                 $res = @pg_query($this->pgsql, "RELEASE SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
227                 if ($res) {
228                     pg_free_result($res);
229                 }
230             }
231         } catch (Exception $e) {
232             if ($this->savepointpresent) {
233                 $res = @pg_query($this->pgsql, "ROLLBACK TO SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
234                 if ($res) {
235                     pg_free_result($res);
236                 }
237             }
238             throw $e;
239         }
240     }
242     /**
243      * Returns database server info array
244      * @return array Array containing 'description' and 'version' info
245      */
246     public function get_server_info() {
247         static $info;
248         if (!$info) {
249             $this->query_start("--pg_version()", null, SQL_QUERY_AUX);
250             $info = pg_version($this->pgsql);
251             $this->query_end(true);
252         }
253         return array('description'=>$info['server'], 'version'=>$info['server']);
254     }
256     /**
257      * Returns supported query parameter types
258      * @return int bitmask of accepted SQL_PARAMS_*
259      */
260     protected function allowed_param_types() {
261         return SQL_PARAMS_DOLLAR;
262     }
264     /**
265      * Returns last error reported by database engine.
266      * @return string error message
267      */
268     public function get_last_error() {
269         return pg_last_error($this->pgsql);
270     }
272     /**
273      * Return tables in database WITHOUT current prefix.
274      * @param bool $usecache if true, returns list of cached tables.
275      * @return array of table names in lowercase and without prefix
276      */
277     public function get_tables($usecache=true) {
278         if ($usecache and $this->tables !== null) {
279             return $this->tables;
280         }
281         $this->tables = array();
282         $prefix = str_replace('_', '|_', $this->prefix);
283         $sql = "SELECT c.relname
284                   FROM pg_catalog.pg_class c
285                   JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
286                  WHERE c.relname LIKE '$prefix%' ESCAPE '|'
287                        AND c.relkind = 'r'
288                        AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())";
289         $this->query_start($sql, null, SQL_QUERY_AUX);
290         $result = pg_query($this->pgsql, $sql);
291         $this->query_end($result);
293         if ($result) {
294             while ($row = pg_fetch_row($result)) {
295                 $tablename = reset($row);
296                 if ($this->prefix !== false && $this->prefix !== '') {
297                     if (strpos($tablename, $this->prefix) !== 0) {
298                         continue;
299                     }
300                     $tablename = substr($tablename, strlen($this->prefix));
301                 }
302                 $this->tables[$tablename] = $tablename;
303             }
304             pg_free_result($result);
305         }
306         return $this->tables;
307     }
309     /**
310      * Return table indexes - everything lowercased.
311      * @param string $table The table we want to get indexes from.
312      * @return array of arrays
313      */
314     public function get_indexes($table) {
315         $indexes = array();
316         $tablename = $this->prefix.$table;
318         $sql = "SELECT i.*
319                   FROM pg_catalog.pg_indexes i
320                   JOIN pg_catalog.pg_namespace as ns ON ns.nspname = i.schemaname
321                  WHERE i.tablename = '$tablename'
322                        AND (i.schemaname = current_schema() OR ns.oid = pg_my_temp_schema())";
324         $this->query_start($sql, null, SQL_QUERY_AUX);
325         $result = pg_query($this->pgsql, $sql);
326         $this->query_end($result);
328         if ($result) {
329             while ($row = pg_fetch_assoc($result)) {
330                 if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON '.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) {
331                     continue;
332                 }
333                 if ($matches[4] === 'id') {
334                     continue;
335                 }
336                 $columns = explode(',', $matches[4]);
337                 foreach ($columns as $k=>$column) {
338                     $column = trim($column);
339                     if ($pos = strpos($column, ' ')) {
340                         // index type is separated by space
341                         $column = substr($column, 0, $pos);
342                     }
343                     $columns[$k] = $this->trim_quotes($column);
344                 }
345                 $indexes[$row['indexname']] = array('unique'=>!empty($matches[1]),
346                                               'columns'=>$columns);
347             }
348             pg_free_result($result);
349         }
350         return $indexes;
351     }
353     /**
354      * Returns detailed information about columns in table. This information is cached internally.
355      * @param string $table name
356      * @param bool $usecache
357      * @return database_column_info[] array of database_column_info objects indexed with column names
358      */
359     public function get_columns($table, $usecache=true) {
360         if ($usecache) {
361             if ($this->temptables->is_temptable($table)) {
362                 if ($data = $this->get_temp_tables_cache()->get($table)) {
363                     return $data;
364                 }
365             } else {
366                 if ($data = $this->get_metacache()->get($table)) {
367                     return $data;
368                 }
369             }
370         }
372         $structure = array();
374         $tablename = $this->prefix.$table;
376         $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef, d.adsrc
377                   FROM pg_catalog.pg_class c
378                   JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
379                   JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
380                   JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
381              LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum)
382                  WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0
383                        AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())
384               ORDER BY a.attnum";
386         $this->query_start($sql, null, SQL_QUERY_AUX);
387         $result = pg_query($this->pgsql, $sql);
388         $this->query_end($result);
390         if (!$result) {
391             return array();
392         }
393         while ($rawcolumn = pg_fetch_object($result)) {
395             $info = new stdClass();
396             $info->name = $rawcolumn->field;
397             $matches = null;
399             if ($rawcolumn->type === 'varchar') {
400                 $info->type          = 'varchar';
401                 $info->meta_type     = 'C';
402                 $info->max_length    = $rawcolumn->atttypmod - 4;
403                 $info->scale         = null;
404                 $info->not_null      = ($rawcolumn->attnotnull === 't');
405                 $info->has_default   = ($rawcolumn->atthasdef === 't');
406                 if ($info->has_default) {
407                     $parts = explode('::', $rawcolumn->adsrc);
408                     if (count($parts) > 1) {
409                         $info->default_value = reset($parts);
410                         $info->default_value = trim($info->default_value, "'");
411                     } else {
412                         $info->default_value = $rawcolumn->adsrc;
413                     }
414                 } else {
415                     $info->default_value = null;
416                 }
417                 $info->primary_key   = false;
418                 $info->binary        = false;
419                 $info->unsigned      = null;
420                 $info->auto_increment= false;
421                 $info->unique        = null;
423             } else if (preg_match('/int(\d)/i', $rawcolumn->type, $matches)) {
424                 $info->type = 'int';
425                 if (strpos($rawcolumn->adsrc, 'nextval') === 0) {
426                     $info->primary_key   = true;
427                     $info->meta_type     = 'R';
428                     $info->unique        = true;
429                     $info->auto_increment= true;
430                     $info->has_default   = false;
431                 } else {
432                     $info->primary_key   = false;
433                     $info->meta_type     = 'I';
434                     $info->unique        = null;
435                     $info->auto_increment= false;
436                     $info->has_default   = ($rawcolumn->atthasdef === 't');
437                 }
438                 // Return number of decimals, not bytes here.
439                 if ($matches[1] >= 8) {
440                     $info->max_length = 18;
441                 } else if ($matches[1] >= 4) {
442                     $info->max_length = 9;
443                 } else if ($matches[1] >= 2) {
444                     $info->max_length = 4;
445                 } else if ($matches[1] >= 1) {
446                     $info->max_length = 2;
447                 } else {
448                     $info->max_length = 0;
449                 }
450                 $info->scale         = null;
451                 $info->not_null      = ($rawcolumn->attnotnull === 't');
452                 if ($info->has_default) {
453                     // PG 9.5+ uses ::<TYPE> syntax for some defaults.
454                     $parts = explode('::', $rawcolumn->adsrc);
455                     if (count($parts) > 1) {
456                         $info->default_value = reset($parts);
457                     } else {
458                         $info->default_value = $rawcolumn->adsrc;
459                     }
460                     $info->default_value = trim($info->default_value, "()'");
461                 } else {
462                     $info->default_value = null;
463                 }
464                 $info->binary        = false;
465                 $info->unsigned      = false;
467             } else if ($rawcolumn->type === 'numeric') {
468                 $info->type = $rawcolumn->type;
469                 $info->meta_type     = 'N';
470                 $info->primary_key   = false;
471                 $info->binary        = false;
472                 $info->unsigned      = null;
473                 $info->auto_increment= false;
474                 $info->unique        = null;
475                 $info->not_null      = ($rawcolumn->attnotnull === 't');
476                 $info->has_default   = ($rawcolumn->atthasdef === 't');
477                 if ($info->has_default) {
478                     // PG 9.5+ uses ::<TYPE> syntax for some defaults.
479                     $parts = explode('::', $rawcolumn->adsrc);
480                     if (count($parts) > 1) {
481                         $info->default_value = reset($parts);
482                     } else {
483                         $info->default_value = $rawcolumn->adsrc;
484                     }
485                     $info->default_value = trim($info->default_value, "()'");
486                 } else {
487                     $info->default_value = null;
488                 }
489                 $info->max_length    = $rawcolumn->atttypmod >> 16;
490                 $info->scale         = ($rawcolumn->atttypmod & 0xFFFF) - 4;
492             } else if (preg_match('/float(\d)/i', $rawcolumn->type, $matches)) {
493                 $info->type = 'float';
494                 $info->meta_type     = 'N';
495                 $info->primary_key   = false;
496                 $info->binary        = false;
497                 $info->unsigned      = null;
498                 $info->auto_increment= false;
499                 $info->unique        = null;
500                 $info->not_null      = ($rawcolumn->attnotnull === 't');
501                 $info->has_default   = ($rawcolumn->atthasdef === 't');
502                 if ($info->has_default) {
503                     // PG 9.5+ uses ::<TYPE> syntax for some defaults.
504                     $parts = explode('::', $rawcolumn->adsrc);
505                     if (count($parts) > 1) {
506                         $info->default_value = reset($parts);
507                     } else {
508                         $info->default_value = $rawcolumn->adsrc;
509                     }
510                     $info->default_value = trim($info->default_value, "()'");
511                 } else {
512                     $info->default_value = null;
513                 }
514                 // just guess expected number of deciaml places :-(
515                 if ($matches[1] == 8) {
516                     // total 15 digits
517                     $info->max_length = 8;
518                     $info->scale      = 7;
519                 } else {
520                     // total 6 digits
521                     $info->max_length = 4;
522                     $info->scale      = 2;
523                 }
525             } else if ($rawcolumn->type === 'text') {
526                 $info->type          = $rawcolumn->type;
527                 $info->meta_type     = 'X';
528                 $info->max_length    = -1;
529                 $info->scale         = null;
530                 $info->not_null      = ($rawcolumn->attnotnull === 't');
531                 $info->has_default   = ($rawcolumn->atthasdef === 't');
532                 if ($info->has_default) {
533                     $parts = explode('::', $rawcolumn->adsrc);
534                     if (count($parts) > 1) {
535                         $info->default_value = reset($parts);
536                         $info->default_value = trim($info->default_value, "'");
537                     } else {
538                         $info->default_value = $rawcolumn->adsrc;
539                     }
540                 } else {
541                     $info->default_value = null;
542                 }
543                 $info->primary_key   = false;
544                 $info->binary        = false;
545                 $info->unsigned      = null;
546                 $info->auto_increment= false;
547                 $info->unique        = null;
549             } else if ($rawcolumn->type === 'bytea') {
550                 $info->type          = $rawcolumn->type;
551                 $info->meta_type     = 'B';
552                 $info->max_length    = -1;
553                 $info->scale         = null;
554                 $info->not_null      = ($rawcolumn->attnotnull === 't');
555                 $info->has_default   = false;
556                 $info->default_value = null;
557                 $info->primary_key   = false;
558                 $info->binary        = true;
559                 $info->unsigned      = null;
560                 $info->auto_increment= false;
561                 $info->unique        = null;
563             }
565             $structure[$info->name] = new database_column_info($info);
566         }
568         pg_free_result($result);
570         if ($usecache) {
571             if ($this->temptables->is_temptable($table)) {
572                 $this->get_temp_tables_cache()->set($table, $structure);
573             } else {
574                 $this->get_metacache()->set($table, $structure);
575             }
576         }
578         return $structure;
579     }
581     /**
582      * Normalise values based in RDBMS dependencies (booleans, LOBs...)
583      *
584      * @param database_column_info $column column metadata corresponding with the value we are going to normalise
585      * @param mixed $value value we are going to normalise
586      * @return mixed the normalised value
587      */
588     protected function normalise_value($column, $value) {
589         $this->detect_objects($value);
591         if (is_bool($value)) { // Always, convert boolean to int
592             $value = (int)$value;
594         } else if ($column->meta_type === 'B') {
595             if (!is_null($value)) {
596                 // standard_conforming_strings must be enabled, otherwise pg_escape_bytea() will double escape
597                 // \ and produce data errors.  This is set on the connection.
598                 $value = pg_escape_bytea($this->pgsql, $value);
599             }
601         } else if ($value === '') {
602             if ($column->meta_type === 'I' or $column->meta_type === 'F' or $column->meta_type === 'N') {
603                 $value = 0; // prevent '' problems in numeric fields
604             }
605         }
606         return $value;
607     }
609     /**
610      * Is db in unicode mode?
611      * @return bool
612      */
613     public function setup_is_unicodedb() {
614         // Get PostgreSQL server_encoding value
615         $sql = "SHOW server_encoding";
616         $this->query_start($sql, null, SQL_QUERY_AUX);
617         $result = pg_query($this->pgsql, $sql);
618         $this->query_end($result);
620         if (!$result) {
621             return false;
622         }
623         $rawcolumn = pg_fetch_object($result);
624         $encoding = $rawcolumn->server_encoding;
625         pg_free_result($result);
627         return (strtoupper($encoding) == 'UNICODE' || strtoupper($encoding) == 'UTF8');
628     }
630     /**
631      * Do NOT use in code, to be used by database_manager only!
632      * @param string|array $sql query
633      * @param array|null $tablenames an array of xmldb table names affected by this request.
634      * @return bool true
635      * @throws ddl_change_structure_exception A DDL specific exception is thrown for any errors.
636      */
637     public function change_database_structure($sql, $tablenames = null) {
638         $this->get_manager(); // Includes DDL exceptions classes ;-)
639         if (is_array($sql)) {
640             $sql = implode("\n;\n", $sql);
641         }
642         if (!$this->is_transaction_started()) {
643             // It is better to do all or nothing, this helps with recovery...
644             $sql = "BEGIN ISOLATION LEVEL SERIALIZABLE;\n$sql\n; COMMIT";
645         }
647         try {
648             $this->query_start($sql, null, SQL_QUERY_STRUCTURE);
649             $result = pg_query($this->pgsql, $sql);
650             $this->query_end($result);
651             pg_free_result($result);
652         } catch (ddl_change_structure_exception $e) {
653             if (!$this->is_transaction_started()) {
654                 $result = @pg_query($this->pgsql, "ROLLBACK");
655                 @pg_free_result($result);
656             }
657             $this->reset_caches($tablenames);
658             throw $e;
659         }
661         $this->reset_caches($tablenames);
662         return true;
663     }
665     /**
666      * Execute general sql query. Should be used only when no other method suitable.
667      * Do NOT use this to make changes in db structure, use database_manager methods instead!
668      * @param string $sql query
669      * @param array $params query parameters
670      * @return bool true
671      * @throws dml_exception A DML specific exception is thrown for any errors.
672      */
673     public function execute($sql, array $params=null) {
674         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
676         if (strpos($sql, ';') !== false) {
677             throw new coding_exception('moodle_database::execute() Multiple sql statements found or bound parameters not used properly in query!');
678         }
680         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
681         $result = pg_query_params($this->pgsql, $sql, $params);
682         $this->query_end($result);
684         pg_free_result($result);
685         return true;
686     }
688     /**
689      * Get a number of records as a moodle_recordset using a SQL statement.
690      *
691      * Since this method is a little less readable, use of it should be restricted to
692      * code where it's possible there might be large datasets being returned.  For known
693      * small datasets use get_records_sql - it leads to simpler code.
694      *
695      * The return type is like:
696      * @see function get_recordset.
697      *
698      * @param string $sql the SQL select query to execute.
699      * @param array $params array of sql parameters
700      * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
701      * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
702      * @return moodle_recordset instance
703      * @throws dml_exception A DML specific exception is thrown for any errors.
704      */
705     public function get_recordset_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
707         list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
709         if ($limitfrom or $limitnum) {
710             if ($limitnum < 1) {
711                 $limitnum = "ALL";
712             } else if (PHP_INT_MAX - $limitnum < $limitfrom) {
713                 // this is a workaround for weird max int problem
714                 $limitnum = "ALL";
715             }
716             $sql .= " LIMIT $limitnum OFFSET $limitfrom";
717         }
719         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
721         $this->query_start($sql, $params, SQL_QUERY_SELECT);
722         $result = pg_query_params($this->pgsql, $sql, $params);
723         $this->query_end($result);
725         return $this->create_recordset($result);
726     }
728     protected function create_recordset($result) {
729         return new pgsql_native_moodle_recordset($result);
730     }
732     /**
733      * Get a number of records as an array of objects using a SQL statement.
734      *
735      * Return value is like:
736      * @see function get_records.
737      *
738      * @param string $sql the SQL select query to execute. The first column of this SELECT statement
739      *   must be a unique value (usually the 'id' field), as it will be used as the key of the
740      *   returned array.
741      * @param array $params array of sql parameters
742      * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
743      * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
744      * @return array of objects, or empty array if no records were found
745      * @throws dml_exception A DML specific exception is thrown for any errors.
746      */
747     public function get_records_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
749         list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
751         if ($limitfrom or $limitnum) {
752             if ($limitnum < 1) {
753                 $limitnum = "ALL";
754             } else if (PHP_INT_MAX - $limitnum < $limitfrom) {
755                 // this is a workaround for weird max int problem
756                 $limitnum = "ALL";
757             }
758             $sql .= " LIMIT $limitnum OFFSET $limitfrom";
759         }
761         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
762         $this->query_start($sql, $params, SQL_QUERY_SELECT);
763         $result = pg_query_params($this->pgsql, $sql, $params);
764         $this->query_end($result);
766         // find out if there are any blobs
767         $numfields = pg_num_fields($result);
768         $blobs = array();
769         for ($i = 0; $i < $numfields; $i++) {
770             $type = pg_field_type($result, $i);
771             if ($type == 'bytea') {
772                 $blobs[] = pg_field_name($result, $i);
773             }
774         }
776         $rows = pg_fetch_all($result);
777         pg_free_result($result);
779         $return = array();
780         if ($rows) {
781             foreach ($rows as $row) {
782                 $id = reset($row);
783                 if ($blobs) {
784                     foreach ($blobs as $blob) {
785                         $row[$blob] = ($row[$blob] !== null ? pg_unescape_bytea($row[$blob]) : null);
786                     }
787                 }
788                 if (isset($return[$id])) {
789                     $colname = key($row);
790                     debugging("Did you remember to make the first column something unique in your call to get_records? Duplicate value '$id' found in column '$colname'.", DEBUG_DEVELOPER);
791                 }
792                 $return[$id] = (object)$row;
793             }
794         }
796         return $return;
797     }
799     /**
800      * Selects records and return values (first field) as an array using a SQL statement.
801      *
802      * @param string $sql The SQL query
803      * @param array $params array of sql parameters
804      * @return array of values
805      * @throws dml_exception A DML specific exception is thrown for any errors.
806      */
807     public function get_fieldset_sql($sql, array $params=null) {
808         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
810         $this->query_start($sql, $params, SQL_QUERY_SELECT);
811         $result = pg_query_params($this->pgsql, $sql, $params);
812         $this->query_end($result);
814         $return = pg_fetch_all_columns($result, 0);
816         if (pg_field_type($result, 0) == 'bytea') {
817             foreach ($return as $key => $value) {
818                 $return[$key] = ($value === null ? $value : pg_unescape_bytea($value));
819             }
820         }
822         pg_free_result($result);
824         return $return;
825     }
827     /**
828      * Insert new record into database, as fast as possible, no safety checks, lobs not supported.
829      * @param string $table name
830      * @param mixed $params data record as object or array
831      * @param bool $returnit return it of inserted record
832      * @param bool $bulk true means repeated inserts expected
833      * @param bool $customsequence true if 'id' included in $params, disables $returnid
834      * @return bool|int true or new id
835      * @throws dml_exception A DML specific exception is thrown for any errors.
836      */
837     public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) {
838         if (!is_array($params)) {
839             $params = (array)$params;
840         }
842         $returning = "";
844         if ($customsequence) {
845             if (!isset($params['id'])) {
846                 throw new coding_exception('moodle_database::insert_record_raw() id field must be specified if custom sequences used.');
847             }
848             $returnid = false;
849         } else {
850             if ($returnid) {
851                 $returning = "RETURNING id";
852                 unset($params['id']);
853             } else {
854                 unset($params['id']);
855             }
856         }
858         if (empty($params)) {
859             throw new coding_exception('moodle_database::insert_record_raw() no fields found.');
860         }
862         $fields = implode(',', array_keys($params));
863         $values = array();
864         $i = 1;
865         foreach ($params as $value) {
866             $this->detect_objects($value);
867             $values[] = "\$".$i++;
868         }
869         $values = implode(',', $values);
871         $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning";
872         $this->query_start($sql, $params, SQL_QUERY_INSERT);
873         $result = pg_query_params($this->pgsql, $sql, $params);
874         $this->query_end($result);
876         if ($returning !== "") {
877             $row = pg_fetch_assoc($result);
878             $params['id'] = reset($row);
879         }
880         pg_free_result($result);
882         if (!$returnid) {
883             return true;
884         }
886         return (int)$params['id'];
887     }
889     /**
890      * Insert a record into a table and return the "id" field if required.
891      *
892      * Some conversions and safety checks are carried out. Lobs are supported.
893      * If the return ID isn't required, then this just reports success as true/false.
894      * $data is an object containing needed data
895      * @param string $table The database table to be inserted into
896      * @param object $data A data object with values for one or more fields in the record
897      * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned.
898      * @return bool|int true or new id
899      * @throws dml_exception A DML specific exception is thrown for any errors.
900      */
901     public function insert_record($table, $dataobject, $returnid=true, $bulk=false) {
902         $dataobject = (array)$dataobject;
904         $columns = $this->get_columns($table);
905         if (empty($columns)) {
906             throw new dml_exception('ddltablenotexist', $table);
907         }
909         $cleaned = array();
911         foreach ($dataobject as $field=>$value) {
912             if ($field === 'id') {
913                 continue;
914             }
915             if (!isset($columns[$field])) {
916                 continue;
917             }
918             $column = $columns[$field];
919             $cleaned[$field] = $this->normalise_value($column, $value);
920         }
922         return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
924     }
926     /**
927      * Insert multiple records into database as fast as possible.
928      *
929      * Order of inserts is maintained, but the operation is not atomic,
930      * use transactions if necessary.
931      *
932      * This method is intended for inserting of large number of small objects,
933      * do not use for huge objects with text or binary fields.
934      *
935      * @since Moodle 2.7
936      *
937      * @param string $table  The database table to be inserted into
938      * @param array|Traversable $dataobjects list of objects to be inserted, must be compatible with foreach
939      * @return void does not return new record ids
940      *
941      * @throws coding_exception if data objects have different structure
942      * @throws dml_exception A DML specific exception is thrown for any errors.
943      */
944     public function insert_records($table, $dataobjects) {
945         if (!is_array($dataobjects) and !($dataobjects instanceof Traversable)) {
946             throw new coding_exception('insert_records() passed non-traversable object');
947         }
949         // PostgreSQL does not seem to have problems with huge queries.
950         $chunksize = 500;
951         if (!empty($this->dboptions['bulkinsertsize'])) {
952             $chunksize = (int)$this->dboptions['bulkinsertsize'];
953         }
955         $columns = $this->get_columns($table, true);
957         $fields = null;
958         $count = 0;
959         $chunk = array();
960         foreach ($dataobjects as $dataobject) {
961             if (!is_array($dataobject) and !is_object($dataobject)) {
962                 throw new coding_exception('insert_records() passed invalid record object');
963             }
964             $dataobject = (array)$dataobject;
965             if ($fields === null) {
966                 $fields = array_keys($dataobject);
967                 $columns = array_intersect_key($columns, $dataobject);
968                 unset($columns['id']);
969             } else if ($fields !== array_keys($dataobject)) {
970                 throw new coding_exception('All dataobjects in insert_records() must have the same structure!');
971             }
973             $count++;
974             $chunk[] = $dataobject;
976             if ($count === $chunksize) {
977                 $this->insert_chunk($table, $chunk, $columns);
978                 $chunk = array();
979                 $count = 0;
980             }
981         }
983         if ($count) {
984             $this->insert_chunk($table, $chunk, $columns);
985         }
986     }
988     /**
989      * Insert records in chunks, strict param types...
990      *
991      * Note: can be used only from insert_records().
992      *
993      * @param string $table
994      * @param array $chunk
995      * @param database_column_info[] $columns
996      */
997     protected function insert_chunk($table, array $chunk, array $columns) {
998         $i = 1;
999         $params = array();
1000         $values = array();
1001         foreach ($chunk as $dataobject) {
1002             $vals = array();
1003             foreach ($columns as $field => $column) {
1004                 $params[] = $this->normalise_value($column, $dataobject[$field]);
1005                 $vals[] = "\$".$i++;
1006             }
1007             $values[] = '('.implode(',', $vals).')';
1008         }
1010         $fieldssql = '('.implode(',', array_keys($columns)).')';
1011         $valuessql = implode(',', $values);
1013         $sql = "INSERT INTO {$this->prefix}$table $fieldssql VALUES $valuessql";
1014         $this->query_start($sql, $params, SQL_QUERY_INSERT);
1015         $result = pg_query_params($this->pgsql, $sql, $params);
1016         $this->query_end($result);
1017         pg_free_result($result);
1018     }
1020     /**
1021      * Import a record into a table, id field is required.
1022      * Safety checks are NOT carried out. Lobs are supported.
1023      *
1024      * @param string $table name of database table to be inserted into
1025      * @param object $dataobject A data object with values for one or more fields in the record
1026      * @return bool true
1027      * @throws dml_exception A DML specific exception is thrown for any errors.
1028      */
1029     public function import_record($table, $dataobject) {
1030         $dataobject = (array)$dataobject;
1032         $columns = $this->get_columns($table);
1033         $cleaned = array();
1035         foreach ($dataobject as $field=>$value) {
1036             $this->detect_objects($value);
1037             if (!isset($columns[$field])) {
1038                 continue;
1039             }
1040             $column = $columns[$field];
1041             $cleaned[$field] = $this->normalise_value($column, $value);
1042         }
1044         return $this->insert_record_raw($table, $cleaned, false, true, true);
1045     }
1047     /**
1048      * Update record in database, as fast as possible, no safety checks, lobs not supported.
1049      * @param string $table name
1050      * @param mixed $params data record as object or array
1051      * @param bool true means repeated updates expected
1052      * @return bool true
1053      * @throws dml_exception A DML specific exception is thrown for any errors.
1054      */
1055     public function update_record_raw($table, $params, $bulk=false) {
1056         $params = (array)$params;
1058         if (!isset($params['id'])) {
1059             throw new coding_exception('moodle_database::update_record_raw() id field must be specified.');
1060         }
1061         $id = $params['id'];
1062         unset($params['id']);
1064         if (empty($params)) {
1065             throw new coding_exception('moodle_database::update_record_raw() no fields found.');
1066         }
1068         $i = 1;
1070         $sets = array();
1071         foreach ($params as $field=>$value) {
1072             $this->detect_objects($value);
1073             $sets[] = "$field = \$".$i++;
1074         }
1076         $params[] = $id; // last ? in WHERE condition
1078         $sets = implode(',', $sets);
1079         $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i;
1081         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1082         $result = pg_query_params($this->pgsql, $sql, $params);
1083         $this->query_end($result);
1085         pg_free_result($result);
1086         return true;
1087     }
1089     /**
1090      * Update a record in a table
1091      *
1092      * $dataobject is an object containing needed data
1093      * Relies on $dataobject having a variable "id" to
1094      * specify the record to update
1095      *
1096      * @param string $table The database table to be checked against.
1097      * @param object $dataobject An object with contents equal to fieldname=>fieldvalue. Must have an entry for 'id' to map to the table specified.
1098      * @param bool true means repeated updates expected
1099      * @return bool true
1100      * @throws dml_exception A DML specific exception is thrown for any errors.
1101      */
1102     public function update_record($table, $dataobject, $bulk=false) {
1103         $dataobject = (array)$dataobject;
1105         $columns = $this->get_columns($table);
1106         $cleaned = array();
1108         foreach ($dataobject as $field=>$value) {
1109             if (!isset($columns[$field])) {
1110                 continue;
1111             }
1112             $column = $columns[$field];
1113             $cleaned[$field] = $this->normalise_value($column, $value);
1114         }
1116         $this->update_record_raw($table, $cleaned, $bulk);
1118         return true;
1119     }
1121     /**
1122      * Set a single field in every table record which match a particular WHERE clause.
1123      *
1124      * @param string $table The database table to be checked against.
1125      * @param string $newfield the field to set.
1126      * @param string $newvalue the value to set the field to.
1127      * @param string $select A fragment of SQL to be used in a where clause in the SQL call.
1128      * @param array $params array of sql parameters
1129      * @return bool true
1130      * @throws dml_exception A DML specific exception is thrown for any errors.
1131      */
1132     public function set_field_select($table, $newfield, $newvalue, $select, array $params=null) {
1134         if ($select) {
1135             $select = "WHERE $select";
1136         }
1137         if (is_null($params)) {
1138             $params = array();
1139         }
1140         list($select, $params, $type) = $this->fix_sql_params($select, $params);
1141         $i = count($params)+1;
1143         // Get column metadata
1144         $columns = $this->get_columns($table);
1145         $column = $columns[$newfield];
1147         $normalisedvalue = $this->normalise_value($column, $newvalue);
1149         $newfield = "$newfield = \$" . $i;
1150         $params[] = $normalisedvalue;
1151         $sql = "UPDATE {$this->prefix}$table SET $newfield $select";
1153         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1154         $result = pg_query_params($this->pgsql, $sql, $params);
1155         $this->query_end($result);
1157         pg_free_result($result);
1159         return true;
1160     }
1162     /**
1163      * Delete one or more records from a table which match a particular WHERE clause, lobs not supported.
1164      *
1165      * @param string $table The database table to be checked against.
1166      * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria).
1167      * @param array $params array of sql parameters
1168      * @return bool true
1169      * @throws dml_exception A DML specific exception is thrown for any errors.
1170      */
1171     public function delete_records_select($table, $select, array $params=null) {
1172         if ($select) {
1173             $select = "WHERE $select";
1174         }
1175         $sql = "DELETE FROM {$this->prefix}$table $select";
1177         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1179         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1180         $result = pg_query_params($this->pgsql, $sql, $params);
1181         $this->query_end($result);
1183         pg_free_result($result);
1185         return true;
1186     }
1188     /**
1189      * Returns 'LIKE' part of a query.
1190      *
1191      * @param string $fieldname usually name of the table column
1192      * @param string $param usually bound query parameter (?, :named)
1193      * @param bool $casesensitive use case sensitive search
1194      * @param bool $accensensitive use accent sensitive search (not all databases support accent insensitive)
1195      * @param bool $notlike true means "NOT LIKE"
1196      * @param string $escapechar escape char for '%' and '_'
1197      * @return string SQL code fragment
1198      */
1199     public function sql_like($fieldname, $param, $casesensitive = true, $accentsensitive = true, $notlike = false, $escapechar = '\\') {
1200         if (strpos($param, '%') !== false) {
1201             debugging('Potential SQL injection detected, sql_like() expects bound parameters (? or :named)');
1202         }
1204         // postgresql does not support accent insensitive text comparisons, sorry
1205         if ($casesensitive) {
1206             $LIKE = $notlike ? 'NOT LIKE' : 'LIKE';
1207         } else {
1208             $LIKE = $notlike ? 'NOT ILIKE' : 'ILIKE';
1209         }
1210         return "$fieldname $LIKE $param ESCAPE '$escapechar'";
1211     }
1213     public function sql_bitxor($int1, $int2) {
1214         return '((' . $int1 . ') # (' . $int2 . '))';
1215     }
1217     public function sql_cast_char2int($fieldname, $text=false) {
1218         return ' CAST(' . $fieldname . ' AS INT) ';
1219     }
1221     public function sql_cast_char2real($fieldname, $text=false) {
1222         return " $fieldname::real ";
1223     }
1225     public function sql_concat() {
1226         $arr = func_get_args();
1227         $s = implode(' || ', $arr);
1228         if ($s === '') {
1229             return " '' ";
1230         }
1231         // Add always empty string element so integer-exclusive concats
1232         // will work without needing to cast each element explicitly
1233         return " '' || $s ";
1234     }
1236     public function sql_concat_join($separator="' '", $elements=array()) {
1237         for ($n=count($elements)-1; $n > 0 ; $n--) {
1238             array_splice($elements, $n, 0, $separator);
1239         }
1240         $s = implode(' || ', $elements);
1241         if ($s === '') {
1242             return " '' ";
1243         }
1244         return " $s ";
1245     }
1247     public function sql_regex_supported() {
1248         return true;
1249     }
1251     public function sql_regex($positivematch=true) {
1252         return $positivematch ? '~*' : '!~*';
1253     }
1255     /**
1256      * Does this driver support tool_replace?
1257      *
1258      * @since Moodle 2.6.1
1259      * @return bool
1260      */
1261     public function replace_all_text_supported() {
1262         return true;
1263     }
1265     public function session_lock_supported() {
1266         return true;
1267     }
1269     /**
1270      * Obtain session lock
1271      * @param int $rowid id of the row with session record
1272      * @param int $timeout max allowed time to wait for the lock in seconds
1273      * @return bool success
1274      */
1275     public function get_session_lock($rowid, $timeout) {
1276         // NOTE: there is a potential locking problem for database running
1277         //       multiple instances of moodle, we could try to use pg_advisory_lock(int, int),
1278         //       luckily there is not a big chance that they would collide
1279         if (!$this->session_lock_supported()) {
1280             return;
1281         }
1283         parent::get_session_lock($rowid, $timeout);
1285         $timeoutmilli = $timeout * 1000;
1287         $sql = "SET statement_timeout TO $timeoutmilli";
1288         $this->query_start($sql, null, SQL_QUERY_AUX);
1289         $result = pg_query($this->pgsql, $sql);
1290         $this->query_end($result);
1292         if ($result) {
1293             pg_free_result($result);
1294         }
1296         $sql = "SELECT pg_advisory_lock($rowid)";
1297         $this->query_start($sql, null, SQL_QUERY_AUX);
1298         $start = time();
1299         $result = pg_query($this->pgsql, $sql);
1300         $end = time();
1301         try {
1302             $this->query_end($result);
1303         } catch (dml_exception $ex) {
1304             if ($end - $start >= $timeout) {
1305                 throw new dml_sessionwait_exception();
1306             } else {
1307                 throw $ex;
1308             }
1309         }
1311         if ($result) {
1312             pg_free_result($result);
1313         }
1315         $sql = "SET statement_timeout TO DEFAULT";
1316         $this->query_start($sql, null, SQL_QUERY_AUX);
1317         $result = pg_query($this->pgsql, $sql);
1318         $this->query_end($result);
1320         if ($result) {
1321             pg_free_result($result);
1322         }
1323     }
1325     public function release_session_lock($rowid) {
1326         if (!$this->session_lock_supported()) {
1327             return;
1328         }
1329         if (!$this->used_for_db_sessions) {
1330             return;
1331         }
1333         parent::release_session_lock($rowid);
1335         $sql = "SELECT pg_advisory_unlock($rowid)";
1336         $this->query_start($sql, null, SQL_QUERY_AUX);
1337         $result = pg_query($this->pgsql, $sql);
1338         $this->query_end($result);
1340         if ($result) {
1341             pg_free_result($result);
1342         }
1343     }
1345     /**
1346      * Driver specific start of real database transaction,
1347      * this can not be used directly in code.
1348      * @return void
1349      */
1350     protected function begin_transaction() {
1351         $this->savepointpresent = true;
1352         $sql = "BEGIN ISOLATION LEVEL READ COMMITTED; SAVEPOINT moodle_pg_savepoint";
1353         $this->query_start($sql, NULL, SQL_QUERY_AUX);
1354         $result = pg_query($this->pgsql, $sql);
1355         $this->query_end($result);
1357         pg_free_result($result);
1358     }
1360     /**
1361      * Driver specific commit of real database transaction,
1362      * this can not be used directly in code.
1363      * @return void
1364      */
1365     protected function commit_transaction() {
1366         $this->savepointpresent = false;
1367         $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; COMMIT";
1368         $this->query_start($sql, NULL, SQL_QUERY_AUX);
1369         $result = pg_query($this->pgsql, $sql);
1370         $this->query_end($result);
1372         pg_free_result($result);
1373     }
1375     /**
1376      * Driver specific abort of real database transaction,
1377      * this can not be used directly in code.
1378      * @return void
1379      */
1380     protected function rollback_transaction() {
1381         $this->savepointpresent = false;
1382         $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; ROLLBACK";
1383         $this->query_start($sql, NULL, SQL_QUERY_AUX);
1384         $result = pg_query($this->pgsql, $sql);
1385         $this->query_end($result);
1387         pg_free_result($result);
1388     }
1390     /**
1391      * Helper function trimming (whitespace + quotes) any string
1392      * needed because PG uses to enclose with double quotes some
1393      * fields in indexes definition and others
1394      *
1395      * @param string $str string to apply whitespace + quotes trim
1396      * @return string trimmed string
1397      */
1398     private function trim_quotes($str) {
1399         return trim(trim($str), "'\"");
1400     }