MDL-14990 dml: added __destruct method, will be needed for logging
[moodle.git] / lib / dml / pgsql_native_moodle_database.php
CommitLineData
158622bd 1<?php //$Id$
2
3require_once($CFG->libdir.'/dml/moodle_database.php');
4require_once($CFG->libdir.'/dml/pgsql_native_moodle_recordset.php');
5
6/**
7 * Native pgsql class representing moodle database interface.
8 * @package dml
9 */
10class pgsql_native_moodle_database extends moodle_database {
11
db7aea38 12 protected $pgsql = null;
13 protected $debug = false;
14 protected $bytea_oid = null;
158622bd 15
16 /**
17 * Detects if all needed PHP stuff installed.
18 * Note: can be used before connect()
19 * @return mixed true if ok, string if something
20 */
21 public function driver_installed() {
22 if (!extension_loaded('pgsql')) {
23 return get_string('pgsqlextensionisnotpresentinphp', 'install');
24 }
25 return true;
26 }
27
28 /**
29 * Returns database family type - describes SQL dialect
30 * Note: can be used before connect()
31 * @return string db family name (mysql, postgres, mssql, oracle, etc.)
32 */
33 public function get_dbfamily() {
34 return 'postgres';
35 }
36
37 /**
38 * Returns more specific database driver type
39 * Note: can be used before connect()
40 * @return string db type mysql, pgsql, postgres7
41 */
42 protected function get_dbtype() {
43 return 'pgsql';
44 }
45
46 /**
47 * Returns general database library name
48 * Note: can be used before connect()
49 * @return string db type adodb, pdo, native
50 */
51 protected function get_dblibrary() {
52 return 'native';
53 }
54
55 /**
56 * Returns localised database type name
57 * Note: can be used before connect()
58 * @return string
59 */
60 public function get_name() {
61 return get_string('nativepgsql', 'install'); // TODO: localise
62 }
63
64 /**
65 * Returns localised database description
66 * Note: can be used before connect()
67 * @return string
68 */
69 public function get_configuration_hints() {
70 return get_string('databasesettingssub_postgres7', 'install'); // TODO: improve
71 }
72
73 /**
74 * Connect to db
75 * Must be called before other methods.
76 * @param string $dbhost
77 * @param string $dbuser
78 * @param string $dbpass
79 * @param string $dbname
158622bd 80 * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used
81 * @param array $dboptions driver specific options
82 * @return bool success
83 */
beaa43db 84 public function connect($dbhost, $dbuser, $dbpass, $dbname, $prefix, array $dboptions=null) {
158622bd 85 global $CFG;
86
beaa43db 87 $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions);
158622bd 88
89 //TODO: handle both port and socket connection
90 $this->pgsql = pg_connect("host='{$this->dbhost}' user='{$this->dbuser}' password='{$this->dbpass}' dbname='{$this->dbname}'");
91 $status = pg_connection_status($this->pgsql);
92 if ($status === PGSQL_CONNECTION_BAD) {
93 $this->pgsql = null;
94 return false;
95 }
96 pg_set_client_encoding($this->pgsql, 'utf8');
db7aea38 97 // find out the bytea oid
98 $sql = "select oid from pg_type where typname = 'bytea'";
99 $result = pg_query($this->pgsql, $sql);
100 if ($result === false) {
101 return false;
102 }
103 $this->bytea_oid = pg_fetch_result($result, 0);
104 pg_free_result($result);
105 if ($this->bytea_oid === false) {
106 return false;
107 }
158622bd 108 return true;
109 }
110
111 /**
112 * Close database connection and release all resources
113 * and memory (especially circular memory references).
114 * Do NOT use connect() again, create a new instance if needed.
115 */
116 public function dispose() {
117 if ($this->pgsql) {
118 pg_close($this->pgsql);
119 $this->pgsql = null;
120 }
121 parent::dispose();
122 }
123
124 /**
125 * Returns database server info array
126 * @return array
127 */
128 public function get_server_info() {
129 static $info;
130 if (!$info) {
131 $info = pg_version($this->pgsql);
132 }
133 return array('description'=>$info['server'], 'version'=>$info['server']);
134 }
135
136 protected function is_min_version($version) {
137 $server = $this->get_server_info();
138 $server = $server['version'];
139 return version_compare($server, $version, '>=');
140 }
141
142 /**
143 * Returns supported query parameter types
144 * @return bitmask
145 */
146 protected function allowed_param_types() {
147 return SQL_PARAMS_DOLLAR;
148 }
149
150 /**
151 * Returns last error reported by database engine.
152 */
153 public function get_last_error() {
154 return pg_last_error($this->pgsql);
155 }
156
157 /**
158 * Return tables in database WITHOUT current prefix
159 * @return array of table names in lowercase and without prefix
160 */
161 public function get_tables() {
162 $this->reads++;
163 $tables = array();
164 $prefix = str_replace('_', '\\\\_', $this->prefix);
165 $sql = "SELECT tablename
166 FROM pg_catalog.pg_tables
167 WHERE tablename LIKE '$prefix%'";
168 if ($result = pg_query($this->pgsql, $sql)) {
169 while ($row = pg_fetch_row($result)) {
170 $tablename = reset($row);
171 if (strpos($tablename, $this->prefix) !== 0) {
172 continue;
173 }
174 $tablename = substr($tablename, strlen($this->prefix));
175 $tables[$tablename] = $tablename;
176 }
177 pg_free_result($result);
178 }
179 return $tables;
180 }
181
182 /**
183 * Return table indexes - everything lowercased
184 * @return array of arrays
185 */
186 public function get_indexes($table) {
187 $indexes = array();
188 $tablename = $this->prefix.$table;
189
190 $sql = "SELECT *
191 FROM pg_catalog.pg_indexes
192 WHERE tablename = '$tablename'";
193 if ($result = pg_query($this->pgsql, $sql)) {
194 while ($row = pg_fetch_assoc($result)) {
195 if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON '.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) {
196 continue;
197 }
198 if ($matches[4] === 'id') {
199 continue;
200 }
80ffbad3 201 $columns = explode(',', $matches[4]);
202 $columns = array_map('trim', $columns);
158622bd 203 $indexes[$matches[2]] = array('unique'=>!empty($matches[1]),
80ffbad3 204 'columns'=>$columns);
158622bd 205 }
206 pg_free_result($result);
207 }
208 return $indexes;
209 }
210
211 /**
212 * Returns datailed information about columns in table. This information is cached internally.
213 * @param string $table name
214 * @param bool $usecache
215 * @return array array of database_column_info objects indexed with column names
216 */
217 public function get_columns($table, $usecache=true) {
218 if ($usecache and isset($this->columns[$table])) {
219 return $this->columns[$table];
220 }
221
222 $this->columns[$table] = array();
223
224 $tablename = $this->prefix.$table;
225
226 $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef, d.adsrc
227 FROM pg_catalog.pg_class c
228 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
229 JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
230 LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum)
231 WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0
232 ORDER BY a.attnum";
233
234 if (!$result = pg_query($this->pgsql, $sql)) {
235 return array();
236 }
237 while ($rawcolumn = pg_fetch_object($result)) {
238
239 $info = new object();
240 $info->name = $rawcolumn->field;
241 $matches = null;
242
243 if ($rawcolumn->type === 'varchar') {
244 //TODO add some basic enum support here
245 $info->type = 'varchar';
246 $info->meta_type = 'C';
247 $info->max_length = $rawcolumn->atttypmod - 4;
248 $info->scale = null;
298d9250 249 $info->not_null = ($rawcolumn->attnotnull === 't');
250 $info->has_default = ($rawcolumn->atthasdef === 't');
158622bd 251 if ($info->has_default) {
252 $parts = explode('::', $rawcolumn->adsrc);
253 if (count($parts) > 1) {
254 $info->default_value = reset($parts);
255 $info->default_value = trim($info->default_value, "'");
256 } else {
257 $info->default_value = $rawcolumn->adsrc;
258 }
259 } else {
260 $info->default_value = null;
261 }
262 $info->primary_key = false;
263 $info->binary = false;
264 $info->unsigned = null;
265 $info->auto_increment= false;
266 $info->unique = null;
267
268 } else if (preg_match('/int(\d)/i', $rawcolumn->type, $matches)) {
269 $info->type = 'int';
270 if (strpos($rawcolumn->adsrc, 'nextval') === 0) {
271 $info->primary_key = true;
272 $info->meta_type = 'R';
273 $info->unique = true;
274 $info->auto_increment= true;
275 $info->has_default = false;
276 } else {
277 $info->primary_key = false;
278 $info->meta_type = 'I';
279 $info->unique = null;
280 $info->auto_increment= false;
298d9250 281 $info->has_default = ($rawcolumn->atthasdef === 't');
158622bd 282 }
283 $info->max_length = $matches[1];
284 $info->scale = null;
298d9250 285 $info->not_null = ($rawcolumn->attnotnull === 't');
158622bd 286 if ($info->has_default) {
287 $info->default_value = $rawcolumn->adsrc;
288 } else {
289 $info->default_value = null;
290 }
291 $info->binary = false;
292 $info->unsigned = false;
293
294 } else if ($rawcolumn->type === 'numeric') {
295 $info->type = $rawcolumn->type;
296 $info->meta_type = 'N';
297 $info->primary_key = false;
298 $info->binary = false;
299 $info->unsigned = null;
300 $info->auto_increment= false;
301 $info->unique = null;
298d9250 302 $info->not_null = ($rawcolumn->attnotnull === 't');
303 $info->has_default = ($rawcolumn->atthasdef === 't');
158622bd 304 if ($info->has_default) {
305 $info->default_value = $rawcolumn->adsrc;
306 } else {
307 $info->default_value = null;
308 }
309 $info->max_length = $rawcolumn->atttypmod >> 16;
310 $info->scale = ($rawcolumn->atttypmod & 0xFFFF) - 4;
311
312 } else if (preg_match('/float(\d)/i', $rawcolumn->type, $matches)) {
313 $info->type = 'float';
314 $info->meta_type = 'N';
315 $info->primary_key = false;
316 $info->binary = false;
317 $info->unsigned = null;
318 $info->auto_increment= false;
319 $info->unique = null;
298d9250 320 $info->not_null = ($rawcolumn->attnotnull === 't');
321 $info->has_default = ($rawcolumn->atthasdef === 't');
158622bd 322 if ($info->has_default) {
323 $info->default_value = $rawcolumn->adsrc;
324 } else {
325 $info->default_value = null;
326 }
327 // just guess expected number of deciaml places :-(
328 if ($matches[1] == 8) {
329 // total 15 digits
330 $info->max_length = 8;
331 $info->scale = 7;
332 } else {
333 // total 6 digits
334 $info->max_length = 4;
335 $info->scale = 2;
336 }
337
338 } else if ($rawcolumn->type === 'text') {
339 $info->type = $rawcolumn->type;
340 $info->meta_type = 'X';
341 $info->max_length = -1;
342 $info->scale = null;
298d9250 343 $info->not_null = ($rawcolumn->attnotnull === 't');
344 $info->has_default = ($rawcolumn->atthasdef === 't');
158622bd 345 if ($info->has_default) {
346 $parts = explode('::', $rawcolumn->adsrc);
347 if (count($parts) > 1) {
348 $info->default_value = reset($parts);
349 $info->default_value = trim($info->default_value, "'");
350 } else {
351 $info->default_value = $rawcolumn->adsrc;
352 }
353 } else {
354 $info->default_value = null;
355 }
356 $info->primary_key = false;
357 $info->binary = false;
358 $info->unsigned = null;
359 $info->auto_increment= false;
360 $info->unique = null;
361
362 } else if ($rawcolumn->type === 'bytea') {
363 $info->type = $rawcolumn->type;
364 $info->meta_type = 'B';
365 $info->max_length = -1;
366 $info->scale = null;
298d9250 367 $info->not_null = ($rawcolumn->attnotnull === 't');
158622bd 368 $info->has_default = false;
369 $info->default_value = null;
370 $info->primary_key = false;
371 $info->binary = true;
372 $info->unsigned = null;
373 $info->auto_increment= false;
374 $info->unique = null;
375
376 }
377
378 $this->columns[$table][$info->name] = new database_column_info($info);
379 }
380
e4f9c142 381 pg_free_result($result);
382
158622bd 383 return $this->columns[$table];
384 }
385
386 /**
387 * Reset a sequence to the id field of a table.
388 * @param string $table name of table
389 * @return success
390 */
391 public function reset_sequence($table) {
392 if (!$this->get_manager()->table_exists($table)) {
393 return false;
394 }
395 $value = (int)$this->get_field_sql('SELECT MAX(id) FROM {'.$table.'}');
396 $value++;
397 return $this->change_database_structure("ALTER SEQUENCE $this->prefix{$table}_id_seq RESTART WITH $value");
398 }
399
400 /**
401 * Is db in unicode mode?
402 * @return bool
403 */
404 public function setup_is_unicodedb() {
405 /// Get PostgreSQL server_encoding value
406 $this->reads++;
407 if (!$result = pg_query($this->pgsql, "SHOW server_encoding")) {
408 return false;
409 }
410 $rawcolumn = pg_fetch_object($result);
411 $encoding = $rawcolumn->server_encoding;
412 pg_free_result($result);
413
414 return (strtoupper($encoding) == 'UNICODE' || strtoupper($encoding) == 'UTF8');
415 }
416
417 /**
418 * Enable/disable very detailed debugging
419 * @param bool $state
420 */
421 public function set_debug($state) {
422 $this->debug = $state;
423 }
424
425 /**
426 * Returns debug status
427 * @return bool $state
428 */
429 public function get_debug() {
430 return $this->debug;
431 }
432
433 /**
434 * Enable/disable detailed sql logging
435 * @param bool $state
436 */
437 public function set_logging($state) {
438 //TODO
439 }
440
441 /**
442 * Do NOT use in code, to be used by database_manager only!
443 * @param string $sql query
444 * @return bool success
445 */
446 public function change_database_structure($sql) {
447 $this->writes++;
448 $this->print_debug($sql);
449 $result = pg_query($this->pgsql, $sql);
450 $this->reset_columns();
451 if ($result === false) {
452 $this->report_error($sql);
453 return false;
454 }
e4f9c142 455 pg_free_result($result);
158622bd 456 return true;
457 }
458
459 /**
460 * Execute general sql query. Should be used only when no other method suitable.
461 * Do NOT use this to make changes in db structure, use database_manager::execute_sql() instead!
462 * @param string $sql query
463 * @param array $params query parameters
464 * @return bool success
465 */
466 public function execute($sql, array $params=null) {
467 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
468
469 if (strpos($sql, ';') !== false) {
470 debugging('Error: Multiple sql statements found or bound parameters not used properly in query!');
471 return false;
472 }
473
474 $this->writes++;
475 $this->print_debug($sql, $params);
476 $result = pg_query_params($this->pgsql, $sql, $params);
477
478 if ($result === false) {
479 $this->report_error($sql, $params);
480 return false;
481
482 }
e4f9c142 483 pg_free_result($result);
158622bd 484 return true;
485 }
486
487 /**
488 * Get a number of records as a moodle_recordset using a SQL statement.
489 *
490 * Since this method is a little less readable, use of it should be restricted to
491 * code where it's possible there might be large datasets being returned. For known
492 * small datasets use get_records_sql - it leads to simpler code.
493 *
494 * The return type is as for @see function get_recordset.
495 *
496 * @param string $sql the SQL select query to execute.
497 * @param array $params array of sql parameters
498 * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
499 * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
500 * @return mixed an moodle_recorset object, or false if an error occured.
501 */
502 public function get_recordset_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
503 if ($limitfrom or $limitnum) {
504 $limitfrom = (int)$limitfrom;
505 $limitnum = (int)$limitnum;
506 if ($limitnum < 1) {
507 $limitnum = "18446744073709551615";
508 }
509 $sql .= " LIMIT $limitnum OFFSET $limitfrom";
510 }
511
512 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
513
514 $this->reads++;
515 $this->print_debug($sql, $params);
516 $result = pg_query_params($this->pgsql, $sql, $params);
517
518 if ($result === false) {
519 $this->report_error($sql, $params);
520 return false;
521 }
522
523 return $this->create_recordset($result);
524 }
525
526 protected function create_recordset($result) {
db7aea38 527 return new pgsql_native_moodle_recordset($result, $this->bytea_oid);
158622bd 528 }
529
530 /**
531 * Get a number of records as an array of objects using a SQL statement.
532 *
533 * Return value as for @see function get_records.
534 *
535 * @param string $sql the SQL select query to execute. The first column of this SELECT statement
536 * must be a unique value (usually the 'id' field), as it will be used as the key of the
537 * returned array.
538 * @param array $params array of sql parameters
539 * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
540 * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
541 * @return mixed an array of objects, or empty array if no records were found, or false if an error occured.
542 */
543 public function get_records_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
544 if ($limitfrom or $limitnum) {
545 $limitfrom = (int)$limitfrom;
546 $limitnum = (int)$limitnum;
547 if ($limitnum < 1) {
548 $limitnum = "18446744073709551615";
549 }
550 $sql .= " LIMIT $limitnum OFFSET $limitfrom";
551 }
552
553 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
554 $this->reads++;
555 $this->print_debug($sql, $params);
556 $result = pg_query_params($this->pgsql, $sql, $params);
557
558 if ($result === false) {
559 $this->report_error($sql, $params);
560 return false;
561 }
db7aea38 562 // find out if there are any blobs
563 $numrows = pg_num_fields($result);
564 $blobs = array();
565 for($i=0; $i<$numrows; $i++) {
566 $type_oid = pg_field_type_oid($result, $i);
567 if ($type_oid == $this->bytea_oid) {
568 $blobs[] = pg_field_name($result, $i);
569 }
570 }
158622bd 571
572 $rows = pg_fetch_all($result);
573 pg_free_result($result);
574
575 $return = array();
576 if ($rows) {
577 foreach ($rows as $row) {
578 $id = reset($row);
db7aea38 579 if ($blobs) {
580 foreach ($blobs as $blob) {
581 $row[$blob] = pg_unescape_bytea($row[$blob]);
582 }
583 }
158622bd 584 $return[$id] = (object)$row;
585 }
586 }
db7aea38 587
158622bd 588 return $return;
589 }
590
591 /**
592 * Selects records and return values (first field) as an array using a SQL statement.
593 *
594 * @param string $sql The SQL query
595 * @param array $params array of sql parameters
596 * @return mixed array of values or false if an error occured
597 */
598 public function get_fieldset_sql($sql, array $params=null) {
599 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
600
601 $this->reads++;
602 $this->print_debug($sql, $params);
603 $result = pg_query_params($this->pgsql, $sql, $params);
604
605 if ($result === false) {
606 $this->report_error($sql, $params);
607 return false;
608 }
609
610 $return = pg_fetch_all_columns($result, 0);
611 pg_free_result($result);
612
613 return $return;
614 }
615
616 /**
617 * Insert new record into database, as fast as possible, no safety checks, lobs not supported.
618 * @param string $table name
619 * @param mixed $params data record as object or array
620 * @param bool $returnit return it of inserted record
621 * @param bool $bulk true means repeated inserts expected
622 * @param bool $customsequence true if 'id' included in $params, disables $returnid
623 * @return mixed success or new id
624 */
625 public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) {
626 if (!is_array($params)) {
627 $params = (array)$params;
628 }
629
630 $returning = "";
631
632 if ($customsequence) {
633 if (!isset($params['id'])) {
634 return false;
635 }
636 $returnid = false;
637 } else {
638 if ($returnid) {
639 if ($this->is_min_version('8.2.0')) {
640 $returning = "RETURNING id";
641 unset($params['id']);
642 } else {
643 //ugly workaround for pg < 8.2
644 $this->reads++;
645 $seqsql = "SELECT NEXTVAL({$this->prefix}{$table}_id_seq) AS id";
646 $result = pg_query($this->pgsql, $seqsql);
647 if ($result === false) {
648 throw new dml_exception('missingidsequence', "{$this->prefix}{$table}"); // TODO: add localised string
649 }
650 $row = pg_fetch_assoc($result);
651 $params['id'] = reset($row);
652 pg_free_result($result);
653 }
654 } else {
655 unset($params['id']);
656 }
657 }
658
659 if (empty($params)) {
660 return false;
661 }
662
663 $fields = implode(',', array_keys($params));
664 $values = array();
665 $count = count($params);
666 for ($i=1; $i<=$count; $i++) {
667 $values[] = "\$".$i;
668 }
669 $values = implode(',', $values);
670
671 $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning";
672 $this->writes++;
673 $this->print_debug($sql, $params);
674 $result = pg_query_params($this->pgsql, $sql, $params);
675
676 if ($result === false) {
677 $this->report_error($sql, $params);
678 return false;
679 }
680
681 if ($returning !== "") {
682 $row = pg_fetch_assoc($result);
683 $params['id'] = reset($row);
158622bd 684 }
e4f9c142 685 pg_free_result($result);
158622bd 686
687 if (!$returnid) {
688 return true;
689 }
690
691 return (int)$params['id'];
692 }
693
694 /**
695 * Insert a record into a table and return the "id" field if required.
696 *
697 * Some conversions and safety checks are carried out. Lobs are supported.
698 * If the return ID isn't required, then this just reports success as true/false.
699 * $data is an object containing needed data
700 * @param string $table The database table to be inserted into
701 * @param object $data A data object with values for one or more fields in the record
702 * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned.
703 * @return mixed success or new ID
704 */
705 public function insert_record($table, $dataobject, $returnid=true, $bulk=false) {
706 if (!is_object($dataobject)) {
707 $dataobject = (object)$dataobject;
708 }
709
710 $columns = $this->get_columns($table);
711
712 unset($dataobject->id);
713 $cleaned = array();
714 $blobs = array();
715
716 foreach ($dataobject as $field=>$value) {
717 if (!isset($columns[$field])) {
718 continue;
719 }
720 $column = $columns[$field];
721 if ($column->meta_type == 'B') {
722 if (is_null($value)) {
723 $cleaned[$field] = null;
724 } else {
725 $blobs[$field] = $value;
726 $cleaned[$field] = '@#BLOB#@';
727 }
728 continue;
729
730 } else if (is_bool($value)) {
731 $value = (int)$value; // prevent false '' problems
732
733 } else if ($value === '') {
734 if ($column->meta_type == 'I' or $column->meta_type == 'F' or $column->meta_type == 'N') {
735 $value = 0; // prevent '' problems in numeric fields
736 }
737 }
738
739 $cleaned[$field] = $value;
740 }
741
742 if (empty($cleaned)) {
743 return false;
744 }
745
746 if (empty($blobs)) {
747 return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
748 }
749
750 if (!$id = $this->insert_record_raw($table, $cleaned, true, $bulk)) {
751 return false;
752 }
753
754 foreach ($blobs as $key=>$value) {
755 $this->writes++;
756 $value = pg_escape_bytea($this->pgsql, $value);
757 $sql = "UPDATE {$this->prefix}$table SET $key = '$value'::bytea WHERE id = $id";
e4f9c142 758 $result = pg_query($this->pgsql, $sql);
759 if ($result !== false) {
760 pg_free_result($result);
761 }
158622bd 762 }
763
764 return ($returnid ? $id : true);
765
766 }
767
768 /**
769 * Import a record into a table, id field is required.
770 * Safety checks are NOT carried out. Lobs are supported.
771 *
772 * @param string $table name of database table to be inserted into
773 * @param object $dataobject A data object with values for one or more fields in the record
774 * @return bool success
775 */
776 public function import_record($table, $dataobject) {
777 $dataobject = (object)$dataobject;
778
779 if (empty($dataobject->id)) {
780 return false;
781 }
782
783 $columns = $this->get_columns($table);
784 $cleaned = array();
785
786 foreach ($dataobject as $field=>$value) {
787 if (!isset($columns[$field])) {
788 continue;
789 }
790 $cleaned[$field] = $value;
791 }
792
793 return $this->insert_record_raw($table, $cleaned, false, true, true);
794 }
795
796 /**
797 * Update record in database, as fast as possible, no safety checks, lobs not supported.
798 * @param string $table name
799 * @param mixed $params data record as object or array
800 * @param bool true means repeated updates expected
801 * @return bool success
802 */
803 public function update_record_raw($table, $params, $bulk=false) {
804 if (!is_array($params)) {
805 $params = (array)$params;
806 }
807 if (!isset($params['id'])) {
808 return false;
809 }
810 $id = $params['id'];
811 unset($params['id']);
812
813 if (empty($params)) {
814 return false;
815 }
816
817 $i = 1;
818
819 $sets = array();
820 foreach ($params as $field=>$value) {
821 $sets[] = "$field = \$".$i++;
822 }
823
824 $params[] = $id; // last ? in WHERE condition
825
826 $sets = implode(',', $sets);
827 $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i;
828
829 $this->writes++;
830 $this->print_debug($sql, $params);
831 $result = pg_query_params($this->pgsql, $sql, $params);
832
833 if ($result === false) {
834 $this->report_error($sql, $params);
835 return false;
836 }
837
e4f9c142 838 pg_free_result($result);
158622bd 839 return true;
840 }
841
842 /**
843 * Update a record in a table
844 *
845 * $dataobject is an object containing needed data
846 * Relies on $dataobject having a variable "id" to
847 * specify the record to update
848 *
849 * @param string $table The database table to be checked against.
850 * @param object $dataobject An object with contents equal to fieldname=>fieldvalue. Must have an entry for 'id' to map to the table specified.
851 * @param bool true means repeated updates expected
852 * @return bool success
853 */
854 public function update_record($table, $dataobject, $bulk=false) {
855 if (!is_object($dataobject)) {
856 $dataobject = (object)$dataobject;
857 }
858
859 if (!isset($dataobject->id) ) {
860 return false;
861 }
862
d246cdd2 863 $id = (int)$dataobject->id;
864
158622bd 865 $columns = $this->get_columns($table);
866 $cleaned = array();
d246cdd2 867 $blobs = array();
158622bd 868
869 foreach ($dataobject as $field=>$value) {
870 if (!isset($columns[$field])) {
871 continue;
872 }
d246cdd2 873 $column = $columns[$field];
874 if ($column->meta_type == 'B') {
875 if (is_null($value)) {
876 $cleaned[$field] = null;
877 } else {
878 $blobs[$field] = $value;
879 $cleaned[$field] = '@#BLOB#@';
880 }
881 continue;
882
883 } else if (is_bool($value)) {
884 $value = (int)$value; // prevent false '' problems
885
886 } else if ($value === '') {
887 if ($column->meta_type == 'I' or $column->meta_type == 'F' or $column->meta_type == 'N') {
888 $value = 0; // prevent '' problems in numeric fields
889 }
158622bd 890 }
d246cdd2 891
158622bd 892 $cleaned[$field] = $value;
893 }
894
d246cdd2 895 if (!$this->update_record_raw($table, $cleaned, $bulk)) {
896 return false;
897 }
898
899 if (empty($blobs)) {
900 return true;
901 }
902
903 foreach ($blobs as $key=>$value) {
904 $this->writes++;
905 $value = pg_escape_bytea($this->pgsql, $value);
906 $sql = "UPDATE {$this->prefix}$table SET $key = '$value'::bytea WHERE id = $id";
907 $result = pg_query($this->pgsql, $sql);
908 if ($result === false) {
909 return false;
910 }
911 pg_free_result($result);
912 }
913
914 return true;
158622bd 915 }
916
917 /**
918 * Set a single field in every table record which match a particular WHERE clause.
919 *
920 * @param string $table The database table to be checked against.
921 * @param string $newfield the field to set.
922 * @param string $newvalue the value to set the field to.
923 * @param string $select A fragment of SQL to be used in a where clause in the SQL call.
924 * @param array $params array of sql parameters
925 * @return bool success
926 */
927 public function set_field_select($table, $newfield, $newvalue, $select, array $params=null) {
928 if ($select) {
929 $select = "WHERE $select";
930 }
931 if (is_null($params)) {
932 $params = array();
933 }
934 list($select, $params, $type) = $this->fix_sql_params($select, $params);
935 $i = count($params)+1;
936
937 if (is_bool($newvalue)) {
938 $newvalue = (int)$newvalue; // prevent "false" problems
939 }
940 if (is_null($newvalue)) {
941 $newfield = "$newfield = NULL";
942 } else {
943 $newfield = "$newfield = \$".$i;
944 $params[] = $newvalue;
945 }
946 $sql = "UPDATE {$this->prefix}$table SET $newfield $select";
947
948 $this->writes++;
949 $this->print_debug($sql, $params);
950 $result = pg_query_params($this->pgsql, $sql, $params);
951
952 if ($result === false) {
953 $this->report_error($sql, $params);
954 return false;
955 }
e4f9c142 956 pg_free_result($result);
158622bd 957
958 return true;
959 }
960
961 /**
962 * Delete one or more records from a table which match a particular WHERE clause.
963 *
964 * @param string $table The database table to be checked against.
965 * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria).
966 * @param array $params array of sql parameters
967 * @return returns success.
968 */
969 public function delete_records_select($table, $select, array $params=null) {
970 if ($select) {
971 $select = "WHERE $select";
972 }
973 $sql = "DELETE FROM {$this->prefix}$table $select";
974
975 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
976
977 $this->writes++;
978 $this->print_debug($sql, $params);
979 $result = pg_query_params($this->pgsql, $sql, $params);
980
981 if ($result === false) {
982 $this->report_error($sql, $params);
983 return false;
984 }
e4f9c142 985 pg_free_result($result);
158622bd 986
987 return true;
988 }
989
990 public function sql_ilike() {
991 return 'ILIKE';
992 }
993
994 public function sql_bitxor($int1, $int2) {
995 return '(' . $this->sql_bitor($int1, $int2) . ' - ' . $this->sql_bitand($int1, $int2) . ')';
996 }
997
998 public function sql_cast_char2int($fieldname, $text=false) {
999 return ' CAST(' . $fieldname . ' AS INT) ';
1000 }
1001
1002 public function sql_cast_char2real($fieldname, $text=false) {
1003 return " $fieldname::real ";
1004 }
1005
1006 public function sql_concat() {
1007 $arr = func_get_args();
1008 $s = implode(' || ', $arr);
1009 if ($s === '') {
1010 return " '' ";
1011 }
1012 return " $s ";
1013 }
1014
1015 public function sql_concat_join($separator="' '", $elements=array()) {
1016 for ($n=count($elements)-1; $n > 0 ; $n--) {
1017 array_splice($elements, $n, 0, $separator);
1018 }
1019 $s = implode(' || ', $elements);
1020 if ($s === '') {
1021 return " '' ";
1022 }
1023 return " $s ";
1024 }
1025
1026 public function sql_substr() {
1027 return "SUBSTRING";
1028 }
1029
1030 public function sql_regex_supported() {
1031 return true;
1032 }
1033
1034 public function sql_regex($positivematch=true) {
1035 return $positivematch ? '~*' : '!~*';
1036 }
1037
fb76304b 1038/// transactions
1039 /**
1040 * on DBs that support it, switch to transaction mode and begin a transaction
1041 * you'll need to ensure you call commit_sql() or your changes *will* be lost.
1042 *
1043 * this is _very_ useful for massive updates
1044 */
1045 public function begin_sql() {
2958a49b 1046 $result = pg_query($this->pgsql, "BEGIN ISOLATION LEVEL READ COMMITTED");
fb76304b 1047 if ($result === false) {
1048 return false;
1049 }
1050 pg_free_result($result);
1051 return true;
1052 }
1053
1054 /**
1055 * on DBs that support it, commit the transaction
1056 */
1057 public function commit_sql() {
2958a49b 1058 $result = pg_query($this->pgsql, "COMMIT");
fb76304b 1059 if ($result === false) {
1060 return false;
1061 }
1062 pg_free_result($result);
1063 return true;
1064 }
1065
1066 /**
1067 * on DBs that support it, rollback the transaction
1068 */
1069 public function rollback_sql() {
2958a49b 1070 $result = pg_query($this->pgsql, "ROLLBACK");
fb76304b 1071 if ($result === false) {
1072 return false;
1073 }
1074 pg_free_result($result);
1075 return true;
1076 }
158622bd 1077}