Source code for feathr.definition.sink

from typing import List, Optional
from jinja2 import Template
from feathr.definition.feathrconfig import HoconConvertible


class Sink(HoconConvertible):
    """A data sink.
    """
    pass


[docs]class RedisSink(Sink): """Redis-based sink use to store online feature data, can be used in batch job or streaming job. Attributes: table_name: output table name streaming: whether it is used in streaming mode streamingTimeoutMs: maximum running time for streaming mode. It is not used in batch mode. """ def __init__(self, table_name: str, streaming: bool=False, streamingTimeoutMs: Optional[int]=None) -> None: self.table_name = table_name self.streaming = streaming self.streamingTimeoutMs = streamingTimeoutMs
[docs] def to_feature_config(self) -> str: """Produce the config used in feature materialization""" tm = Template(""" { name: REDIS params: { table_name: "{{source.table_name}}" {% if source.streaming %} streaming: true {% endif %} {% if source.streamingTimeoutMs %} timeoutMs: {{source.streamingTimeoutMs}} {% endif %} } } """) msg = tm.render(source=self) return msg
class HdfsSink(Sink): """Offline Hadoop HDFS-compatible(HDFS, delta lake, Azure blog storage etc) sink that is used to store feature data. The result is in AVRO format. Attributes: output_path: output path """ def __init__(self, output_path: str) -> None: self.output_path = output_path # Sample generated HOCON config: # operational: { # name: testFeatureGen # endTime: 2019-05-01 # endTimeFormat: "yyyy-MM-dd" # resolution: DAILY # output:[ # { # name: HDFS # params: { # path: "/user/featureGen/hdfsResult/" # } # } # ] # } # features: [mockdata_a_ct_gen, mockdata_a_sample_gen] def to_feature_config(self) -> str: """Produce the config used in feature materialization""" tm = Template(""" { name: HDFS params: { path: "{{sink.output_path}}" } } """) hocon_config = tm.render(sink=self) return hocon_config