Merge branch 'MDL-70422' of https://github.com/paulholden/moodle
[moodle.git] / lib / dml / pgsql_native_moodle_database.php
1 <?php
2 // This file is part of Moodle - http://moodle.org/
3 //
4 // Moodle is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // Moodle is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with Moodle.  If not, see <http://www.gnu.org/licenses/>.
17 /**
18  * Native pgsql class representing moodle database interface.
19  *
20  * @package    core_dml
21  * @copyright  2008 Petr Skoda (http://skodak.org)
22  * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
23  */
25 defined('MOODLE_INTERNAL') || die();
27 require_once(__DIR__.'/moodle_database.php');
28 require_once(__DIR__.'/moodle_read_slave_trait.php');
29 require_once(__DIR__.'/pgsql_native_moodle_recordset.php');
30 require_once(__DIR__.'/pgsql_native_moodle_temptables.php');
32 /**
33  * Native pgsql class representing moodle database interface.
34  *
35  * @package    core_dml
36  * @copyright  2008 Petr Skoda (http://skodak.org)
37  * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
38  */
39 class pgsql_native_moodle_database extends moodle_database {
40     use moodle_read_slave_trait {
41         select_db_handle as read_slave_select_db_handle;
42         can_use_readonly as read_slave_can_use_readonly;
43         query_start as read_slave_query_start;
44     }
46     /** @var array $dbhcursor keep track of open cursors */
47     private $dbhcursor = [];
49     /** @var resource $pgsql database resource */
50     protected $pgsql     = null;
52     protected $last_error_reporting; // To handle pgsql driver default verbosity
54     /** @var bool savepoint hack for MDL-35506 - workaround for automatic transaction rollback on error */
55     protected $savepointpresent = false;
57     /** @var int Number of cursors used (for constructing a unique ID) */
58     protected $cursorcount = 0;
60     /** @var int Default number of rows to fetch at a time when using recordsets with cursors */
61     const DEFAULT_FETCH_BUFFER_SIZE = 100000;
63     /**
64      * Detects if all needed PHP stuff installed.
65      * Note: can be used before connect()
66      * @return mixed true if ok, string if something
67      */
68     public function driver_installed() {
69         if (!extension_loaded('pgsql')) {
70             return get_string('pgsqlextensionisnotpresentinphp', 'install');
71         }
72         return true;
73     }
75     /**
76      * Returns database family type - describes SQL dialect
77      * Note: can be used before connect()
78      * @return string db family name (mysql, postgres, mssql, oracle, etc.)
79      */
80     public function get_dbfamily() {
81         return 'postgres';
82     }
84     /**
85      * Returns more specific database driver type
86      * Note: can be used before connect()
87      * @return string db type mysqli, pgsql, oci, mssql, sqlsrv
88      */
89     protected function get_dbtype() {
90         return 'pgsql';
91     }
93     /**
94      * Returns general database library name
95      * Note: can be used before connect()
96      * @return string db type pdo, native
97      */
98     protected function get_dblibrary() {
99         return 'native';
100     }
102     /**
103      * Returns localised database type name
104      * Note: can be used before connect()
105      * @return string
106      */
107     public function get_name() {
108         return get_string('nativepgsql', 'install');
109     }
111     /**
112      * Returns localised database configuration help.
113      * Note: can be used before connect()
114      * @return string
115      */
116     public function get_configuration_help() {
117         return get_string('nativepgsqlhelp', 'install');
118     }
120     /**
121      * Connect to db
122      * @param string $dbhost The database host.
123      * @param string $dbuser The database username.
124      * @param string $dbpass The database username's password.
125      * @param string $dbname The name of the database being connected to.
126      * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used
127      * @param array $dboptions driver specific options
128      * @return bool true
129      * @throws dml_connection_exception if error
130      */
131     public function raw_connect(string $dbhost, string $dbuser, string $dbpass, string $dbname, $prefix, array $dboptions=null): bool {
132         if ($prefix == '' and !$this->external) {
133             //Enforce prefixes for everybody but mysql
134             throw new dml_exception('prefixcannotbeempty', $this->get_dbfamily());
135         }
137         $driverstatus = $this->driver_installed();
139         if ($driverstatus !== true) {
140             throw new dml_exception('dbdriverproblem', $driverstatus);
141         }
143         $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions);
145         $pass = addcslashes($this->dbpass, "'\\");
147         // Unix socket connections should have lower overhead
148         if (!empty($this->dboptions['dbsocket']) and ($this->dbhost === 'localhost' or $this->dbhost === '127.0.0.1')) {
149             $connection = "user='$this->dbuser' password='$pass' dbname='$this->dbname'";
150             if (strpos($this->dboptions['dbsocket'], '/') !== false) {
151                 // A directory was specified as the socket location.
152                 $connection .= " host='".$this->dboptions['dbsocket']."'";
153             }
154             if (!empty($this->dboptions['dbport'])) {
155                 // A port as specified, add it to the connection as it's used as part of the socket path.
156                 $connection .= " port ='".$this->dboptions['dbport']."'";
157             }
158         } else {
159             $this->dboptions['dbsocket'] = '';
160             if (empty($this->dbname)) {
161                 // probably old style socket connection - do not add port
162                 $port = "";
163             } else if (empty($this->dboptions['dbport'])) {
164                 $port = "port ='5432'";
165             } else {
166                 $port = "port ='".$this->dboptions['dbport']."'";
167             }
168             $connection = "host='$this->dbhost' $port user='$this->dbuser' password='$pass' dbname='$this->dbname'";
169         }
171         if (!empty($this->dboptions['connecttimeout'])) {
172             $connection .= " connect_timeout=".$this->dboptions['connecttimeout'];
173         }
175         if (empty($this->dboptions['dbhandlesoptions'])) {
176             // ALTER USER and ALTER DATABASE are overridden by these settings.
177             $options = array('--client_encoding=utf8', '--standard_conforming_strings=on');
178             // Select schema if specified, otherwise the first one wins.
179             if (!empty($this->dboptions['dbschema'])) {
180                 $options[] = "-c search_path=" . addcslashes($this->dboptions['dbschema'], "'\\");
181             }
183             $connection .= " options='" . implode(' ', $options) . "'";
184         }
186         ob_start();
187         if (empty($this->dboptions['dbpersist'])) {
188             $this->pgsql = pg_connect($connection, PGSQL_CONNECT_FORCE_NEW);
189         } else {
190             $this->pgsql = pg_pconnect($connection, PGSQL_CONNECT_FORCE_NEW);
191         }
192         $dberr = ob_get_contents();
193         ob_end_clean();
195         $status = pg_connection_status($this->pgsql);
197         if ($status === false or $status === PGSQL_CONNECTION_BAD) {
198             $this->pgsql = null;
199             throw new dml_connection_exception($dberr);
200         }
202         if (!empty($this->dboptions['dbpersist'])) {
203             // There are rare situations (such as PHP out of memory errors) when open cursors may
204             // not be closed at the end of a connection. When using persistent connections, the
205             // cursors remain open and 'get in the way' of future connections. To avoid this
206             // problem, close all cursors here.
207             $result = pg_query($this->pgsql, 'CLOSE ALL');
208             if ($result) {
209                 pg_free_result($result);
210             }
211         }
213         if (!empty($this->dboptions['dbhandlesoptions'])) {
214             /* We don't trust people who just set the dbhandlesoptions, this code checks up on them.
215              * These functions do not talk to the server, they use the client library knowledge to determine state.
216              */
217             if (!empty($this->dboptions['dbschema'])) {
218                 throw new dml_connection_exception('You cannot specify a schema with dbhandlesoptions, use the database to set it.');
219             }
220             if (pg_client_encoding($this->pgsql) != 'UTF8') {
221                 throw new dml_connection_exception('client_encoding = UTF8 not set, it is: ' . pg_client_encoding($this->pgsql));
222             }
223             if (pg_escape_string($this->pgsql, '\\') != '\\') {
224                 throw new dml_connection_exception('standard_conforming_strings = on, must be set at the database.');
225             }
226         }
228         // Connection stabilised and configured, going to instantiate the temptables controller
229         $this->temptables = new pgsql_native_moodle_temptables($this);
231         return true;
232     }
234     /**
235      * Close database connection and release all resources
236      * and memory (especially circular memory references).
237      * Do NOT use connect() again, create a new instance if needed.
238      */
239     public function dispose() {
240         parent::dispose(); // Call parent dispose to write/close session and other common stuff before closing connection
241         if ($this->pgsql) {
242             pg_close($this->pgsql);
243             $this->pgsql = null;
244         }
245     }
247     /**
248      * Gets db handle currently used with queries
249      * @return resource
250      */
251     protected function get_db_handle() {
252         return $this->pgsql;
253     }
255     /**
256      * Sets db handle to be used with subsequent queries
257      * @param resource $dbh
258      * @return void
259      */
260     protected function set_db_handle($dbh): void {
261         $this->pgsql = $dbh;
262     }
264     /**
265      * Select appropriate db handle - readwrite or readonly
266      * @param int $type type of query
267      * @param string $sql
268      * @return void
269      */
270     protected function select_db_handle(int $type, string $sql): void {
271         $this->read_slave_select_db_handle($type, $sql);
273         if (preg_match('/^DECLARE (crs\w*) NO SCROLL CURSOR/', $sql, $match)) {
274             $cursor = $match[1];
275             $this->dbhcursor[$cursor] = $this->pgsql;
276         }
277         if (preg_match('/^(?:FETCH \d+ FROM|CLOSE) (crs\w*)\b/', $sql, $match)) {
278             $cursor = $match[1];
279             $this->pgsql = $this->dbhcursor[$cursor];
280         }
281     }
283     /**
284      * Check if The query qualifies for readonly connection execution
285      * Logging queries are exempt, those are write operations that circumvent
286      * standard query_start/query_end paths.
287      * @param int $type type of query
288      * @param string $sql
289      * @return bool
290      */
291     protected function can_use_readonly(int $type, string $sql): bool {
292         // ... pg_*lock queries always go to master.
293         if (preg_match('/\bpg_\w*lock/', $sql)) {
294             return false;
295         }
297         // ... a nuisance - temptables use this.
298         if (preg_match('/\bpg_constraint/', $sql) && $this->temptables->get_temptables()) {
299             return false;
300         }
302         return $this->read_slave_can_use_readonly($type, $sql);
304     }
306     /**
307      * Called before each db query.
308      * @param string $sql
309      * @param array array of parameters
310      * @param int $type type of query
311      * @param mixed $extrainfo driver specific extra information
312      * @return void
313      */
314     protected function query_start($sql, array $params=null, $type, $extrainfo=null) {
315         $this->read_slave_query_start($sql, $params, $type, $extrainfo);
316         // pgsql driver tends to send debug to output, we do not need that.
317         $this->last_error_reporting = error_reporting(0);
318     }
320     /**
321      * Called immediately after each db query.
322      * @param mixed db specific result
323      * @return void
324      */
325     protected function query_end($result) {
326         // reset original debug level
327         error_reporting($this->last_error_reporting);
328         try {
329             parent::query_end($result);
330             if ($this->savepointpresent and $this->last_type != SQL_QUERY_AUX and $this->last_type != SQL_QUERY_SELECT) {
331                 $res = @pg_query($this->pgsql, "RELEASE SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
332                 if ($res) {
333                     pg_free_result($res);
334                 }
335             }
336         } catch (Exception $e) {
337             if ($this->savepointpresent) {
338                 $res = @pg_query($this->pgsql, "ROLLBACK TO SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
339                 if ($res) {
340                     pg_free_result($res);
341                 }
342             }
343             throw $e;
344         }
345     }
347     /**
348      * Returns database server info array
349      * @return array Array containing 'description' and 'version' info
350      */
351     public function get_server_info() {
352         static $info;
353         if (!$info) {
354             $this->query_start("--pg_version()", null, SQL_QUERY_AUX);
355             $info = pg_version($this->pgsql);
356             $this->query_end(true);
357         }
358         return array('description'=>$info['server'], 'version'=>$info['server']);
359     }
361     /**
362      * Returns supported query parameter types
363      * @return int bitmask of accepted SQL_PARAMS_*
364      */
365     protected function allowed_param_types() {
366         return SQL_PARAMS_DOLLAR;
367     }
369     /**
370      * Returns last error reported by database engine.
371      * @return string error message
372      */
373     public function get_last_error() {
374         return pg_last_error($this->pgsql);
375     }
377     /**
378      * Return tables in database WITHOUT current prefix.
379      * @param bool $usecache if true, returns list of cached tables.
380      * @return array of table names in lowercase and without prefix
381      */
382     public function get_tables($usecache=true) {
383         if ($usecache and $this->tables !== null) {
384             return $this->tables;
385         }
386         $this->tables = array();
387         $prefix = str_replace('_', '|_', $this->prefix);
388         $sql = "SELECT c.relname
389                   FROM pg_catalog.pg_class c
390                   JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
391                  WHERE c.relname LIKE '$prefix%' ESCAPE '|'
392                        AND c.relkind = 'r'
393                        AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())";
394         $this->query_start($sql, null, SQL_QUERY_AUX);
395         $result = pg_query($this->pgsql, $sql);
396         $this->query_end($result);
398         if ($result) {
399             while ($row = pg_fetch_row($result)) {
400                 $tablename = reset($row);
401                 if ($this->prefix !== false && $this->prefix !== '') {
402                     if (strpos($tablename, $this->prefix) !== 0) {
403                         continue;
404                     }
405                     $tablename = substr($tablename, strlen($this->prefix));
406                 }
407                 $this->tables[$tablename] = $tablename;
408             }
409             pg_free_result($result);
410         }
411         return $this->tables;
412     }
414     /**
415      * Return table indexes - everything lowercased.
416      * @param string $table The table we want to get indexes from.
417      * @return array of arrays
418      */
419     public function get_indexes($table) {
420         $indexes = array();
421         $tablename = $this->prefix.$table;
423         $sql = "SELECT i.*
424                   FROM pg_catalog.pg_indexes i
425                   JOIN pg_catalog.pg_namespace as ns ON ns.nspname = i.schemaname
426                  WHERE i.tablename = '$tablename'
427                        AND (i.schemaname = current_schema() OR ns.oid = pg_my_temp_schema())";
429         $this->query_start($sql, null, SQL_QUERY_AUX);
430         $result = pg_query($this->pgsql, $sql);
431         $this->query_end($result);
433         if ($result) {
434             while ($row = pg_fetch_assoc($result)) {
435                 // The index definition could be generated schema-qualifying the target table name
436                 // for safety, depending on the pgsql version (CVE-2018-1058).
437                 if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON (|'.$row['schemaname'].'\.)'.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) {
438                     continue;
439                 }
440                 if ($matches[5] === 'id') {
441                     continue;
442                 }
443                 $columns = explode(',', $matches[5]);
444                 foreach ($columns as $k=>$column) {
445                     $column = trim($column);
446                     if ($pos = strpos($column, ' ')) {
447                         // index type is separated by space
448                         $column = substr($column, 0, $pos);
449                     }
450                     $columns[$k] = $this->trim_quotes($column);
451                 }
452                 $indexes[$row['indexname']] = array('unique'=>!empty($matches[1]),
453                                               'columns'=>$columns);
454             }
455             pg_free_result($result);
456         }
457         return $indexes;
458     }
460     /**
461      * Returns detailed information about columns in table.
462      *
463      * @param string $table name
464      * @return database_column_info[] array of database_column_info objects indexed with column names
465      */
466     protected function fetch_columns(string $table): array {
467         $structure = array();
469         $tablename = $this->prefix.$table;
471         $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef,
472                        CASE WHEN a.atthasdef THEN pg_catalog.pg_get_expr(d.adbin, d.adrelid) END AS adsrc
473                   FROM pg_catalog.pg_class c
474                   JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
475                   JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
476                   JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
477              LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum)
478                  WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0
479                        AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())
480               ORDER BY a.attnum";
482         $this->query_start($sql, null, SQL_QUERY_AUX);
483         $result = pg_query($this->pgsql, $sql);
484         $this->query_end($result);
486         if (!$result) {
487             return array();
488         }
489         while ($rawcolumn = pg_fetch_object($result)) {
491             $info = new stdClass();
492             $info->name = $rawcolumn->field;
493             $matches = null;
495             if ($rawcolumn->type === 'varchar') {
496                 $info->type          = 'varchar';
497                 $info->meta_type     = 'C';
498                 $info->max_length    = $rawcolumn->atttypmod - 4;
499                 $info->scale         = null;
500                 $info->not_null      = ($rawcolumn->attnotnull === 't');
501                 $info->has_default   = ($rawcolumn->atthasdef === 't');
502                 if ($info->has_default) {
503                     $parts = explode('::', $rawcolumn->adsrc);
504                     if (count($parts) > 1) {
505                         $info->default_value = reset($parts);
506                         $info->default_value = trim($info->default_value, "'");
507                     } else {
508                         $info->default_value = $rawcolumn->adsrc;
509                     }
510                 } else {
511                     $info->default_value = null;
512                 }
513                 $info->primary_key   = false;
514                 $info->binary        = false;
515                 $info->unsigned      = null;
516                 $info->auto_increment= false;
517                 $info->unique        = null;
519             } else if (preg_match('/int(\d)/i', $rawcolumn->type, $matches)) {
520                 $info->type = 'int';
521                 if (strpos($rawcolumn->adsrc, 'nextval') === 0) {
522                     $info->primary_key   = true;
523                     $info->meta_type     = 'R';
524                     $info->unique        = true;
525                     $info->auto_increment= true;
526                     $info->has_default   = false;
527                 } else {
528                     $info->primary_key   = false;
529                     $info->meta_type     = 'I';
530                     $info->unique        = null;
531                     $info->auto_increment= false;
532                     $info->has_default   = ($rawcolumn->atthasdef === 't');
533                 }
534                 // Return number of decimals, not bytes here.
535                 if ($matches[1] >= 8) {
536                     $info->max_length = 18;
537                 } else if ($matches[1] >= 4) {
538                     $info->max_length = 9;
539                 } else if ($matches[1] >= 2) {
540                     $info->max_length = 4;
541                 } else if ($matches[1] >= 1) {
542                     $info->max_length = 2;
543                 } else {
544                     $info->max_length = 0;
545                 }
546                 $info->scale         = null;
547                 $info->not_null      = ($rawcolumn->attnotnull === 't');
548                 if ($info->has_default) {
549                     // PG 9.5+ uses ::<TYPE> syntax for some defaults.
550                     $parts = explode('::', $rawcolumn->adsrc);
551                     if (count($parts) > 1) {
552                         $info->default_value = reset($parts);
553                     } else {
554                         $info->default_value = $rawcolumn->adsrc;
555                     }
556                     $info->default_value = trim($info->default_value, "()'");
557                 } else {
558                     $info->default_value = null;
559                 }
560                 $info->binary        = false;
561                 $info->unsigned      = false;
563             } else if ($rawcolumn->type === 'numeric') {
564                 $info->type = $rawcolumn->type;
565                 $info->meta_type     = 'N';
566                 $info->primary_key   = false;
567                 $info->binary        = false;
568                 $info->unsigned      = null;
569                 $info->auto_increment= false;
570                 $info->unique        = null;
571                 $info->not_null      = ($rawcolumn->attnotnull === 't');
572                 $info->has_default   = ($rawcolumn->atthasdef === 't');
573                 if ($info->has_default) {
574                     // PG 9.5+ uses ::<TYPE> syntax for some defaults.
575                     $parts = explode('::', $rawcolumn->adsrc);
576                     if (count($parts) > 1) {
577                         $info->default_value = reset($parts);
578                     } else {
579                         $info->default_value = $rawcolumn->adsrc;
580                     }
581                     $info->default_value = trim($info->default_value, "()'");
582                 } else {
583                     $info->default_value = null;
584                 }
585                 $info->max_length    = $rawcolumn->atttypmod >> 16;
586                 $info->scale         = ($rawcolumn->atttypmod & 0xFFFF) - 4;
588             } else if (preg_match('/float(\d)/i', $rawcolumn->type, $matches)) {
589                 $info->type = 'float';
590                 $info->meta_type     = 'N';
591                 $info->primary_key   = false;
592                 $info->binary        = false;
593                 $info->unsigned      = null;
594                 $info->auto_increment= false;
595                 $info->unique        = null;
596                 $info->not_null      = ($rawcolumn->attnotnull === 't');
597                 $info->has_default   = ($rawcolumn->atthasdef === 't');
598                 if ($info->has_default) {
599                     // PG 9.5+ uses ::<TYPE> syntax for some defaults.
600                     $parts = explode('::', $rawcolumn->adsrc);
601                     if (count($parts) > 1) {
602                         $info->default_value = reset($parts);
603                     } else {
604                         $info->default_value = $rawcolumn->adsrc;
605                     }
606                     $info->default_value = trim($info->default_value, "()'");
607                 } else {
608                     $info->default_value = null;
609                 }
610                 // just guess expected number of deciaml places :-(
611                 if ($matches[1] == 8) {
612                     // total 15 digits
613                     $info->max_length = 8;
614                     $info->scale      = 7;
615                 } else {
616                     // total 6 digits
617                     $info->max_length = 4;
618                     $info->scale      = 2;
619                 }
621             } else if ($rawcolumn->type === 'text') {
622                 $info->type          = $rawcolumn->type;
623                 $info->meta_type     = 'X';
624                 $info->max_length    = -1;
625                 $info->scale         = null;
626                 $info->not_null      = ($rawcolumn->attnotnull === 't');
627                 $info->has_default   = ($rawcolumn->atthasdef === 't');
628                 if ($info->has_default) {
629                     $parts = explode('::', $rawcolumn->adsrc);
630                     if (count($parts) > 1) {
631                         $info->default_value = reset($parts);
632                         $info->default_value = trim($info->default_value, "'");
633                     } else {
634                         $info->default_value = $rawcolumn->adsrc;
635                     }
636                 } else {
637                     $info->default_value = null;
638                 }
639                 $info->primary_key   = false;
640                 $info->binary        = false;
641                 $info->unsigned      = null;
642                 $info->auto_increment= false;
643                 $info->unique        = null;
645             } else if ($rawcolumn->type === 'bytea') {
646                 $info->type          = $rawcolumn->type;
647                 $info->meta_type     = 'B';
648                 $info->max_length    = -1;
649                 $info->scale         = null;
650                 $info->not_null      = ($rawcolumn->attnotnull === 't');
651                 $info->has_default   = false;
652                 $info->default_value = null;
653                 $info->primary_key   = false;
654                 $info->binary        = true;
655                 $info->unsigned      = null;
656                 $info->auto_increment= false;
657                 $info->unique        = null;
659             }
661             $structure[$info->name] = new database_column_info($info);
662         }
664         pg_free_result($result);
666         return $structure;
667     }
669     /**
670      * Normalise values based in RDBMS dependencies (booleans, LOBs...)
671      *
672      * @param database_column_info $column column metadata corresponding with the value we are going to normalise
673      * @param mixed $value value we are going to normalise
674      * @return mixed the normalised value
675      */
676     protected function normalise_value($column, $value) {
677         $this->detect_objects($value);
679         if (is_bool($value)) { // Always, convert boolean to int
680             $value = (int)$value;
682         } else if ($column->meta_type === 'B') {
683             if (!is_null($value)) {
684                 // standard_conforming_strings must be enabled, otherwise pg_escape_bytea() will double escape
685                 // \ and produce data errors.  This is set on the connection.
686                 $value = pg_escape_bytea($this->pgsql, $value);
687             }
689         } else if ($value === '') {
690             if ($column->meta_type === 'I' or $column->meta_type === 'F' or $column->meta_type === 'N') {
691                 $value = 0; // prevent '' problems in numeric fields
692             }
693         }
694         return $value;
695     }
697     /**
698      * Is db in unicode mode?
699      * @return bool
700      */
701     public function setup_is_unicodedb() {
702         // Get PostgreSQL server_encoding value
703         $sql = "SHOW server_encoding";
704         $this->query_start($sql, null, SQL_QUERY_AUX);
705         $result = pg_query($this->pgsql, $sql);
706         $this->query_end($result);
708         if (!$result) {
709             return false;
710         }
711         $rawcolumn = pg_fetch_object($result);
712         $encoding = $rawcolumn->server_encoding;
713         pg_free_result($result);
715         return (strtoupper($encoding) == 'UNICODE' || strtoupper($encoding) == 'UTF8');
716     }
718     /**
719      * Do NOT use in code, to be used by database_manager only!
720      * @param string|array $sql query
721      * @param array|null $tablenames an array of xmldb table names affected by this request.
722      * @return bool true
723      * @throws ddl_change_structure_exception A DDL specific exception is thrown for any errors.
724      */
725     public function change_database_structure($sql, $tablenames = null) {
726         $this->get_manager(); // Includes DDL exceptions classes ;-)
727         if (is_array($sql)) {
728             $sql = implode("\n;\n", $sql);
729         }
730         if (!$this->is_transaction_started()) {
731             // It is better to do all or nothing, this helps with recovery...
732             $sql = "BEGIN ISOLATION LEVEL SERIALIZABLE;\n$sql\n; COMMIT";
733         }
735         try {
736             $this->query_start($sql, null, SQL_QUERY_STRUCTURE);
737             $result = pg_query($this->pgsql, $sql);
738             $this->query_end($result);
739             pg_free_result($result);
740         } catch (ddl_change_structure_exception $e) {
741             if (!$this->is_transaction_started()) {
742                 $result = @pg_query($this->pgsql, "ROLLBACK");
743                 @pg_free_result($result);
744             }
745             $this->reset_caches($tablenames);
746             throw $e;
747         }
749         $this->reset_caches($tablenames);
750         return true;
751     }
753     /**
754      * Execute general sql query. Should be used only when no other method suitable.
755      * Do NOT use this to make changes in db structure, use database_manager methods instead!
756      * @param string $sql query
757      * @param array $params query parameters
758      * @return bool true
759      * @throws dml_exception A DML specific exception is thrown for any errors.
760      */
761     public function execute($sql, array $params=null) {
762         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
764         if (strpos($sql, ';') !== false) {
765             throw new coding_exception('moodle_database::execute() Multiple sql statements found or bound parameters not used properly in query!');
766         }
768         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
769         $result = pg_query_params($this->pgsql, $sql, $params);
770         $this->query_end($result);
772         pg_free_result($result);
773         return true;
774     }
776     /**
777      * Get a number of records as a moodle_recordset using a SQL statement.
778      *
779      * Since this method is a little less readable, use of it should be restricted to
780      * code where it's possible there might be large datasets being returned.  For known
781      * small datasets use get_records_sql - it leads to simpler code.
782      *
783      * The return type is like:
784      * @see function get_recordset.
785      *
786      * @param string $sql the SQL select query to execute.
787      * @param array $params array of sql parameters
788      * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
789      * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
790      * @return moodle_recordset instance
791      * @throws dml_exception A DML specific exception is thrown for any errors.
792      */
793     public function get_recordset_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
795         list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
797         if ($limitnum) {
798             $sql .= " LIMIT $limitnum";
799         }
800         if ($limitfrom) {
801             $sql .= " OFFSET $limitfrom";
802         }
804         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
806         // For any query that doesn't explicitly specify a limit, we must use cursors to stop it
807         // loading the entire thing (unless the config setting is turned off).
808         $usecursors = !$limitnum && ($this->get_fetch_buffer_size() > 0);
809         if ($usecursors) {
810             // Work out the cursor unique identifer. This is based on a simple count used which
811             // should be OK because the identifiers only need to be unique within the current
812             // transaction.
813             $this->cursorcount++;
814             $cursorname = 'crs' . $this->cursorcount;
816             // Do the query to a cursor.
817             $sql = 'DECLARE ' . $cursorname . ' NO SCROLL CURSOR WITH HOLD FOR ' . $sql;
818         } else {
819             $cursorname = '';
820         }
822         $this->query_start($sql, $params, SQL_QUERY_SELECT);
824         $result = pg_query_params($this->pgsql, $sql, $params);
826         $this->query_end($result);
827         if ($usecursors) {
828             pg_free_result($result);
829             $result = null;
830         }
832         return new pgsql_native_moodle_recordset($result, $this, $cursorname);
833     }
835     /**
836      * Gets size of fetch buffer used for recordset queries.
837      *
838      * If this returns 0 then cursors will not be used, meaning recordset queries will occupy enough
839      * memory as needed for the Postgres library to hold the entire query results in memory.
840      *
841      * @return int Fetch buffer size or 0 indicating not to use cursors
842      */
843     protected function get_fetch_buffer_size() {
844         if (array_key_exists('fetchbuffersize', $this->dboptions)) {
845             return (int)$this->dboptions['fetchbuffersize'];
846         } else {
847             return self::DEFAULT_FETCH_BUFFER_SIZE;
848         }
849     }
851     /**
852      * Retrieves data from cursor. For use by recordset only; do not call directly.
853      *
854      * Return value contains the next batch of Postgres data, and a boolean indicating if this is
855      * definitely the last batch (if false, there may be more)
856      *
857      * @param string $cursorname Name of cursor to read from
858      * @return array Array with 2 elements (next data batch and boolean indicating last batch)
859      */
860     public function fetch_from_cursor($cursorname) {
861         $count = $this->get_fetch_buffer_size();
863         $sql = 'FETCH ' . $count . ' FROM ' . $cursorname;
865         $this->query_start($sql, [], SQL_QUERY_AUX);
866         $result = pg_query($this->pgsql, $sql);
867         $last = pg_num_rows($result) !== $count;
869         $this->query_end($result);
871         return [$result, $last];
872     }
874     /**
875      * Closes a cursor. For use by recordset only; do not call directly.
876      *
877      * @param string $cursorname Name of cursor to close
878      * @return bool True if we actually closed one, false if the transaction was cancelled
879      */
880     public function close_cursor($cursorname) {
881         // If the transaction got cancelled, then ignore this request.
882         $sql = 'CLOSE ' . $cursorname;
883         $this->query_start($sql, [], SQL_QUERY_AUX);
884         $result = pg_query($this->pgsql, $sql);
885         $this->query_end($result);
886         if ($result) {
887             pg_free_result($result);
888         }
889         return true;
890     }
892     /**
893      * Get a number of records as an array of objects using a SQL statement.
894      *
895      * Return value is like:
896      * @see function get_records.
897      *
898      * @param string $sql the SQL select query to execute. The first column of this SELECT statement
899      *   must be a unique value (usually the 'id' field), as it will be used as the key of the
900      *   returned array.
901      * @param array $params array of sql parameters
902      * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
903      * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
904      * @return array of objects, or empty array if no records were found
905      * @throws dml_exception A DML specific exception is thrown for any errors.
906      */
907     public function get_records_sql($sql, array $params = null, $limitfrom = 0, $limitnum = 0) {
908         list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
910         if ($limitnum) {
911             $sql .= " LIMIT $limitnum";
912         }
913         if ($limitfrom) {
914             $sql .= " OFFSET $limitfrom";
915         }
917         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
918         $this->query_start($sql, $params, SQL_QUERY_SELECT);
919         $result = pg_query_params($this->pgsql, $sql, $params);
920         $this->query_end($result);
922         // find out if there are any blobs
923         $numfields = pg_num_fields($result);
924         $blobs = array();
925         for ($i = 0; $i < $numfields; $i++) {
926             $type = pg_field_type($result, $i);
927             if ($type == 'bytea') {
928                 $blobs[] = pg_field_name($result, $i);
929             }
930         }
932         $return = [];
933         while ($row = pg_fetch_assoc($result)) {
934             $id = reset($row);
935             if ($blobs) {
936                 foreach ($blobs as $blob) {
937                     $row[$blob] = ($row[$blob] !== null ? pg_unescape_bytea($row[$blob]) : null);
938                 }
939             }
940             if (isset($return[$id])) {
941                 $colname = key($row);
942                 debugging("Did you remember to make the first column something unique in your call to get_records? Duplicate value '$id' found in column '$colname'.", DEBUG_DEVELOPER);
943             }
944             $return[$id] = (object) $row;
945         }
947         return $return;
948     }
950     /**
951      * Selects records and return values (first field) as an array using a SQL statement.
952      *
953      * @param string $sql The SQL query
954      * @param array $params array of sql parameters
955      * @return array of values
956      * @throws dml_exception A DML specific exception is thrown for any errors.
957      */
958     public function get_fieldset_sql($sql, array $params=null) {
959         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
961         $this->query_start($sql, $params, SQL_QUERY_SELECT);
962         $result = pg_query_params($this->pgsql, $sql, $params);
963         $this->query_end($result);
965         $return = pg_fetch_all_columns($result, 0);
967         if (pg_field_type($result, 0) == 'bytea') {
968             foreach ($return as $key => $value) {
969                 $return[$key] = ($value === null ? $value : pg_unescape_bytea($value));
970             }
971         }
973         pg_free_result($result);
975         return $return;
976     }
978     /**
979      * Insert new record into database, as fast as possible, no safety checks, lobs not supported.
980      * @param string $table name
981      * @param mixed $params data record as object or array
982      * @param bool $returnit return it of inserted record
983      * @param bool $bulk true means repeated inserts expected
984      * @param bool $customsequence true if 'id' included in $params, disables $returnid
985      * @return bool|int true or new id
986      * @throws dml_exception A DML specific exception is thrown for any errors.
987      */
988     public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) {
989         if (!is_array($params)) {
990             $params = (array)$params;
991         }
993         $returning = "";
995         if ($customsequence) {
996             if (!isset($params['id'])) {
997                 throw new coding_exception('moodle_database::insert_record_raw() id field must be specified if custom sequences used.');
998             }
999             $returnid = false;
1000         } else {
1001             if ($returnid) {
1002                 $returning = "RETURNING id";
1003                 unset($params['id']);
1004             } else {
1005                 unset($params['id']);
1006             }
1007         }
1009         if (empty($params)) {
1010             throw new coding_exception('moodle_database::insert_record_raw() no fields found.');
1011         }
1013         $fields = implode(',', array_keys($params));
1014         $values = array();
1015         $i = 1;
1016         foreach ($params as $value) {
1017             $this->detect_objects($value);
1018             $values[] = "\$".$i++;
1019         }
1020         $values = implode(',', $values);
1022         $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning";
1023         $this->query_start($sql, $params, SQL_QUERY_INSERT);
1024         $result = pg_query_params($this->pgsql, $sql, $params);
1025         $this->query_end($result);
1027         if ($returning !== "") {
1028             $row = pg_fetch_assoc($result);
1029             $params['id'] = reset($row);
1030         }
1031         pg_free_result($result);
1033         if (!$returnid) {
1034             return true;
1035         }
1037         return (int)$params['id'];
1038     }
1040     /**
1041      * Insert a record into a table and return the "id" field if required.
1042      *
1043      * Some conversions and safety checks are carried out. Lobs are supported.
1044      * If the return ID isn't required, then this just reports success as true/false.
1045      * $data is an object containing needed data
1046      * @param string $table The database table to be inserted into
1047      * @param object $data A data object with values for one or more fields in the record
1048      * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned.
1049      * @return bool|int true or new id
1050      * @throws dml_exception A DML specific exception is thrown for any errors.
1051      */
1052     public function insert_record($table, $dataobject, $returnid=true, $bulk=false) {
1053         $dataobject = (array)$dataobject;
1055         $columns = $this->get_columns($table);
1056         if (empty($columns)) {
1057             throw new dml_exception('ddltablenotexist', $table);
1058         }
1060         $cleaned = array();
1062         foreach ($dataobject as $field=>$value) {
1063             if ($field === 'id') {
1064                 continue;
1065             }
1066             if (!isset($columns[$field])) {
1067                 continue;
1068             }
1069             $column = $columns[$field];
1070             $cleaned[$field] = $this->normalise_value($column, $value);
1071         }
1073         return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
1075     }
1077     /**
1078      * Insert multiple records into database as fast as possible.
1079      *
1080      * Order of inserts is maintained, but the operation is not atomic,
1081      * use transactions if necessary.
1082      *
1083      * This method is intended for inserting of large number of small objects,
1084      * do not use for huge objects with text or binary fields.
1085      *
1086      * @since Moodle 2.7
1087      *
1088      * @param string $table  The database table to be inserted into
1089      * @param array|Traversable $dataobjects list of objects to be inserted, must be compatible with foreach
1090      * @return void does not return new record ids
1091      *
1092      * @throws coding_exception if data objects have different structure
1093      * @throws dml_exception A DML specific exception is thrown for any errors.
1094      */
1095     public function insert_records($table, $dataobjects) {
1096         if (!is_array($dataobjects) and !($dataobjects instanceof Traversable)) {
1097             throw new coding_exception('insert_records() passed non-traversable object');
1098         }
1100         // PostgreSQL does not seem to have problems with huge queries.
1101         $chunksize = 500;
1102         if (!empty($this->dboptions['bulkinsertsize'])) {
1103             $chunksize = (int)$this->dboptions['bulkinsertsize'];
1104         }
1106         $columns = $this->get_columns($table, true);
1108         $fields = null;
1109         $count = 0;
1110         $chunk = array();
1111         foreach ($dataobjects as $dataobject) {
1112             if (!is_array($dataobject) and !is_object($dataobject)) {
1113                 throw new coding_exception('insert_records() passed invalid record object');
1114             }
1115             $dataobject = (array)$dataobject;
1116             if ($fields === null) {
1117                 $fields = array_keys($dataobject);
1118                 $columns = array_intersect_key($columns, $dataobject);
1119                 unset($columns['id']);
1120             } else if ($fields !== array_keys($dataobject)) {
1121                 throw new coding_exception('All dataobjects in insert_records() must have the same structure!');
1122             }
1124             $count++;
1125             $chunk[] = $dataobject;
1127             if ($count === $chunksize) {
1128                 $this->insert_chunk($table, $chunk, $columns);
1129                 $chunk = array();
1130                 $count = 0;
1131             }
1132         }
1134         if ($count) {
1135             $this->insert_chunk($table, $chunk, $columns);
1136         }
1137     }
1139     /**
1140      * Insert records in chunks, strict param types...
1141      *
1142      * Note: can be used only from insert_records().
1143      *
1144      * @param string $table
1145      * @param array $chunk
1146      * @param database_column_info[] $columns
1147      */
1148     protected function insert_chunk($table, array $chunk, array $columns) {
1149         $i = 1;
1150         $params = array();
1151         $values = array();
1152         foreach ($chunk as $dataobject) {
1153             $vals = array();
1154             foreach ($columns as $field => $column) {
1155                 $params[] = $this->normalise_value($column, $dataobject[$field]);
1156                 $vals[] = "\$".$i++;
1157             }
1158             $values[] = '('.implode(',', $vals).')';
1159         }
1161         $fieldssql = '('.implode(',', array_keys($columns)).')';
1162         $valuessql = implode(',', $values);
1164         $sql = "INSERT INTO {$this->prefix}$table $fieldssql VALUES $valuessql";
1165         $this->query_start($sql, $params, SQL_QUERY_INSERT);
1166         $result = pg_query_params($this->pgsql, $sql, $params);
1167         $this->query_end($result);
1168         pg_free_result($result);
1169     }
1171     /**
1172      * Import a record into a table, id field is required.
1173      * Safety checks are NOT carried out. Lobs are supported.
1174      *
1175      * @param string $table name of database table to be inserted into
1176      * @param object $dataobject A data object with values for one or more fields in the record
1177      * @return bool true
1178      * @throws dml_exception A DML specific exception is thrown for any errors.
1179      */
1180     public function import_record($table, $dataobject) {
1181         $dataobject = (array)$dataobject;
1183         $columns = $this->get_columns($table);
1184         $cleaned = array();
1186         foreach ($dataobject as $field=>$value) {
1187             $this->detect_objects($value);
1188             if (!isset($columns[$field])) {
1189                 continue;
1190             }
1191             $column = $columns[$field];
1192             $cleaned[$field] = $this->normalise_value($column, $value);
1193         }
1195         return $this->insert_record_raw($table, $cleaned, false, true, true);
1196     }
1198     /**
1199      * Update record in database, as fast as possible, no safety checks, lobs not supported.
1200      * @param string $table name
1201      * @param mixed $params data record as object or array
1202      * @param bool true means repeated updates expected
1203      * @return bool true
1204      * @throws dml_exception A DML specific exception is thrown for any errors.
1205      */
1206     public function update_record_raw($table, $params, $bulk=false) {
1207         $params = (array)$params;
1209         if (!isset($params['id'])) {
1210             throw new coding_exception('moodle_database::update_record_raw() id field must be specified.');
1211         }
1212         $id = $params['id'];
1213         unset($params['id']);
1215         if (empty($params)) {
1216             throw new coding_exception('moodle_database::update_record_raw() no fields found.');
1217         }
1219         $i = 1;
1221         $sets = array();
1222         foreach ($params as $field=>$value) {
1223             $this->detect_objects($value);
1224             $sets[] = "$field = \$".$i++;
1225         }
1227         $params[] = $id; // last ? in WHERE condition
1229         $sets = implode(',', $sets);
1230         $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i;
1232         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1233         $result = pg_query_params($this->pgsql, $sql, $params);
1234         $this->query_end($result);
1236         pg_free_result($result);
1237         return true;
1238     }
1240     /**
1241      * Update a record in a table
1242      *
1243      * $dataobject is an object containing needed data
1244      * Relies on $dataobject having a variable "id" to
1245      * specify the record to update
1246      *
1247      * @param string $table The database table to be checked against.
1248      * @param object $dataobject An object with contents equal to fieldname=>fieldvalue. Must have an entry for 'id' to map to the table specified.
1249      * @param bool true means repeated updates expected
1250      * @return bool true
1251      * @throws dml_exception A DML specific exception is thrown for any errors.
1252      */
1253     public function update_record($table, $dataobject, $bulk=false) {
1254         $dataobject = (array)$dataobject;
1256         $columns = $this->get_columns($table);
1257         $cleaned = array();
1259         foreach ($dataobject as $field=>$value) {
1260             if (!isset($columns[$field])) {
1261                 continue;
1262             }
1263             $column = $columns[$field];
1264             $cleaned[$field] = $this->normalise_value($column, $value);
1265         }
1267         $this->update_record_raw($table, $cleaned, $bulk);
1269         return true;
1270     }
1272     /**
1273      * Set a single field in every table record which match a particular WHERE clause.
1274      *
1275      * @param string $table The database table to be checked against.
1276      * @param string $newfield the field to set.
1277      * @param string $newvalue the value to set the field to.
1278      * @param string $select A fragment of SQL to be used in a where clause in the SQL call.
1279      * @param array $params array of sql parameters
1280      * @return bool true
1281      * @throws dml_exception A DML specific exception is thrown for any errors.
1282      */
1283     public function set_field_select($table, $newfield, $newvalue, $select, array $params=null) {
1285         if ($select) {
1286             $select = "WHERE $select";
1287         }
1288         if (is_null($params)) {
1289             $params = array();
1290         }
1291         list($select, $params, $type) = $this->fix_sql_params($select, $params);
1292         $i = count($params)+1;
1294         // Get column metadata
1295         $columns = $this->get_columns($table);
1296         $column = $columns[$newfield];
1298         $normalisedvalue = $this->normalise_value($column, $newvalue);
1300         $newfield = "$newfield = \$" . $i;
1301         $params[] = $normalisedvalue;
1302         $sql = "UPDATE {$this->prefix}$table SET $newfield $select";
1304         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1305         $result = pg_query_params($this->pgsql, $sql, $params);
1306         $this->query_end($result);
1308         pg_free_result($result);
1310         return true;
1311     }
1313     /**
1314      * Delete one or more records from a table which match a particular WHERE clause, lobs not supported.
1315      *
1316      * @param string $table The database table to be checked against.
1317      * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria).
1318      * @param array $params array of sql parameters
1319      * @return bool true
1320      * @throws dml_exception A DML specific exception is thrown for any errors.
1321      */
1322     public function delete_records_select($table, $select, array $params=null) {
1323         if ($select) {
1324             $select = "WHERE $select";
1325         }
1326         $sql = "DELETE FROM {$this->prefix}$table $select";
1328         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1330         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1331         $result = pg_query_params($this->pgsql, $sql, $params);
1332         $this->query_end($result);
1334         pg_free_result($result);
1336         return true;
1337     }
1339     /**
1340      * Returns 'LIKE' part of a query.
1341      *
1342      * @param string $fieldname usually name of the table column
1343      * @param string $param usually bound query parameter (?, :named)
1344      * @param bool $casesensitive use case sensitive search
1345      * @param bool $accensensitive use accent sensitive search (not all databases support accent insensitive)
1346      * @param bool $notlike true means "NOT LIKE"
1347      * @param string $escapechar escape char for '%' and '_'
1348      * @return string SQL code fragment
1349      */
1350     public function sql_like($fieldname, $param, $casesensitive = true, $accentsensitive = true, $notlike = false, $escapechar = '\\') {
1351         if (strpos($param, '%') !== false) {
1352             debugging('Potential SQL injection detected, sql_like() expects bound parameters (? or :named)');
1353         }
1355         // postgresql does not support accent insensitive text comparisons, sorry
1356         if ($casesensitive) {
1357             $LIKE = $notlike ? 'NOT LIKE' : 'LIKE';
1358         } else {
1359             $LIKE = $notlike ? 'NOT ILIKE' : 'ILIKE';
1360         }
1361         return "$fieldname $LIKE $param ESCAPE '$escapechar'";
1362     }
1364     public function sql_bitxor($int1, $int2) {
1365         return '((' . $int1 . ') # (' . $int2 . '))';
1366     }
1368     public function sql_cast_char2int($fieldname, $text=false) {
1369         return ' CAST(' . $fieldname . ' AS INT) ';
1370     }
1372     public function sql_cast_char2real($fieldname, $text=false) {
1373         return " $fieldname::real ";
1374     }
1376     public function sql_concat() {
1377         $arr = func_get_args();
1378         $s = implode(' || ', $arr);
1379         if ($s === '') {
1380             return " '' ";
1381         }
1382         // Add always empty string element so integer-exclusive concats
1383         // will work without needing to cast each element explicitly
1384         return " '' || $s ";
1385     }
1387     public function sql_concat_join($separator="' '", $elements=array()) {
1388         for ($n=count($elements)-1; $n > 0 ; $n--) {
1389             array_splice($elements, $n, 0, $separator);
1390         }
1391         $s = implode(' || ', $elements);
1392         if ($s === '') {
1393             return " '' ";
1394         }
1395         return " $s ";
1396     }
1398     public function sql_regex_supported() {
1399         return true;
1400     }
1402     public function sql_regex($positivematch = true, $casesensitive = false) {
1403         if ($casesensitive) {
1404             return $positivematch ? '~' : '!~';
1405         } else {
1406             return $positivematch ? '~*' : '!~*';
1407         }
1408     }
1410     /**
1411      * Does this driver support tool_replace?
1412      *
1413      * @since Moodle 2.6.1
1414      * @return bool
1415      */
1416     public function replace_all_text_supported() {
1417         return true;
1418     }
1420     public function session_lock_supported() {
1421         return true;
1422     }
1424     /**
1425      * Obtain session lock
1426      * @param int $rowid id of the row with session record
1427      * @param int $timeout max allowed time to wait for the lock in seconds
1428      * @return bool success
1429      */
1430     public function get_session_lock($rowid, $timeout) {
1431         // NOTE: there is a potential locking problem for database running
1432         //       multiple instances of moodle, we could try to use pg_advisory_lock(int, int),
1433         //       luckily there is not a big chance that they would collide
1434         if (!$this->session_lock_supported()) {
1435             return;
1436         }
1438         parent::get_session_lock($rowid, $timeout);
1440         $timeoutmilli = $timeout * 1000;
1442         $sql = "SET statement_timeout TO $timeoutmilli";
1443         $this->query_start($sql, null, SQL_QUERY_AUX);
1444         $result = pg_query($this->pgsql, $sql);
1445         $this->query_end($result);
1447         if ($result) {
1448             pg_free_result($result);
1449         }
1451         $sql = "SELECT pg_advisory_lock($rowid)";
1452         $this->query_start($sql, null, SQL_QUERY_AUX);
1453         $start = time();
1454         $result = pg_query($this->pgsql, $sql);
1455         $end = time();
1456         try {
1457             $this->query_end($result);
1458         } catch (dml_exception $ex) {
1459             if ($end - $start >= $timeout) {
1460                 throw new dml_sessionwait_exception();
1461             } else {
1462                 throw $ex;
1463             }
1464         }
1466         if ($result) {
1467             pg_free_result($result);
1468         }
1470         $sql = "SET statement_timeout TO DEFAULT";
1471         $this->query_start($sql, null, SQL_QUERY_AUX);
1472         $result = pg_query($this->pgsql, $sql);
1473         $this->query_end($result);
1475         if ($result) {
1476             pg_free_result($result);
1477         }
1478     }
1480     public function release_session_lock($rowid) {
1481         if (!$this->session_lock_supported()) {
1482             return;
1483         }
1484         if (!$this->used_for_db_sessions) {
1485             return;
1486         }
1488         parent::release_session_lock($rowid);
1490         $sql = "SELECT pg_advisory_unlock($rowid)";
1491         $this->query_start($sql, null, SQL_QUERY_AUX);
1492         $result = pg_query($this->pgsql, $sql);
1493         $this->query_end($result);
1495         if ($result) {
1496             pg_free_result($result);
1497         }
1498     }
1500     /**
1501      * Driver specific start of real database transaction,
1502      * this can not be used directly in code.
1503      * @return void
1504      */
1505     protected function begin_transaction() {
1506         $this->savepointpresent = true;
1507         $sql = "BEGIN ISOLATION LEVEL READ COMMITTED; SAVEPOINT moodle_pg_savepoint";
1508         $this->query_start($sql, null, SQL_QUERY_AUX);
1509         $result = pg_query($this->pgsql, $sql);
1510         $this->query_end($result);
1512         pg_free_result($result);
1513     }
1515     /**
1516      * Driver specific commit of real database transaction,
1517      * this can not be used directly in code.
1518      * @return void
1519      */
1520     protected function commit_transaction() {
1521         $this->savepointpresent = false;
1522         $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; COMMIT";
1523         $this->query_start($sql, null, SQL_QUERY_AUX);
1524         $result = pg_query($this->pgsql, $sql);
1525         $this->query_end($result);
1527         pg_free_result($result);
1528     }
1530     /**
1531      * Driver specific abort of real database transaction,
1532      * this can not be used directly in code.
1533      * @return void
1534      */
1535     protected function rollback_transaction() {
1536         $this->savepointpresent = false;
1537         $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; ROLLBACK";
1538         $this->query_start($sql, null, SQL_QUERY_AUX);
1539         $result = pg_query($this->pgsql, $sql);
1540         $this->query_end($result);
1542         pg_free_result($result);
1543     }
1545     /**
1546      * Helper function trimming (whitespace + quotes) any string
1547      * needed because PG uses to enclose with double quotes some
1548      * fields in indexes definition and others
1549      *
1550      * @param string $str string to apply whitespace + quotes trim
1551      * @return string trimmed string
1552      */
1553     private function trim_quotes($str) {
1554         return trim(trim($str), "'\"");
1555     }
1557     /**
1558      * Postgresql supports full-text search indexes.
1559      *
1560      * @return bool
1561      */
1562     public function is_fulltext_search_supported() {
1563         return true;
1564     }