API¶
- class pipeline.Message(*, kind: Kind = Kind.Message, id: str = None, created: datetime = None, logs: List[Log] = [], content: Dict[str, Any] = {})[source]¶
- as_model(model_class: Type[BaseModel], mappings: Optional[Dict[str, str]] = None) BaseModel ¶
return content as another BaseModel instance
- Parameters
param model_class: return class type
type model_class: class
return: BaseModel
rtype: BaseModel
- classmethod construct(_fields_set: Optional[SetStr] = None, **values: Any) Model ¶
Creates a new model setting __dict__ and __fields_set__ from trusted or pre-validated data. Default values are respected, but no other validation is performed. Behaves as if Config.extra = ‘allow’ was set since it adds all passed values
- copy(*, include: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, exclude: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, update: Optional[DictStrAny] = None, deep: bool = False) Model ¶
Duplicate a model, optionally choose which fields to include, exclude and change.
- Parameters
include – fields to include in new model
exclude – fields to exclude from new model, as with values this takes precedence over include
update – values to change/add in the new model. Note: the data is not validated before creating the new model: you should trust this data
deep – set to True to make a deep copy of the model
- Returns
new model instance
- dict(*, include: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, exclude: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False) DictStrAny ¶
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
- get(key: str, default: Optional[Any] = None) Any ¶
access any field in message content
- Parameters
param key: field name
type key: str
return: value
rtype: Any
- json(*, include: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, exclude: Optional[Union[AbstractSetIntStr, MappingIntStrAny]] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False, encoder: Optional[Callable[[Any], Any]] = None, models_as_dict: bool = True, **dumps_kwargs: Any) unicode ¶
Generate a JSON representation of the model, include and exclude arguments as per dict().
encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().
- update_content(other: BaseModel) KeysView[str] ¶
add fields from other model to update message’s content
- Parameters
param other: other BaseModel object
type other: BaseModel
return: list of keys updated
rtype: KeysView[str]
- classmethod update_forward_refs(**localns: Any) None ¶
Try to update ForwardRefs on fields based on this Model, globalns and localns.
- class pipeline.Producer(settings: ~pipeline.worker.ProducerSettings, output_class: ~typing.Type[~pydantic.main.BaseModel], logger: ~logging.Logger = <Logger pipeline (INFO)>)[source]¶
Producer is a worker to generate new messages. For example, a webcrawler can be a producer. It reads no input, and produce outputs until it exits.
- Parameters
param settings: settings
type settings: ProducerSettings
param output_class: output class
type output_class: Type[BaseModel]
param logger: logger
type logger: Logger
Usage:
>>> from pydantic import BaseModel >>> >>> class Output(BaseModel): ... pass >>> >>> settings = ProducerSettings(name='', version='', description='', out_kind='MEM') >>> producer = Producer(settings, output_class=Output) >>> producer.parse_args() >>> #producer.start()
- duplicate_destination(topic)¶
helper function
- parse_args(args: Union[List[str], str] = ['-T', '-E', '-W', '--keep-going', '-b', 'html', '-d', '_build/doctrees', '-D', 'language=en', '.', '/home/docs/checkouts/readthedocs.org/user_builds/tanbih-pipeline/checkouts/latest/_readthedocs//html']) None ¶
parse command line arguments args: a list of arguments or a command line arguments string
- setup() None ¶
loading code goes here
- shutdown() None ¶
clean up code goes here
- class pipeline.Splitter(settings: ~pipeline.worker.SplitterSettings, logger: ~logging.Logger = <Logger pipeline (INFO)>)[source]¶
Splitter will write to a topic whose name is based on a function
- duplicate_destination(topic)¶
helper function
- parse_args(args: Union[List[str], str] = ['-T', '-E', '-W', '--keep-going', '-b', 'html', '-d', '_build/doctrees', '-D', 'language=en', '.', '/home/docs/checkouts/readthedocs.org/user_builds/tanbih-pipeline/checkouts/latest/_readthedocs//html']) None ¶
parse command line arguments args: a list of arguments or a command line arguments string
- setup() None ¶
loading code goes here
- shutdown() None ¶
clean up code goes here
- class pipeline.Processor(settings: ~pipeline.worker.ProcessorSettings, input_class: ~typing.Type[~pydantic.main.BaseModel], output_class: ~typing.Type[~pydantic.main.BaseModel], logger: ~logging.Logger = <Logger pipeline (INFO)>)[source]¶
Processor is a worker which will process incoming messages and output new messages
- duplicate_destination(topic)¶
helper function
- parse_args(args: Union[List[str], str] = ['-T', '-E', '-W', '--keep-going', '-b', 'html', '-d', '_build/doctrees', '-D', 'language=en', '.', '/home/docs/checkouts/readthedocs.org/user_builds/tanbih-pipeline/checkouts/latest/_readthedocs//html']) None ¶
parse command line arguments args: a list of arguments or a command line arguments string
- process(message_content: BaseModel, message_id: str) BaseModel [source]¶
process function to be overridden by users, for streaming processing, this function needs to do in-place update on msg.dct and return an error or a list of errors (for batch processing). Message has been terminated though .terminates() will be skipped in output.
A typical process definition will be:
1newValue = msg.value 2return OutputModel(value=newValue)
- setup() None ¶
loading code goes here
- shutdown() None ¶
clean up code goes here
- class pipeline.Pipeline(**kwargs: Any)[source]¶
Pipeline manages
SourceTap
andDestinationTap
when you don’t want to use predefined worker logic. Instead, you have access toSourceTap
andDestinationTap
directly.Usage:
from .tap import TapKind, MemorySourceSettings, MemoryDestinationSettings in_settings = MemorySourceSettings() out_settings = MemoryDestinationSettings() pipeline = Pipeline( in_kind=TapKind.MEM, in_settings=in_settings, out_kind=TapKind.MEM, out_settings=out_settings )
Take command line arguments:
pipeline = Pipeline(args=sys.argv)
Take environment settings:
pipeline = Pipeline()
- add_destination_topic(name: str) None [source]¶
Add a new
DestinationTap
with a defined topic(queue) name- Parameters
name – a name given for the destination topic
- add_source_topic(name: str) None [source]¶
Add a new
SourceTap
with a defined topic(queue) name- Parameters
name – a name given for the source topic