wfgeo
wfgeo

Reputation: 3118

Is this psycopg2 code safe from injection?

I have a class that I am writing that allows used to perform various spatial analysis on postGIS tables. Because of this, the users have to be able to select them by name, which I get from parameters. I understand the danger of allowing user input like this, but I don't have a choice.

I make sure to sanitize the table name ahead of time with another function. I do this by checking if the input string for that parameter matches a list of retrieved table names from the database. Then I pass it using AsIs(), which I know is not recommended, but like i said, I verified the table name ahead of time by seeing if it is an existing table in the database. But I still have on parameter left, a code representing the spatial coordinate system.

I am trying to write an injection myself to see if this is an issue. I am not using AsIs() for this variable but I am paranoid and want to make sure it is safe. I have not been able to pass a variable that is able to perform an injection (I try to drop a tale called "deletetest").

This is my code:

class myClass(object):

    def __init__(self, conn_string, srid):

        self.connString = conn_string
        self.conn = psycopg2.connect(self.connString)
        self.srid = srid

        return None

    def sanitized(self, input_text):

        """
        Makes sure that the input matches an existing table name to make sure that the input name is not an SQL
        injection attempt.  True if the table name is found, False if not.
        :param input_text: String to be sanitized.
        :return: boolean
        """

        query = "SELECT relname FROM pg_class WHERE relkind='r' AND relname !~ '^(pg_|sql_)';"

        cur = self.conn.cursor()
        cur.execute(query)

        for tbl in [i[0] for i in cur.fetchall()]:
            if input_text == tbl:
                return True

        return False

    def interallocate(self, features):

        if self.sanitized(features):

            query = """

                    DROP TABLE IF EXISTS parking_lots_interallocation_result;
                    CREATE TABLE parking_lots_interallocation_result (pk_id SERIAL PRIMARY KEY, from_pl_id varchar(50), to_pl_id varchar(50), distance real);
                    SELECT AddGeometryColumn('public', 'parking_lots_interallocation_result', 'geom', %(srid)s, 'LINESTRING', 2);

                    DROP TABLE IF EXISTS interallocation_duplicate;
                    CREATE TABLE interallocation_duplicate AS TABLE %(features)s;

                    INSERT INTO parking_lots_interallocation_result (from_pl_id, to_pl_id, distance, geom)
                      SELECT
                        %(features)s.pl_id AS from_pl_id,
                        interallocation_duplicate.pl_id AS to_pl_id,
                        ST_Distance(%(features)s.geom, interallocation_duplicate.geom) AS distance,
                        ST_ShortestLine(%(features)s.geom, interallocation_duplicate.geom) AS geom
                      FROM
                        %(features)s
                      LEFT JOIN
                        interallocation_duplicate ON ST_DWithin(%(features)s.geom, interallocation_duplicate.geom, 700)
                      WHERE
                        interallocation_duplicate.pl_id IS NOT NULL AND %(features)s.pl_id != interallocation_duplicate.pl_id
                      ORDER BY
                        %(features)s.pl_id,
                        ST_Distance(%(features)s.geom, interallocation_duplicate.geom);

                    """

            print(query)

            cur = self.conn.cursor()
            cur.execute(query, {
                'features': AsIs(features), # Can use AsIs because we made sure that this string matches an existing table name.
                'srid': self.srid})
            self.conn.commit()

        else:
            raise KeyError('Table {0} was not found.'.format(features))

Now as far as I am aware, using cur.execute() SHOULD sanitize the inputs, and using AsIs() is what bypasses this step. But I would like to get other opinions to know if this is still open to injection.

Upvotes: 1

Views: 2913

Answers (1)

Clodoaldo Neto
Clodoaldo Neto

Reputation: 125444

Use the sql module:

features = 'Table_Name'
insert_query = sql.SQL("""
INSERT INTO parking_lots_interallocation_result (from_pl_id, to_pl_id, distance, geom)
SELECT
    {0}.pl_id AS from_pl_id,
    interallocation_duplicate.pl_id AS to_pl_id,
    ST_Distance({0}.geom, interallocation_duplicate.geom) AS distance,
    ST_ShortestLine({0}.geom, interallocation_duplicate.geom) AS geom
FROM
    {0}
    LEFT JOIN
    interallocation_duplicate ON ST_DWithin({0}.geom, interallocation_duplicate.geom, 700)
WHERE
    interallocation_duplicate.pl_id IS NOT NULL AND {0}.pl_id != interallocation_duplicate.pl_id
ORDER BY
    {0}.pl_id,
    ST_Distance({0}.geom, interallocation_duplicate.geom);
""")

print (insert_query.format(sql.Identifier(features)).as_string(conn))

Output:

INSERT INTO parking_lots_interallocation_result (from_pl_id, to_pl_id, distance, geom)
SELECT
    "Table_Name".pl_id AS from_pl_id,
    interallocation_duplicate.pl_id AS to_pl_id,
    ST_Distance("Table_Name".geom, interallocation_duplicate.geom) AS distance,
    ST_ShortestLine("Table_Name".geom, interallocation_duplicate.geom) AS geom
FROM
    "Table_Name"
    LEFT JOIN
    interallocation_duplicate ON ST_DWithin("Table_Name".geom, interallocation_duplicate.geom, 700)
WHERE
    interallocation_duplicate.pl_id IS NOT NULL AND "Table_Name".pl_id != interallocation_duplicate.pl_id
ORDER BY
    "Table_Name".pl_id,
    ST_Distance("Table_Name".geom, interallocation_duplicate.geom);

Upvotes: 1

Related Questions