0ee68de2d3f4f0526201b84a869fa548448bbb97
[moodle.git] / lib / dml / pgsql_native_moodle_database.php
1 <?php
3 // This file is part of Moodle - http://moodle.org/
4 //
5 // Moodle is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // Moodle is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU General Public License
16 // along with Moodle.  If not, see <http://www.gnu.org/licenses/>.
19 /**
20  * Native pgsql class representing moodle database interface.
21  *
22  * @package    core
23  * @subpackage dml
24  * @copyright  2008 Petr Skoda (http://skodak.org)
25  * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
26  */
28 defined('MOODLE_INTERNAL') || die();
30 require_once($CFG->libdir.'/dml/moodle_database.php');
31 require_once($CFG->libdir.'/dml/pgsql_native_moodle_recordset.php');
32 require_once($CFG->libdir.'/dml/pgsql_native_moodle_temptables.php');
34 /**
35  * Native pgsql class representing moodle database interface.
36  */
37 class pgsql_native_moodle_database extends moodle_database {
39     protected $pgsql     = null;
40     protected $bytea_oid = null;
42     protected $last_error_reporting; // To handle pgsql driver default verbosity
44     /**
45      * Detects if all needed PHP stuff installed.
46      * Note: can be used before connect()
47      * @return mixed true if ok, string if something
48      */
49     public function driver_installed() {
50         if (!extension_loaded('pgsql')) {
51             return get_string('pgsqlextensionisnotpresentinphp', 'install');
52         }
53         return true;
54     }
56     /**
57      * Returns database family type - describes SQL dialect
58      * Note: can be used before connect()
59      * @return string db family name (mysql, postgres, mssql, oracle, etc.)
60      */
61     public function get_dbfamily() {
62         return 'postgres';
63     }
65     /**
66      * Returns more specific database driver type
67      * Note: can be used before connect()
68      * @return string db type mysqli, pgsql, oci, mssql, sqlsrv
69      */
70     protected function get_dbtype() {
71         return 'pgsql';
72     }
74     /**
75      * Returns general database library name
76      * Note: can be used before connect()
77      * @return string db type pdo, native
78      */
79     protected function get_dblibrary() {
80         return 'native';
81     }
83     /**
84      * Returns localised database type name
85      * Note: can be used before connect()
86      * @return string
87      */
88     public function get_name() {
89         return get_string('nativepgsql', 'install');
90     }
92     /**
93      * Returns localised database configuration help.
94      * Note: can be used before connect()
95      * @return string
96      */
97     public function get_configuration_help() {
98         return get_string('nativepgsqlhelp', 'install');
99     }
101     /**
102      * Returns localised database description
103      * Note: can be used before connect()
104      * @return string
105      */
106     public function get_configuration_hints() {
107         return get_string('databasesettingssub_postgres7', 'install');
108     }
110     /**
111      * Connect to db
112      * Must be called before other methods.
113      * @param string $dbhost
114      * @param string $dbuser
115      * @param string $dbpass
116      * @param string $dbname
117      * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used
118      * @param array $dboptions driver specific options
119      * @return bool true
120      * @throws dml_connection_exception if error
121      */
122     public function connect($dbhost, $dbuser, $dbpass, $dbname, $prefix, array $dboptions=null) {
123         if ($prefix == '' and !$this->external) {
124             //Enforce prefixes for everybody but mysql
125             throw new dml_exception('prefixcannotbeempty', $this->get_dbfamily());
126         }
128         $driverstatus = $this->driver_installed();
130         if ($driverstatus !== true) {
131             throw new dml_exception('dbdriverproblem', $driverstatus);
132         }
134         $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions);
136         $pass = addcslashes($this->dbpass, "'\\");
138         // Unix socket connections should have lower overhead
139         if (!empty($this->dboptions['dbsocket']) and ($this->dbhost === 'localhost' or $this->dbhost === '127.0.0.1')) {
140             $connection = "user='$this->dbuser' password='$pass' dbname='$this->dbname'";
141         } else {
142             $this->dboptions['dbsocket'] = 0;
143             if (empty($this->dbname)) {
144                 // probably old style socket connection - do not add port
145                 $port = "";
146             } else if (empty($this->dboptions['dbport'])) {
147                 $port = "port ='5432'";
148             } else {
149                 $port = "port ='".$this->dboptions['dbport']."'";
150             }
151             $connection = "host='$this->dbhost' $port user='$this->dbuser' password='$pass' dbname='$this->dbname'";
152         }
154         ob_start();
155         if (empty($this->dboptions['dbpersist'])) {
156             $this->pgsql = pg_connect($connection, PGSQL_CONNECT_FORCE_NEW);
157         } else {
158             $this->pgsql = pg_pconnect($connection, PGSQL_CONNECT_FORCE_NEW);
159         }
160         $dberr = ob_get_contents();
161         ob_end_clean();
163         $status = pg_connection_status($this->pgsql);
165         if ($status === false or $status === PGSQL_CONNECTION_BAD) {
166             $this->pgsql = null;
167             throw new dml_connection_exception($dberr);
168         }
170         $this->query_start("--pg_set_client_encoding()", null, SQL_QUERY_AUX);
171         pg_set_client_encoding($this->pgsql, 'utf8');
172         $this->query_end(true);
174         // find out the bytea oid
175         $sql = "SELECT oid FROM pg_type WHERE typname = 'bytea'";
176         $this->query_start($sql, null, SQL_QUERY_AUX);
177         $result = pg_query($this->pgsql, $sql);
178         $this->query_end($result);
180         $this->bytea_oid = pg_fetch_result($result, 0);
181         pg_free_result($result);
182         if ($this->bytea_oid === false) {
183             $this->pgsql = null;
184             throw new dml_connection_exception('Can not read bytea type.');
185         }
187         // Connection stabilished and configured, going to instantiate the temptables controller
188         $this->temptables = new pgsql_native_moodle_temptables($this);
190         return true;
191     }
193     /**
194      * Close database connection and release all resources
195      * and memory (especially circular memory references).
196      * Do NOT use connect() again, create a new instance if needed.
197      */
198     public function dispose() {
199         parent::dispose(); // Call parent dispose to write/close session and other common stuff before closing connection
200         if ($this->pgsql) {
201             pg_close($this->pgsql);
202             $this->pgsql = null;
203         }
204     }
207     /**
208      * Called before each db query.
209      * @param string $sql
210      * @param array array of parameters
211      * @param int $type type of query
212      * @param mixed $extrainfo driver specific extra information
213      * @return void
214      */
215     protected function query_start($sql, array $params=null, $type, $extrainfo=null) {
216         parent::query_start($sql, $params, $type, $extrainfo);
217         // pgsql driver tents to send debug to output, we do not need that ;-)
218         $this->last_error_reporting = error_reporting(0);
219     }
221     /**
222      * Called immediately after each db query.
223      * @param mixed db specific result
224      * @return void
225      */
226     protected function query_end($result) {
227         // reset original debug level
228         error_reporting($this->last_error_reporting);
229         parent::query_end($result);
230     }
232     /**
233      * Returns database server info array
234      * @return array
235      */
236     public function get_server_info() {
237         static $info;
238         if (!$info) {
239             $this->query_start("--pg_version()", null, SQL_QUERY_AUX);
240             $info = pg_version($this->pgsql);
241             $this->query_end(true);
242         }
243         return array('description'=>$info['server'], 'version'=>$info['server']);
244     }
246     protected function is_min_version($version) {
247         $server = $this->get_server_info();
248         $server = $server['version'];
249         return version_compare($server, $version, '>=');
250     }
252     /**
253      * Returns supported query parameter types
254      * @return int bitmask
255      */
256     protected function allowed_param_types() {
257         return SQL_PARAMS_DOLLAR;
258     }
260     /**
261      * Returns last error reported by database engine.
262      * @return string error message
263      */
264     public function get_last_error() {
265         return pg_last_error($this->pgsql);
266     }
268     /**
269      * Return tables in database WITHOUT current prefix
270      * @return array of table names in lowercase and without prefix
271      */
272     public function get_tables($usecache=true) {
273         if ($usecache and $this->tables !== null) {
274             return $this->tables;
275         }
276         $this->tables = array();
277         $prefix = str_replace('_', '\\\\_', $this->prefix);
278         // Get them from information_schema instead of catalog as far as
279         // we want to get only own session temp objects (catalog returns all)
280         $sql = "SELECT table_name
281                   FROM information_schema.tables
282                  WHERE table_name LIKE '$prefix%'
283                    AND table_type IN ('BASE TABLE', 'LOCAL TEMPORARY')";
284         $this->query_start($sql, null, SQL_QUERY_AUX);
285         $result = pg_query($this->pgsql, $sql);
286         $this->query_end($result);
288         if ($result) {
289             while ($row = pg_fetch_row($result)) {
290                 $tablename = reset($row);
291                 if (strpos($tablename, $this->prefix) !== 0) {
292                     continue;
293                 }
294                 $tablename = substr($tablename, strlen($this->prefix));
295                 $this->tables[$tablename] = $tablename;
296             }
297             pg_free_result($result);
298         }
299         return $this->tables;
300     }
302     /**
303      * Return table indexes - everything lowercased
304      * @return array of arrays
305      */
306     public function get_indexes($table) {
307         $indexes = array();
308         $tablename = $this->prefix.$table;
310         $sql = "SELECT *
311                   FROM pg_catalog.pg_indexes
312                  WHERE tablename = '$tablename'";
314         $this->query_start($sql, null, SQL_QUERY_AUX);
315         $result = pg_query($this->pgsql, $sql);
316         $this->query_end($result);
318         if ($result) {
319             while ($row = pg_fetch_assoc($result)) {
320                 if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON '.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) {
321                     continue;
322                 }
323                 if ($matches[4] === 'id') {
324                     continue;
325                 }
326                 $columns = explode(',', $matches[4]);
327                 $columns = array_map(array($this, 'trim_quotes'), $columns);
328                 $indexes[$row['indexname']] = array('unique'=>!empty($matches[1]),
329                                               'columns'=>$columns);
330             }
331             pg_free_result($result);
332         }
333         return $indexes;
334     }
336     /**
337      * Returns detailed information about columns in table. This information is cached internally.
338      * @param string $table name
339      * @param bool $usecache
340      * @return array array of database_column_info objects indexed with column names
341      */
342     public function get_columns($table, $usecache=true) {
343         if ($usecache and isset($this->columns[$table])) {
344             return $this->columns[$table];
345         }
347         $this->columns[$table] = array();
349         $tablename = $this->prefix.$table;
351         $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef, d.adsrc
352                   FROM pg_catalog.pg_class c
353                   JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
354                   JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
355              LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum)
356                  WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0
357               ORDER BY a.attnum";
359         $this->query_start($sql, null, SQL_QUERY_AUX);
360         $result = pg_query($this->pgsql, $sql);
361         $this->query_end($result);
363         if (!$result) {
364             return array();
365         }
366         while ($rawcolumn = pg_fetch_object($result)) {
368             $info = new object();
369             $info->name = $rawcolumn->field;
370             $matches = null;
372             if ($rawcolumn->type === 'varchar') {
373                 $info->type          = 'varchar';
374                 $info->meta_type     = 'C';
375                 $info->max_length    = $rawcolumn->atttypmod - 4;
376                 $info->scale         = null;
377                 $info->not_null      = ($rawcolumn->attnotnull === 't');
378                 $info->has_default   = ($rawcolumn->atthasdef === 't');
379                 if ($info->has_default) {
380                     $parts = explode('::', $rawcolumn->adsrc);
381                     if (count($parts) > 1) {
382                         $info->default_value = reset($parts);
383                         $info->default_value = trim($info->default_value, "'");
384                     } else {
385                         $info->default_value = $rawcolumn->adsrc;
386                     }
387                 } else {
388                     $info->default_value = null;
389                 }
390                 $info->primary_key   = false;
391                 $info->binary        = false;
392                 $info->unsigned      = null;
393                 $info->auto_increment= false;
394                 $info->unique        = null;
396             } else if (preg_match('/int(\d)/i', $rawcolumn->type, $matches)) {
397                 $info->type = 'int';
398                 if (strpos($rawcolumn->adsrc, 'nextval') === 0) {
399                     $info->primary_key   = true;
400                     $info->meta_type     = 'R';
401                     $info->unique        = true;
402                     $info->auto_increment= true;
403                     $info->has_default   = false;
404                 } else {
405                     $info->primary_key   = false;
406                     $info->meta_type     = 'I';
407                     $info->unique        = null;
408                     $info->auto_increment= false;
409                     $info->has_default   = ($rawcolumn->atthasdef === 't');
410                 }
411                 $info->max_length    = $matches[1];
412                 $info->scale         = null;
413                 $info->not_null      = ($rawcolumn->attnotnull === 't');
414                 if ($info->has_default) {
415                     $info->default_value = $rawcolumn->adsrc;
416                 } else {
417                     $info->default_value = null;
418                 }
419                 $info->binary        = false;
420                 $info->unsigned      = false;
422             } else if ($rawcolumn->type === 'numeric') {
423                 $info->type = $rawcolumn->type;
424                 $info->meta_type     = 'N';
425                 $info->primary_key   = false;
426                 $info->binary        = false;
427                 $info->unsigned      = null;
428                 $info->auto_increment= false;
429                 $info->unique        = null;
430                 $info->not_null      = ($rawcolumn->attnotnull === 't');
431                 $info->has_default   = ($rawcolumn->atthasdef === 't');
432                 if ($info->has_default) {
433                     $info->default_value = $rawcolumn->adsrc;
434                 } else {
435                     $info->default_value = null;
436                 }
437                 $info->max_length    = $rawcolumn->atttypmod >> 16;
438                 $info->scale         = ($rawcolumn->atttypmod & 0xFFFF) - 4;
440             } else if (preg_match('/float(\d)/i', $rawcolumn->type, $matches)) {
441                 $info->type = 'float';
442                 $info->meta_type     = 'N';
443                 $info->primary_key   = false;
444                 $info->binary        = false;
445                 $info->unsigned      = null;
446                 $info->auto_increment= false;
447                 $info->unique        = null;
448                 $info->not_null      = ($rawcolumn->attnotnull === 't');
449                 $info->has_default   = ($rawcolumn->atthasdef === 't');
450                 if ($info->has_default) {
451                     $info->default_value = $rawcolumn->adsrc;
452                 } else {
453                     $info->default_value = null;
454                 }
455                 // just guess expected number of deciaml places :-(
456                 if ($matches[1] == 8) {
457                     // total 15 digits
458                     $info->max_length = 8;
459                     $info->scale      = 7;
460                 } else {
461                     // total 6 digits
462                     $info->max_length = 4;
463                     $info->scale      = 2;
464                 }
466             } else if ($rawcolumn->type === 'text') {
467                 $info->type          = $rawcolumn->type;
468                 $info->meta_type     = 'X';
469                 $info->max_length    = -1;
470                 $info->scale         = null;
471                 $info->not_null      = ($rawcolumn->attnotnull === 't');
472                 $info->has_default   = ($rawcolumn->atthasdef === 't');
473                 if ($info->has_default) {
474                     $parts = explode('::', $rawcolumn->adsrc);
475                     if (count($parts) > 1) {
476                         $info->default_value = reset($parts);
477                         $info->default_value = trim($info->default_value, "'");
478                     } else {
479                         $info->default_value = $rawcolumn->adsrc;
480                     }
481                 } else {
482                     $info->default_value = null;
483                 }
484                 $info->primary_key   = false;
485                 $info->binary        = false;
486                 $info->unsigned      = null;
487                 $info->auto_increment= false;
488                 $info->unique        = null;
490             } else if ($rawcolumn->type === 'bytea') {
491                 $info->type          = $rawcolumn->type;
492                 $info->meta_type     = 'B';
493                 $info->max_length    = -1;
494                 $info->scale         = null;
495                 $info->not_null      = ($rawcolumn->attnotnull === 't');
496                 $info->has_default   = false;
497                 $info->default_value = null;
498                 $info->primary_key   = false;
499                 $info->binary        = true;
500                 $info->unsigned      = null;
501                 $info->auto_increment= false;
502                 $info->unique        = null;
504             }
506             $this->columns[$table][$info->name] = new database_column_info($info);
507         }
509         pg_free_result($result);
511         return $this->columns[$table];
512     }
514     /**
515      * Normalise values based in RDBMS dependencies (booleans, LOBs...)
516      *
517      * @param database_column_info $column column metadata corresponding with the value we are going to normalise
518      * @param mixed $value value we are going to normalise
519      * @return mixed the normalised value
520      */
521     protected function normalise_value($column, $value) {
522         if (is_bool($value)) { // Always, convert boolean to int
523             $value = (int)$value;
525         } else if ($column->meta_type == 'B') { // BLOB detected, we return 'blob' array instead of raw value to allow
526             if (!is_null($value)) {             // binding/executing code later to know about its nature
527                 $value = array('blob' => $value);
528             }
530         } else if ($value === '') {
531             if ($column->meta_type == 'I' or $column->meta_type == 'F' or $column->meta_type == 'N') {
532                 $value = 0; // prevent '' problems in numeric fields
533             }
534         }
535         return $value;
536     }
538     /**
539      * Is db in unicode mode?
540      * @return bool
541      */
542     public function setup_is_unicodedb() {
543     /// Get PostgreSQL server_encoding value
544         $sql = "SHOW server_encoding";
545         $this->query_start($sql, null, SQL_QUERY_AUX);
546         $result = pg_query($this->pgsql, $sql);
547         $this->query_end($result);
549         if (!$result) {
550             return false;
551         }
552         $rawcolumn = pg_fetch_object($result);
553         $encoding = $rawcolumn->server_encoding;
554         pg_free_result($result);
556         return (strtoupper($encoding) == 'UNICODE' || strtoupper($encoding) == 'UTF8');
557     }
559     /**
560      * Do NOT use in code, to be used by database_manager only!
561      * @param string $sql query
562      * @return bool true
563      * @throws dml_exception if error
564      */
565     public function change_database_structure($sql) {
566         $this->reset_caches();
568         $this->query_start($sql, null, SQL_QUERY_STRUCTURE);
569         $result = pg_query($this->pgsql, $sql);
570         $this->query_end($result);
572         pg_free_result($result);
573         return true;
574     }
576     /**
577      * Execute general sql query. Should be used only when no other method suitable.
578      * Do NOT use this to make changes in db structure, use database_manager::execute_sql() instead!
579      * @param string $sql query
580      * @param array $params query parameters
581      * @return bool true
582      * @throws dml_exception if error
583      */
584     public function execute($sql, array $params=null) {
585         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
587         if (strpos($sql, ';') !== false) {
588             throw new coding_exception('moodle_database::execute() Multiple sql statements found or bound parameters not used properly in query!');
589         }
591         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
592         $result = pg_query_params($this->pgsql, $sql, $params);
593         $this->query_end($result);
595         pg_free_result($result);
596         return true;
597     }
599     /**
600      * Get a number of records as a moodle_recordset using a SQL statement.
601      *
602      * Since this method is a little less readable, use of it should be restricted to
603      * code where it's possible there might be large datasets being returned.  For known
604      * small datasets use get_records_sql - it leads to simpler code.
605      *
606      * The return type is as for @see function get_recordset.
607      *
608      * @param string $sql the SQL select query to execute.
609      * @param array $params array of sql parameters
610      * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
611      * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
612      * @return moodle_recordset instance
613      * @throws dml_exception if error
614      */
615     public function get_recordset_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
616         $limitfrom = (int)$limitfrom;
617         $limitnum  = (int)$limitnum;
618         $limitfrom = ($limitfrom < 0) ? 0 : $limitfrom;
619         $limitnum  = ($limitnum < 0)  ? 0 : $limitnum;
620         if ($limitfrom or $limitnum) {
621             if ($limitnum < 1) {
622                 $limitnum = "ALL";
623             }
624             $sql .= " LIMIT $limitnum OFFSET $limitfrom";
625         }
627         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
629         $this->query_start($sql, $params, SQL_QUERY_SELECT);
630         $result = pg_query_params($this->pgsql, $sql, $params);
631         $this->query_end($result);
633         return $this->create_recordset($result);
634     }
636     protected function create_recordset($result) {
637         return new pgsql_native_moodle_recordset($result, $this->bytea_oid);
638     }
640     /**
641      * Get a number of records as an array of objects using a SQL statement.
642      *
643      * Return value as for @see function get_records.
644      *
645      * @param string $sql the SQL select query to execute. The first column of this SELECT statement
646      *   must be a unique value (usually the 'id' field), as it will be used as the key of the
647      *   returned array.
648      * @param array $params array of sql parameters
649      * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
650      * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
651      * @return array of objects, or empty array if no records were found
652      * @throws dml_exception if error
653      */
654     public function get_records_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
655         $limitfrom = (int)$limitfrom;
656         $limitnum  = (int)$limitnum;
657         $limitfrom = ($limitfrom < 0) ? 0 : $limitfrom;
658         $limitnum  = ($limitnum < 0)  ? 0 : $limitnum;
659         if ($limitfrom or $limitnum) {
660             if ($limitnum < 1) {
661                 $limitnum = "ALL";
662             }
663             $sql .= " LIMIT $limitnum OFFSET $limitfrom";
664         }
666         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
667         $this->query_start($sql, $params, SQL_QUERY_SELECT);
668         $result = pg_query_params($this->pgsql, $sql, $params);
669         $this->query_end($result);
671         // find out if there are any blobs
672         $numrows = pg_num_fields($result);
673         $blobs = array();
674         for($i=0; $i<$numrows; $i++) {
675             $type_oid = pg_field_type_oid($result, $i);
676             if ($type_oid == $this->bytea_oid) {
677                 $blobs[] = pg_field_name($result, $i);
678             }
679         }
681         $rows = pg_fetch_all($result);
682         pg_free_result($result);
684         $return = array();
685         if ($rows) {
686             foreach ($rows as $row) {
687                 $id = reset($row);
688                 if ($blobs) {
689                     foreach ($blobs as $blob) {
690                         $row[$blob] = $row[$blob] !== null ? pg_unescape_bytea($row[$blob]) : null;
691                     }
692                 }
693                 if (isset($return[$id])) {
694                     $colname = key($row);
695                     debugging("Did you remember to make the first column something unique in your call to get_records? Duplicate value '$id' found in column '$colname'.", DEBUG_DEVELOPER);
696                 }
697                 $return[$id] = (object)$row;
698             }
699         }
701         return $return;
702     }
704     /**
705      * Selects records and return values (first field) as an array using a SQL statement.
706      *
707      * @param string $sql The SQL query
708      * @param array $params array of sql parameters
709      * @return array of values
710      * @throws dml_exception if error
711      */
712     public function get_fieldset_sql($sql, array $params=null) {
713         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
715         $this->query_start($sql, $params, SQL_QUERY_SELECT);
716         $result = pg_query_params($this->pgsql, $sql, $params);
717         $this->query_end($result);
719         $return = pg_fetch_all_columns($result, 0);
720         pg_free_result($result);
722         return $return;
723     }
725     /**
726      * Insert new record into database, as fast as possible, no safety checks, lobs not supported.
727      * @param string $table name
728      * @param mixed $params data record as object or array
729      * @param bool $returnit return it of inserted record
730      * @param bool $bulk true means repeated inserts expected
731      * @param bool $customsequence true if 'id' included in $params, disables $returnid
732      * @return bool|int true or new id
733      * @throws dml_exception if error
734      */
735     public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) {
736         if (!is_array($params)) {
737             $params = (array)$params;
738         }
740         $returning = "";
742         if ($customsequence) {
743             if (!isset($params['id'])) {
744                 throw new coding_exception('moodle_database::insert_record_raw() id field must be specified if custom sequences used.');
745             }
746             $returnid = false;
747         } else {
748             if ($returnid) {
749                 $returning = "RETURNING id";
750                 unset($params['id']);
751             } else {
752                 unset($params['id']);
753             }
754         }
756         if (empty($params)) {
757             throw new coding_exception('moodle_database::insert_record_raw() no fields found.');
758         }
760         $fields = implode(',', array_keys($params));
761         $values = array();
762         $count = count($params);
763         for ($i=1; $i<=$count; $i++) {
764             $values[] = "\$".$i;
765         }
766         $values = implode(',', $values);
768         $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning";
769         $this->query_start($sql, $params, SQL_QUERY_INSERT);
770         $result = pg_query_params($this->pgsql, $sql, $params);
771         $this->query_end($result);
773         if ($returning !== "") {
774             $row = pg_fetch_assoc($result);
775             $params['id'] = reset($row);
776         }
777         pg_free_result($result);
779         if (!$returnid) {
780             return true;
781         }
783         return (int)$params['id'];
784     }
786     /**
787      * Insert a record into a table and return the "id" field if required.
788      *
789      * Some conversions and safety checks are carried out. Lobs are supported.
790      * If the return ID isn't required, then this just reports success as true/false.
791      * $data is an object containing needed data
792      * @param string $table The database table to be inserted into
793      * @param object $data A data object with values for one or more fields in the record
794      * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned.
795      * @return bool|int true or new id
796      * @throws dml_exception if error
797      */
798     public function insert_record($table, $dataobject, $returnid=true, $bulk=false) {
799         $dataobject = (array)$dataobject;
801         $columns = $this->get_columns($table);
802         $cleaned = array();
803         $blobs   = array();
805         foreach ($dataobject as $field=>$value) {
806             if ($field === 'id') {
807                 continue;
808             }
809             if (!isset($columns[$field])) {
810                 continue;
811             }
812             $column = $columns[$field];
813             $normalised_value = $this->normalise_value($column, $value);
814             if (is_array($normalised_value) && array_key_exists('blob', $normalised_value)) {
815                 $cleaned[$field] = '@#BLOB#@';
816                 $blobs[$field] = $normalised_value['blob'];
817             } else {
818                 $cleaned[$field] = $normalised_value;
819             }
820         }
822         if (empty($blobs)) {
823             return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
824         }
826         $id = $this->insert_record_raw($table, $cleaned, true, $bulk);
828         foreach ($blobs as $key=>$value) {
829             $value = pg_escape_bytea($this->pgsql, $value);
830             $sql = "UPDATE {$this->prefix}$table SET $key = '$value'::bytea WHERE id = $id";
831             $this->query_start($sql, NULL, SQL_QUERY_UPDATE);
832             $result = pg_query($this->pgsql, $sql);
833             $this->query_end($result);
834             if ($result !== false) {
835                 pg_free_result($result);
836             }
837         }
839         return ($returnid ? $id : true);
841     }
843     /**
844      * Import a record into a table, id field is required.
845      * Safety checks are NOT carried out. Lobs are supported.
846      *
847      * @param string $table name of database table to be inserted into
848      * @param object $dataobject A data object with values for one or more fields in the record
849      * @return bool true
850      * @throws dml_exception if error
851      */
852     public function import_record($table, $dataobject) {
853         $dataobject = (array)$dataobject;
855         $columns = $this->get_columns($table);
856         $cleaned = array();
858         foreach ($dataobject as $field=>$value) {
859             if (!isset($columns[$field])) {
860                 continue;
861             }
862             $cleaned[$field] = $value;
863         }
865         return $this->insert_record_raw($table, $cleaned, false, true, true);
866     }
868     /**
869      * Update record in database, as fast as possible, no safety checks, lobs not supported.
870      * @param string $table name
871      * @param mixed $params data record as object or array
872      * @param bool true means repeated updates expected
873      * @return bool true
874      * @throws dml_exception if error
875      */
876     public function update_record_raw($table, $params, $bulk=false) {
877         $params = (array)$params;
879         if (!isset($params['id'])) {
880             throw new coding_exception('moodle_database::update_record_raw() id field must be specified.');
881         }
882         $id = $params['id'];
883         unset($params['id']);
885         if (empty($params)) {
886             throw new coding_exception('moodle_database::update_record_raw() no fields found.');
887         }
889         $i = 1;
891         $sets = array();
892         foreach ($params as $field=>$value) {
893             $sets[] = "$field = \$".$i++;
894         }
896         $params[] = $id; // last ? in WHERE condition
898         $sets = implode(',', $sets);
899         $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i;
901         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
902         $result = pg_query_params($this->pgsql, $sql, $params);
903         $this->query_end($result);
905         pg_free_result($result);
906         return true;
907     }
909     /**
910      * Update a record in a table
911      *
912      * $dataobject is an object containing needed data
913      * Relies on $dataobject having a variable "id" to
914      * specify the record to update
915      *
916      * @param string $table The database table to be checked against.
917      * @param object $dataobject An object with contents equal to fieldname=>fieldvalue. Must have an entry for 'id' to map to the table specified.
918      * @param bool true means repeated updates expected
919      * @return bool true
920      * @throws dml_exception if error
921      */
922     public function update_record($table, $dataobject, $bulk=false) {
923         $dataobject = (array)$dataobject;
925         $columns = $this->get_columns($table);
926         $cleaned = array();
927         $blobs   = array();
929         foreach ($dataobject as $field=>$value) {
930             if (!isset($columns[$field])) {
931                 continue;
932             }
933             $column = $columns[$field];
934             $normalised_value = $this->normalise_value($column, $value);
935             if (is_array($normalised_value) && array_key_exists('blob', $normalised_value)) {
936                 $cleaned[$field] = '@#BLOB#@';
937                 $blobs[$field] = $normalised_value['blob'];
938             } else {
939                 $cleaned[$field] = $normalised_value;
940             }
941         }
943         $this->update_record_raw($table, $cleaned, $bulk);
945         if (empty($blobs)) {
946             return true;
947         }
949         $id = (int)$dataobject['id'];
951         foreach ($blobs as $key=>$value) {
952             $value = pg_escape_bytea($this->pgsql, $value);
953             $sql = "UPDATE {$this->prefix}$table SET $key = '$value'::bytea WHERE id = $id";
954             $this->query_start($sql, NULL, SQL_QUERY_UPDATE);
955             $result = pg_query($this->pgsql, $sql);
956             $this->query_end($result);
958             pg_free_result($result);
959         }
961         return true;
962     }
964     /**
965      * Set a single field in every table record which match a particular WHERE clause.
966      *
967      * @param string $table The database table to be checked against.
968      * @param string $newfield the field to set.
969      * @param string $newvalue the value to set the field to.
970      * @param string $select A fragment of SQL to be used in a where clause in the SQL call.
971      * @param array $params array of sql parameters
972      * @return bool true
973      * @throws dml_exception if error
974      */
975     public function set_field_select($table, $newfield, $newvalue, $select, array $params=null) {
977         if ($select) {
978             $select = "WHERE $select";
979         }
980         if (is_null($params)) {
981             $params = array();
982         }
983         list($select, $params, $type) = $this->fix_sql_params($select, $params);
984         $i = count($params)+1;
986     /// Get column metadata
987         $columns = $this->get_columns($table);
988         $column = $columns[$newfield];
990         $normalised_value = $this->normalise_value($column, $newvalue);
991         if (is_array($normalised_value) && array_key_exists('blob', $normalised_value)) {
992         /// Update BYTEA and return
993             $normalised_value = pg_escape_bytea($this->pgsql, $normalised_value['blob']);
994             $sql = "UPDATE {$this->prefix}$table SET $newfield = '$normalised_value'::bytea $select";
995             $this->query_start($sql, NULL, SQL_QUERY_UPDATE);
996             $result = pg_query_params($this->pgsql, $sql, $params);
997             $this->query_end($result);
998             pg_free_result($result);
999             return true;
1000         }
1002         if (is_null($normalised_value)) {
1003             $newfield = "$newfield = NULL";
1004         } else {
1005             $newfield = "$newfield = \$".$i;
1006             $params[] = $normalised_value;
1007         }
1008         $sql = "UPDATE {$this->prefix}$table SET $newfield $select";
1010         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1011         $result = pg_query_params($this->pgsql, $sql, $params);
1012         $this->query_end($result);
1014         pg_free_result($result);
1016         return true;
1017     }
1019     /**
1020      * Delete one or more records from a table which match a particular WHERE clause.
1021      *
1022      * @param string $table The database table to be checked against.
1023      * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria).
1024      * @param array $params array of sql parameters
1025      * @return bool true
1026      * @throws dml_exception if error
1027      */
1028     public function delete_records_select($table, $select, array $params=null) {
1029         if ($select) {
1030             $select = "WHERE $select";
1031         }
1032         $sql = "DELETE FROM {$this->prefix}$table $select";
1034         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1036         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1037         $result = pg_query_params($this->pgsql, $sql, $params);
1038         $this->query_end($result);
1040         pg_free_result($result);
1042         return true;
1043     }
1045     public function sql_ilike() {
1046         return 'ILIKE';
1047     }
1049     public function sql_bitxor($int1, $int2) {
1050         return '(' . $this->sql_bitor($int1, $int2) . ' - ' . $this->sql_bitand($int1, $int2) . ')';
1051     }
1053     public function sql_cast_char2int($fieldname, $text=false) {
1054         return ' CAST(' . $fieldname . ' AS INT) ';
1055     }
1057     public function sql_cast_char2real($fieldname, $text=false) {
1058         return " $fieldname::real ";
1059     }
1061     public function sql_concat() {
1062         $arr = func_get_args();
1063         $s = implode(' || ', $arr);
1064         if ($s === '') {
1065             return " '' ";
1066         }
1067         return " $s ";
1068     }
1070     public function sql_concat_join($separator="' '", $elements=array()) {
1071         for ($n=count($elements)-1; $n > 0 ; $n--) {
1072             array_splice($elements, $n, 0, $separator);
1073         }
1074         $s = implode(' || ', $elements);
1075         if ($s === '') {
1076             return " '' ";
1077         }
1078         return " $s ";
1079     }
1081     public function sql_regex_supported() {
1082         return true;
1083     }
1085     public function sql_regex($positivematch=true) {
1086         return $positivematch ? '~*' : '!~*';
1087     }
1089 /// session locking
1090     public function session_lock_supported() {
1091         return true;
1092     }
1094     public function get_session_lock($rowid) {
1095         // NOTE: there is a potential locking problem for database running
1096         //       multiple instances of moodle, we could try to use pg_advisory_lock(int, int),
1097         //       luckily there is not a big chance that they would collide
1098         if (!$this->session_lock_supported()) {
1099             return;
1100         }
1102         parent::get_session_lock($rowid);
1103         $sql = "SELECT pg_advisory_lock($rowid)";
1104         $this->query_start($sql, null, SQL_QUERY_AUX);
1105         $result = pg_query($this->pgsql, $sql);
1106         $this->query_end($result);
1108         if ($result) {
1109             pg_free_result($result);
1110         }
1111     }
1113     public function release_session_lock($rowid) {
1114         if (!$this->session_lock_supported()) {
1115             return;
1116         }
1117         parent::release_session_lock($rowid);
1119         $sql = "SELECT pg_advisory_unlock($rowid)";
1120         $this->query_start($sql, null, SQL_QUERY_AUX);
1121         $result = pg_query($this->pgsql, $sql);
1122         $this->query_end($result);
1124         if ($result) {
1125             pg_free_result($result);
1126         }
1127     }
1129 /// transactions
1130     /**
1131      * Driver specific start of real database transaction,
1132      * this can not be used directly in code.
1133      * @return void
1134      */
1135     protected function begin_transaction() {
1136         $sql = "BEGIN ISOLATION LEVEL READ COMMITTED";
1137         $this->query_start($sql, NULL, SQL_QUERY_AUX);
1138         $result = pg_query($this->pgsql, $sql);
1139         $this->query_end($result);
1141         pg_free_result($result);
1142     }
1144     /**
1145      * Driver specific commit of real database transaction,
1146      * this can not be used directly in code.
1147      * @return void
1148      */
1149     protected function commit_transaction() {
1150         $sql = "COMMIT";
1151         $this->query_start($sql, NULL, SQL_QUERY_AUX);
1152         $result = pg_query($this->pgsql, $sql);
1153         $this->query_end($result);
1155         pg_free_result($result);
1156     }
1158     /**
1159      * Driver specific abort of real database transaction,
1160      * this can not be used directly in code.
1161      * @return void
1162      */
1163     protected function rollback_transaction() {
1164         $sql = "ROLLBACK";
1165         $this->query_start($sql, NULL, SQL_QUERY_AUX);
1166         $result = pg_query($this->pgsql, $sql);
1167         $this->query_end($result);
1169         pg_free_result($result);
1170     }
1172     /**
1173      * Helper function trimming (whitespace + quotes) any string
1174      * needed because PG uses to enclose with double quotes some
1175      * fields in indexes definition and others
1176      *
1177      * @param string $str string to apply whitespace + quotes trim
1178      * @return string trimmed string
1179      */
1180     private function trim_quotes($str) {
1181         return trim(trim($str), "'\"");
1182     }