Source code for feathr.sink

from typing import List, Optional
from jinja2 import Template
from feathr.frameconfig import HoconConvertible


[docs]class Sink(HoconConvertible): """A data sink. """ pass
[docs]class RedisSink(Sink): """Redis-based sink use to store online feature data, can be used in batch job or streaming job. Attributes: table_name: output table name streaming: whether it is used in streaming mode streamingTimeoutMs: maximum running time for streaming mode. It is not used in batch mode. """ def __init__(self, table_name: str, streaming: bool=False, streamingTimeoutMs: Optional[int]=None) -> None: self.table_name = table_name self.streaming = streaming self.streamingTimeoutMs = streamingTimeoutMs
[docs] def to_feature_config(self) -> str: """Produce the config used in feature materialization""" tm = Template(""" { name: REDIS params: { table_name: "{{source.table_name}}" {% if source.streaming %} streaming: true {% endif %} {% if source.streamingTimeoutMs %} timeoutMs: {{source.streamingTimeoutMs}} {% endif %} } } """) msg = tm.render(source=self) return msg