Source code for feathr.definition.transformation
import enum
from typing import Type, Union, List, Optional
from abc import ABC, abstractmethod
from jinja2 import Template
from feathr.definition.feathrconfig import HoconConvertible
class Transformation(HoconConvertible):
"""Base class for all transformations that produce feature values."""
pass
class RowTransformation(Transformation):
"""Base class for all row-level transformations."""
pass
class ExpressionTransformation(RowTransformation):
"""A row-level transformations that is defined with the Feathr expression language.
Attributes:
expr: expression that transforms the raw value into a new value, e.g. amount * 10.
"""
def __init__(self, expr: str) -> None:
super().__init__()
self.expr = expr
def to_feature_config(self, with_def_field_name: Optional[bool] = True) -> str:
tm = Template("""
{% if with_def_field_name %}
def.sqlExpr: "{{expr}}"
{% else %}
"{{expr}}"
{% endif %}
""")
return tm.render(expr=self.expr, with_def_field_name=with_def_field_name)
[docs]class WindowAggTransformation(Transformation):
"""Aggregate the value of an expression over a fixed time window. E.g. sum(amount*10) over last 3 days.
Attributes:
agg_expr: expression that transforms the raw value into a new value, e.g. amount * 10
agg_func: aggregation function. Available values: `SUM`, `COUNT`, `MAX`, `MIN`, `AVG`, `MAX_POOLING`, `MIN_POOLING`, `AVG_POOLING`, `LATEST`
window: Time window length to apply the aggregation. support 4 type of units: d(day), h(hour), m(minute), s(second). The example value are "7d' or "5h" or "3m" or "1s"
group_by: Feathr expressions applied after the `agg_expr` transformation as groupby field, before aggregation, same as 'group by' in SQL
filter: Feathr expression applied to each row as a filter before aggregation
"""
def __init__(self, agg_expr: str, agg_func: str, window: str, group_by: Optional[str] = None, filter: Optional[str] = None, limit: Optional[int] = None) -> None:
super().__init__()
self.def_expr = agg_expr
self.agg_func = agg_func
self.window = window
self.group_by = group_by
self.filter = filter
self.limit = limit
[docs] def to_feature_config(self, with_def_field_name: Optional[bool] = True) -> str:
tm = Template("""
def:"{{windowAgg.def_expr}}"
window: {{windowAgg.window}}
aggregation: {{windowAgg.agg_func}}
{% if windowAgg.group_by is not none %}
groupBy: {{windowAgg.group_by}}
{% endif %}
{% if windowAgg.filter is not none %}
filter: {{windowAgg.filter}}
{% endif %}
{% if windowAgg.limit is not none %}
limit: {{windowAgg.limit}}
{% endif %}
""")
return tm.render(windowAgg = self)
class UdfTransform(Transformation):
"""User defined transformation. To be supported.
Attributes:
name: name of the user defined function
"""
def __init__(self, name: str) -> None:
"""
:param name:
"""
super().__init__()
self.name = name
def to_feature_config(self) -> str:
pass