MDL-66327 pgsql DML: Update get_records_sql to consume less memory
[moodle.git] / lib / dml / pgsql_native_moodle_database.php
1 <?php
2 // This file is part of Moodle - http://moodle.org/
3 //
4 // Moodle is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // Moodle is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with Moodle.  If not, see <http://www.gnu.org/licenses/>.
17 /**
18  * Native pgsql class representing moodle database interface.
19  *
20  * @package    core_dml
21  * @copyright  2008 Petr Skoda (http://skodak.org)
22  * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
23  */
25 defined('MOODLE_INTERNAL') || die();
27 require_once(__DIR__.'/moodle_database.php');
28 require_once(__DIR__.'/pgsql_native_moodle_recordset.php');
29 require_once(__DIR__.'/pgsql_native_moodle_temptables.php');
31 /**
32  * Native pgsql class representing moodle database interface.
33  *
34  * @package    core_dml
35  * @copyright  2008 Petr Skoda (http://skodak.org)
36  * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
37  */
38 class pgsql_native_moodle_database extends moodle_database {
40     /** @var resource $pgsql database resource */
41     protected $pgsql     = null;
43     protected $last_error_reporting; // To handle pgsql driver default verbosity
45     /** @var bool savepoint hack for MDL-35506 - workaround for automatic transaction rollback on error */
46     protected $savepointpresent = false;
48     /** @var int Number of cursors used (for constructing a unique ID) */
49     protected $cursorcount = 0;
51     /** @var int Default number of rows to fetch at a time when using recordsets with cursors */
52     const DEFAULT_FETCH_BUFFER_SIZE = 100000;
54     /**
55      * Detects if all needed PHP stuff installed.
56      * Note: can be used before connect()
57      * @return mixed true if ok, string if something
58      */
59     public function driver_installed() {
60         if (!extension_loaded('pgsql')) {
61             return get_string('pgsqlextensionisnotpresentinphp', 'install');
62         }
63         return true;
64     }
66     /**
67      * Returns database family type - describes SQL dialect
68      * Note: can be used before connect()
69      * @return string db family name (mysql, postgres, mssql, oracle, etc.)
70      */
71     public function get_dbfamily() {
72         return 'postgres';
73     }
75     /**
76      * Returns more specific database driver type
77      * Note: can be used before connect()
78      * @return string db type mysqli, pgsql, oci, mssql, sqlsrv
79      */
80     protected function get_dbtype() {
81         return 'pgsql';
82     }
84     /**
85      * Returns general database library name
86      * Note: can be used before connect()
87      * @return string db type pdo, native
88      */
89     protected function get_dblibrary() {
90         return 'native';
91     }
93     /**
94      * Returns localised database type name
95      * Note: can be used before connect()
96      * @return string
97      */
98     public function get_name() {
99         return get_string('nativepgsql', 'install');
100     }
102     /**
103      * Returns localised database configuration help.
104      * Note: can be used before connect()
105      * @return string
106      */
107     public function get_configuration_help() {
108         return get_string('nativepgsqlhelp', 'install');
109     }
111     /**
112      * Connect to db
113      * Must be called before other methods.
114      * @param string $dbhost The database host.
115      * @param string $dbuser The database username.
116      * @param string $dbpass The database username's password.
117      * @param string $dbname The name of the database being connected to.
118      * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used
119      * @param array $dboptions driver specific options
120      * @return bool true
121      * @throws dml_connection_exception if error
122      */
123     public function connect($dbhost, $dbuser, $dbpass, $dbname, $prefix, array $dboptions=null) {
124         if ($prefix == '' and !$this->external) {
125             //Enforce prefixes for everybody but mysql
126             throw new dml_exception('prefixcannotbeempty', $this->get_dbfamily());
127         }
129         $driverstatus = $this->driver_installed();
131         if ($driverstatus !== true) {
132             throw new dml_exception('dbdriverproblem', $driverstatus);
133         }
135         $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions);
137         $pass = addcslashes($this->dbpass, "'\\");
139         // Unix socket connections should have lower overhead
140         if (!empty($this->dboptions['dbsocket']) and ($this->dbhost === 'localhost' or $this->dbhost === '127.0.0.1')) {
141             $connection = "user='$this->dbuser' password='$pass' dbname='$this->dbname'";
142             if (strpos($this->dboptions['dbsocket'], '/') !== false) {
143                 // A directory was specified as the socket location.
144                 $connection .= " host='".$this->dboptions['dbsocket']."'";
145             }
146             if (!empty($this->dboptions['dbport'])) {
147                 // A port as specified, add it to the connection as it's used as part of the socket path.
148                 $connection .= " port ='".$this->dboptions['dbport']."'";
149             }
150         } else {
151             $this->dboptions['dbsocket'] = '';
152             if (empty($this->dbname)) {
153                 // probably old style socket connection - do not add port
154                 $port = "";
155             } else if (empty($this->dboptions['dbport'])) {
156                 $port = "port ='5432'";
157             } else {
158                 $port = "port ='".$this->dboptions['dbport']."'";
159             }
160             $connection = "host='$this->dbhost' $port user='$this->dbuser' password='$pass' dbname='$this->dbname'";
161         }
163         if (empty($this->dboptions['dbhandlesoptions'])) {
164             // ALTER USER and ALTER DATABASE are overridden by these settings.
165             $options = array('--client_encoding=utf8', '--standard_conforming_strings=on');
166             // Select schema if specified, otherwise the first one wins.
167             if (!empty($this->dboptions['dbschema'])) {
168                 $options[] = "-c search_path=" . addcslashes($this->dboptions['dbschema'], "'\\");
169             }
171             $connection .= " options='" . implode(' ', $options) . "'";
172         }
174         ob_start();
175         if (empty($this->dboptions['dbpersist'])) {
176             $this->pgsql = pg_connect($connection, PGSQL_CONNECT_FORCE_NEW);
177         } else {
178             $this->pgsql = pg_pconnect($connection, PGSQL_CONNECT_FORCE_NEW);
179         }
180         $dberr = ob_get_contents();
181         ob_end_clean();
183         $status = pg_connection_status($this->pgsql);
185         if ($status === false or $status === PGSQL_CONNECTION_BAD) {
186             $this->pgsql = null;
187             throw new dml_connection_exception($dberr);
188         }
190         if (!empty($this->dboptions['dbhandlesoptions'])) {
191             /* We don't trust people who just set the dbhandlesoptions, this code checks up on them.
192              * These functions do not talk to the server, they use the client library knowledge to determine state.
193              */
194             if (!empty($this->dboptions['dbschema'])) {
195                 throw new dml_connection_exception('You cannot specify a schema with dbhandlesoptions, use the database to set it.');
196             }
197             if (pg_client_encoding($this->pgsql) != 'UTF8') {
198                 throw new dml_connection_exception('client_encoding = UTF8 not set, it is: ' . pg_client_encoding($this->pgsql));
199             }
200             if (pg_escape_string($this->pgsql, '\\') != '\\') {
201                 throw new dml_connection_exception('standard_conforming_strings = on, must be set at the database.');
202             }
203         }
205         // Connection stabilised and configured, going to instantiate the temptables controller
206         $this->temptables = new pgsql_native_moodle_temptables($this);
208         return true;
209     }
211     /**
212      * Close database connection and release all resources
213      * and memory (especially circular memory references).
214      * Do NOT use connect() again, create a new instance if needed.
215      */
216     public function dispose() {
217         parent::dispose(); // Call parent dispose to write/close session and other common stuff before closing connection
218         if ($this->pgsql) {
219             pg_close($this->pgsql);
220             $this->pgsql = null;
221         }
222     }
225     /**
226      * Called before each db query.
227      * @param string $sql
228      * @param array array of parameters
229      * @param int $type type of query
230      * @param mixed $extrainfo driver specific extra information
231      * @return void
232      */
233     protected function query_start($sql, array $params=null, $type, $extrainfo=null) {
234         parent::query_start($sql, $params, $type, $extrainfo);
235         // pgsql driver tents to send debug to output, we do not need that ;-)
236         $this->last_error_reporting = error_reporting(0);
237     }
239     /**
240      * Called immediately after each db query.
241      * @param mixed db specific result
242      * @return void
243      */
244     protected function query_end($result) {
245         // reset original debug level
246         error_reporting($this->last_error_reporting);
247         try {
248             parent::query_end($result);
249             if ($this->savepointpresent and $this->last_type != SQL_QUERY_AUX and $this->last_type != SQL_QUERY_SELECT) {
250                 $res = @pg_query($this->pgsql, "RELEASE SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
251                 if ($res) {
252                     pg_free_result($res);
253                 }
254             }
255         } catch (Exception $e) {
256             if ($this->savepointpresent) {
257                 $res = @pg_query($this->pgsql, "ROLLBACK TO SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
258                 if ($res) {
259                     pg_free_result($res);
260                 }
261             }
262             throw $e;
263         }
264     }
266     /**
267      * Returns database server info array
268      * @return array Array containing 'description' and 'version' info
269      */
270     public function get_server_info() {
271         static $info;
272         if (!$info) {
273             $this->query_start("--pg_version()", null, SQL_QUERY_AUX);
274             $info = pg_version($this->pgsql);
275             $this->query_end(true);
276         }
277         return array('description'=>$info['server'], 'version'=>$info['server']);
278     }
280     /**
281      * Returns supported query parameter types
282      * @return int bitmask of accepted SQL_PARAMS_*
283      */
284     protected function allowed_param_types() {
285         return SQL_PARAMS_DOLLAR;
286     }
288     /**
289      * Returns last error reported by database engine.
290      * @return string error message
291      */
292     public function get_last_error() {
293         return pg_last_error($this->pgsql);
294     }
296     /**
297      * Return tables in database WITHOUT current prefix.
298      * @param bool $usecache if true, returns list of cached tables.
299      * @return array of table names in lowercase and without prefix
300      */
301     public function get_tables($usecache=true) {
302         if ($usecache and $this->tables !== null) {
303             return $this->tables;
304         }
305         $this->tables = array();
306         $prefix = str_replace('_', '|_', $this->prefix);
307         $sql = "SELECT c.relname
308                   FROM pg_catalog.pg_class c
309                   JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
310                  WHERE c.relname LIKE '$prefix%' ESCAPE '|'
311                        AND c.relkind = 'r'
312                        AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())";
313         $this->query_start($sql, null, SQL_QUERY_AUX);
314         $result = pg_query($this->pgsql, $sql);
315         $this->query_end($result);
317         if ($result) {
318             while ($row = pg_fetch_row($result)) {
319                 $tablename = reset($row);
320                 if ($this->prefix !== false && $this->prefix !== '') {
321                     if (strpos($tablename, $this->prefix) !== 0) {
322                         continue;
323                     }
324                     $tablename = substr($tablename, strlen($this->prefix));
325                 }
326                 $this->tables[$tablename] = $tablename;
327             }
328             pg_free_result($result);
329         }
330         return $this->tables;
331     }
333     /**
334      * Return table indexes - everything lowercased.
335      * @param string $table The table we want to get indexes from.
336      * @return array of arrays
337      */
338     public function get_indexes($table) {
339         $indexes = array();
340         $tablename = $this->prefix.$table;
342         $sql = "SELECT i.*
343                   FROM pg_catalog.pg_indexes i
344                   JOIN pg_catalog.pg_namespace as ns ON ns.nspname = i.schemaname
345                  WHERE i.tablename = '$tablename'
346                        AND (i.schemaname = current_schema() OR ns.oid = pg_my_temp_schema())";
348         $this->query_start($sql, null, SQL_QUERY_AUX);
349         $result = pg_query($this->pgsql, $sql);
350         $this->query_end($result);
352         if ($result) {
353             while ($row = pg_fetch_assoc($result)) {
354                 // The index definition could be generated schema-qualifying the target table name
355                 // for safety, depending on the pgsql version (CVE-2018-1058).
356                 if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON (|'.$row['schemaname'].'\.)'.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) {
357                     continue;
358                 }
359                 if ($matches[5] === 'id') {
360                     continue;
361                 }
362                 $columns = explode(',', $matches[5]);
363                 foreach ($columns as $k=>$column) {
364                     $column = trim($column);
365                     if ($pos = strpos($column, ' ')) {
366                         // index type is separated by space
367                         $column = substr($column, 0, $pos);
368                     }
369                     $columns[$k] = $this->trim_quotes($column);
370                 }
371                 $indexes[$row['indexname']] = array('unique'=>!empty($matches[1]),
372                                               'columns'=>$columns);
373             }
374             pg_free_result($result);
375         }
376         return $indexes;
377     }
379     /**
380      * Returns detailed information about columns in table. This information is cached internally.
381      * @param string $table name
382      * @param bool $usecache
383      * @return database_column_info[] array of database_column_info objects indexed with column names
384      */
385     public function get_columns($table, $usecache=true) {
386         if ($usecache) {
387             if ($this->temptables->is_temptable($table)) {
388                 if ($data = $this->get_temp_tables_cache()->get($table)) {
389                     return $data;
390                 }
391             } else {
392                 if ($data = $this->get_metacache()->get($table)) {
393                     return $data;
394                 }
395             }
396         }
398         $structure = array();
400         $tablename = $this->prefix.$table;
402         $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef, d.adsrc
403                   FROM pg_catalog.pg_class c
404                   JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
405                   JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
406                   JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
407              LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum)
408                  WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0
409                        AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())
410               ORDER BY a.attnum";
412         $this->query_start($sql, null, SQL_QUERY_AUX);
413         $result = pg_query($this->pgsql, $sql);
414         $this->query_end($result);
416         if (!$result) {
417             return array();
418         }
419         while ($rawcolumn = pg_fetch_object($result)) {
421             $info = new stdClass();
422             $info->name = $rawcolumn->field;
423             $matches = null;
425             if ($rawcolumn->type === 'varchar') {
426                 $info->type          = 'varchar';
427                 $info->meta_type     = 'C';
428                 $info->max_length    = $rawcolumn->atttypmod - 4;
429                 $info->scale         = null;
430                 $info->not_null      = ($rawcolumn->attnotnull === 't');
431                 $info->has_default   = ($rawcolumn->atthasdef === 't');
432                 if ($info->has_default) {
433                     $parts = explode('::', $rawcolumn->adsrc);
434                     if (count($parts) > 1) {
435                         $info->default_value = reset($parts);
436                         $info->default_value = trim($info->default_value, "'");
437                     } else {
438                         $info->default_value = $rawcolumn->adsrc;
439                     }
440                 } else {
441                     $info->default_value = null;
442                 }
443                 $info->primary_key   = false;
444                 $info->binary        = false;
445                 $info->unsigned      = null;
446                 $info->auto_increment= false;
447                 $info->unique        = null;
449             } else if (preg_match('/int(\d)/i', $rawcolumn->type, $matches)) {
450                 $info->type = 'int';
451                 if (strpos($rawcolumn->adsrc, 'nextval') === 0) {
452                     $info->primary_key   = true;
453                     $info->meta_type     = 'R';
454                     $info->unique        = true;
455                     $info->auto_increment= true;
456                     $info->has_default   = false;
457                 } else {
458                     $info->primary_key   = false;
459                     $info->meta_type     = 'I';
460                     $info->unique        = null;
461                     $info->auto_increment= false;
462                     $info->has_default   = ($rawcolumn->atthasdef === 't');
463                 }
464                 // Return number of decimals, not bytes here.
465                 if ($matches[1] >= 8) {
466                     $info->max_length = 18;
467                 } else if ($matches[1] >= 4) {
468                     $info->max_length = 9;
469                 } else if ($matches[1] >= 2) {
470                     $info->max_length = 4;
471                 } else if ($matches[1] >= 1) {
472                     $info->max_length = 2;
473                 } else {
474                     $info->max_length = 0;
475                 }
476                 $info->scale         = null;
477                 $info->not_null      = ($rawcolumn->attnotnull === 't');
478                 if ($info->has_default) {
479                     // PG 9.5+ uses ::<TYPE> syntax for some defaults.
480                     $parts = explode('::', $rawcolumn->adsrc);
481                     if (count($parts) > 1) {
482                         $info->default_value = reset($parts);
483                     } else {
484                         $info->default_value = $rawcolumn->adsrc;
485                     }
486                     $info->default_value = trim($info->default_value, "()'");
487                 } else {
488                     $info->default_value = null;
489                 }
490                 $info->binary        = false;
491                 $info->unsigned      = false;
493             } else if ($rawcolumn->type === 'numeric') {
494                 $info->type = $rawcolumn->type;
495                 $info->meta_type     = 'N';
496                 $info->primary_key   = false;
497                 $info->binary        = false;
498                 $info->unsigned      = null;
499                 $info->auto_increment= false;
500                 $info->unique        = null;
501                 $info->not_null      = ($rawcolumn->attnotnull === 't');
502                 $info->has_default   = ($rawcolumn->atthasdef === 't');
503                 if ($info->has_default) {
504                     // PG 9.5+ uses ::<TYPE> syntax for some defaults.
505                     $parts = explode('::', $rawcolumn->adsrc);
506                     if (count($parts) > 1) {
507                         $info->default_value = reset($parts);
508                     } else {
509                         $info->default_value = $rawcolumn->adsrc;
510                     }
511                     $info->default_value = trim($info->default_value, "()'");
512                 } else {
513                     $info->default_value = null;
514                 }
515                 $info->max_length    = $rawcolumn->atttypmod >> 16;
516                 $info->scale         = ($rawcolumn->atttypmod & 0xFFFF) - 4;
518             } else if (preg_match('/float(\d)/i', $rawcolumn->type, $matches)) {
519                 $info->type = 'float';
520                 $info->meta_type     = 'N';
521                 $info->primary_key   = false;
522                 $info->binary        = false;
523                 $info->unsigned      = null;
524                 $info->auto_increment= false;
525                 $info->unique        = null;
526                 $info->not_null      = ($rawcolumn->attnotnull === 't');
527                 $info->has_default   = ($rawcolumn->atthasdef === 't');
528                 if ($info->has_default) {
529                     // PG 9.5+ uses ::<TYPE> syntax for some defaults.
530                     $parts = explode('::', $rawcolumn->adsrc);
531                     if (count($parts) > 1) {
532                         $info->default_value = reset($parts);
533                     } else {
534                         $info->default_value = $rawcolumn->adsrc;
535                     }
536                     $info->default_value = trim($info->default_value, "()'");
537                 } else {
538                     $info->default_value = null;
539                 }
540                 // just guess expected number of deciaml places :-(
541                 if ($matches[1] == 8) {
542                     // total 15 digits
543                     $info->max_length = 8;
544                     $info->scale      = 7;
545                 } else {
546                     // total 6 digits
547                     $info->max_length = 4;
548                     $info->scale      = 2;
549                 }
551             } else if ($rawcolumn->type === 'text') {
552                 $info->type          = $rawcolumn->type;
553                 $info->meta_type     = 'X';
554                 $info->max_length    = -1;
555                 $info->scale         = null;
556                 $info->not_null      = ($rawcolumn->attnotnull === 't');
557                 $info->has_default   = ($rawcolumn->atthasdef === 't');
558                 if ($info->has_default) {
559                     $parts = explode('::', $rawcolumn->adsrc);
560                     if (count($parts) > 1) {
561                         $info->default_value = reset($parts);
562                         $info->default_value = trim($info->default_value, "'");
563                     } else {
564                         $info->default_value = $rawcolumn->adsrc;
565                     }
566                 } else {
567                     $info->default_value = null;
568                 }
569                 $info->primary_key   = false;
570                 $info->binary        = false;
571                 $info->unsigned      = null;
572                 $info->auto_increment= false;
573                 $info->unique        = null;
575             } else if ($rawcolumn->type === 'bytea') {
576                 $info->type          = $rawcolumn->type;
577                 $info->meta_type     = 'B';
578                 $info->max_length    = -1;
579                 $info->scale         = null;
580                 $info->not_null      = ($rawcolumn->attnotnull === 't');
581                 $info->has_default   = false;
582                 $info->default_value = null;
583                 $info->primary_key   = false;
584                 $info->binary        = true;
585                 $info->unsigned      = null;
586                 $info->auto_increment= false;
587                 $info->unique        = null;
589             }
591             $structure[$info->name] = new database_column_info($info);
592         }
594         pg_free_result($result);
596         if ($usecache) {
597             if ($this->temptables->is_temptable($table)) {
598                 $this->get_temp_tables_cache()->set($table, $structure);
599             } else {
600                 $this->get_metacache()->set($table, $structure);
601             }
602         }
604         return $structure;
605     }
607     /**
608      * Normalise values based in RDBMS dependencies (booleans, LOBs...)
609      *
610      * @param database_column_info $column column metadata corresponding with the value we are going to normalise
611      * @param mixed $value value we are going to normalise
612      * @return mixed the normalised value
613      */
614     protected function normalise_value($column, $value) {
615         $this->detect_objects($value);
617         if (is_bool($value)) { // Always, convert boolean to int
618             $value = (int)$value;
620         } else if ($column->meta_type === 'B') {
621             if (!is_null($value)) {
622                 // standard_conforming_strings must be enabled, otherwise pg_escape_bytea() will double escape
623                 // \ and produce data errors.  This is set on the connection.
624                 $value = pg_escape_bytea($this->pgsql, $value);
625             }
627         } else if ($value === '') {
628             if ($column->meta_type === 'I' or $column->meta_type === 'F' or $column->meta_type === 'N') {
629                 $value = 0; // prevent '' problems in numeric fields
630             }
631         }
632         return $value;
633     }
635     /**
636      * Is db in unicode mode?
637      * @return bool
638      */
639     public function setup_is_unicodedb() {
640         // Get PostgreSQL server_encoding value
641         $sql = "SHOW server_encoding";
642         $this->query_start($sql, null, SQL_QUERY_AUX);
643         $result = pg_query($this->pgsql, $sql);
644         $this->query_end($result);
646         if (!$result) {
647             return false;
648         }
649         $rawcolumn = pg_fetch_object($result);
650         $encoding = $rawcolumn->server_encoding;
651         pg_free_result($result);
653         return (strtoupper($encoding) == 'UNICODE' || strtoupper($encoding) == 'UTF8');
654     }
656     /**
657      * Do NOT use in code, to be used by database_manager only!
658      * @param string|array $sql query
659      * @param array|null $tablenames an array of xmldb table names affected by this request.
660      * @return bool true
661      * @throws ddl_change_structure_exception A DDL specific exception is thrown for any errors.
662      */
663     public function change_database_structure($sql, $tablenames = null) {
664         $this->get_manager(); // Includes DDL exceptions classes ;-)
665         if (is_array($sql)) {
666             $sql = implode("\n;\n", $sql);
667         }
668         if (!$this->is_transaction_started()) {
669             // It is better to do all or nothing, this helps with recovery...
670             $sql = "BEGIN ISOLATION LEVEL SERIALIZABLE;\n$sql\n; COMMIT";
671         }
673         try {
674             $this->query_start($sql, null, SQL_QUERY_STRUCTURE);
675             $result = pg_query($this->pgsql, $sql);
676             $this->query_end($result);
677             pg_free_result($result);
678         } catch (ddl_change_structure_exception $e) {
679             if (!$this->is_transaction_started()) {
680                 $result = @pg_query($this->pgsql, "ROLLBACK");
681                 @pg_free_result($result);
682             }
683             $this->reset_caches($tablenames);
684             throw $e;
685         }
687         $this->reset_caches($tablenames);
688         return true;
689     }
691     /**
692      * Execute general sql query. Should be used only when no other method suitable.
693      * Do NOT use this to make changes in db structure, use database_manager methods instead!
694      * @param string $sql query
695      * @param array $params query parameters
696      * @return bool true
697      * @throws dml_exception A DML specific exception is thrown for any errors.
698      */
699     public function execute($sql, array $params=null) {
700         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
702         if (strpos($sql, ';') !== false) {
703             throw new coding_exception('moodle_database::execute() Multiple sql statements found or bound parameters not used properly in query!');
704         }
706         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
707         $result = pg_query_params($this->pgsql, $sql, $params);
708         $this->query_end($result);
710         pg_free_result($result);
711         return true;
712     }
714     /**
715      * Get a number of records as a moodle_recordset using a SQL statement.
716      *
717      * Since this method is a little less readable, use of it should be restricted to
718      * code where it's possible there might be large datasets being returned.  For known
719      * small datasets use get_records_sql - it leads to simpler code.
720      *
721      * The return type is like:
722      * @see function get_recordset.
723      *
724      * @param string $sql the SQL select query to execute.
725      * @param array $params array of sql parameters
726      * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
727      * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
728      * @return moodle_recordset instance
729      * @throws dml_exception A DML specific exception is thrown for any errors.
730      */
731     public function get_recordset_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
733         list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
735         if ($limitnum) {
736             $sql .= " LIMIT $limitnum";
737         }
738         if ($limitfrom) {
739             $sql .= " OFFSET $limitfrom";
740         }
742         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
744         $this->query_start($sql, $params, SQL_QUERY_SELECT);
746         // For any query that doesn't explicitly specify a limit, we must use cursors to stop it
747         // loading the entire thing (unless the config setting is turned off).
748         $usecursors = !$limitnum && ($this->get_fetch_buffer_size() > 0);
749         if ($usecursors) {
750             // Work out the cursor unique identifer. This is based on a simple count used which
751             // should be OK because the identifiers only need to be unique within the current
752             // transaction.
753             $this->cursorcount++;
754             $cursorname = 'crs' . $this->cursorcount;
756             // Do the query to a cursor.
757             $sql = 'DECLARE ' . $cursorname . ' NO SCROLL CURSOR WITH HOLD FOR ' . $sql;
758             $result = pg_query_params($this->pgsql, $sql, $params);
759         } else {
760             $result = pg_query_params($this->pgsql, $sql, $params);
761             $cursorname = '';
762         }
764         $this->query_end($result);
765         if ($usecursors) {
766             pg_free_result($result);
767             $result = null;
768         }
770         return new pgsql_native_moodle_recordset($result, $this, $cursorname);
771     }
773     /**
774      * Gets size of fetch buffer used for recordset queries.
775      *
776      * If this returns 0 then cursors will not be used, meaning recordset queries will occupy enough
777      * memory as needed for the Postgres library to hold the entire query results in memory.
778      *
779      * @return int Fetch buffer size or 0 indicating not to use cursors
780      */
781     protected function get_fetch_buffer_size() {
782         if (array_key_exists('fetchbuffersize', $this->dboptions)) {
783             return (int)$this->dboptions['fetchbuffersize'];
784         } else {
785             return self::DEFAULT_FETCH_BUFFER_SIZE;
786         }
787     }
789     /**
790      * Retrieves data from cursor. For use by recordset only; do not call directly.
791      *
792      * Return value contains the next batch of Postgres data, and a boolean indicating if this is
793      * definitely the last batch (if false, there may be more)
794      *
795      * @param string $cursorname Name of cursor to read from
796      * @return array Array with 2 elements (next data batch and boolean indicating last batch)
797      */
798     public function fetch_from_cursor($cursorname) {
799         $count = $this->get_fetch_buffer_size();
801         $sql = 'FETCH ' . $count . ' FROM ' . $cursorname;
803         $this->query_start($sql, [], SQL_QUERY_AUX);
804         $result = pg_query($this->pgsql, $sql);
805         $last = pg_num_rows($result) !== $count;
807         $this->query_end($result);
809         return [$result, $last];
810     }
812     /**
813      * Closes a cursor. For use by recordset only; do not call directly.
814      *
815      * @param string $cursorname Name of cursor to close
816      * @return bool True if we actually closed one, false if the transaction was cancelled
817      */
818     public function close_cursor($cursorname) {
819         // If the transaction got cancelled, then ignore this request.
820         $sql = 'CLOSE ' . $cursorname;
821         $this->query_start($sql, [], SQL_QUERY_AUX);
822         $result = pg_query($this->pgsql, $sql);
823         $this->query_end($result);
824         if ($result) {
825             pg_free_result($result);
826         }
827         return true;
828     }
830     /**
831      * Get a number of records as an array of objects using a SQL statement.
832      *
833      * Return value is like:
834      * @see function get_records.
835      *
836      * @param string $sql the SQL select query to execute. The first column of this SELECT statement
837      *   must be a unique value (usually the 'id' field), as it will be used as the key of the
838      *   returned array.
839      * @param array $params array of sql parameters
840      * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
841      * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
842      * @return array of objects, or empty array if no records were found
843      * @throws dml_exception A DML specific exception is thrown for any errors.
844      */
845     public function get_records_sql($sql, array $params = null, $limitfrom = 0, $limitnum = 0) {
846         list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
848         if ($limitnum) {
849             $sql .= " LIMIT $limitnum";
850         }
851         if ($limitfrom) {
852             $sql .= " OFFSET $limitfrom";
853         }
855         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
856         $this->query_start($sql, $params, SQL_QUERY_SELECT);
857         $result = pg_query_params($this->pgsql, $sql, $params);
858         $this->query_end($result);
860         // find out if there are any blobs
861         $numfields = pg_num_fields($result);
862         $blobs = array();
863         for ($i = 0; $i < $numfields; $i++) {
864             $type = pg_field_type($result, $i);
865             if ($type == 'bytea') {
866                 $blobs[] = pg_field_name($result, $i);
867             }
868         }
870         $return = [];
871         while ($row = pg_fetch_assoc($result)) {
872             $id = reset($row);
873             if ($blobs) {
874                 foreach ($blobs as $blob) {
875                     $row[$blob] = ($row[$blob] !== null ? pg_unescape_bytea($row[$blob]) : null);
876                 }
877             }
878             if (isset($return[$id])) {
879                 $colname = key($row);
880                 debugging("Did you remember to make the first column something unique in your call to get_records? Duplicate value '$id' found in column '$colname'.", DEBUG_DEVELOPER);
881             }
882             $return[$id] = (object) $row;
883         }
885         return $return;
886     }
888     /**
889      * Selects records and return values (first field) as an array using a SQL statement.
890      *
891      * @param string $sql The SQL query
892      * @param array $params array of sql parameters
893      * @return array of values
894      * @throws dml_exception A DML specific exception is thrown for any errors.
895      */
896     public function get_fieldset_sql($sql, array $params=null) {
897         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
899         $this->query_start($sql, $params, SQL_QUERY_SELECT);
900         $result = pg_query_params($this->pgsql, $sql, $params);
901         $this->query_end($result);
903         $return = pg_fetch_all_columns($result, 0);
905         if (pg_field_type($result, 0) == 'bytea') {
906             foreach ($return as $key => $value) {
907                 $return[$key] = ($value === null ? $value : pg_unescape_bytea($value));
908             }
909         }
911         pg_free_result($result);
913         return $return;
914     }
916     /**
917      * Insert new record into database, as fast as possible, no safety checks, lobs not supported.
918      * @param string $table name
919      * @param mixed $params data record as object or array
920      * @param bool $returnit return it of inserted record
921      * @param bool $bulk true means repeated inserts expected
922      * @param bool $customsequence true if 'id' included in $params, disables $returnid
923      * @return bool|int true or new id
924      * @throws dml_exception A DML specific exception is thrown for any errors.
925      */
926     public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) {
927         if (!is_array($params)) {
928             $params = (array)$params;
929         }
931         $returning = "";
933         if ($customsequence) {
934             if (!isset($params['id'])) {
935                 throw new coding_exception('moodle_database::insert_record_raw() id field must be specified if custom sequences used.');
936             }
937             $returnid = false;
938         } else {
939             if ($returnid) {
940                 $returning = "RETURNING id";
941                 unset($params['id']);
942             } else {
943                 unset($params['id']);
944             }
945         }
947         if (empty($params)) {
948             throw new coding_exception('moodle_database::insert_record_raw() no fields found.');
949         }
951         $fields = implode(',', array_keys($params));
952         $values = array();
953         $i = 1;
954         foreach ($params as $value) {
955             $this->detect_objects($value);
956             $values[] = "\$".$i++;
957         }
958         $values = implode(',', $values);
960         $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning";
961         $this->query_start($sql, $params, SQL_QUERY_INSERT);
962         $result = pg_query_params($this->pgsql, $sql, $params);
963         $this->query_end($result);
965         if ($returning !== "") {
966             $row = pg_fetch_assoc($result);
967             $params['id'] = reset($row);
968         }
969         pg_free_result($result);
971         if (!$returnid) {
972             return true;
973         }
975         return (int)$params['id'];
976     }
978     /**
979      * Insert a record into a table and return the "id" field if required.
980      *
981      * Some conversions and safety checks are carried out. Lobs are supported.
982      * If the return ID isn't required, then this just reports success as true/false.
983      * $data is an object containing needed data
984      * @param string $table The database table to be inserted into
985      * @param object $data A data object with values for one or more fields in the record
986      * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned.
987      * @return bool|int true or new id
988      * @throws dml_exception A DML specific exception is thrown for any errors.
989      */
990     public function insert_record($table, $dataobject, $returnid=true, $bulk=false) {
991         $dataobject = (array)$dataobject;
993         $columns = $this->get_columns($table);
994         if (empty($columns)) {
995             throw new dml_exception('ddltablenotexist', $table);
996         }
998         $cleaned = array();
1000         foreach ($dataobject as $field=>$value) {
1001             if ($field === 'id') {
1002                 continue;
1003             }
1004             if (!isset($columns[$field])) {
1005                 continue;
1006             }
1007             $column = $columns[$field];
1008             $cleaned[$field] = $this->normalise_value($column, $value);
1009         }
1011         return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
1013     }
1015     /**
1016      * Insert multiple records into database as fast as possible.
1017      *
1018      * Order of inserts is maintained, but the operation is not atomic,
1019      * use transactions if necessary.
1020      *
1021      * This method is intended for inserting of large number of small objects,
1022      * do not use for huge objects with text or binary fields.
1023      *
1024      * @since Moodle 2.7
1025      *
1026      * @param string $table  The database table to be inserted into
1027      * @param array|Traversable $dataobjects list of objects to be inserted, must be compatible with foreach
1028      * @return void does not return new record ids
1029      *
1030      * @throws coding_exception if data objects have different structure
1031      * @throws dml_exception A DML specific exception is thrown for any errors.
1032      */
1033     public function insert_records($table, $dataobjects) {
1034         if (!is_array($dataobjects) and !($dataobjects instanceof Traversable)) {
1035             throw new coding_exception('insert_records() passed non-traversable object');
1036         }
1038         // PostgreSQL does not seem to have problems with huge queries.
1039         $chunksize = 500;
1040         if (!empty($this->dboptions['bulkinsertsize'])) {
1041             $chunksize = (int)$this->dboptions['bulkinsertsize'];
1042         }
1044         $columns = $this->get_columns($table, true);
1046         $fields = null;
1047         $count = 0;
1048         $chunk = array();
1049         foreach ($dataobjects as $dataobject) {
1050             if (!is_array($dataobject) and !is_object($dataobject)) {
1051                 throw new coding_exception('insert_records() passed invalid record object');
1052             }
1053             $dataobject = (array)$dataobject;
1054             if ($fields === null) {
1055                 $fields = array_keys($dataobject);
1056                 $columns = array_intersect_key($columns, $dataobject);
1057                 unset($columns['id']);
1058             } else if ($fields !== array_keys($dataobject)) {
1059                 throw new coding_exception('All dataobjects in insert_records() must have the same structure!');
1060             }
1062             $count++;
1063             $chunk[] = $dataobject;
1065             if ($count === $chunksize) {
1066                 $this->insert_chunk($table, $chunk, $columns);
1067                 $chunk = array();
1068                 $count = 0;
1069             }
1070         }
1072         if ($count) {
1073             $this->insert_chunk($table, $chunk, $columns);
1074         }
1075     }
1077     /**
1078      * Insert records in chunks, strict param types...
1079      *
1080      * Note: can be used only from insert_records().
1081      *
1082      * @param string $table
1083      * @param array $chunk
1084      * @param database_column_info[] $columns
1085      */
1086     protected function insert_chunk($table, array $chunk, array $columns) {
1087         $i = 1;
1088         $params = array();
1089         $values = array();
1090         foreach ($chunk as $dataobject) {
1091             $vals = array();
1092             foreach ($columns as $field => $column) {
1093                 $params[] = $this->normalise_value($column, $dataobject[$field]);
1094                 $vals[] = "\$".$i++;
1095             }
1096             $values[] = '('.implode(',', $vals).')';
1097         }
1099         $fieldssql = '('.implode(',', array_keys($columns)).')';
1100         $valuessql = implode(',', $values);
1102         $sql = "INSERT INTO {$this->prefix}$table $fieldssql VALUES $valuessql";
1103         $this->query_start($sql, $params, SQL_QUERY_INSERT);
1104         $result = pg_query_params($this->pgsql, $sql, $params);
1105         $this->query_end($result);
1106         pg_free_result($result);
1107     }
1109     /**
1110      * Import a record into a table, id field is required.
1111      * Safety checks are NOT carried out. Lobs are supported.
1112      *
1113      * @param string $table name of database table to be inserted into
1114      * @param object $dataobject A data object with values for one or more fields in the record
1115      * @return bool true
1116      * @throws dml_exception A DML specific exception is thrown for any errors.
1117      */
1118     public function import_record($table, $dataobject) {
1119         $dataobject = (array)$dataobject;
1121         $columns = $this->get_columns($table);
1122         $cleaned = array();
1124         foreach ($dataobject as $field=>$value) {
1125             $this->detect_objects($value);
1126             if (!isset($columns[$field])) {
1127                 continue;
1128             }
1129             $column = $columns[$field];
1130             $cleaned[$field] = $this->normalise_value($column, $value);
1131         }
1133         return $this->insert_record_raw($table, $cleaned, false, true, true);
1134     }
1136     /**
1137      * Update record in database, as fast as possible, no safety checks, lobs not supported.
1138      * @param string $table name
1139      * @param mixed $params data record as object or array
1140      * @param bool true means repeated updates expected
1141      * @return bool true
1142      * @throws dml_exception A DML specific exception is thrown for any errors.
1143      */
1144     public function update_record_raw($table, $params, $bulk=false) {
1145         $params = (array)$params;
1147         if (!isset($params['id'])) {
1148             throw new coding_exception('moodle_database::update_record_raw() id field must be specified.');
1149         }
1150         $id = $params['id'];
1151         unset($params['id']);
1153         if (empty($params)) {
1154             throw new coding_exception('moodle_database::update_record_raw() no fields found.');
1155         }
1157         $i = 1;
1159         $sets = array();
1160         foreach ($params as $field=>$value) {
1161             $this->detect_objects($value);
1162             $sets[] = "$field = \$".$i++;
1163         }
1165         $params[] = $id; // last ? in WHERE condition
1167         $sets = implode(',', $sets);
1168         $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i;
1170         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1171         $result = pg_query_params($this->pgsql, $sql, $params);
1172         $this->query_end($result);
1174         pg_free_result($result);
1175         return true;
1176     }
1178     /**
1179      * Update a record in a table
1180      *
1181      * $dataobject is an object containing needed data
1182      * Relies on $dataobject having a variable "id" to
1183      * specify the record to update
1184      *
1185      * @param string $table The database table to be checked against.
1186      * @param object $dataobject An object with contents equal to fieldname=>fieldvalue. Must have an entry for 'id' to map to the table specified.
1187      * @param bool true means repeated updates expected
1188      * @return bool true
1189      * @throws dml_exception A DML specific exception is thrown for any errors.
1190      */
1191     public function update_record($table, $dataobject, $bulk=false) {
1192         $dataobject = (array)$dataobject;
1194         $columns = $this->get_columns($table);
1195         $cleaned = array();
1197         foreach ($dataobject as $field=>$value) {
1198             if (!isset($columns[$field])) {
1199                 continue;
1200             }
1201             $column = $columns[$field];
1202             $cleaned[$field] = $this->normalise_value($column, $value);
1203         }
1205         $this->update_record_raw($table, $cleaned, $bulk);
1207         return true;
1208     }
1210     /**
1211      * Set a single field in every table record which match a particular WHERE clause.
1212      *
1213      * @param string $table The database table to be checked against.
1214      * @param string $newfield the field to set.
1215      * @param string $newvalue the value to set the field to.
1216      * @param string $select A fragment of SQL to be used in a where clause in the SQL call.
1217      * @param array $params array of sql parameters
1218      * @return bool true
1219      * @throws dml_exception A DML specific exception is thrown for any errors.
1220      */
1221     public function set_field_select($table, $newfield, $newvalue, $select, array $params=null) {
1223         if ($select) {
1224             $select = "WHERE $select";
1225         }
1226         if (is_null($params)) {
1227             $params = array();
1228         }
1229         list($select, $params, $type) = $this->fix_sql_params($select, $params);
1230         $i = count($params)+1;
1232         // Get column metadata
1233         $columns = $this->get_columns($table);
1234         $column = $columns[$newfield];
1236         $normalisedvalue = $this->normalise_value($column, $newvalue);
1238         $newfield = "$newfield = \$" . $i;
1239         $params[] = $normalisedvalue;
1240         $sql = "UPDATE {$this->prefix}$table SET $newfield $select";
1242         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1243         $result = pg_query_params($this->pgsql, $sql, $params);
1244         $this->query_end($result);
1246         pg_free_result($result);
1248         return true;
1249     }
1251     /**
1252      * Delete one or more records from a table which match a particular WHERE clause, lobs not supported.
1253      *
1254      * @param string $table The database table to be checked against.
1255      * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria).
1256      * @param array $params array of sql parameters
1257      * @return bool true
1258      * @throws dml_exception A DML specific exception is thrown for any errors.
1259      */
1260     public function delete_records_select($table, $select, array $params=null) {
1261         if ($select) {
1262             $select = "WHERE $select";
1263         }
1264         $sql = "DELETE FROM {$this->prefix}$table $select";
1266         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1268         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1269         $result = pg_query_params($this->pgsql, $sql, $params);
1270         $this->query_end($result);
1272         pg_free_result($result);
1274         return true;
1275     }
1277     /**
1278      * Returns 'LIKE' part of a query.
1279      *
1280      * @param string $fieldname usually name of the table column
1281      * @param string $param usually bound query parameter (?, :named)
1282      * @param bool $casesensitive use case sensitive search
1283      * @param bool $accensensitive use accent sensitive search (not all databases support accent insensitive)
1284      * @param bool $notlike true means "NOT LIKE"
1285      * @param string $escapechar escape char for '%' and '_'
1286      * @return string SQL code fragment
1287      */
1288     public function sql_like($fieldname, $param, $casesensitive = true, $accentsensitive = true, $notlike = false, $escapechar = '\\') {
1289         if (strpos($param, '%') !== false) {
1290             debugging('Potential SQL injection detected, sql_like() expects bound parameters (? or :named)');
1291         }
1293         // postgresql does not support accent insensitive text comparisons, sorry
1294         if ($casesensitive) {
1295             $LIKE = $notlike ? 'NOT LIKE' : 'LIKE';
1296         } else {
1297             $LIKE = $notlike ? 'NOT ILIKE' : 'ILIKE';
1298         }
1299         return "$fieldname $LIKE $param ESCAPE '$escapechar'";
1300     }
1302     public function sql_bitxor($int1, $int2) {
1303         return '((' . $int1 . ') # (' . $int2 . '))';
1304     }
1306     public function sql_cast_char2int($fieldname, $text=false) {
1307         return ' CAST(' . $fieldname . ' AS INT) ';
1308     }
1310     public function sql_cast_char2real($fieldname, $text=false) {
1311         return " $fieldname::real ";
1312     }
1314     public function sql_concat() {
1315         $arr = func_get_args();
1316         $s = implode(' || ', $arr);
1317         if ($s === '') {
1318             return " '' ";
1319         }
1320         // Add always empty string element so integer-exclusive concats
1321         // will work without needing to cast each element explicitly
1322         return " '' || $s ";
1323     }
1325     public function sql_concat_join($separator="' '", $elements=array()) {
1326         for ($n=count($elements)-1; $n > 0 ; $n--) {
1327             array_splice($elements, $n, 0, $separator);
1328         }
1329         $s = implode(' || ', $elements);
1330         if ($s === '') {
1331             return " '' ";
1332         }
1333         return " $s ";
1334     }
1336     public function sql_regex_supported() {
1337         return true;
1338     }
1340     public function sql_regex($positivematch = true, $casesensitive = false) {
1341         if ($casesensitive) {
1342             return $positivematch ? '~' : '!~';
1343         } else {
1344             return $positivematch ? '~*' : '!~*';
1345         }
1346     }
1348     /**
1349      * Does this driver support tool_replace?
1350      *
1351      * @since Moodle 2.6.1
1352      * @return bool
1353      */
1354     public function replace_all_text_supported() {
1355         return true;
1356     }
1358     public function session_lock_supported() {
1359         return true;
1360     }
1362     /**
1363      * Obtain session lock
1364      * @param int $rowid id of the row with session record
1365      * @param int $timeout max allowed time to wait for the lock in seconds
1366      * @return bool success
1367      */
1368     public function get_session_lock($rowid, $timeout) {
1369         // NOTE: there is a potential locking problem for database running
1370         //       multiple instances of moodle, we could try to use pg_advisory_lock(int, int),
1371         //       luckily there is not a big chance that they would collide
1372         if (!$this->session_lock_supported()) {
1373             return;
1374         }
1376         parent::get_session_lock($rowid, $timeout);
1378         $timeoutmilli = $timeout * 1000;
1380         $sql = "SET statement_timeout TO $timeoutmilli";
1381         $this->query_start($sql, null, SQL_QUERY_AUX);
1382         $result = pg_query($this->pgsql, $sql);
1383         $this->query_end($result);
1385         if ($result) {
1386             pg_free_result($result);
1387         }
1389         $sql = "SELECT pg_advisory_lock($rowid)";
1390         $this->query_start($sql, null, SQL_QUERY_AUX);
1391         $start = time();
1392         $result = pg_query($this->pgsql, $sql);
1393         $end = time();
1394         try {
1395             $this->query_end($result);
1396         } catch (dml_exception $ex) {
1397             if ($end - $start >= $timeout) {
1398                 throw new dml_sessionwait_exception();
1399             } else {
1400                 throw $ex;
1401             }
1402         }
1404         if ($result) {
1405             pg_free_result($result);
1406         }
1408         $sql = "SET statement_timeout TO DEFAULT";
1409         $this->query_start($sql, null, SQL_QUERY_AUX);
1410         $result = pg_query($this->pgsql, $sql);
1411         $this->query_end($result);
1413         if ($result) {
1414             pg_free_result($result);
1415         }
1416     }
1418     public function release_session_lock($rowid) {
1419         if (!$this->session_lock_supported()) {
1420             return;
1421         }
1422         if (!$this->used_for_db_sessions) {
1423             return;
1424         }
1426         parent::release_session_lock($rowid);
1428         $sql = "SELECT pg_advisory_unlock($rowid)";
1429         $this->query_start($sql, null, SQL_QUERY_AUX);
1430         $result = pg_query($this->pgsql, $sql);
1431         $this->query_end($result);
1433         if ($result) {
1434             pg_free_result($result);
1435         }
1436     }
1438     /**
1439      * Driver specific start of real database transaction,
1440      * this can not be used directly in code.
1441      * @return void
1442      */
1443     protected function begin_transaction() {
1444         $this->savepointpresent = true;
1445         $sql = "BEGIN ISOLATION LEVEL READ COMMITTED; SAVEPOINT moodle_pg_savepoint";
1446         $this->query_start($sql, null, SQL_QUERY_AUX);
1447         $result = pg_query($this->pgsql, $sql);
1448         $this->query_end($result);
1450         pg_free_result($result);
1451     }
1453     /**
1454      * Driver specific commit of real database transaction,
1455      * this can not be used directly in code.
1456      * @return void
1457      */
1458     protected function commit_transaction() {
1459         $this->savepointpresent = false;
1460         $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; COMMIT";
1461         $this->query_start($sql, null, SQL_QUERY_AUX);
1462         $result = pg_query($this->pgsql, $sql);
1463         $this->query_end($result);
1465         pg_free_result($result);
1466     }
1468     /**
1469      * Driver specific abort of real database transaction,
1470      * this can not be used directly in code.
1471      * @return void
1472      */
1473     protected function rollback_transaction() {
1474         $this->savepointpresent = false;
1475         $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; ROLLBACK";
1476         $this->query_start($sql, null, SQL_QUERY_AUX);
1477         $result = pg_query($this->pgsql, $sql);
1478         $this->query_end($result);
1480         pg_free_result($result);
1481     }
1483     /**
1484      * Helper function trimming (whitespace + quotes) any string
1485      * needed because PG uses to enclose with double quotes some
1486      * fields in indexes definition and others
1487      *
1488      * @param string $str string to apply whitespace + quotes trim
1489      * @return string trimmed string
1490      */
1491     private function trim_quotes($str) {
1492         return trim(trim($str), "'\"");
1493     }
1495     /**
1496      * Postgresql supports full-text search indexes.
1497      *
1498      * @return bool
1499      */
1500     public function is_fulltext_search_supported() {
1501         return true;
1502     }