ach
ach

Reputation: 1

Making comparing 2 tables faster (Postgres/SQLAlchemy)

I wrote a code in python to manipulate a table I have in my database. I am doing so using SQL Alchemy. Basically I have table 1 that has 2 500 000 entries. I have another table 2 with 200 000 entries. Basically what I am trying to do, is compare my source ip and dest ip in table 1 with source ip and dest ip in table 2. if there is a match, I replace the ip source and ip dest in table 1 with a data that matches ip source and ip dest in table 2 and I add the entry in table 3. My code also checks if the entry isn't already in the new table. If so, it skips it and then goes on with the next row. My problem is its extremely slow. I launched my script yesterday and in 24 hours it only went through 47 000 entries out of 2 500 000. I am wondering if there are anyways I can speed up the process. It's a postgres db and I can't tell if the script taking this much time is reasonable or if something is up. If anyone had a similar experience with something like this, how much time did it take before completion ? Many thanks.

session = Session()
i = 0
start_id = 1
flows = session.query(Table1).filter(Table1.id >= start_id).all()
result_number = len(flows)
vlan_list = {"['0050']", "['0130']", "['0120']", "['0011']", "['0110']"}
while i < result_number:
    for flow in flows:
        if flow.vlan_destination in vlan_list:
            usage = session.query(Table2).filter(Table2.ip ==
                                                                                     str(flow.ip_destination)).all()
            if len(usage) > 0:
                usage = usage[0].usage
            else:
                usage = str(flow.ip_destination)
            usage_ip_src = session.query(Table2).filter(Table2.ip ==
                                                                                                    str(flow.ip_source)).all()
            if len(usage_ip_src) > 0:
                usage_ip_src = usage_ip_src[0].usage
            else:
                usage_ip_src = str(flow.ip_source)
            if flow.protocol == "17":
                protocol = func.REPLACE(flow.protocol, "17", 'UDP')
            elif flow.protocol == "1":
                protocol = func.REPLACE(flow.protocol, "1", 'ICMP')
            elif flow.protocol == "6":
                protocol = func.REPLACE(flow.protocol, "6", 'TCP')
            else:
                protocol = flow.protocol
            is_in_db = session.query(Table3).filter(Table3.protocol ==
                                                                                            protocol)\
                .filter(Table3.application == flow.application)\
                .filter(Table3.destination_port == flow.destination_port)\
                .filter(Table3.vlan_destination == flow.vlan_destination)\
                .filter(Table3.usage_source == usage_ip_src)\
                .filter(Table3.state == flow.state)\
                .filter(Table3.usage_destination == usage).count()
            if is_in_db == 0:
                to_add = Table3(usage_ip_src, usage, protocol, flow.application, flow.destination_port,
                                                flow.vlan_destination, flow.state)
                session.add(to_add)
                session.flush()
                session.commit()
                print("added " + str(i))
            else:
                print("usage already in DB")
        i = i + 1

session.close()

EDIT As requested, here are more details : Table 1 has 11 columns, the two we are interested in are source ip and dest ip. Table 1 Here, I have Table 2 :Table 2. It has an IP and a Usage. What my script is doing is that it takes source ip and dest ip from table one and looks up if there is a match in Table 2. If so, it replaces the ip address by usage, and adds this along with some of the columns of Table 1 in Table 3 :[Table3][3] Along doing this, when adding the protocol column into Table 3, it writes the protocol name instead of the number, just to make it more readable.

EDIT 2 I am trying to think about this differently, so I did a diagram of my problem Diagram (X problem) What I am trying to figure out is if my code (Y solution) is working as intended. I've been coding in python for a month only and I feel like I am messing something up. My code is supposed to take every row from my Table 1, compare it to Table 2 and add data to table 3. My Table one has over 2 million entries and it's understandable that it should take a while but its too slow. For example, when I had to load the data from the API to the db, it went faster than the comparisons im trying to do with everything that is already in the db. I am running my code on a virtual machine that has sufficient memory so I am sure it's my code that is lacking and I need direction to as what can be improved. Screenshots of my tables:

Table 2 Table2

Table 3 Table3

Table 1 Table1

EDIT 3 : Postgresql QUERY

SELECT
  coalesce(table2_1.usage, table1.ip_source) AS coalesce_1,
  coalesce(table2_2.usage, table1.ip_destination) AS coalesce_2,
  CASE table1.protocol WHEN %(param_1) s THEN %(param_2) s WHEN %(param_3) s THEN %(param_4) s WHEN %(param_5) s THEN %(param_6) s ELSE table1.protocol END AS anon_1,
  table1.application AS table1_application,
  table1.destination_port AS table1_destination_port,
  table1.vlan_destination AS table1_vlan_destination,
  table1.state AS table1_state
FROM
  table1
  LEFT OUTER JOIN table2 AS table2_2 ON table2_2.ip = table1.ip_destination
  LEFT OUTER JOIN table2 AS table2_1 ON table2_1.ip = table1.ip_source
WHERE
  table1.vlan_destination IN (
    %(vlan_destination_1) s,
    %(vlan_destination_2) s,
    %(vlan_destination_3) s,
    %(vlan_destination_4) s,
    %(vlan_destination_5) s
  )
  AND NOT (
    EXISTS (
      SELECT
        1
      FROM
        table3
      WHERE
        table3.usage_source = coalesce(table2_1.usage, table1.ip_source)
        AND table3.usage_destination = coalesce(table2_2.usage, table1.ip_destination)
        AND table3.protocol = CASE table1.protocol WHEN %(param_1) s THEN %(param_2) s WHEN %(param_3) s THEN %(param_4) s WHEN %(param_5) s THEN %(param_6) s ELSE table1.protocol END
        AND table3.application = table1.application
        AND table3.destination_port = table1.destination_port
        AND table3.vlan_destination = table1.vlan_destination
        AND table3.state = table1.state
    )
  )


Upvotes: 0

Views: 602

Answers (1)

Ilja Everil&#228;
Ilja Everil&#228;

Reputation: 52949

Given the current question, I think this at least comes close to what you might be after. The idea is to perform the entire operation in the database, instead of fetching everything – the whole 2,500,000 rows – and filtering in Python etc.:

from sqlalchemy import func, case
from sqlalchemy.orm import aliased


def newhotness(session, vlan_list):
    # The query needs to join Table2 twice, so it has to be aliased
    dst = aliased(Table2)
    src = aliased(Table2)

    # Prepare required SQL expressions
    usage = func.coalesce(dst.usage, Table1.ip_destination)
    usage_ip_src = func.coalesce(src.usage, Table1.ip_source)
    protocol = case({"17": "UDP",
                     "1": "ICMP",
                     "6": "TCP"},
                    value=Table1.protocol,
                    else_=Table1.protocol)

    # Form a query producing the data to insert to Table3
    flows = session.query(
            usage_ip_src,
            usage,
            protocol,
            Table1.application,
            Table1.destination_port,
            Table1.vlan_destination,
            Table1.state).\
        outerjoin(dst, dst.ip == Table1.ip_destination).\
        outerjoin(src, src.ip == Table1.ip_source).\
        filter(Table1.vlan_destination.in_(vlan_list),
               ~session.query(Table3).
                   filter_by(usage_source=usage_ip_src,
                             usage_destination=usage,
                             protocol=protocol,
                             application=Table1.application,
                             destination_port=Table1.destination_port,
                             vlan_destination=Table1.vlan_destination,
                             state=Table1.state).
                   exists())

    stmt = insert(Table3).from_select(
        ["usage_source", "usage_destination", "protocol", "application",
         "destination_port", "vlan_destination", "state"],
        flows)

    return session.execute(stmt)

If the vlan_list is selective, or in other words filters out most rows, this will perform a lot less operations in the database. Depending on the size of Table2 you may benefit from indexing Table2.ip, but do test first. If it is relatively small, I would guess that PostgreSQL will perform a hash or nested loop join there. If some column of the ones used to filter out duplicates in Table3 is unique, you could perform an INSERT ... ON CONFLICT ... DO NOTHING instead of removing duplicates in the SELECT using the NOT EXISTS subquery expression (which PostgreSQL will perform as an antijoin). If there is a possibility that the flows query may produce duplicates, add a call to Query.distinct() to it.

Upvotes: 1

Related Questions