<?php

/**
 * A class that implements the Slony 1.0.x support plugin
 *
 * $Id: Slony.php,v 1.15 2007/10/02 21:36:35 ioguix Exp $
 */

include_once('./classes/plugins/Plugin.php');

class Slony extends Plugin {

	var $slony_version;
	var $slony_schema;
	var $slony_cluster;
	var $slony_owner;
	var $slony_comment;
	var $enabled = null;
	
	/**
	 * Constructor
	 */
	function Slony() {
		$this->Plugin('slony');
		$this->isEnabled();
	}

	/**
	 * Determines whether or not Slony is installed in the current
	 * database.
	 * @post Will populate version and schema fields, etc.
	 * @return True if Slony is installed, false otherwise.
	 */
	function isEnabled() {
		// Access cache
		if ($this->enabled !== null) return $this->enabled;
		else $this->enabled = false;
		
		global $data;
		
		// Check for the slonyversion() function and find the schema
		// it's in.  We put an order by and limit 1 in here to guarantee
		// only finding the first one, even if there are somehow two
		// Slony schemas.
		$sql = "SELECT pn.nspname AS schema, pu.usename AS owner, 
                    SUBSTRING(pn.nspname FROM 2) AS cluster,
                    pg_catalog.obj_description(pn.oid, 'pg_namespace') AS 
                    nspcomment
                    FROM pg_catalog.pg_proc pp, pg_catalog.pg_namespace pn, 
                    pg_catalog.pg_user pu 
					WHERE pp.pronamespace=pn.oid
					AND pn.nspowner = pu.usesysid
					AND pp.proname='slonyversion'
					AND pn.nspname LIKE '@_%' ESCAPE '@'  
					ORDER BY pn.nspname LIMIT 1";
		$rs = $data->selectSet($sql);
		if ($rs->recordCount() == 1) {
			$schema = $rs->fields['schema'];
			$this->slony_schema = $schema;
			$this->slony_owner = $rs->fields['owner'];
			$this->slony_comment = $rs->fields['nspcomment'];
			// Cluster name is schema minus "_" prefix.
			$this->slony_cluster = $rs->fields['cluster'];
			$data->fieldClean($schema);
			$sql = "SELECT \"{$schema}\".slonyversion() AS version";
			$version = $data->selectField($sql, 'version');
			if ($version === -1) return false;
			else {
				$this->slony_version = $version;
				$this->enabled = true;
				return true;
			}
		}
		else return false;
	}	

	// CLUSTERS

	/**
	 * Gets the clusters in this database
	 */
	function getClusters() {
		include_once('./classes/ArrayRecordSet.php');

		if ($this->isEnabled()) {
			$clusters = array(array('cluster' => $this->slony_cluster, 'comment' => $this->slony_comment));
		}
		else
			$clusters = array();

		return new ArrayRecordSet($clusters);
	}
	
	/**
	 * Gets a single cluster
	 */
	function getCluster() {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($no_id);
		
		$sql = "SELECT no_id, no_comment, \"{$schema}\".slonyversion() AS version
					FROM \"{$schema}\".sl_local_node_id, \"{$schema}\".sl_node
					WHERE no_id=last_value";
		
		
		return $data->selectSet($sql);
	}

	/**
	 * Drops an entire cluster.
	 */
	function dropCluster() {
		global $data;
		
		$schema = $this->slony_schema;
		$data->fieldClean($schema);

		$sql = "SELECT \"{$schema}\".uninstallnode(); DROP SCHEMA \"{$schema}\" CASCADE";
		
		$status = $data->execute($sql);
		if ($status == 0) {
			$this->enabled = null;
			$enabled = $this->isEnabled();
		}
		return $status;
	}
	
	/**
	 * Helper function to get a file into a string and replace
	 * variables.
	 * @return The file contents, or FALSE on error.
	 */
	function _getFile($file, $cluster) {
		global $data,$misc;
		$schema = '_' . $cluster;
		$data->fieldClean($cluster);
		
		$server_info = $misc->getServerInfo();
		$path = $server_info['slony_sql'] . '/' . $file;
		
		// Check that we can access the file
		if (!file_exists($path) || !is_readable($path)) return false;

		$buffer = null;
		$handle = fopen($path, 'r');
		if ($handle === false) return false;
		while (!feof($handle)) {
		   $temp = fgets($handle, 4096);
		   $temp = str_replace('@CLUSTERNAME@', $cluster, $temp);
		   
		   $temp = str_replace('@NAMESPACE@', $schema, $temp);
		   $buffer .= $temp;
		}
		fclose($handle);
		
		return $buffer;
	}
	
	/**
	 * Initializes a new cluster
	 */
	function initCluster($name, $no_id, $no_comment) {
		global $data, $misc;
		
		// Prevent timeouts since cluster initialization can be slow
		if (!ini_get('safe_mode')) set_time_limit(0);

		$server_info = $misc->getServerInfo();
		
		if (!$data->isSuperUser($server_info['username'])) {
			return -10;
		}

		// Determine Slony compatibility version.
		if ($data->major_version == 7.3)
			$ver = '73';
		elseif ($data->major_version >= 7.4)
			$ver = '74';
		else {
			return -11;
		}
		
		$status = $data->beginTransaction();
		if ($status != 0) return -1;

		// Create the schema
		$status = $data->createSchema('_' . $name);
		if ($status != 0) {
			$data->rollbackTransaction();
			return -2;
		}

		$sql = $this->_getFile("xxid.v{$ver}.sql", $name);
		if ($sql === false) {
			$data->rollbackTransaction();
			return -6;
		}			
		$status = $data->execute($sql);
		if ($status != 0) {
			$data->rollbackTransaction();
			return -3;
		}
		
		$sql = $this->_getFile('slony1_base.sql', $name);
		if ($sql === false) {
			$data->rollbackTransaction();
			return -6;
		}			
		$status = $data->execute($sql);
		if ($status != 0) {
			$data->rollbackTransaction();
			return -3;
		}
/* THIS FILE IS EMPTY AND JUST CAUSES ERRORS
		$sql = $this->_getFile('slony1_base.v74.sql', $name);
		$status = $data->execute($sql);
		if ($status != 0) {
			$data->rollbackTransaction();
			return -3;
		}
*/
		$sql = $this->_getFile('slony1_funcs.sql', $name);
		if ($sql === false) {
			$data->rollbackTransaction();
			return -6;
		}			
		$status = $data->execute($sql);
		if ($status != 0) {
			$data->rollbackTransaction();
			return -3;
		}

		$sql = $this->_getFile("slony1_funcs.v{$ver}.sql", $name);
		if ($sql === false) {
			$data->rollbackTransaction();
			return -6;
		}			
		$status = $data->execute($sql);
		if ($status != 0) {
			$data->rollbackTransaction();
			return -3;
		}
		
		$this->enabled = null;
		$enabled = $this->isEnabled();
		if (!$enabled) {
			$data->rollbackTransaction();
			return -4;
		}
		
		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($no_id);
		$data->clean($no_comment);

		$sql = "SELECT \"{$schema}\".initializelocalnode('{$no_id}', '{$no_comment}'); SELECT \"{$schema}\".enablenode('{$no_id}')";
		$status = $data->execute($sql);
		if ($status != 0) {
			$data->rollbackTransaction();
			return -5;
		}
		
		return $data->endTransaction();
	}
	
	// NODES

	/**
	 * Gets the nodes in this database
	 */
	function getNodes() {
		global $data;
		
		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		
		// We use 10 seconds as the default check time since that is the
		// the default in Slony, and it gives no mechanism to look it up
		$sql = "SELECT no_id, no_active, no_comment, no_spool, ".
				"CASE WHEN st_lag_time > '10 seconds'::interval ". 
				"THEN 'outofsync' ELSE 'insync' END AS no_status ".
				"FROM \"{$schema}\".sl_node ".
				"LEFT JOIN \"{$schema}\".sl_status ON (no_id =st_received) ".
				"ORDER BY no_comment";

		return $data->selectSet($sql);
	}

	/**
	 * Gets a single node
	 */
	function getNode($no_id) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($no_id);
		
		$sql = "SELECT * FROM \"{$schema}\".sl_node WHERE no_id='{$no_id}'";
		
		return $data->selectSet($sql);
	}
	
	/**
	 * Creates a node
	 */
	function createNode($no_id, $no_comment) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($no_comment);
		$data->clean($no_id);
		
		if ($no_id != '')
			$sql = "SELECT \"{$schema}\".storenode('{$no_id}', '{$no_comment}')";
		else
			$sql = "SELECT \"{$schema}\".storenode((SELECT COALESCE(MAX(no_id), 0) + 1 FROM \"{$schema}\".sl_node), '{$no_comment}')";

		return $data->execute($sql);
	}

	/**
	 * Drops a node
	 */
	function dropNode($no_id) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($no_id);

		$sql = "SELECT \"{$schema}\".dropnode('{$no_id}')";

		return $data->execute($sql);
	}	
	
	// REPLICATION SETS
	
	/**
	 * Gets the replication sets in this database
	 */
	function getReplicationSets() {
		global $data;
		
		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		
		$sql = "SELECT *, set_locked IS NOT NULL AS is_locked FROM \"{$schema}\".sl_set ORDER BY set_id";
		
		return $data->selectSet($sql);
	}

	/**
	 * Gets a particular replication set
	 */
	function getReplicationSet($set_id) {
		global $data;
		
		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($set_id);
		
		$sql = "SELECT *, (SELECT COUNT(*) FROM \"{$schema}\".sl_subscribe ssub WHERE ssub.sub_set=ss.set_id) AS subscriptions,
					set_locked IS NOT NULL AS is_locked
					FROM \"{$schema}\".sl_set ss, \"{$schema}\".sl_node sn
					WHERE ss.set_origin=sn.no_id
					AND set_id='{$set_id}'";
		
		return $data->selectSet($sql);
	}

	/**
	 * Creates a set
	 */
	function createReplicationSet($set_id, $set_comment) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($set_comment);
		$data->clean($set_id);

		if ($set_id != '')
			$sql = "SELECT \"{$schema}\".storeset('{$set_id}', '{$set_comment}')";
		else
			$sql = "SELECT \"{$schema}\".storeset((SELECT COALESCE(MAX(set_id), 0) + 1 FROM \"{$schema}\".sl_set), '{$set_comment}')";

		return $data->execute($sql);
	}

	/**
	 * Drops a set
	 */
	function dropReplicationSet($set_id) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($set_id);

		$sql = "SELECT \"{$schema}\".dropset('{$set_id}')";

		return $data->execute($sql);
	}	

	/**
	 * Locks or unlocks a set
	 * @param boolean $lock True to lock, false to unlock
	 */
	function lockReplicationSet($set_id, $lock) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($set_id);

		if ($lock)
			$sql = "SELECT \"{$schema}\".lockset('{$set_id}')";
		else
			$sql = "SELECT \"{$schema}\".unlockset('{$set_id}')";		

		return $data->execute($sql);
	}	
		
	/**
	 * Merges two sets
	 */
	function mergeReplicationSet($set_id, $target) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($set_id);
		$data->clean($target);

		$sql = "SELECT \"{$schema}\".mergeset('{$target}', '{$set_id}')";

		return $data->execute($sql);
	}

	/**
	 * Moves a set to a new origin
	 */
	function moveReplicationSet($set_id, $new_origin) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($set_id);
		$data->clean($new_origin);

		$sql = "SELECT \"{$schema}\".moveset('{$set_id}', '{$new_origin}')";

		return $data->execute($sql);
	}

	/**
	 * Executes schema changing DDL set on nodes
	 */
	function executeReplicationSet($set_id, $script) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($set_id);
		$data->clean($script);

		$sql = "SELECT \"{$schema}\".ddlscript('{$set_id}', '{$script}')";

		return $data->execute($sql);
	}	
	
	// TABLES
	
	/**
	 * Return all tables in a replication set
	 * @param $set_id The ID of the replication set
	 * @return Tables in the replication set, sorted alphabetically 
	 */
	function getTables($set_id) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($set_id);		

		$sql = "SELECT st.tab_id, c.relname, n.nspname, n.nspname||'.'||c.relname AS qualname,
					pg_catalog.pg_get_userbyid(c.relowner) AS relowner, 
					reltuples::bigint";
		// Tablespace
		if ($data->hasTablespaces()) {
			$sql .= ", (SELECT spcname FROM pg_catalog.pg_tablespace pt WHERE pt.oid=c.reltablespace) AS tablespace";
		}
		$sql .= " FROM pg_catalog.pg_class c, \"{$schema}\".sl_table st, pg_catalog.pg_namespace n
					WHERE c.oid=st.tab_reloid
					AND c.relnamespace=n.oid
					AND st.tab_set='{$set_id}'
					ORDER BY n.nspname, c.relname";

		return $data->selectSet($sql);
	}

	/**
	 * Adds a table to a replication set
	 */
	function addTable($set_id, $tab_id, $nspname, $relname, $idxname, $comment, $storedtriggers) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($set_id);
		$data->clean($tab_id);
		$fqname = $nspname . '.' . $relname;
		$data->clean($fqname);
		$data->clean($nspname);
		$data->clean($relname);
		$data->clean($idxname);
		$data->clean($comment);

		$hastriggers = (sizeof($storedtriggers) > 0);
		if ($hastriggers) {
			// Begin a transaction
			$status = $data->beginTransaction();
			if ($status != 0) return -1;
		}

		if ($tab_id != '')
			$sql = "SELECT \"{$schema}\".setaddtable('{$set_id}', '{$tab_id}', '{$fqname}', '{$idxname}', '{$comment}')";
		else {
			$sql = "SELECT \"{$schema}\".setaddtable('{$set_id}', (SELECT COALESCE(MAX(tab_id), 0) + 1 FROM \"{$schema}\".sl_table), '{$fqname}', '{$idxname}', '{$comment}')";			
		}

		$status = $data->execute($sql);	
		if ($status != 0) {
			if ($hastriggers) $data->rollbackTransaction();
			return -3;
		}		

		// If we are storing triggers, we need to know the tab_id that was assigned to the table
		if ($tab_id == '' && $hastriggers) {
			$sql = "SELECT tab_id
						FROM \"{$schema}\".sl_table
						WHERE tab_set='{$set_id}'
						AND tab_reloid=(SELECT pc.oid FROM pg_catalog.pg_class pc, pg_namespace pn 
												WHERE pc.relnamespace=pn.oid AND pc.relname='{$relname}'
												AND pn.nspname='{$nspname}')";
			$tab_id = $data->selectField($sql, 'tab_id');
			if ($tab_id === -1) {
				$data->rollbackTransaction();
				return -4;
			}
		}
		
		// Store requested triggers
		if ($hastriggers) {
			foreach ($storedtriggers as $tgname) {
				$data->clean($tgname);
				$sql = "SELECT \"{$schema}\".storetrigger('{$tab_id}', '{$tgname}')";
				$status = $data->execute($sql);	
				if ($status != 0) {
					$data->rollbackTransaction();
					return -5;
				}		
			}				
		}

		if ($hastriggers)
			return $data->endTransaction();
		else
			return $status;
	}
		
	/**
	 * Drops a table from a replication set
	 */
	function dropTable($tab_id) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($tab_id);

		$sql = "SELECT \"{$schema}\".setdroptable('{$tab_id}')";

		return $data->execute($sql);
	}		

	/**
	 * Moves a table to another replication set
	 */
	function moveTable($tab_id, $new_set_id) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($tab_id);
		$data->clean($new_set_id);

		$sql = "SELECT \"{$schema}\".setmovetable('{$tab_id}', '{$new_set_id}')";

		return $data->execute($sql);
	}		

    /**
     * Return all tables we are not current in a replication set
     */
    function getNonRepTables()
    {
		global $data;
        /*
         * we cannot just query pg_tables as we want the OID of the table 
         * for the subquery against the slony table. We could match on
         * on schema name and table name, but the slony info isn't updated
         * if the user renames a table which is in a replication set
         */
        $sql = "SELECT c.relname, n.nspname, n.nspname||'.'||c.relname AS qualname, 
                    pg_get_userbyid(c.relowner) AS relowner
                    FROM pg_class c
                    LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
                    WHERE c.relkind = 'r' AND n.nspname NOT IN ('pg_catalog', 
                    'information_schema', 'pg_toast') AND
                    NOT(n.nspname = '_{$this->slony_cluster}' AND
                    relname LIKE 'sl_%') AND 
                    NOT EXISTS(SELECT 1 FROM _{$this->slony_cluster}.sl_table s
                    WHERE s.tab_reloid = c.oid)
                    ORDER BY n.nspname, c.relname";
        return $data->selectSet($sql);
    }

	// SEQUENCES
	
	/**
	 * Return all sequences in a replication set
	 * @param $set_id The ID of the replication set
	 * @return Sequences in the replication set, sorted alphabetically 
	 */
	function getSequences($set_id) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($set_id);		

		$sql = "SELECT ss.seq_id, c.relname AS seqname, n.nspname, n.nspname||'.'||c.relname AS qualname,
						pg_catalog.obj_description(c.oid, 'pg_class') AS seqcomment,
						pg_catalog.pg_get_userbyid(c.relowner) AS seqowner 
					FROM pg_catalog.pg_class c, \"{$schema}\".sl_sequence ss, pg_catalog.pg_namespace n
					WHERE c.oid=ss.seq_reloid
					AND c.relnamespace=n.oid
					AND ss.seq_set='{$set_id}'
					ORDER BY n.nspname, c.relname";

		return $data->selectSet($sql);
	}

	/**
	 * Adds a sequence to a replication set
	 */
	function addSequence($set_id, $seq_id, $fqname, $comment) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($set_id);
		$data->clean($seq_id);
		$data->clean($fqname);
		$data->clean($comment);

		if ($seq_id != '')
			$sql = "SELECT \"{$schema}\".setaddsequence('{$set_id}', '{$seq_id}', '{$fqname}', '{$comment}')";
		else
			$sql = "SELECT \"{$schema}\".setaddsequence('{$set_id}', (SELECT COALESCE(MAX(seq_id), 0) + 1 FROM \"{$schema}\".sl_sequence), '{$fqname}', '{$comment}')";

		return $data->execute($sql);	}		
		
	/**
	 * Drops a sequence from a replication set
	 */
	function dropSequence($seq_id) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($seq_id);

		$sql = "SELECT \"{$schema}\".setdropsequence('{$seq_id}')";

		return $data->execute($sql);
	}		

	/**
	 * Moves a sequence to another replication set
	 */
	function moveSequence($seq_id, $new_set_id) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($seq_id);
		$data->clean($new_set_id);

		$sql = "SELECT \"{$schema}\".setmovesequence('{$seq_id}', '{$new_set_id}')";

		return $data->execute($sql);
	}		
	
	// SUBSCRIPTIONS
	
	/**
	 * Gets all nodes subscribing to a set
	 * @param $set_id The ID of the replication set
	 * @return Nodes subscribing to this set
	 */
	function getSubscribedNodes($set_id) {
		global $data;
		
		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($set_id);		
		
		$sql = "SELECT sn.*, ss.sub_set
					FROM \"{$schema}\".sl_subscribe ss, \"{$schema}\".sl_node sn
					WHERE ss.sub_set='{$set_id}'
					AND ss.sub_receiver = sn.no_id
					ORDER BY sn.no_comment";
		
		return $data->selectSet($sql);
	}

	/**
	 * Gets all nodes subscribing to a set
	 * @param $set_id The ID of the replication set
	 * @return Nodes subscribing to this set
	 */
	function getSubscription($set_id, $no_id) {
		global $data;
		
		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($set_id);
		$data->clean($no_id);
		
		$sql = "SELECT ss.*, sn.no_comment AS receiver, sn2.no_comment AS provider
					FROM \"{$schema}\".sl_subscribe ss, \"{$schema}\".sl_node sn, \"{$schema}\".sl_node sn2
					WHERE ss.sub_set='{$set_id}'
					AND ss.sub_receiver = sn.no_id
					AND ss.sub_provider = sn2.no_id
					AND sn.no_id='{$no_id}'";
		
		return $data->selectSet($sql);
	}
	
	// NODES
	
	/**
	 * Gets node paths
	 */
	function getPaths($no_id) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($no_id);
		
		$sql = "SELECT * FROM \"{$schema}\".sl_path sp, \"{$schema}\".sl_node sn
					WHERE sp.pa_server=sn.no_id 
					AND sp.pa_client='{$no_id}'
					ORDER BY sn.no_comment";
		
		return $data->selectSet($sql);
	}

	/**
	 * Gets node path details
	 */
	function getPath($no_id, $path_id) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($no_id);
		$data->clean($path_id);
		
		$sql = "SELECT * FROM \"{$schema}\".sl_path sp, \"{$schema}\".sl_node sn
					WHERE sp.pa_server=sn.no_id 
					AND sp.pa_client='{$no_id}'
					AND sn.no_id='{$path_id}'";
		
		return $data->selectSet($sql);
	}

	/**
	 * Creates a path
	 */
	function createPath($no_id, $server, $conn, $retry) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($no_id);
		$data->clean($server);
		$data->clean($conn);
		$data->clean($retry);

		$sql = "SELECT \"{$schema}\".storepath('{$server}', '{$no_id}', '{$conn}', '{$retry}')";

		return $data->execute($sql);
	}

	/**
	 * Drops a path
	 */
	function dropPath($no_id, $path_id) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($no_id);
		$data->clean($path_id);

		$sql = "SELECT \"{$schema}\".droppath('{$path_id}', '{$no_id}')";

		return $data->execute($sql);
	}	
	
	// LISTENS
	
	/**
	 * Gets node listens
	 */
	function getListens($no_id) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($no_id);
		
		$sql = "SELECT * FROM \"{$schema}\".sl_listen sl, \"{$schema}\".sl_node sn
					WHERE sl.li_provider=sn.no_id 
					AND sl.li_receiver='{$no_id}'
					ORDER BY sn.no_comment";
		
		return $data->selectSet($sql);
	}

	/**
	 * Gets node listen details
	 */
	function getListen($no_id, $listen_id) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($no_id);
		$data->clean($listen_id);
		
		$sql = "SELECT sl.*, sn.*, sn2.no_comment AS origin FROM \"{$schema}\".sl_listen sl, \"{$schema}\".sl_node sn, \"{$schema}\".sl_node sn2
					WHERE sl.li_provider=sn.no_id 
					AND sl.li_receiver='{$no_id}'
					AND sn.no_id='{$listen_id}'
					AND sn2.no_id=sl.li_origin";
		
		return $data->selectSet($sql);
	}

	/**
	 * Creates a listen
	 */
	function createListen($no_id, $origin, $provider) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($no_id);
		$data->clean($origin);
		$data->clean($provider);

		$sql = "SELECT \"{$schema}\".storelisten('{$origin}', '{$provider}', '{$no_id}')";

		return $data->execute($sql);
	}

	/**
	 * Drops a listen
	 */
	function dropListen($no_id, $origin, $provider) {
		global $data;

		$schema = $this->slony_schema;
		$data->fieldClean($schema);
		$data->clean($no_id);
		$data->clean($origin);
		$data->clean($provider);

		$sql = "SELECT \"{$schema}\".droplisten('{$origin}', '{$provider}', '{$no_id}')";

		return $data->execute($sql);
	}	
	
	// ACTIONS
	
	

}

?>