Faisal Khan
Faisal Khan

Reputation: 1

PARTITION BY keyword not recognized in SESSION function

I am currently working on developing a custom SQLGlot dialect for Apache Flink. To achieve this, I am extending the Hive dialect into my custom Flink dialect. I have successfully implemented support for functions like TUMBLE and HOP. However, I am encountering numerous errors while trying to implement support for the SESSION function.

I would greatly appreciate any assistance or guidance you can provide in resolving these issues. Below is the code structure and the implementation I have prepared so far:

**FlinkDialect.py : **

from sqlglot import exp
from sqlglot.dialects.hive import Hive
from sqlglot.generator import Generator
from sqlglot.tokens import Tokenizer, TokenType
from sqlglot.parser import Parser
from utility.custom import Tumble, Hop, Cumulate

class Flink(Hive):
    class Tokenizer(Tokenizer):
        # Inherit Hive's tokenizer and add TUMBLE and HOP as a keyword
        KEYWORDS = {
            **Hive.Tokenizer.KEYWORDS,
            "TUMBLE": TokenType.FUNCTION,
            "HOP": TokenType.FUNCTION,
            "CUMULATE": TokenType.FUNCTION,
        }

    class Generator(Generator):
        # Inherit Hive's generator and add special handling for TUMBLE function
        TRANSFORMS = {
            **Hive.Generator.TRANSFORMS,
            Tumble: lambda self, e: self.tumble(e),
            Hop: lambda self, e: self.hop(e),
            Cumulate: lambda self, e: self.cumulate(e),
        }

        def tumble(self, e: Tumble) -> str:
            args = [self.sql(e.this), self.sql(e.timecol), self.sql(e.size)]
            if e.offset:
                args.append(self.sql(e.offset))
            return f"TUMBLE({', '.join(args)})"
        

        def hop(self, e: Hop) -> str:
            args = [self.sql(e.this), self.sql(e.timecol), self.sql(e.slide), self.sql(e.size)]
            if e.offset:
                args.append(self.sql(e.offset))
            return f"HOP({', '.join(args)})"
        

        def cumulate(self, e: Cumulate) -> str:
            args = [self.sql(e.this), self.sql(e.timecol), self.sql(e.step), self.sql(e.size)]
            if e.offset:
                args.append(self.sql(e.offset))
            return f"CUMULATE({', '.join(args)})"

        # Add support for TUMBLE and HOP function in the SQL generator
        def sql(self, e: exp.Expression) -> str:
            if isinstance(e, Tumble):
                return self.tumble(e)
            elif isinstance(e, Hop):
                return self.hop(e)
            elif isinstance(e, Cumulate):
                return self.cumulate(e)
            return super().sql(e)

    class Parser(Parser):
        # Inherit Hive's parser and add special handling for TUMBLE function
        FUNCTIONS = {
            **Hive.Parser.FUNCTIONS,
            "TUMBLE": Tumble,
            "HOP": Hop,
            "CUMULATE": Cumulate,
        }

        def _parse_function(self, optional_parens=False, anonymous=False, any_token=False):
            if self._match(TokenType.FUNCTION, "TUMBLE"):
                self._match(TokenType.L_PAREN)
                this = self._parse_table()
                self._match(TokenType.COMMA)
                timecol = self._parse_column()
                self._match(TokenType.COMMA)
                size = self._parse_interval()
                offset = None
                if self._match(TokenType.COMMA):
                    offset = self._parse_interval()
                self._match(TokenType.R_PAREN)
                return Tumble(this, timecol, size, offset)
            elif self._match(TokenType.FUNCTION, "HOP"):
                self._match(TokenType.L_PAREN)
                this = self._parse_table()
                self._match(TokenType.COMMA)
                timecol = self._parse_column()
                self._match(TokenType.COMMA)
                slide = self._parse_interval()
                self._match(TokenType.COMMA)
                size = self._parse_interval()
                offset = None
                if self._match(TokenType.COMMA):
                    offset = self._parse_interval()
                self._match(TokenType.R_PAREN)
                return Hop(this, timecol, slide, size, offset)
            elif self._match(TokenType.FUNCTION, "CUMULATE"):
                self._match(TokenType.L_PAREN)
                this = self._parse_table()
                self._match(TokenType.COMMA)
                timecol = self._parse_column()
                self._match(TokenType.COMMA)
                step = self._parse_interval()
                self._match(TokenType.COMMA)
                size = self._parse_interval()
                offset = None
                if self._match(TokenType.COMMA):
                    offset = self._parse_interval()
                self._match(TokenType.R_PAREN)
                return Cumulate(this, timecol, step, size, offset)
            return super()._parse_function(optional_parens=optional_parens, anonymous=anonymous, any_token=any_token)

**and the custom.py file contains : **

from sqlglot import exp

class Tumble(exp.Expression):
    def __init__(self, this, timecol, size, offset=None):
        self.timecol = timecol
        self.size = size
        self.offset = offset
        self.args = {"this": this, "timecol": timecol, "size": size, "offset": offset}  # Set the args attribute as a dictionary
        self._type = "TUMBLE"  # Define the _type attribute
        self.comments = []  # Define the comments attribute
        self._id = id(self)  # Define the _id attribute

    def __repr__(self):
        args_str = ", ".join(f"{k}={v}" for k, v in self.args.items())
        return f"Tumble({args_str})"
    


class Hop(exp.Expression):
    def __init__(self, this, timecol, slide, size, offset=None):
        self.timecol = timecol
        self.slide = slide
        self.size = size
        self.offset = offset
        self.args = {"this": this, "timecol": timecol, "slide": slide, "size": size, "offset": offset}  
        self._type = "HOP"  
        self.comments = []  
        self._id = id(self)  

    def __repr__(self):
        args_str = ", ".join(f"{k}={v}" for k, v in self.args.items())
        return f"Hop({args_str})"
    


class Cumulate(exp.Expression):
    def __init__(self, this, timecol, step, size, offset=None):
        self.timecol = timecol
        self.step = step
        self.size = size
        self.offset = offset
        self.args = {"this": this, "timecol": timecol, "step": step, "size": size, "offset": offset}  
        self._type = "CUMULATE"  
        self.comments = []  
        self._id = id(self)  

    def __repr__(self):
        args_str = ", ".join(f"{k}={v}" for k, v in self.args.items())
        return f"Cumulate({args_str})"

def parse_one(sql, read):
    return read.parse(sql)

**Thanks in Advance :) **

What I Expected:

The SESSION function should be parsed correctly without errors, similar to how TUMBLE and HOP are handled.
Expected to correctly parse and handle queries using the SESSION function.

What Actually Resulted:

Encountered multiple errors during the parsing process.
Errors include syntax errors and unexpected token errors which prevent the correct parsing of the SESSION function.

Upvotes: 0

Views: 70

Answers (0)

Related Questions