MDL-20734 normalise_value() - moving from private to protected everywhere and abstracting
[moodle.git] / lib / dml / pgsql_native_moodle_database.php
1 <?php
3 // This file is part of Moodle - http://moodle.org/
4 //
5 // Moodle is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // Moodle is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU General Public License
16 // along with Moodle.  If not, see <http://www.gnu.org/licenses/>.
19 /**
20  * Native pgsql class representing moodle database interface.
21  *
22  * @package    moodlecore
23  * @subpackage DML
24  * @copyright  2008 Petr Skoda (http://skodak.org)
25  * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
26  */
28 require_once($CFG->libdir.'/dml/moodle_database.php');
29 require_once($CFG->libdir.'/dml/pgsql_native_moodle_recordset.php');
31 /**
32  * Native pgsql class representing moodle database interface.
33  */
34 class pgsql_native_moodle_database extends moodle_database {
36     protected $pgsql     = null;
37     protected $bytea_oid = null;
39     protected $last_error_reporting; // To handle pgsql driver default verbosity
41     /**
42      * Detects if all needed PHP stuff installed.
43      * Note: can be used before connect()
44      * @return mixed true if ok, string if something
45      */
46     public function driver_installed() {
47         if (!extension_loaded('pgsql')) {
48             return get_string('pgsqlextensionisnotpresentinphp', 'install');
49         }
50         return true;
51     }
53     /**
54      * Returns database family type - describes SQL dialect
55      * Note: can be used before connect()
56      * @return string db family name (mysql, postgres, mssql, oracle, etc.)
57      */
58     public function get_dbfamily() {
59         return 'postgres';
60     }
62     /**
63      * Returns more specific database driver type
64      * Note: can be used before connect()
65      * @return string db type mysql, pgsql, postgres7
66      */
67     protected function get_dbtype() {
68         return 'pgsql';
69     }
71     /**
72      * Returns general database library name
73      * Note: can be used before connect()
74      * @return string db type pdo, native
75      */
76     protected function get_dblibrary() {
77         return 'native';
78     }
80     /**
81      * Returns localised database type name
82      * Note: can be used before connect()
83      * @return string
84      */
85     public function get_name() {
86         return get_string('nativepgsql', 'install');
87     }
89     /**
90      * Returns localised database configuration help.
91      * Note: can be used before connect()
92      * @return string
93      */
94     public function get_configuration_help() {
95         return get_string('nativepgsqlhelp', 'install');
96     }
98     /**
99      * Returns localised database description
100      * Note: can be used before connect()
101      * @return string
102      */
103     public function get_configuration_hints() {
104         return get_string('databasesettingssub_postgres7', 'install');
105     }
107     /**
108      * Connect to db
109      * Must be called before other methods.
110      * @param string $dbhost
111      * @param string $dbuser
112      * @param string $dbpass
113      * @param string $dbname
114      * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used
115      * @param array $dboptions driver specific options
116      * @return bool true
117      * @throws dml_connection_exception if error
118      */
119     public function connect($dbhost, $dbuser, $dbpass, $dbname, $prefix, array $dboptions=null) {
120         if ($prefix == '' and !$this->external) {
121             //Enforce prefixes for everybody but mysql
122             throw new dml_exception('prefixcannotbeempty', $this->get_dbfamily());
123         }
125         $driverstatus = $this->driver_installed();
127         if ($driverstatus !== true) {
128             throw new dml_exception('dbdriverproblem', $driverstatus);
129         }
131         $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions);
133         $pass = addcslashes($this->dbpass, "'\\");
135         // Unix socket connections should have lower overhead
136         if (!empty($this->dboptions['dbsocket']) and ($this->dbhost === 'localhost' or $this->dbhost === '127.0.0.1')) {
137             $connection = "user='$this->dbuser' password='$pass' dbname='$this->dbname'";
138         } else {
139             $this->dboptions['dbsocket'] = 0;
140             if (empty($this->dbname)) {
141                 // probably old style socket connection - do not add port
142                 $port = "";
143             } else if (empty($this->dboptions['dbport'])) {
144                 $port = "port ='5432'";
145             } else {
146                 $port = "port ='".$this->dboptions['dbport']."'";
147             }
148             $connection = "host='$this->dbhost' $port user='$this->dbuser' password='$pass' dbname='$this->dbname'";
149         }
151         ob_start();
152         if (empty($this->dboptions['dbpersit'])) {
153             $this->pgsql = pg_connect($connection, PGSQL_CONNECT_FORCE_NEW);
154         } else {
155             $this->pgsql = pg_pconnect($connection, PGSQL_CONNECT_FORCE_NEW);
156         }
157         $dberr = ob_get_contents();
158         ob_end_clean();
160         $status = pg_connection_status($this->pgsql);
162         if ($status === false or $status === PGSQL_CONNECTION_BAD) {
163             $this->pgsql = null;
164             throw new dml_connection_exception($dberr);
165         }
167         $this->query_start("--pg_set_client_encoding()", null, SQL_QUERY_AUX);
168         pg_set_client_encoding($this->pgsql, 'utf8');
169         $this->query_end(true);
171         // find out the bytea oid
172         $sql = "SELECT oid FROM pg_type WHERE typname = 'bytea'";
173         $this->query_start($sql, null, SQL_QUERY_AUX);
174         $result = pg_query($this->pgsql, $sql);
175         $this->query_end($result);
177         $this->bytea_oid = pg_fetch_result($result, 0);
178         pg_free_result($result);
179         if ($this->bytea_oid === false) {
180             $this->pgsql = null;
181             throw new dml_connection_exception('Can not read bytea type.');
182         }
184         return true;
185     }
187     /**
188      * Close database connection and release all resources
189      * and memory (especially circular memory references).
190      * Do NOT use connect() again, create a new instance if needed.
191      */
192     public function dispose() {
193         parent::dispose(); // Call parent dispose to write/close session and other common stuff before clossing conn
194         if ($this->pgsql) {
195             pg_close($this->pgsql);
196             $this->pgsql = null;
197         }
198     }
201     /**
202      * Called before each db query.
203      * @param string $sql
204      * @param array array of parameters
205      * @param int $type type of query
206      * @param mixed $extrainfo driver specific extra information
207      * @return void
208      */
209     protected function query_start($sql, array $params=null, $type, $extrainfo=null) {
210         parent::query_start($sql, $params, $type, $extrainfo);
211         // pgsql driver tents to send debug to output, we do not need that ;-)
212         $this->last_error_reporting = error_reporting(0);
213     }
215     /**
216      * Called immediately after each db query.
217      * @param mixed db specific result
218      * @return void
219      */
220     protected function query_end($result) {
221         // reset original debug level
222         error_reporting($this->last_error_reporting);
223         parent::query_end($result);
224     }
226     /**
227      * Returns database server info array
228      * @return array
229      */
230     public function get_server_info() {
231         static $info;
232         if (!$info) {
233             $this->query_start("--pg_version()", null, SQL_QUERY_AUX);
234             $info = pg_version($this->pgsql);
235             $this->query_end(true);
236         }
237         return array('description'=>$info['server'], 'version'=>$info['server']);
238     }
240     protected function is_min_version($version) {
241         $server = $this->get_server_info();
242         $server = $server['version'];
243         return version_compare($server, $version, '>=');
244     }
246     /**
247      * Returns supported query parameter types
248      * @return bitmask
249      */
250     protected function allowed_param_types() {
251         return SQL_PARAMS_DOLLAR;
252     }
254     /**
255      * Returns last error reported by database engine.
256      */
257     public function get_last_error() {
258         return pg_last_error($this->pgsql);
259     }
261     /**
262      * Return tables in database WITHOUT current prefix
263      * @return array of table names in lowercase and without prefix
264      */
265     public function get_tables($usecache=true) {
266         if ($usecache and $this->tables !== null) {
267             return $this->tables;
268         }
269         $this->tables = array();
270         $prefix = str_replace('_', '\\\\_', $this->prefix);
271         $sql = "SELECT tablename
272                   FROM pg_catalog.pg_tables
273                  WHERE tablename LIKE '$prefix%'";
274         $this->query_start($sql, null, SQL_QUERY_AUX);
275         $result = pg_query($this->pgsql, $sql);
276         $this->query_end($result);
278         if ($result) {
279             while ($row = pg_fetch_row($result)) {
280                 $tablename = reset($row);
281                 if (strpos($tablename, $this->prefix) !== 0) {
282                     continue;
283                 }
284                 $tablename = substr($tablename, strlen($this->prefix));
285                 $this->tables[$tablename] = $tablename;
286             }
287             pg_free_result($result);
288         }
289         return $this->tables;
290     }
292     /**
293      * Return table indexes - everything lowercased
294      * @return array of arrays
295      */
296     public function get_indexes($table) {
297         $indexes = array();
298         $tablename = $this->prefix.$table;
300         $sql = "SELECT *
301                   FROM pg_catalog.pg_indexes
302                  WHERE tablename = '$tablename'";
304         $this->query_start($sql, null, SQL_QUERY_AUX);
305         $result = pg_query($this->pgsql, $sql);
306         $this->query_end($result);
308         if ($result) {
309             while ($row = pg_fetch_assoc($result)) {
310                 if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON '.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) {
311                     continue;
312                 }
313                 if ($matches[4] === 'id') {
314                     continue;
315                 }
316                 $columns = explode(',', $matches[4]);
317                 $columns = array_map(array($this, 'trim_quotes'), $columns);
318                 $indexes[$matches[2]] = array('unique'=>!empty($matches[1]),
319                                               'columns'=>$columns);
320             }
321             pg_free_result($result);
322         }
323         return $indexes;
324     }
326     /**
327      * Returns datailed information about columns in table. This information is cached internally.
328      * @param string $table name
329      * @param bool $usecache
330      * @return array array of database_column_info objects indexed with column names
331      */
332     public function get_columns($table, $usecache=true) {
333         if ($usecache and isset($this->columns[$table])) {
334             return $this->columns[$table];
335         }
337         $this->columns[$table] = array();
339         $tablename = $this->prefix.$table;
341         $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef, d.adsrc
342                   FROM pg_catalog.pg_class c
343                   JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
344                   JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
345              LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum)
346                  WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0
347               ORDER BY a.attnum";
349         $this->query_start($sql, null, SQL_QUERY_AUX);
350         $result = pg_query($this->pgsql, $sql);
351         $this->query_end($result);
353         if (!$result) {
354             return array();
355         }
356         while ($rawcolumn = pg_fetch_object($result)) {
358             $info = new object();
359             $info->name = $rawcolumn->field;
360             $matches = null;
362             if ($rawcolumn->type === 'varchar') {
363                 $info->type          = 'varchar';
364                 $info->meta_type     = 'C';
365                 $info->max_length    = $rawcolumn->atttypmod - 4;
366                 $info->scale         = null;
367                 $info->not_null      = ($rawcolumn->attnotnull === 't');
368                 $info->has_default   = ($rawcolumn->atthasdef === 't');
369                 if ($info->has_default) {
370                     $parts = explode('::', $rawcolumn->adsrc);
371                     if (count($parts) > 1) {
372                         $info->default_value = reset($parts);
373                         $info->default_value = trim($info->default_value, "'");
374                     } else {
375                         $info->default_value = $rawcolumn->adsrc;
376                     }
377                 } else {
378                     $info->default_value = null;
379                 }
380                 $info->primary_key   = false;
381                 $info->binary        = false;
382                 $info->unsigned      = null;
383                 $info->auto_increment= false;
384                 $info->unique        = null;
386             } else if (preg_match('/int(\d)/i', $rawcolumn->type, $matches)) {
387                 $info->type = 'int';
388                 if (strpos($rawcolumn->adsrc, 'nextval') === 0) {
389                     $info->primary_key   = true;
390                     $info->meta_type     = 'R';
391                     $info->unique        = true;
392                     $info->auto_increment= true;
393                     $info->has_default   = false;
394                 } else {
395                     $info->primary_key   = false;
396                     $info->meta_type     = 'I';
397                     $info->unique        = null;
398                     $info->auto_increment= false;
399                     $info->has_default   = ($rawcolumn->atthasdef === 't');
400                 }
401                 $info->max_length    = $matches[1];
402                 $info->scale         = null;
403                 $info->not_null      = ($rawcolumn->attnotnull === 't');
404                 if ($info->has_default) {
405                     $info->default_value = $rawcolumn->adsrc;
406                 } else {
407                     $info->default_value = null;
408                 }
409                 $info->binary        = false;
410                 $info->unsigned      = false;
412             } else if ($rawcolumn->type === 'numeric') {
413                 $info->type = $rawcolumn->type;
414                 $info->meta_type     = 'N';
415                 $info->primary_key   = false;
416                 $info->binary        = false;
417                 $info->unsigned      = null;
418                 $info->auto_increment= false;
419                 $info->unique        = null;
420                 $info->not_null      = ($rawcolumn->attnotnull === 't');
421                 $info->has_default   = ($rawcolumn->atthasdef === 't');
422                 if ($info->has_default) {
423                     $info->default_value = $rawcolumn->adsrc;
424                 } else {
425                     $info->default_value = null;
426                 }
427                 $info->max_length    = $rawcolumn->atttypmod >> 16;
428                 $info->scale         = ($rawcolumn->atttypmod & 0xFFFF) - 4;
430             } else if (preg_match('/float(\d)/i', $rawcolumn->type, $matches)) {
431                 $info->type = 'float';
432                 $info->meta_type     = 'N';
433                 $info->primary_key   = false;
434                 $info->binary        = false;
435                 $info->unsigned      = null;
436                 $info->auto_increment= false;
437                 $info->unique        = null;
438                 $info->not_null      = ($rawcolumn->attnotnull === 't');
439                 $info->has_default   = ($rawcolumn->atthasdef === 't');
440                 if ($info->has_default) {
441                     $info->default_value = $rawcolumn->adsrc;
442                 } else {
443                     $info->default_value = null;
444                 }
445                 // just guess expected number of deciaml places :-(
446                 if ($matches[1] == 8) {
447                     // total 15 digits
448                     $info->max_length = 8;
449                     $info->scale      = 7;
450                 } else {
451                     // total 6 digits
452                     $info->max_length = 4;
453                     $info->scale      = 2;
454                 }
456             } else if ($rawcolumn->type === 'text') {
457                 $info->type          = $rawcolumn->type;
458                 $info->meta_type     = 'X';
459                 $info->max_length    = -1;
460                 $info->scale         = null;
461                 $info->not_null      = ($rawcolumn->attnotnull === 't');
462                 $info->has_default   = ($rawcolumn->atthasdef === 't');
463                 if ($info->has_default) {
464                     $parts = explode('::', $rawcolumn->adsrc);
465                     if (count($parts) > 1) {
466                         $info->default_value = reset($parts);
467                         $info->default_value = trim($info->default_value, "'");
468                     } else {
469                         $info->default_value = $rawcolumn->adsrc;
470                     }
471                 } else {
472                     $info->default_value = null;
473                 }
474                 $info->primary_key   = false;
475                 $info->binary        = false;
476                 $info->unsigned      = null;
477                 $info->auto_increment= false;
478                 $info->unique        = null;
480             } else if ($rawcolumn->type === 'bytea') {
481                 $info->type          = $rawcolumn->type;
482                 $info->meta_type     = 'B';
483                 $info->max_length    = -1;
484                 $info->scale         = null;
485                 $info->not_null      = ($rawcolumn->attnotnull === 't');
486                 $info->has_default   = false;
487                 $info->default_value = null;
488                 $info->primary_key   = false;
489                 $info->binary        = true;
490                 $info->unsigned      = null;
491                 $info->auto_increment= false;
492                 $info->unique        = null;
494             }
496             $this->columns[$table][$info->name] = new database_column_info($info);
497         }
499         pg_free_result($result);
501         return $this->columns[$table];
502     }
504     /**
505      * Normalise values based in RDBMS dependencies (booleans, LOBs...)
506      *
507      * @param database_column_info $column column metadata corresponding with the value we are going to normalise
508      * @param mixed $value value we are going to normalise
509      * @return mixed the normalised value
510      */
511     protected function normalise_value($column, $value) {
512         if (is_bool($value)) { // Always, convert boolean to int
513             $value = (int)$value;
515         } else if ($column->meta_type == 'B') { // BLOB detected, we return 'blob' array instead of raw value to allow
516             if (!is_null($value)) {             // binding/executing code later to know about its nature
517                 $value = array('blob' => $value);
518             }
520         } else if ($value === '') {
521             if ($column->meta_type == 'I' or $column->meta_type == 'F' or $column->meta_type == 'N') {
522                 $value = 0; // prevent '' problems in numeric fields
523             }
524         }
525         return $value;
526     }
528     /**
529      * Is db in unicode mode?
530      * @return bool
531      */
532     public function setup_is_unicodedb() {
533     /// Get PostgreSQL server_encoding value
534         $sql = "SHOW server_encoding";
535         $this->query_start($sql, null, SQL_QUERY_AUX);
536         $result = pg_query($this->pgsql, $sql);
537         $this->query_end($result);
539         if (!$result) {
540             return false;
541         }
542         $rawcolumn = pg_fetch_object($result);
543         $encoding = $rawcolumn->server_encoding;
544         pg_free_result($result);
546         return (strtoupper($encoding) == 'UNICODE' || strtoupper($encoding) == 'UTF8');
547     }
549     /**
550      * Do NOT use in code, to be used by database_manager only!
551      * @param string $sql query
552      * @return bool true
553      * @throws dml_exception if error
554      */
555     public function change_database_structure($sql) {
556         $this->reset_caches();
558         $this->query_start($sql, null, SQL_QUERY_STRUCTURE);
559         $result = pg_query($this->pgsql, $sql);
560         $this->query_end($result);
562         pg_free_result($result);
563         return true;
564     }
566     /**
567      * Execute general sql query. Should be used only when no other method suitable.
568      * Do NOT use this to make changes in db structure, use database_manager::execute_sql() instead!
569      * @param string $sql query
570      * @param array $params query parameters
571      * @return bool true
572      * @throws dml_exception if error
573      */
574     public function execute($sql, array $params=null) {
575         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
577         if (strpos($sql, ';') !== false) {
578             throw new coding_exception('moodle_database::execute() Multiple sql statements found or bound parameters not used properly in query!');
579         }
581         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
582         $result = pg_query_params($this->pgsql, $sql, $params);
583         $this->query_end($result);
585         pg_free_result($result);
586         return true;
587     }
589     /**
590      * Get a number of records as a moodle_recordset using a SQL statement.
591      *
592      * Since this method is a little less readable, use of it should be restricted to
593      * code where it's possible there might be large datasets being returned.  For known
594      * small datasets use get_records_sql - it leads to simpler code.
595      *
596      * The return type is as for @see function get_recordset.
597      *
598      * @param string $sql the SQL select query to execute.
599      * @param array $params array of sql parameters
600      * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
601      * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
602      * @return mixed an moodle_recordset object
603      * @throws dml_exception if error
604      */
605     public function get_recordset_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
606         $limitfrom = (int)$limitfrom;
607         $limitnum  = (int)$limitnum;
608         $limitfrom = ($limitfrom < 0) ? 0 : $limitfrom;
609         $limitnum  = ($limitnum < 0)  ? 0 : $limitnum;
610         if ($limitfrom or $limitnum) {
611             if ($limitnum < 1) {
612                 $limitnum = "ALL";
613             }
614             $sql .= " LIMIT $limitnum OFFSET $limitfrom";
615         }
617         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
619         $this->query_start($sql, $params, SQL_QUERY_SELECT);
620         $result = pg_query_params($this->pgsql, $sql, $params);
621         $this->query_end($result);
623         return $this->create_recordset($result);
624     }
626     protected function create_recordset($result) {
627         return new pgsql_native_moodle_recordset($result, $this->bytea_oid);
628     }
630     /**
631      * Get a number of records as an array of objects using a SQL statement.
632      *
633      * Return value as for @see function get_records.
634      *
635      * @param string $sql the SQL select query to execute. The first column of this SELECT statement
636      *   must be a unique value (usually the 'id' field), as it will be used as the key of the
637      *   returned array.
638      * @param array $params array of sql parameters
639      * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
640      * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
641      * @return mixed an array of objects, or empty array if no records were found
642      * @throws dml_exception if error
643      */
644     public function get_records_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
645         $limitfrom = (int)$limitfrom;
646         $limitnum  = (int)$limitnum;
647         $limitfrom = ($limitfrom < 0) ? 0 : $limitfrom;
648         $limitnum  = ($limitnum < 0)  ? 0 : $limitnum;
649         if ($limitfrom or $limitnum) {
650             if ($limitnum < 1) {
651                 $limitnum = "ALL";
652             }
653             $sql .= " LIMIT $limitnum OFFSET $limitfrom";
654         }
656         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
657         $this->query_start($sql, $params, SQL_QUERY_SELECT);
658         $result = pg_query_params($this->pgsql, $sql, $params);
659         $this->query_end($result);
661         // find out if there are any blobs
662         $numrows = pg_num_fields($result);
663         $blobs = array();
664         for($i=0; $i<$numrows; $i++) {
665             $type_oid = pg_field_type_oid($result, $i);
666             if ($type_oid == $this->bytea_oid) {
667                 $blobs[] = pg_field_name($result, $i);
668             }
669         }
671         $rows = pg_fetch_all($result);
672         pg_free_result($result);
674         $return = array();
675         if ($rows) {
676             foreach ($rows as $row) {
677                 $id = reset($row);
678                 if ($blobs) {
679                     foreach ($blobs as $blob) {
680                         $row[$blob] = $row[$blob] !== null ? pg_unescape_bytea($row[$blob]) : null;
681                     }
682                 }
683                 if (isset($return[$id])) {
684                     $colname = key($row);
685                     debugging("Did you remember to make the first column something unique in your call to get_records? Duplicate value '$id' found in column '$colname'.", DEBUG_DEVELOPER);
686                 }
687                 $return[$id] = (object)$row;
688             }
689         }
691         return $return;
692     }
694     /**
695      * Selects records and return values (first field) as an array using a SQL statement.
696      *
697      * @param string $sql The SQL query
698      * @param array $params array of sql parameters
699      * @return mixed array of values
700      * @throws dml_exception if error
701      */
702     public function get_fieldset_sql($sql, array $params=null) {
703         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
705         $this->query_start($sql, $params, SQL_QUERY_SELECT);
706         $result = pg_query_params($this->pgsql, $sql, $params);
707         $this->query_end($result);
709         $return = pg_fetch_all_columns($result, 0);
710         pg_free_result($result);
712         return $return;
713     }
715     /**
716      * Insert new record into database, as fast as possible, no safety checks, lobs not supported.
717      * @param string $table name
718      * @param mixed $params data record as object or array
719      * @param bool $returnit return it of inserted record
720      * @param bool $bulk true means repeated inserts expected
721      * @param bool $customsequence true if 'id' included in $params, disables $returnid
722      * @return true or new id
723      * @throws dml_exception if error
724      */
725     public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) {
726         if (!is_array($params)) {
727             $params = (array)$params;
728         }
730         $returning = "";
732         if ($customsequence) {
733             if (!isset($params['id'])) {
734                 throw new coding_exception('moodle_database::insert_record_raw() id field must be specified if custom sequences used.');
735             }
736             $returnid = false;
737         } else {
738             if ($returnid) {
739                 $returning = "RETURNING id";
740                 unset($params['id']);
741             } else {
742                 unset($params['id']);
743             }
744         }
746         if (empty($params)) {
747             throw new coding_exception('moodle_database::insert_record_raw() no fields found.');
748         }
750         $fields = implode(',', array_keys($params));
751         $values = array();
752         $count = count($params);
753         for ($i=1; $i<=$count; $i++) {
754             $values[] = "\$".$i;
755         }
756         $values = implode(',', $values);
758         $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning";
759         $this->query_start($sql, $params, SQL_QUERY_INSERT);
760         $result = pg_query_params($this->pgsql, $sql, $params);
761         $this->query_end($result);
763         if ($returning !== "") {
764             $row = pg_fetch_assoc($result);
765             $params['id'] = reset($row);
766         }
767         pg_free_result($result);
769         if (!$returnid) {
770             return true;
771         }
773         return (int)$params['id'];
774     }
776     /**
777      * Insert a record into a table and return the "id" field if required.
778      *
779      * Some conversions and safety checks are carried out. Lobs are supported.
780      * If the return ID isn't required, then this just reports success as true/false.
781      * $data is an object containing needed data
782      * @param string $table The database table to be inserted into
783      * @param object $data A data object with values for one or more fields in the record
784      * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned.
785      * @return true or new id
786      * @throws dml_exception if error
787      */
788     public function insert_record($table, $dataobject, $returnid=true, $bulk=false) {
789         if (!is_object($dataobject)) {
790             $dataobject = (object)$dataobject;
791         }
793         $columns = $this->get_columns($table);
795         unset($dataobject->id);
796         $cleaned = array();
797         $blobs   = array();
799         foreach ($dataobject as $field=>$value) {
800             if (!isset($columns[$field])) {
801                 continue;
802             }
803             $column = $columns[$field];
804             $normalised_value = $this->normalise_value($column, $value);
805             if (is_array($normalised_value) && array_key_exists('blob', $normalised_value)) {
806                 $cleaned[$field] = '@#BLOB#@';
807                 $blobs[$field] = $normalised_value['blob'];
808             } else {
809                 $cleaned[$field] = $normalised_value;
810             }
811         }
813         if (empty($blobs)) {
814             return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
815         }
817         $id = $this->insert_record_raw($table, $cleaned, true, $bulk);
819         foreach ($blobs as $key=>$value) {
820             $value = pg_escape_bytea($this->pgsql, $value);
821             $sql = "UPDATE {$this->prefix}$table SET $key = '$value'::bytea WHERE id = $id";
822             $this->query_start($sql, NULL, SQL_QUERY_UPDATE);
823             $result = pg_query($this->pgsql, $sql);
824             $this->query_end($result);
825             if ($result !== false) {
826                 pg_free_result($result);
827             }
828         }
830         return ($returnid ? $id : true);
832     }
834     /**
835      * Import a record into a table, id field is required.
836      * Safety checks are NOT carried out. Lobs are supported.
837      *
838      * @param string $table name of database table to be inserted into
839      * @param object $dataobject A data object with values for one or more fields in the record
840      * @return bool true
841      * @throws dml_exception if error
842      */
843     public function import_record($table, $dataobject) {
844         $dataobject = (object)$dataobject;
846         $columns = $this->get_columns($table);
847         $cleaned = array();
849         foreach ($dataobject as $field=>$value) {
850             if (!isset($columns[$field])) {
851                 continue;
852             }
853             $cleaned[$field] = $value;
854         }
856         return $this->insert_record_raw($table, $cleaned, false, true, true);
857     }
859     /**
860      * Update record in database, as fast as possible, no safety checks, lobs not supported.
861      * @param string $table name
862      * @param mixed $params data record as object or array
863      * @param bool true means repeated updates expected
864      * @return bool true
865      * @throws dml_exception if error
866      */
867     public function update_record_raw($table, $params, $bulk=false) {
868         if (!is_array($params)) {
869             $params = (array)$params;
870         }
871         if (!isset($params['id'])) {
872             throw new coding_exception('moodle_database::update_record_raw() id field must be specified.');
873         }
874         $id = $params['id'];
875         unset($params['id']);
877         if (empty($params)) {
878             throw new coding_exception('moodle_database::update_record_raw() no fields found.');
879         }
881         $i = 1;
883         $sets = array();
884         foreach ($params as $field=>$value) {
885             $sets[] = "$field = \$".$i++;
886         }
888         $params[] = $id; // last ? in WHERE condition
890         $sets = implode(',', $sets);
891         $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i;
893         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
894         $result = pg_query_params($this->pgsql, $sql, $params);
895         $this->query_end($result);
897         pg_free_result($result);
898         return true;
899     }
901     /**
902      * Update a record in a table
903      *
904      * $dataobject is an object containing needed data
905      * Relies on $dataobject having a variable "id" to
906      * specify the record to update
907      *
908      * @param string $table The database table to be checked against.
909      * @param object $dataobject An object with contents equal to fieldname=>fieldvalue. Must have an entry for 'id' to map to the table specified.
910      * @param bool true means repeated updates expected
911      * @return bool true
912      * @throws dml_exception if error
913      */
914     public function update_record($table, $dataobject, $bulk=false) {
915         if (!is_object($dataobject)) {
916             $dataobject = (object)$dataobject;
917         }
919         $columns = $this->get_columns($table);
920         $cleaned = array();
921         $blobs   = array();
923         foreach ($dataobject as $field=>$value) {
924             if (!isset($columns[$field])) {
925                 continue;
926             }
927             $column = $columns[$field];
928             $normalised_value = $this->normalise_value($column, $value);
929             if (is_array($normalised_value) && array_key_exists('blob', $normalised_value)) {
930                 $cleaned[$field] = '@#BLOB#@';
931                 $blobs[$field] = $normalised_value['blob'];
932             } else {
933                 $cleaned[$field] = $normalised_value;
934             }
935         }
937         $this->update_record_raw($table, $cleaned, $bulk);
939         if (empty($blobs)) {
940             return true;
941         }
943         $id = (int)$dataobject->id;
945         foreach ($blobs as $key=>$value) {
946             $value = pg_escape_bytea($this->pgsql, $value);
947             $sql = "UPDATE {$this->prefix}$table SET $key = '$value'::bytea WHERE id = $id";
948             $this->query_start($sql, NULL, SQL_QUERY_UPDATE);
949             $result = pg_query($this->pgsql, $sql);
950             $this->query_end($result);
952             pg_free_result($result);
953         }
955         return true;
956     }
958     /**
959      * Set a single field in every table record which match a particular WHERE clause.
960      *
961      * @param string $table The database table to be checked against.
962      * @param string $newfield the field to set.
963      * @param string $newvalue the value to set the field to.
964      * @param string $select A fragment of SQL to be used in a where clause in the SQL call.
965      * @param array $params array of sql parameters
966      * @return bool true
967      * @throws dml_exception if error
968      */
969     public function set_field_select($table, $newfield, $newvalue, $select, array $params=null) {
971         if ($select) {
972             $select = "WHERE $select";
973         }
974         if (is_null($params)) {
975             $params = array();
976         }
977         list($select, $params, $type) = $this->fix_sql_params($select, $params);
978         $i = count($params)+1;
980     /// Get column metadata
981         $columns = $this->get_columns($table);
982         $column = $columns[$newfield];
984         $normalised_value = $this->normalise_value($column, $newvalue);
985         if (is_array($normalised_value) && array_key_exists('blob', $normalised_value)) {
986         /// Update BYTEA and return
987             $normalised_value = pg_escape_bytea($this->pgsql, $normalised_value['blob']);
988             $sql = "UPDATE {$this->prefix}$table SET $newfield = '$normalised_value'::bytea $select";
989             $this->query_start($sql, NULL, SQL_QUERY_UPDATE);
990             $result = pg_query_params($this->pgsql, $sql, $params);
991             $this->query_end($result);
992             pg_free_result($result);
993             return true;
994         }
996         if (is_null($normalised_value)) {
997             $newfield = "$newfield = NULL";
998         } else {
999             $newfield = "$newfield = \$".$i;
1000             $params[] = $normalised_value;
1001         }
1002         $sql = "UPDATE {$this->prefix}$table SET $newfield $select";
1004         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1005         $result = pg_query_params($this->pgsql, $sql, $params);
1006         $this->query_end($result);
1008         pg_free_result($result);
1010         return true;
1011     }
1013     /**
1014      * Delete one or more records from a table which match a particular WHERE clause.
1015      *
1016      * @param string $table The database table to be checked against.
1017      * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria).
1018      * @param array $params array of sql parameters
1019      * @return bool true
1020      * @throws dml_exception if error
1021      */
1022     public function delete_records_select($table, $select, array $params=null) {
1023         if ($select) {
1024             $select = "WHERE $select";
1025         }
1026         $sql = "DELETE FROM {$this->prefix}$table $select";
1028         list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1030         $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1031         $result = pg_query_params($this->pgsql, $sql, $params);
1032         $this->query_end($result);
1034         pg_free_result($result);
1036         return true;
1037     }
1039     public function sql_ilike() {
1040         return 'ILIKE';
1041     }
1043     public function sql_bitxor($int1, $int2) {
1044         return '(' . $this->sql_bitor($int1, $int2) . ' - ' . $this->sql_bitand($int1, $int2) . ')';
1045     }
1047     public function sql_cast_char2int($fieldname, $text=false) {
1048         return ' CAST(' . $fieldname . ' AS INT) ';
1049     }
1051     public function sql_cast_char2real($fieldname, $text=false) {
1052         return " $fieldname::real ";
1053     }
1055     public function sql_concat() {
1056         $arr = func_get_args();
1057         $s = implode(' || ', $arr);
1058         if ($s === '') {
1059             return " '' ";
1060         }
1061         return " $s ";
1062     }
1064     public function sql_concat_join($separator="' '", $elements=array()) {
1065         for ($n=count($elements)-1; $n > 0 ; $n--) {
1066             array_splice($elements, $n, 0, $separator);
1067         }
1068         $s = implode(' || ', $elements);
1069         if ($s === '') {
1070             return " '' ";
1071         }
1072         return " $s ";
1073     }
1075     public function sql_regex_supported() {
1076         return true;
1077     }
1079     public function sql_regex($positivematch=true) {
1080         return $positivematch ? '~*' : '!~*';
1081     }
1083 /// session locking
1084     public function session_lock_supported() {
1085         return true;
1086     }
1088     public function get_session_lock($rowid) {
1089         // NOTE: there is a potential locking problem for database running
1090         //       multiple instances of moodle, we could try to use pg_advisory_lock(int, int),
1091         //       luckily there is not a big chance that they would collide
1092         if (!$this->session_lock_supported()) {
1093             return;
1094         }
1096         parent::get_session_lock($rowid);
1097         $sql = "SELECT pg_advisory_lock($rowid)";
1098         $this->query_start($sql, null, SQL_QUERY_AUX);
1099         $result = pg_query($this->pgsql, $sql);
1100         $this->query_end($result);
1102         if ($result) {
1103             pg_free_result($result);
1104         }
1105     }
1107     public function release_session_lock($rowid) {
1108         if (!$this->session_lock_supported()) {
1109             return;
1110         }
1111         parent::release_session_lock($rowid);
1113         $sql = "SELECT pg_advisory_unlock($rowid)";
1114         $this->query_start($sql, null, SQL_QUERY_AUX);
1115         $result = pg_query($this->pgsql, $sql);
1116         $this->query_end($result);
1118         if ($result) {
1119             pg_free_result($result);
1120         }
1121     }
1123 /// transactions
1124     /**
1125      * on DBs that support it, switch to transaction mode and begin a transaction
1126      * you'll need to ensure you call commit_sql() or your changes *will* be lost.
1127      *
1128      * this is _very_ useful for massive updates
1129      */
1130     public function begin_sql() {
1131         if (!parent::begin_sql()) {
1132             return false;
1133         }
1134         $sql = "BEGIN ISOLATION LEVEL READ COMMITTED";
1135         $this->query_start($sql, NULL, SQL_QUERY_AUX);
1136         $result = pg_query($this->pgsql, $sql);
1137         $this->query_end($result);
1139         pg_free_result($result);
1140         return true;
1141     }
1143     /**
1144      * on DBs that support it, commit the transaction
1145      */
1146     public function commit_sql() {
1147         if (!parent::commit_sql()) {
1148             return false;
1149         }
1150         $sql = "COMMIT";
1151         $this->query_start($sql, NULL, SQL_QUERY_AUX);
1152         $result = pg_query($this->pgsql, $sql);
1153         $this->query_end($result);
1155         pg_free_result($result);
1156         return true;
1157     }
1159     /**
1160      * on DBs that support it, rollback the transaction
1161      */
1162     public function rollback_sql() {
1163         if (!parent::rollback_sql()) {
1164             return false;
1165         }
1166         $sql = "ROLLBACK";
1167         $this->query_start($sql, NULL, SQL_QUERY_AUX);
1168         $result = pg_query($this->pgsql, $sql);
1169         $this->query_end($result);
1171         pg_free_result($result);
1172         return true;
1173     }
1175     /**
1176      * Helper function trimming (whitespace + quotes) any string
1177      * needed because PG uses to enclose with double quotes some
1178      * fields in indexes definition and others
1179      *
1180      * @param string $str string to apply whitespace + quotes trim
1181      * @return string trimmed string
1182      */
1183     private function trim_quotes($str) {
1184         return trim(trim($str), "'\"");
1185     }