Module pipeline
Pipeline Objects
class Pipeline()
Pipeline brings together building blocks to build a complex search pipeline with Haystack & user-defined components.
Under-the-hood, a pipeline is represented as a directed acyclic graph of component nodes. It enables custom query flows with options to branch queries(eg, extractive qa vs keyword match query), merge candidate documents for a Reader from multiple Retrievers, or re-ranking of candidate documents.
add_node
| add_node(component, name: str, inputs: List[str])
Add a new node to the pipeline.
Arguments:
-
component
: The object to be called when the data is passed to the node. It can be a Haystack component (like Retriever, Reader, or Generator) or a user-defined object that implements a run() method to process incoming data from predecessor node. -
name
: The name for the node. It must not contain any dots. -
inputs
: A list of inputs to the node. If the predecessor node has a single outgoing edge, just the name of node is sufficient. For instance, a 'ElasticsearchRetriever' node would always output a single edge with a list of documents. It can be represented as ["ElasticsearchRetriever"].In cases when the predecessor node has multiple outputs, e.g., a "QueryClassifier", the output must be specified explicitly as "QueryClassifier.output_2".
get_node
| get_node(name: str) -> Optional[BaseComponent]
Get a node from the Pipeline.
Arguments:
name
: The name of the node.
set_node
| set_node(name: str, component)
Set the component for a node in the Pipeline.
Arguments:
name
: The name of the node.component
: The component object to be set at the node.
draw
| draw(path: Path = Path("pipeline.png"))
Create a Graphviz visualization of the pipeline.
Arguments:
path
: the path to save the image.
load_from_yaml
| @classmethod
| load_from_yaml(cls, path: Path, pipeline_name: Optional[str] = None, overwrite_with_env_variables: bool = True)
Load Pipeline from a YAML file defining the individual components and how they're tied together to form
a Pipeline. A single YAML can declare multiple Pipelines, in which case an explicit pipeline_name
must
be passed.
Here's a sample configuration:
```yaml
| version: '0.8'
|
| components: # define all the building-blocks for Pipeline
| - name: MyReader # custom-name for the component; helpful for visualization & debugging
| type: FARMReader # Haystack Class name for the component
| params:
| no_ans_boost: -10
| model_name_or_path: deepset/roberta-base-squad2
| - name: MyESRetriever
| type: ElasticsearchRetriever
| params:
| document_store: MyDocumentStore # params can reference other components defined in the YAML
| custom_query: null
| - name: MyDocumentStore
| type: ElasticsearchDocumentStore
| params:
| index: haystack_test
|
| pipelines: # multiple Pipelines can be defined using the components from above
| - name: my_query_pipeline # a simple extractive-qa Pipeline
| nodes:
| - name: MyESRetriever
| inputs: [Query]
| - name: MyReader
| inputs: [MyESRetriever]
```
Arguments:
path
: path of the YAML file.pipeline_name
: if the YAML contains multiple pipelines, the pipeline_name to load must be set.overwrite_with_env_variables
: Overwrite the YAML configuration with environment variables. For example, to change index name param for an ElasticsearchDocumentStore, an env variable 'MYDOCSTORE_PARAMS_INDEX=documents-2021' can be set. Note that an_
sign must be used to specify nested hierarchical properties.
save_to_yaml
| save_to_yaml(path: Path, return_defaults: bool = False)
Save a YAML configuration for the Pipeline that can be used with Pipeline.load_from_yaml()
.
Arguments:
path
: path of the output YAML file.return_defaults
: whether to output parameters that have the default values.
BaseStandardPipeline Objects
class BaseStandardPipeline(ABC)
add_node
| add_node(component, name: str, inputs: List[str])
Add a new node to the pipeline.
Arguments:
-
component
: The object to be called when the data is passed to the node. It can be a Haystack component (like Retriever, Reader, or Generator) or a user-defined object that implements a run() method to process incoming data from predecessor node. -
name
: The name for the node. It must not contain any dots. -
inputs
: A list of inputs to the node. If the predecessor node has a single outgoing edge, just the name of node is sufficient. For instance, a 'ElasticsearchRetriever' node would always output a single edge with a list of documents. It can be represented as ["ElasticsearchRetriever"].In cases when the predecessor node has multiple outputs, e.g., a "QueryClassifier", the output must be specified explicitly as "QueryClassifier.output_2".
get_node
| get_node(name: str)
Get a node from the Pipeline.
Arguments:
name
: The name of the node.
set_node
| set_node(name: str, component)
Set the component for a node in the Pipeline.
Arguments:
name
: The name of the node.component
: The component object to be set at the node.
draw
| draw(path: Path = Path("pipeline.png"))
Create a Graphviz visualization of the pipeline.
Arguments:
path
: the path to save the image.
ExtractiveQAPipeline Objects
class ExtractiveQAPipeline(BaseStandardPipeline)
__init__
| __init__(reader: BaseReader, retriever: BaseRetriever)
Initialize a Pipeline for Extractive Question Answering.
Arguments:
reader
: Reader instanceretriever
: Retriever instance
DocumentSearchPipeline Objects
class DocumentSearchPipeline(BaseStandardPipeline)
__init__
| __init__(retriever: BaseRetriever)
Initialize a Pipeline for semantic document search.
Arguments:
retriever
: Retriever instance
GenerativeQAPipeline Objects
class GenerativeQAPipeline(BaseStandardPipeline)
__init__
| __init__(generator: BaseGenerator, retriever: BaseRetriever)
Initialize a Pipeline for Generative Question Answering.
Arguments:
generator
: Generator instanceretriever
: Retriever instance
SearchSummarizationPipeline Objects
class SearchSummarizationPipeline(BaseStandardPipeline)
__init__
| __init__(summarizer: BaseSummarizer, retriever: BaseRetriever)
Initialize a Pipeline that retrieves documents for a query and then summarizes those documents.
Arguments:
summarizer
: Summarizer instanceretriever
: Retriever instance
run
| run(query: str, filters: Optional[Dict] = None, top_k_retriever: Optional[int] = None, generate_single_summary: Optional[bool] = None, return_in_answer_format: bool = False)
Arguments:
query
: Your search queryfilters
:top_k_retriever
: Number of top docs the retriever should pass to the summarizer. The higher this value, the slower your pipeline.generate_single_summary
: Whether to generate single summary from all retrieved docs (True) or one per doc (False).return_in_answer_format
: Whether the results should be returned as documents (False) or in the answer format used in other QA pipelines (True). With the latter, you can use this pipeline as a "drop-in replacement" for other QA pipelines.
FAQPipeline Objects
class FAQPipeline(BaseStandardPipeline)
__init__
| __init__(retriever: BaseRetriever)
Initialize a Pipeline for finding similar FAQs using semantic document search.
Arguments:
retriever
: Retriever instance
TranslationWrapperPipeline Objects
class TranslationWrapperPipeline(BaseStandardPipeline)
Takes an existing search pipeline and adds one "input translation node" after the Query and one "output translation" node just before returning the results
__init__
| __init__(input_translator: BaseTranslator, output_translator: BaseTranslator, pipeline: BaseStandardPipeline)
Wrap a given pipeline
with the input_translator
and output_translator
.
Arguments:
input_translator
: A Translator node that shall translate the input query from language A to Boutput_translator
: A Translator node that shall translate the pipeline results from language B to Apipeline
: The pipeline object (e.g. ExtractiveQAPipeline) you want to "wrap". Note that pipelines with split or merge nodes are currently not supported.
SklearnQueryClassifier Objects
class SklearnQueryClassifier(BaseComponent)
A node to classify an incoming query into one of two categories using a lightweight sklearn model. Depending on the result, the query flows to a different branch in your pipeline
and the further processing can be customized. You can define this by connecting the further pipeline to either output_1
or output_2
from this node.
Example:
|{
|pipe = Pipeline()
|pipe.add_node(component=SklearnQueryClassifier(), name="QueryClassifier", inputs=["Query"])
|pipe.add_node(component=elastic_retriever, name="ElasticRetriever", inputs=["QueryClassifier.output_2"])
|pipe.add_node(component=dpr_retriever, name="DPRRetriever", inputs=["QueryClassifier.output_1"])
|# Keyword queries will use the ElasticRetriever
|pipe.run("kubernetes aws")
|# Semantic queries (questions, statements, sentences ...) will leverage the DPR retriever
|pipe.run("How to manage kubernetes on aws")
Models:
Pass your own Sklearn
binary classification model or use one of the following pretrained ones:
-
Keywords vs. Questions/Statements (Default) query_classifier can be found here query_vectorizer can be found here output_1 => question/statement output_2 => keyword query Readme
-
Questions vs. Statements query_classifier can be found here query_vectorizer can be found here output_1 => question output_2 => statement Readme
See also the tutorial on pipelines.
__init__
| __init__(model_name_or_path: Union[
| str, Any
| ] = "https://ext-models-haystack.s3.eu-central-1.amazonaws.com/gradboost\_query\_classifier/model.pickle", vectorizer_name_or_path: Union[
| str, Any
| ] = "https://ext-models-haystack.s3.eu-central-1.amazonaws.com/gradboost\_query\_classifier/vectorizer.pickle")
Arguments:
model_name_or_path
: Gradient boosting based binary classifier to classify between keyword vs statement/question queries or statement vs question queries.vectorizer_name_or_path
: A ngram based Tfidf vectorizer for extracting features from query.
TransformersQueryClassifier Objects
class TransformersQueryClassifier(BaseComponent)
A node to classify an incoming query into one of two categories using a (small) BERT transformer model. Depending on the result, the query flows to a different branch in your pipeline
and the further processing can be customized. You can define this by connecting the further pipeline to either output_1
or output_2
from this node.
Example:
|{
|pipe = Pipeline()
|pipe.add_node(component=TransformersQueryClassifier(), name="QueryClassifier", inputs=["Query"])
|pipe.add_node(component=elastic_retriever, name="ElasticRetriever", inputs=["QueryClassifier.output_2"])
|pipe.add_node(component=dpr_retriever, name="DPRRetriever", inputs=["QueryClassifier.output_1"])
|# Keyword queries will use the ElasticRetriever
|pipe.run("kubernetes aws")
|# Semantic queries (questions, statements, sentences ...) will leverage the DPR retriever
|pipe.run("How to manage kubernetes on aws")
Models:
Pass your own Transformer
binary classification model from file/huggingface or use one of the following pretrained ones hosted on Huggingface:
-
Keywords vs. Questions/Statements (Default) model_name_or_path="shahrukhx01/bert-mini-finetune-question-detection" output_1 => question/statement output_2 => keyword query Readme
-
Questions vs. Statements
model_name_or_path
="shahrukhx01/question-vs-statement-classifier" output_1 => question output_2 => statement Readme
See also the tutorial on pipelines.
__init__
| __init__(model_name_or_path: Union[
| Path, str
| ] = "shahrukhx01/bert-mini-finetune-question-detection")
Arguments:
model_name_or_path
: Transformer based fine tuned mini bert model for query classification
JoinDocuments Objects
class JoinDocuments(BaseComponent)
A node to join documents outputted by multiple retriever nodes.
The node allows multiple join modes:
- concatenate: combine the documents from multiple nodes. Any duplicate documents are discarded.
- merge: merge scores of documents from multiple nodes. Optionally, each input score can be given a different
weight
& atop_k
limit can be set. This mode can also be used for "reranking" retrieved documents.
__init__
| __init__(join_mode: str = "concatenate", weights: Optional[List[float]] = None, top_k_join: Optional[int] = None)
Arguments:
join_mode
:concatenate
to combine documents from multiple retrievers ormerge
to aggregate scores of individual documents.weights
: A node-wise list(length of list must be equal to the number of input nodes) of weights for adjusting document scores when using themerge
join_mode. By default, equal weight is given to each retriever score. This param is not compatible with theconcatenate
join_mode.top_k_join
: Limit documents to top_k based on the resulting scores of the join.