MDL-27685 fix pg LIKE escape trouble
[moodle.git] / lib / dml / pgsql_native_moodle_database.php
1 <?php
3 // This file is part of Moodle - http://moodle.org/
4 //
5 // Moodle is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // Moodle is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU General Public License
16 // along with Moodle.  If not, see <http://www.gnu.org/licenses/>.
19 /**
20  * Native pgsql class representing moodle database interface.
21  *
22  * @package    core
23  * @subpackage dml
24  * @copyright  2008 Petr Skoda (http://skodak.org)
25  * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
26  */
28 defined('MOODLE_INTERNAL') || die();
30 require_once($CFG->libdir.'/dml/moodle_database.php');
31 require_once($CFG->libdir.'/dml/pgsql_native_moodle_recordset.php');
32 require_once($CFG->libdir.'/dml/pgsql_native_moodle_temptables.php');
34 /**
35  * Native pgsql class representing moodle database interface.
36  */
37 class pgsql_native_moodle_database extends moodle_database {
39     protected $pgsql     = null;
40     protected $bytea_oid = null;
42     protected $last_error_reporting; // To handle pgsql driver default verbosity
44     /**
45      * Detects if all needed PHP stuff installed.
46      * Note: can be used before connect()
47      * @return mixed true if ok, string if something
48      */
49     public function driver_installed() {
50         if (!extension_loaded('pgsql')) {
51             return get_string('pgsqlextensionisnotpresentinphp', 'install');
52         }
53         return true;
54     }
56     /**
57      * Returns database family type - describes SQL dialect
58      * Note: can be used before connect()
59      * @return string db family name (mysql, postgres, mssql, oracle, etc.)
60      */
61     public function get_dbfamily() {
62         return 'postgres';
63     }
65     /**
66      * Returns more specific database driver type
67      * Note: can be used before connect()
68      * @return string db type mysqli, pgsql, oci, mssql, sqlsrv
69      */
70     protected function get_dbtype() {
71         return 'pgsql';
72     }
74     /**
75      * Returns general database library name
76      * Note: can be used before connect()
77      * @return string db type pdo, native
78      */
79     protected function get_dblibrary() {
80         return 'native';
81     }
83     /**
84      * Returns localised database type name
85      * Note: can be used before connect()
86      * @return string
87      */
88     public function get_name() {
89         return get_string('nativepgsql', 'install');
90     }
92     /**
93      * Returns localised database configuration help.
94      * Note: can be used before connect()
95      * @return string
96      */
97     public function get_configuration_help() {
98         return get_string('nativepgsqlhelp', 'install');
99     }
101     /**
102      * Returns localised database description
103      * Note: can be used before connect()
104      * @return string
105      */
106     public function get_configuration_hints() {
107         return get_string('databasesettingssub_postgres7', 'install');
108     }
110     /**
111      * Connect to db
112      * Must be called before other methods.
113      * @param string $dbhost
114      * @param string $dbuser
115      * @param string $dbpass
116      * @param string $dbname
117      * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used
118      * @param array $dboptions driver specific options
119      * @return bool true
120      * @throws dml_connection_exception if error
121      */
122     public function connect($dbhost, $dbuser, $dbpass, $dbname, $prefix, array $dboptions=null) {
123         if ($prefix == '' and !$this->external) {
124             //Enforce prefixes for everybody but mysql
125             throw new dml_exception('prefixcannotbeempty', $this->get_dbfamily());
126         }
128         $driverstatus = $this->driver_installed();
130         if ($driverstatus !== true) {
131             throw new dml_exception('dbdriverproblem', $driverstatus);
132         }
134         $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions);
136         $pass = addcslashes($this->dbpass, "'\\");
138         // Unix socket connections should have lower overhead
139         if (!empty($this->dboptions['dbsocket']) and ($this->dbhost === 'localhost' or $this->dbhost === '127.0.0.1')) {
140             $connection = "user='$this->dbuser' password='$pass' dbname='$this->dbname'";
141             if (strpos($this->dboptions['dbsocket'], '/') !== false) {
142                 $connection = $connection." host='".$this->dboptions['dbsocket']."'";
143             }
144         } else {
145             $this->dboptions['dbsocket'] = '';
146             if (empty($this->dbname)) {
147                 // probably old style socket connection - do not add port
148                 $port = "";
149             } else if (empty($this->dboptions['dbport'])) {
150                 $port = "port ='5432'";
151             } else {
152                 $port = "port ='".$this->dboptions['dbport']."'";
153             }
154             $connection = "host='$this->dbhost' $port user='$this->dbuser' password='$pass' dbname='$this->dbname'";
155         }
157         ob_start();
158         if (empty($this->dboptions['dbpersist'])) {
159             $this->pgsql = pg_connect($connection, PGSQL_CONNECT_FORCE_NEW);
160         } else {
161             $this->pgsql = pg_pconnect($connection, PGSQL_CONNECT_FORCE_NEW);
162         }
163         $dberr = ob_get_contents();
164         ob_end_clean();
166         $status = pg_connection_status($this->pgsql);
168         if ($status === false or $status === PGSQL_CONNECTION_BAD) {
169             $this->pgsql = null;
170             throw new dml_connection_exception($dberr);
171         }
173         $this->query_start("--pg_set_client_encoding()", null, SQL_QUERY_AUX);
174         pg_set_client_encoding($this->pgsql, 'utf8');
175         $this->query_end(true);
177         // find out the bytea oid
178         $sql = "SELECT oid FROM pg_type WHERE typname = 'bytea'";
179         $this->query_start($sql, null, SQL_QUERY_AUX);
180         $result = pg_query($this->pgsql, $sql);
181         $this->query_end($result);
183         $this->bytea_oid = pg_fetch_result($result, 0, 0);
184         pg_free_result($result);
185         if ($this->bytea_oid === false) {
186             $this->pgsql = null;
187             throw new dml_connection_exception('Can not read bytea type.');
188         }
190         // Connection stabilised and configured, going to instantiate the temptables controller
191         $this->temptables = new pgsql_native_moodle_temptables($this);
193         return true;
194     }
196     /**
197      * Close database connection and release all resources
198      * and memory (especially circular memory references).
199      * Do NOT use connect() again, create a new instance if needed.
200      */
201     public function dispose() {
202         parent::dispose(); // Call parent dispose to write/close session and other common stuff before closing connection
203         if ($this->pgsql) {
204             pg_close($this->pgsql);
205             $this->pgsql = null;
206         }
207     }
210     /**
211      * Called before each db query.
212      * @param string $sql
213      * @param array array of parameters
214      * @param int $type type of query
215      * @param mixed $extrainfo driver specific extra information
216      * @return void
217      */
218     protected function query_start($sql, array $params=null, $type, $extrainfo=null) {
219         parent::query_start($sql, $params, $type, $extrainfo);
220         // pgsql driver tents to send debug to output, we do not need that ;-)
221         $this->last_error_reporting = error_reporting(0);
222     }
224     /**
225      * Called immediately after each db query.
226      * @param mixed db specific result
227      * @return void
228      */
229     protected function query_end($result) {
230         // reset original debug level
231         error_reporting($this->last_error_reporting);
232         parent::query_end($result);
233     }
235     /**
236      * Returns database server info array
237      * @return array
238      */
239     public function get_server_info() {
240         static $info;
241         if (!$info) {
242             $this->query_start("--pg_version()", null, SQL_QUERY_AUX);
243             $info = pg_version($this->pgsql);
244             $this->query_end(true);
245         }
246         return array('description'=>$info['server'], 'version'=>$info['server']);
247     }
249     protected function is_min_version($version) {
250         $server = $this->get_server_info();
251         $server = $server['version'];
252         return version_compare($server, $version, '>=');
253     }
255     /**
256      * Returns supported query parameter types
257      * @return int bitmask
258      */
259     protected function allowed_param_types() {
260         return SQL_PARAMS_DOLLAR;
261     }
263     /**
264      * Returns last error reported by database engine.
265      * @return string error message
266      */
267     public function get_last_error() {
268         return pg_last_error($this->pgsql);
269     }
271     /**
272      * Return tables in database WITHOUT current prefix
273      * @return array of table names in lowercase and without prefix
274      */
275     public function get_tables($usecache=true) {
276         if ($usecache and $this->tables !== null) {
277             return $this->tables;
278         }
279         $this->tables = array();
280         $prefix = str_replace('_', '|_', $this->prefix);
281         // Get them from information_schema instead of catalog as far as
282         // we want to get only own session temp objects (catalog returns all)
283         $sql = "SELECT table_name
284                   FROM information_schema.tables
285                  WHERE table_name LIKE '$prefix%' ESCAPE '|'
286                    AND table_type IN ('BASE TABLE', 'LOCAL TEMPORARY')";
287         $this->query_start($sql, null, SQL_QUERY_AUX);
288         $result = pg_query($this->pgsql, $sql);
289         $this->query_end($result);
291         if ($result) {
292             while ($row = pg_fetch_row($result)) {
293                 $tablename = reset($row);
294                 if (strpos($tablename, $this->prefix) !== 0) {
295                     continue;
296                 }
297                 $tablename = substr($tablename, strlen($this->prefix));
298                 $this->tables[$tablename] = $tablename;
299             }
300             pg_free_result($result);
301         }
302         return $this->tables;
303     }
305     /**
306      * Return table indexes - everything lowercased
307      * @return array of arrays
308      */
309     public function get_indexes($table) {
310         $indexes = array();
311         $tablename = $this->prefix.$table;
313         $sql = "SELECT *
314                   FROM pg_catalog.pg_indexes
315                  WHERE tablename = '$tablename'";
317         $this->query_start($sql, null, SQL_QUERY_AUX);
318         $result = pg_query($this->pgsql, $sql);
319         $this->query_end($result);
321         if ($result) {
322             while ($row = pg_fetch_assoc($result)) {
323                 if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON '.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) {
324                     continue;
325                 }
326                 if ($matches[4] === 'id') {
327                     continue;
328                 }
329                 $columns = explode(',', $matches[4]);
330                 $columns = array_map(array($this, 'trim_quotes'), $columns);
331                 $indexes[$row['indexname']] = array('unique'=>!empty($matches[1]),
332                                               'columns'=>$columns);
333             }
334             pg_free_result($result);
335         }
336         return $indexes;
337     }
339     /**
340      * Returns detailed information about columns in table. This information is cached internally.
341      * @param string $table name
342      * @param bool $usecache
343      * @return array array of database_column_info objects indexed with column names
344      */
345     public function get_columns($table, $usecache=true) {
346         if ($usecache and isset($this->columns[$table])) {
347             return $this->columns[$table];
348         }
350         $this->columns[$table] = array();
352         $tablename = $this->prefix.$table;
354         $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef, d.adsrc
355                   FROM pg_catalog.pg_class c
356                   JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
357                   JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
358              LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum)
359                  WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0
360               ORDER BY a.attnum";
362         $this->query_start($sql, null, SQL_QUERY_AUX);
363         $result = pg_query($this->pgsql, $sql);
364         $this->query_end($result);
366         if (!$result) {
367             return array();
368         }
369         while ($rawcolumn = pg_fetch_object($result)) {
371             $info = new stdClass();
372             $info->name = $rawcolumn->field;
373             $matches = null;
375             if ($rawcolumn->type === 'varchar') {
376                 $info->type          = 'varchar';
377                 $info->meta_type     = 'C';
378                 $info->max_length    = $rawcolumn->atttypmod - 4;
379                 $info->scale         = null;
380                 $info->not_null      = ($rawcolumn->attnotnull === 't');
381                 $info->has_default   = ($rawcolumn->atthasdef === 't');
382                 if ($info->has_default) {
383                     $parts = explode('::', $rawcolumn->adsrc);
384                     if (count($parts) > 1) {
385                         $info->default_value = reset($parts);
386                         $info->default_value = trim($info->default_value, "'");
387                     } else {
388                         $info->default_value = $rawcolumn->adsrc;
389                     }
390                 } else {
391                     $info->default_value = null;
392                 }
393                 $info->primary_key   = false;
394                 $info->binary        = false;
395                 $info->unsigned      = null;
396                 $info->auto_increment= false;
397                 $info->unique        = null;
399             } else if (preg_match('/int(\d)/i', $rawcolumn->type, $matches)) {
400                 $info->type = 'int';
401                 if (strpos($rawcolumn->adsrc, 'nextval') === 0) {
402                     $info->primary_key   = true;
403                     $info->meta_type     = 'R';
404                     $info->unique        = true;
405                     $info->auto_increment= true;
406                     $info->has_default   = false;
407                 } else {
408                     $info->primary_key   = false;
409                     $info->meta_type     = 'I';
410                     $info->unique        = null;
411                     $info->auto_increment= false;
412                     $info->has_default   = ($rawcolumn->atthasdef === 't');
413                 }
414                 $info->max_length    = $matches[1];
415                 $info->scale         = null;
416                 $info->not_null      = ($rawcolumn->attnotnull === 't');
417                 if ($info->has_default) {
418                     $info->default_value = $rawcolumn->adsrc;
419                 } else {
420                     $info->default_value = null;
421                 }
422                 $info->binary        = false;
423                 $info->unsigned      = false;
425             } else if ($rawcolumn->type === 'numeric') {
426                 $info->type = $rawcolumn->type;
427                 $info->meta_type     = 'N';
428                 $info->primary_key   = false;
429                 $info->binary        = false;
430                 $info->unsigned      = null;
431                 $info->auto_increment= false;
432                 $info->unique        = null;
433                 $info->not_null      = ($rawcolumn->attnotnull === 't');
434                 $info->has_default   = ($rawcolumn->atthasdef === 't');
435                 if ($info->has_default) {
436                     $info->default_value = $rawcolumn->adsrc;
437                 } else {
438                     $info->default_value = null;
439                 }
440                 $info->max_length    = $rawcolumn->atttypmod >> 16;
441                 $info->scale         = ($rawcolumn->atttypmod & 0xFFFF) - 4;
443             } else if (preg_match('/float(\d)/i', $rawcolumn->type, $matches)) {
444                 $info->type = 'float';
445                 $info->meta_type     = 'N';
446                 $info->primary_key   = false;
447                 $info->binary        = false;
448                 $info->unsigned      = null;
449                 $info->auto_increment= false;
450                 $info->unique        = null;
451                 $info->not_null      = ($rawcolumn->attnotnull === 't');
452                 $info->has_default   = ($rawcolumn->atthasdef === 't');
453                 if ($info->has_default) {
454                     $info->default_value = $rawcolumn->adsrc;
455                 } else {
456                     $info->default_value = null;
457                 }
458                 // just guess expected number of deciaml places :-(
459                 if ($matches[1] == 8) {
460                     // total 15 digits
461                     $info->max_length = 8;
462                     $info->scale      = 7;
463                 } else {
464                     // total 6 digits
465                     $info->max_length = 4;
466                     $info->scale      = 2;
467                 }
469             } else if ($rawcolumn->type === 'text') {
470                 $info->type          = $rawcolumn->type;
471                 $info->meta_type     = 'X';
472                 $info->max_length    = -1;
473                 $info->scale         = null;
474                 $info->not_null      = ($rawcolumn->attnotnull === 't');
475                 $info->has_default   = ($rawcolumn->atthasdef === 't');
476                 if ($info->has_default) {
477                     $parts = explode('::', $rawcolumn->adsrc);
478                     if (count($parts) > 1) {
479                         $info->default_value = reset($parts);
480                         $info->default_value = trim($info->default_value, "'");
481                     } else {
482                         $info->default_value = $rawcolumn->adsrc;
483                     }
484                 } else {
485                     $info->default_value = null;
486                 }
487                 $info->primary_key   = false;
488                 $info->binary        = false;
489                 $info->unsigned      = null;
490                 $info->auto_increment= false;
491                 $info->unique        = null;
493             } else if ($rawcolumn->type === 'bytea') {
494                 $info->type          = $rawcolumn->type;
495                 $info->meta_type     = 'B';
496                 $info->max_length    = -1;
497                 $info->scale         = null;
498                 $info->not_null      = ($rawcolumn->attnotnull === 't');
499                 $info->has_default   = false;
500                 $info->default_value = null;
501                 $info->primary_key   = false;
502                 $info->binary        = true;
503                 $info->unsigned      = null;
504                 $info->auto_increment= false;
505                 $info->unique        = null;
507             }
509             $this->columns[$table][$info->name] = new database_column_info($info);
510         }
512         pg_free_result($result);
514         return $this->columns[$table];
515     }
517     /**
518      * Normalise values based in RDBMS dependencies (booleans, LOBs...)
519      *
520      * @param database_column_info $column column metadata corresponding with the value we are going to normalise
521      * @param mixed $value value we are going to normalise
522      * @return mixed the normalised value
523      */
524     protected function normalise_value($column, $value) {
525         if (is_bool($value)) { // Always, convert boolean to int
526             $value = (int)$value;
528         } else if ($column->meta_type === 'B') { // BLOB detected, we return 'blob' array instead of raw value to allow
529             if (!is_null($value)) {             // binding/executing code later to know about its nature
530                 $value = array('blob' => $value);
531             }
533         } else if ($value === '') {
534             if ($column->meta_type === 'I' or $column->meta_type === 'F' or $column->meta_type === 'N') {
535                 $value = 0; // prevent '' problems in numeric fields
536             }
537         }
538         return $value;
539     }
541     /**
542      * Is db in unicode mode?
543      * @return bool
544      */
545     public function setup_is_unicodedb() {
546     /// Get PostgreSQL server_encoding value
547         $sql = "SHOW server_encoding";
548         $this->query_start($sql, null, SQL_QUERY_AUX);
549         $result = pg_query($this->pgsql, $sql);
550         $this->query_end($result);
552         if (!$result) {
553             return false;
554         }
555         $rawcolumn = pg_fetch_object($result);
556         $encoding = $rawcolumn->server_encoding;
557         pg_free_result($result);
559         return (strtoupper($encoding) == 'UNICODE' || strtoupper($encoding) == 'UTF8');
560     }
562     /**
563      * Do NOT use in code, to be used by database_manager only!
564      * @param string $sql query
565      * @return bool true
566      * @throws dml_exception if error
567      */
568     public function change_database_structure($sql) {
569         $this->reset_caches();
571         $this->query_start($sql, null, SQL_QUERY_STRUCTURE);
572         $result = pg_query($this->pgsql, $sql);
573         $this->query_end($result);
575         pg_free_result($result);
576         return true;
577     }
579     /**
580      * Execute general sql query. Should be used only when no other method suitable.
581      * Do NOT use this to make changes in db structure, use database_manager::execute_sql() instead!
582      * @param string $sql query
583      * @param array $params query parameters
584      * @return bool true
585      * @throws dml_exception if error
586      */
587     public function execute($sql, array $params=null) {
588         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
590         if (strpos($sql, ';') !== false) {
591             throw new coding_exception('moodle_database::execute() Multiple sql statements found or bound parameters not used properly in query!');
592         }
594         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
595         $result = pg_query_params($this->pgsql, $sql, $params);
596         $this->query_end($result);
598         pg_free_result($result);
599         return true;
600     }
602     /**
603      * Get a number of records as a moodle_recordset using a SQL statement.
604      *
605      * Since this method is a little less readable, use of it should be restricted to
606      * code where it's possible there might be large datasets being returned.  For known
607      * small datasets use get_records_sql - it leads to simpler code.
608      *
609      * The return type is as for @see function get_recordset.
610      *
611      * @param string $sql the SQL select query to execute.
612      * @param array $params array of sql parameters
613      * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
614      * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
615      * @return moodle_recordset instance
616      * @throws dml_exception if error
617      */
618     public function get_recordset_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
619         $limitfrom = (int)$limitfrom;
620         $limitnum  = (int)$limitnum;
621         $limitfrom = ($limitfrom < 0) ? 0 : $limitfrom;
622         $limitnum  = ($limitnum < 0)  ? 0 : $limitnum;
623         if ($limitfrom or $limitnum) {
624             if ($limitnum < 1) {
625                 $limitnum = "ALL";
626             } else if (PHP_INT_MAX - $limitnum < $limitfrom) {
627                 // this is a workaround for weird max int problem
628                 $limitnum = "ALL";
629             }
630             $sql .= " LIMIT $limitnum OFFSET $limitfrom";
631         }
633         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
635         $this->query_start($sql, $params, SQL_QUERY_SELECT);
636         $result = pg_query_params($this->pgsql, $sql, $params);
637         $this->query_end($result);
639         return $this->create_recordset($result);
640     }
642     protected function create_recordset($result) {
643         return new pgsql_native_moodle_recordset($result, $this->bytea_oid);
644     }
646     /**
647      * Get a number of records as an array of objects using a SQL statement.
648      *
649      * Return value as for @see function get_records.
650      *
651      * @param string $sql the SQL select query to execute. The first column of this SELECT statement
652      *   must be a unique value (usually the 'id' field), as it will be used as the key of the
653      *   returned array.
654      * @param array $params array of sql parameters
655      * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
656      * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
657      * @return array of objects, or empty array if no records were found
658      * @throws dml_exception if error
659      */
660     public function get_records_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
661         $limitfrom = (int)$limitfrom;
662         $limitnum  = (int)$limitnum;
663         $limitfrom = ($limitfrom < 0) ? 0 : $limitfrom;
664         $limitnum  = ($limitnum < 0)  ? 0 : $limitnum;
665         if ($limitfrom or $limitnum) {
666             if ($limitnum < 1) {
667                 $limitnum = "ALL";
668             } else if (PHP_INT_MAX - $limitnum < $limitfrom) {
669                 // this is a workaround for weird max int problem
670                 $limitnum = "ALL";
671             }
672             $sql .= " LIMIT $limitnum OFFSET $limitfrom";
673         }
675         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
676         $this->query_start($sql, $params, SQL_QUERY_SELECT);
677         $result = pg_query_params($this->pgsql, $sql, $params);
678         $this->query_end($result);
680         // find out if there are any blobs
681         $numrows = pg_num_fields($result);
682         $blobs = array();
683         for($i=0; $i<$numrows; $i++) {
684             $type_oid = pg_field_type_oid($result, $i);
685             if ($type_oid == $this->bytea_oid) {
686                 $blobs[] = pg_field_name($result, $i);
687             }
688         }
690         $rows = pg_fetch_all($result);
691         pg_free_result($result);
693         $return = array();
694         if ($rows) {
695             foreach ($rows as $row) {
696                 $id = reset($row);
697                 if ($blobs) {
698                     foreach ($blobs as $blob) {
699                         // note: in PostgreSQL 9.0 the returned blobs are hexencoded by default - see http://www.postgresql.org/docs/9.0/static/runtime-config-client.html#GUC-BYTEA-OUTPUT
700                         $row[$blob] = $row[$blob] !== null ? pg_unescape_bytea($row[$blob]) : null;
701                     }
702                 }
703                 if (isset($return[$id])) {
704                     $colname = key($row);
705                     debugging("Did you remember to make the first column something unique in your call to get_records? Duplicate value '$id' found in column '$colname'.", DEBUG_DEVELOPER);
706                 }
707                 $return[$id] = (object)$row;
708             }
709         }
711         return $return;
712     }
714     /**
715      * Selects records and return values (first field) as an array using a SQL statement.
716      *
717      * @param string $sql The SQL query
718      * @param array $params array of sql parameters
719      * @return array of values
720      * @throws dml_exception if error
721      */
722     public function get_fieldset_sql($sql, array $params=null) {
723         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
725         $this->query_start($sql, $params, SQL_QUERY_SELECT);
726         $result = pg_query_params($this->pgsql, $sql, $params);
727         $this->query_end($result);
729         $return = pg_fetch_all_columns($result, 0);
730         pg_free_result($result);
732         return $return;
733     }
735     /**
736      * Insert new record into database, as fast as possible, no safety checks, lobs not supported.
737      * @param string $table name
738      * @param mixed $params data record as object or array
739      * @param bool $returnit return it of inserted record
740      * @param bool $bulk true means repeated inserts expected
741      * @param bool $customsequence true if 'id' included in $params, disables $returnid
742      * @return bool|int true or new id
743      * @throws dml_exception if error
744      */
745     public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) {
746         if (!is_array($params)) {
747             $params = (array)$params;
748         }
750         $returning = "";
752         if ($customsequence) {
753             if (!isset($params['id'])) {
754                 throw new coding_exception('moodle_database::insert_record_raw() id field must be specified if custom sequences used.');
755             }
756             $returnid = false;
757         } else {
758             if ($returnid) {
759                 $returning = "RETURNING id";
760                 unset($params['id']);
761             } else {
762                 unset($params['id']);
763             }
764         }
766         if (empty($params)) {
767             throw new coding_exception('moodle_database::insert_record_raw() no fields found.');
768         }
770         $fields = implode(',', array_keys($params));
771         $values = array();
772         $count = count($params);
773         for ($i=1; $i<=$count; $i++) {
774             $values[] = "\$".$i;
775         }
776         $values = implode(',', $values);
778         $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning";
779         $this->query_start($sql, $params, SQL_QUERY_INSERT);
780         $result = pg_query_params($this->pgsql, $sql, $params);
781         $this->query_end($result);
783         if ($returning !== "") {
784             $row = pg_fetch_assoc($result);
785             $params['id'] = reset($row);
786         }
787         pg_free_result($result);
789         if (!$returnid) {
790             return true;
791         }
793         return (int)$params['id'];
794     }
796     /**
797      * Insert a record into a table and return the "id" field if required.
798      *
799      * Some conversions and safety checks are carried out. Lobs are supported.
800      * If the return ID isn't required, then this just reports success as true/false.
801      * $data is an object containing needed data
802      * @param string $table The database table to be inserted into
803      * @param object $data A data object with values for one or more fields in the record
804      * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned.
805      * @return bool|int true or new id
806      * @throws dml_exception if error
807      */
808     public function insert_record($table, $dataobject, $returnid=true, $bulk=false) {
809         $dataobject = (array)$dataobject;
811         $columns = $this->get_columns($table);
812         $cleaned = array();
813         $blobs   = array();
815         foreach ($dataobject as $field=>$value) {
816             if ($field === 'id') {
817                 continue;
818             }
819             if (!isset($columns[$field])) {
820                 continue;
821             }
822             $column = $columns[$field];
823             $normalised_value = $this->normalise_value($column, $value);
824             if (is_array($normalised_value) && array_key_exists('blob', $normalised_value)) {
825                 $cleaned[$field] = '@#BLOB#@';
826                 $blobs[$field] = $normalised_value['blob'];
827             } else {
828                 $cleaned[$field] = $normalised_value;
829             }
830         }
832         if (empty($blobs)) {
833             return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
834         }
836         $id = $this->insert_record_raw($table, $cleaned, true, $bulk);
838         foreach ($blobs as $key=>$value) {
839             $value = pg_escape_bytea($this->pgsql, $value);
840             $sql = "UPDATE {$this->prefix}$table SET $key = '$value'::bytea WHERE id = $id";
841             $this->query_start($sql, NULL, SQL_QUERY_UPDATE);
842             $result = pg_query($this->pgsql, $sql);
843             $this->query_end($result);
844             if ($result !== false) {
845                 pg_free_result($result);
846             }
847         }
849         return ($returnid ? $id : true);
851     }
853     /**
854      * Import a record into a table, id field is required.
855      * Safety checks are NOT carried out. Lobs are supported.
856      *
857      * @param string $table name of database table to be inserted into
858      * @param object $dataobject A data object with values for one or more fields in the record
859      * @return bool true
860      * @throws dml_exception if error
861      */
862     public function import_record($table, $dataobject) {
863         $dataobject = (array)$dataobject;
865         $columns = $this->get_columns($table);
866         $cleaned = array();
867         $blobs   = array();
869         foreach ($dataobject as $field=>$value) {
870             if (!isset($columns[$field])) {
871                 continue;
872             }
873             if ($columns[$field]->meta_type === 'B') {
874                 if (!is_null($value)) {
875                     $cleaned[$field] = '@#BLOB#@';
876                     $blobs[$field] = $value;
877                     continue;
878                 }
879             }
881             $cleaned[$field] = $value;
882         }
884         $this->insert_record_raw($table, $cleaned, false, true, true);
885         $id = $dataobject['id'];
887         foreach ($blobs as $key=>$value) {
888             $value = pg_escape_bytea($this->pgsql, $value);
889             $sql = "UPDATE {$this->prefix}$table SET $key = '$value'::bytea WHERE id = $id";
890             $this->query_start($sql, NULL, SQL_QUERY_UPDATE);
891             $result = pg_query($this->pgsql, $sql);
892             $this->query_end($result);
893             if ($result !== false) {
894                 pg_free_result($result);
895             }
896         }
898         return true;
899     }
901     /**
902      * Update record in database, as fast as possible, no safety checks, lobs not supported.
903      * @param string $table name
904      * @param mixed $params data record as object or array
905      * @param bool true means repeated updates expected
906      * @return bool true
907      * @throws dml_exception if error
908      */
909     public function update_record_raw($table, $params, $bulk=false) {
910         $params = (array)$params;
912         if (!isset($params['id'])) {
913             throw new coding_exception('moodle_database::update_record_raw() id field must be specified.');
914         }
915         $id = $params['id'];
916         unset($params['id']);
918         if (empty($params)) {
919             throw new coding_exception('moodle_database::update_record_raw() no fields found.');
920         }
922         $i = 1;
924         $sets = array();
925         foreach ($params as $field=>$value) {
926             $sets[] = "$field = \$".$i++;
927         }
929         $params[] = $id; // last ? in WHERE condition
931         $sets = implode(',', $sets);
932         $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i;
934         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
935         $result = pg_query_params($this->pgsql, $sql, $params);
936         $this->query_end($result);
938         pg_free_result($result);
939         return true;
940     }
942     /**
943      * Update a record in a table
944      *
945      * $dataobject is an object containing needed data
946      * Relies on $dataobject having a variable "id" to
947      * specify the record to update
948      *
949      * @param string $table The database table to be checked against.
950      * @param object $dataobject An object with contents equal to fieldname=>fieldvalue. Must have an entry for 'id' to map to the table specified.
951      * @param bool true means repeated updates expected
952      * @return bool true
953      * @throws dml_exception if error
954      */
955     public function update_record($table, $dataobject, $bulk=false) {
956         $dataobject = (array)$dataobject;
958         $columns = $this->get_columns($table);
959         $cleaned = array();
960         $blobs   = array();
962         foreach ($dataobject as $field=>$value) {
963             if (!isset($columns[$field])) {
964                 continue;
965             }
966             $column = $columns[$field];
967             $normalised_value = $this->normalise_value($column, $value);
968             if (is_array($normalised_value) && array_key_exists('blob', $normalised_value)) {
969                 $cleaned[$field] = '@#BLOB#@';
970                 $blobs[$field] = $normalised_value['blob'];
971             } else {
972                 $cleaned[$field] = $normalised_value;
973             }
974         }
976         $this->update_record_raw($table, $cleaned, $bulk);
978         if (empty($blobs)) {
979             return true;
980         }
982         $id = (int)$dataobject['id'];
984         foreach ($blobs as $key=>$value) {
985             $value = pg_escape_bytea($this->pgsql, $value);
986             $sql = "UPDATE {$this->prefix}$table SET $key = '$value'::bytea WHERE id = $id";
987             $this->query_start($sql, NULL, SQL_QUERY_UPDATE);
988             $result = pg_query($this->pgsql, $sql);
989             $this->query_end($result);
991             pg_free_result($result);
992         }
994         return true;
995     }
997     /**
998      * Set a single field in every table record which match a particular WHERE clause.
999      *
1000      * @param string $table The database table to be checked against.
1001      * @param string $newfield the field to set.
1002      * @param string $newvalue the value to set the field to.
1003      * @param string $select A fragment of SQL to be used in a where clause in the SQL call.
1004      * @param array $params array of sql parameters
1005      * @return bool true
1006      * @throws dml_exception if error
1007      */
1008     public function set_field_select($table, $newfield, $newvalue, $select, array $params=null) {
1010         if ($select) {
1011             $select = "WHERE $select";
1012         }
1013         if (is_null($params)) {
1014             $params = array();
1015         }
1016         list($select, $params, $type) = $this->fix_sql_params($select, $params);
1017         $i = count($params)+1;
1019     /// Get column metadata
1020         $columns = $this->get_columns($table);
1021         $column = $columns[$newfield];
1023         $normalised_value = $this->normalise_value($column, $newvalue);
1024         if (is_array($normalised_value) && array_key_exists('blob', $normalised_value)) {
1025         /// Update BYTEA and return
1026             $normalised_value = pg_escape_bytea($this->pgsql, $normalised_value['blob']);
1027             $sql = "UPDATE {$this->prefix}$table SET $newfield = '$normalised_value'::bytea $select";
1028             $this->query_start($sql, NULL, SQL_QUERY_UPDATE);
1029             $result = pg_query_params($this->pgsql, $sql, $params);
1030             $this->query_end($result);
1031             pg_free_result($result);
1032             return true;
1033         }
1035         if (is_null($normalised_value)) {
1036             $newfield = "$newfield = NULL";
1037         } else {
1038             $newfield = "$newfield = \$".$i;
1039             $params[] = $normalised_value;
1040         }
1041         $sql = "UPDATE {$this->prefix}$table SET $newfield $select";
1043         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1044         $result = pg_query_params($this->pgsql, $sql, $params);
1045         $this->query_end($result);
1047         pg_free_result($result);
1049         return true;
1050     }
1052     /**
1053      * Delete one or more records from a table which match a particular WHERE clause.
1054      *
1055      * @param string $table The database table to be checked against.
1056      * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria).
1057      * @param array $params array of sql parameters
1058      * @return bool true
1059      * @throws dml_exception if error
1060      */
1061     public function delete_records_select($table, $select, array $params=null) {
1062         if ($select) {
1063             $select = "WHERE $select";
1064         }
1065         $sql = "DELETE FROM {$this->prefix}$table $select";
1067         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1069         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1070         $result = pg_query_params($this->pgsql, $sql, $params);
1071         $this->query_end($result);
1073         pg_free_result($result);
1075         return true;
1076     }
1078     /**
1079      * Returns 'LIKE' part of a query.
1080      *
1081      * @param string $fieldname usually name of the table column
1082      * @param string $param usually bound query parameter (?, :named)
1083      * @param bool $casesensitive use case sensitive search
1084      * @param bool $accensensitive use accent sensitive search (not all databases support accent insensitive)
1085      * @param bool $notlike true means "NOT LIKE"
1086      * @param string $escapechar escape char for '%' and '_'
1087      * @return string SQL code fragment
1088      */
1089     public function sql_like($fieldname, $param, $casesensitive = true, $accentsensitive = true, $notlike = false, $escapechar = '\\') {
1090         if (strpos($param, '%') !== false) {
1091             debugging('Potential SQL injection detected, sql_ilike() expects bound parameters (? or :named)');
1092         }
1093         $escapechar = pg_escape_string($this->pgsql, $escapechar); // prevents problems with C-style escapes of enclosing '\'
1095         // postgresql does not support accent insensitive text comparisons, sorry
1096         if ($casesensitive) {
1097             $LIKE = $notlike ? 'NOT LIKE' : 'LIKE';
1098         } else {
1099             $LIKE = $notlike ? 'NOT ILIKE' : 'ILIKE';
1100         }
1101         return "$fieldname $LIKE $param ESCAPE '$escapechar'";
1102     }
1104     public function sql_ilike() {
1105         debugging('sql_ilike() is deprecated, please use sql_like() instead');
1106         return 'ILIKE';
1107     }
1109     public function sql_bitxor($int1, $int2) {
1110         return '((' . $int1 . ') # (' . $int2 . '))';
1111     }
1113     public function sql_cast_char2int($fieldname, $text=false) {
1114         return ' CAST(' . $fieldname . ' AS INT) ';
1115     }
1117     public function sql_cast_char2real($fieldname, $text=false) {
1118         return " $fieldname::real ";
1119     }
1121     public function sql_concat() {
1122         $arr = func_get_args();
1123         $s = implode(' || ', $arr);
1124         if ($s === '') {
1125             return " '' ";
1126         }
1127         // Add always empty string element so integer-exclusive concats
1128         // will work without needing to cast each element explicity
1129         return " '' || $s ";
1130     }
1132     public function sql_concat_join($separator="' '", $elements=array()) {
1133         for ($n=count($elements)-1; $n > 0 ; $n--) {
1134             array_splice($elements, $n, 0, $separator);
1135         }
1136         $s = implode(' || ', $elements);
1137         if ($s === '') {
1138             return " '' ";
1139         }
1140         return " $s ";
1141     }
1143     public function sql_regex_supported() {
1144         return true;
1145     }
1147     public function sql_regex($positivematch=true) {
1148         return $positivematch ? '~*' : '!~*';
1149     }
1151 /// session locking
1152     public function session_lock_supported() {
1153         return true;
1154     }
1156     public function get_session_lock($rowid) {
1157         // NOTE: there is a potential locking problem for database running
1158         //       multiple instances of moodle, we could try to use pg_advisory_lock(int, int),
1159         //       luckily there is not a big chance that they would collide
1160         if (!$this->session_lock_supported()) {
1161             return;
1162         }
1164         parent::get_session_lock($rowid);
1165         $sql = "SELECT pg_advisory_lock($rowid)";
1166         $this->query_start($sql, null, SQL_QUERY_AUX);
1167         $result = pg_query($this->pgsql, $sql);
1168         $this->query_end($result);
1170         if ($result) {
1171             pg_free_result($result);
1172         }
1173     }
1175     public function release_session_lock($rowid) {
1176         if (!$this->session_lock_supported()) {
1177             return;
1178         }
1179         parent::release_session_lock($rowid);
1181         $sql = "SELECT pg_advisory_unlock($rowid)";
1182         $this->query_start($sql, null, SQL_QUERY_AUX);
1183         $result = pg_query($this->pgsql, $sql);
1184         $this->query_end($result);
1186         if ($result) {
1187             pg_free_result($result);
1188         }
1189     }
1191 /// transactions
1192     /**
1193      * Driver specific start of real database transaction,
1194      * this can not be used directly in code.
1195      * @return void
1196      */
1197     protected function begin_transaction() {
1198         $sql = "BEGIN ISOLATION LEVEL READ COMMITTED";
1199         $this->query_start($sql, NULL, SQL_QUERY_AUX);
1200         $result = pg_query($this->pgsql, $sql);
1201         $this->query_end($result);
1203         pg_free_result($result);
1204     }
1206     /**
1207      * Driver specific commit of real database transaction,
1208      * this can not be used directly in code.
1209      * @return void
1210      */
1211     protected function commit_transaction() {
1212         $sql = "COMMIT";
1213         $this->query_start($sql, NULL, SQL_QUERY_AUX);
1214         $result = pg_query($this->pgsql, $sql);
1215         $this->query_end($result);
1217         pg_free_result($result);
1218     }
1220     /**
1221      * Driver specific abort of real database transaction,
1222      * this can not be used directly in code.
1223      * @return void
1224      */
1225     protected function rollback_transaction() {
1226         $sql = "ROLLBACK";
1227         $this->query_start($sql, NULL, SQL_QUERY_AUX);
1228         $result = pg_query($this->pgsql, $sql);
1229         $this->query_end($result);
1231         pg_free_result($result);
1232     }
1234     /**
1235      * Helper function trimming (whitespace + quotes) any string
1236      * needed because PG uses to enclose with double quotes some
1237      * fields in indexes definition and others
1238      *
1239      * @param string $str string to apply whitespace + quotes trim
1240      * @return string trimmed string
1241      */
1242     private function trim_quotes($str) {
1243         return trim(trim($str), "'\"");
1244     }