Reputation: 1
I am currently working on developing a custom SQLGlot dialect for Apache Flink. To achieve this, I am extending the Hive dialect into my custom Flink dialect. I have successfully implemented support for functions like TUMBLE and HOP. However, I am encountering numerous errors while trying to implement support for the SESSION function.
I would greatly appreciate any assistance or guidance you can provide in resolving these issues. Below is the code structure and the implementation I have prepared so far:
**FlinkDialect.py : **
from sqlglot import exp
from sqlglot.dialects.hive import Hive
from sqlglot.generator import Generator
from sqlglot.tokens import Tokenizer, TokenType
from sqlglot.parser import Parser
from utility.custom import Tumble, Hop, Cumulate
class Flink(Hive):
class Tokenizer(Tokenizer):
# Inherit Hive's tokenizer and add TUMBLE and HOP as a keyword
KEYWORDS = {
**Hive.Tokenizer.KEYWORDS,
"TUMBLE": TokenType.FUNCTION,
"HOP": TokenType.FUNCTION,
"CUMULATE": TokenType.FUNCTION,
}
class Generator(Generator):
# Inherit Hive's generator and add special handling for TUMBLE function
TRANSFORMS = {
**Hive.Generator.TRANSFORMS,
Tumble: lambda self, e: self.tumble(e),
Hop: lambda self, e: self.hop(e),
Cumulate: lambda self, e: self.cumulate(e),
}
def tumble(self, e: Tumble) -> str:
args = [self.sql(e.this), self.sql(e.timecol), self.sql(e.size)]
if e.offset:
args.append(self.sql(e.offset))
return f"TUMBLE({', '.join(args)})"
def hop(self, e: Hop) -> str:
args = [self.sql(e.this), self.sql(e.timecol), self.sql(e.slide), self.sql(e.size)]
if e.offset:
args.append(self.sql(e.offset))
return f"HOP({', '.join(args)})"
def cumulate(self, e: Cumulate) -> str:
args = [self.sql(e.this), self.sql(e.timecol), self.sql(e.step), self.sql(e.size)]
if e.offset:
args.append(self.sql(e.offset))
return f"CUMULATE({', '.join(args)})"
# Add support for TUMBLE and HOP function in the SQL generator
def sql(self, e: exp.Expression) -> str:
if isinstance(e, Tumble):
return self.tumble(e)
elif isinstance(e, Hop):
return self.hop(e)
elif isinstance(e, Cumulate):
return self.cumulate(e)
return super().sql(e)
class Parser(Parser):
# Inherit Hive's parser and add special handling for TUMBLE function
FUNCTIONS = {
**Hive.Parser.FUNCTIONS,
"TUMBLE": Tumble,
"HOP": Hop,
"CUMULATE": Cumulate,
}
def _parse_function(self, optional_parens=False, anonymous=False, any_token=False):
if self._match(TokenType.FUNCTION, "TUMBLE"):
self._match(TokenType.L_PAREN)
this = self._parse_table()
self._match(TokenType.COMMA)
timecol = self._parse_column()
self._match(TokenType.COMMA)
size = self._parse_interval()
offset = None
if self._match(TokenType.COMMA):
offset = self._parse_interval()
self._match(TokenType.R_PAREN)
return Tumble(this, timecol, size, offset)
elif self._match(TokenType.FUNCTION, "HOP"):
self._match(TokenType.L_PAREN)
this = self._parse_table()
self._match(TokenType.COMMA)
timecol = self._parse_column()
self._match(TokenType.COMMA)
slide = self._parse_interval()
self._match(TokenType.COMMA)
size = self._parse_interval()
offset = None
if self._match(TokenType.COMMA):
offset = self._parse_interval()
self._match(TokenType.R_PAREN)
return Hop(this, timecol, slide, size, offset)
elif self._match(TokenType.FUNCTION, "CUMULATE"):
self._match(TokenType.L_PAREN)
this = self._parse_table()
self._match(TokenType.COMMA)
timecol = self._parse_column()
self._match(TokenType.COMMA)
step = self._parse_interval()
self._match(TokenType.COMMA)
size = self._parse_interval()
offset = None
if self._match(TokenType.COMMA):
offset = self._parse_interval()
self._match(TokenType.R_PAREN)
return Cumulate(this, timecol, step, size, offset)
return super()._parse_function(optional_parens=optional_parens, anonymous=anonymous, any_token=any_token)
**and the custom.py file contains : **
from sqlglot import exp
class Tumble(exp.Expression):
def __init__(self, this, timecol, size, offset=None):
self.timecol = timecol
self.size = size
self.offset = offset
self.args = {"this": this, "timecol": timecol, "size": size, "offset": offset} # Set the args attribute as a dictionary
self._type = "TUMBLE" # Define the _type attribute
self.comments = [] # Define the comments attribute
self._id = id(self) # Define the _id attribute
def __repr__(self):
args_str = ", ".join(f"{k}={v}" for k, v in self.args.items())
return f"Tumble({args_str})"
class Hop(exp.Expression):
def __init__(self, this, timecol, slide, size, offset=None):
self.timecol = timecol
self.slide = slide
self.size = size
self.offset = offset
self.args = {"this": this, "timecol": timecol, "slide": slide, "size": size, "offset": offset}
self._type = "HOP"
self.comments = []
self._id = id(self)
def __repr__(self):
args_str = ", ".join(f"{k}={v}" for k, v in self.args.items())
return f"Hop({args_str})"
class Cumulate(exp.Expression):
def __init__(self, this, timecol, step, size, offset=None):
self.timecol = timecol
self.step = step
self.size = size
self.offset = offset
self.args = {"this": this, "timecol": timecol, "step": step, "size": size, "offset": offset}
self._type = "CUMULATE"
self.comments = []
self._id = id(self)
def __repr__(self):
args_str = ", ".join(f"{k}={v}" for k, v in self.args.items())
return f"Cumulate({args_str})"
def parse_one(sql, read):
return read.parse(sql)
**Thanks in Advance :) **
What I Expected:
The SESSION function should be parsed correctly without errors, similar to how TUMBLE and HOP are handled.
Expected to correctly parse and handle queries using the SESSION function.
What Actually Resulted:
Encountered multiple errors during the parsing process.
Errors include syntax errors and unexpected token errors which prevent the correct parsing of the SESSION function.
Upvotes: 0
Views: 70