Overview
By default, AIE uses 'non-streaming" queries. These queries were designed for the typical search use case, where the user will review relatively few returned documents before refining the criteria and searching again. To reduce memory usage and to boost performance, these queries are limited to 5,000 returned documents. If the query matches more than 5,000 documents, AIE returns an error. ( This limit is configurable. )
However, some applications need to retrieve all matching documents, no matter how many there might be. For these applications, AIE offers "streaming" queries. A streaming query allows developers to request all results, facets, or document ID's back from a query. In addition, streaming can be used to send large query filters to AIE for reducing the number of returned results.
While the streaming API is very easy to use, it can require large amounts of client and back-end resources to fulfill a request. For example, running a query for "*:*" will stream every document out of the index.
Streaming and Response Workflows
The results returned by a streaming query pass directly from the index engine to the client application without passing through response workflows. As a result, response document transformers cannot operate on streaming results. (Streaming results can be customized through the use of Field Expressions, however.)
Streaming and Field Collapsing
Note that a streaming query cannot use Field Collapsing.
View incoming links.
Streaming Query API Overview
The Streaming Query API is a part of the Java Client API. The API has a single interface:
com.attivio.sdk.client.streaming | Description |
---|---|
An interface that allows the developer to retrieve results from a streaming search query. |
Create a Streaming Application in Java
This section presents the simplest-possible example of a streaming search application that sends a query to AIE from the command line and displays the search results in the console window.
Create the Project
This project is the standard "FactBook" example from the Quick Start Tutorial. The project was created in AIE Designer by following the step-by-step directions on the Using Java APIs page.
Sample Data
The query application demonstrated here is sufficiently generic to use with any AIE project. For sake of demonstration, we started the FactBook demo and loaded the "city" feed. At the end of the example we'll search for "London" and AIE will return multiple matches.
Create a New Class
We'll begin by using the plusjava project that we created on the Using Java APIs page).
The example application, StreamQueryTest.java, is attached to this page.
To create a new Java class for this application:
- In Designer, right-click the project node in the Project Explorer view. In this example the project is plusjava.
- Select New > Class from the context menu. This exposes the New Java Class dialog.
- In the New Java Class dialog, set the Source Folder to plusjava/src.
- In the New Java Class dialog, enter the com.acme.client package name. (You can make up a new package name if you prefer.)
- Continuing in the Java Class dialog, enter a name for the new class. In this example we used StreamQueryTest.
- Click the Finish button to create the class. This will open the new class in an editing view.
- For the purposes of this exercise, open the StreamQueryTest.java file (attached to this page) and paste the content into the editor. Save the file.
The saved file should be visible in the <project-dir>\plusjava\src\com\acme\client\ subdirectory. Remember that com.acme.client is the new package name we supplied in step 4, above. If you used a different package name, the file will appear in the corresponding location.
Import Required Classes
The StreamQueryTest.java file begins with a package statement and a list of imported classes. (The library files that support these classes are already part of the Eclipse project generated by createproject.)
/** * Copyright 2016 Attivio Inc., All rights reserved. */ // SearchQueryTest.java for the new SDK (AIE 5.0) package com.acme.client; import java.io.IOException; import java.util.ListIterator; import com.attivio.sdk.search.QueryRequest; import com.attivio.sdk.search.QueryResponse; import com.attivio.sdk.search.SearchFieldValue; import com.attivio.sdk.search.StreamingQueryRequest; import com.attivio.sdk.search.StreamingQueryRequest.DocumentStreamingMode; import com.attivio.sdk.AttivioException; import com.attivio.sdk.search.QueryLanguages; import com.attivio.sdk.search.SearchField; import com.attivio.sdk.search.SearchDocument; import com.attivio.sdk.search.facet.FacetBucket; import com.attivio.sdk.search.facet.FacetRequest; import com.attivio.sdk.search.facet.FacetResponse; import com.attivio.sdk.client.SearchClient; import com.attivio.sdk.client.streaming.StreamingFacetResponse; import com.attivio.sdk.client.streaming.StreamingQueryResponse; import com.attivio.sdk.service.ServiceFactory; import com.attivio.sdk.service.Platform;
Class Declaration
The StreamQueryTest class will connect to an AIE server.
public class StreamQueryTest { // Change these to point to your AIE server private static final String host = "localhost"; private static final int port = 17000;
Note the host and port of your AIE zookeeper instance.
Setting up Main
To run this class as an independent application, we must supply it with a main method.
public static void main(String[] args) throws IOException { System.setProperty("AIE_ZOOKEEPER", "localhost:16980"); Platform.instance.setProjectName("remote"); // must be the name of the remote project StreamingQueryResponse sResponse = null; //scoped outside of try
The main() method expects a string array of arguments, which will be fed in from the command line. It throws appropriate exceptions.
We also need an sResponse variable declared outside the scope of the following try/catch/finally construction, so we can reference it in both try and finally.
Validate Arguments
The first task is to examine the array of argument strings and do some validity checking. The criterion is simple: there must be at least two arguments. The first is the name of the query workflow ("search"). The second and subsequent arguments will be search keywords.
try { // Check required arguments (workflow, query string) if (args.length < 2) { System.out.println("Insufficient number of arguments (" + args.length + "). " + "StreamQueryTest requires at least two arguments:" + " " + "(workflow, query string)."); } // Loop over remaining arguments as query term(s) String queryString = args[1]; for (int i = 2; i < args.length; i++) { queryString += " " + args[i]; }
We know that the first argument (args[0]) is the workflow name. The code copies the second argument (args[1]) into a new string (queryString), and then iterates over any remaining arguments and appends them to queryString.
Create a Search Client
Use the ServiceFactory to instantiate a SearchClient object.
// Create a search client SearchClient searchClient = ServiceFactory.getService(SearchClient.class);
Once we have the searchClient object, we can set its clientWorkFlow value. This is the first of the incoming arguments (args[0]).
// Must set a workflow: default workflow is "search" searchClient.setClientWorkflow(args[0]);
Create a Query Request
Now we can create a QueryRequest object and set some of its properties.
// Create query request QueryRequest request = new QueryRequest(); // Set maximum number of rows to return. request.setRows(Long.MAX_VALUE); // Set query string and language request.setQuery(queryString, QueryLanguages.SIMPLE); // Return all fields. request.addField("*");
Our new QueryRequest object is called request.
Since this is a streaming query, it might be important to set a maximum number of rows to return. In this case, we set it to Long.MAX_VALUE, which means "no limit."
Next we call request.setQuery() with the string of keywords (queryString) and indicate that this will be a Simple Query Language request.
This example asks AIE to return all of the fields of the index record, using a wildcard in the request.addField() method. Real applications tend to be more specific at this point.
Request Facets
If we want the search to return facet lists, we must request them at this point.
// Request facet buckets for specific fields. request.addFacet(new FacetRequest("language")); request.addFacet(new FacetRequest("country")); request.addFacet(new FacetRequest("location"));
Create a Streaming Query Request
The next step is to create a StreamingQueryRequest . Note that the request is bundled into this step as a parameter of the StreamingQueryRequest. The other parameter is the DocumentStreamingMode, which is this case is FULL_DOCUMENTS..
// Create a streaming request StreamingQueryRequest sRequest = new StreamingQueryRequest(request,DocumentStreamingMode.FULL_DOCUMENTS);
Enable Streaming Facet Collection
It is necessary to explicitly enable facet reporting on the StreamingQueryRequest.
// Enable streaming facet collection sRequest.setStreamFacets(true);
If you omit this step and then try to display facets, you'll get an IO error that says, "Facet Stream not available: state=DONE”.
Execute the Query (StreamingQueryResponse)
Execute the query by calling the searchClient.search() method, using sRequest as the parameter. This returns a StreamingQueryResponse object
// Create a streaming response object sResponse = searchClient.search(sRequest);
Display Returned Documents
The next step is to iterate over the matching documents. The sResponse.getDocuments() method provides a constantly-renewed buffer of streaming documents. From the client side it looks as if we were processing a single SearchDocument.
// Iterate through documents for (SearchDocument document : sResponse.getDocuments()) { System.out.println(""); // Print document ID System.out.println("document ID: " + document.getId());
Each block of document output begins with the document ID number, as shown above.
Next is the loop that processes the fields within the document:
// Iterate through fields for (SearchField field : document) { // Print field name System.out.print(" " + field.getName());
This is followed by the nested loop that prints out the values of the field. Most fields are single-valued, but some are multi-valued. The logic has to check and branch to handle the two situations.
// Check field value count if (field.size() == 1) { // Print single value System.out.println(": " + field.getFirstValue()); //Was getDisplayValue() } else { // Print multiple values int numFieldValues = field.size(); //Was field.getValues().toArray().length System.out.print(" {" + numFieldValues + " values}:"); ListIterator<SearchFieldValue> itr = field.iterator(); while(itr.hasNext()) { Object element = itr.next(); System.out.print(element + " "); } System.out.println(""); } } }
Display Facets
Facet output cannot be accessed until all documents have been processed. At that point we can iterate over a stream of facet buckets. Note that sResponse.getFacetBuckets() returns individual buckets (a facet value and a count). Each bucket knows which facet it belongs to, and all of a facet's buckets arrive in a block, so it is pretty easy to separate the buckets into facet lists.
The code below processes the facet buckets by formatting them as output to System.out.println().
// Iterate through facets and buckets // Note that facets cannot be retrieved until all documents // have been processed. for (StreamingFacetResponse facet : sResponse.getFacets()) { FacetResponse headerInfo = facet.getFacetResponse(); System.out.println("Facet: "+headerInfo.getField()); for (FacetBucket bucket : facet) { System.out.println(" " + bucket.getDisplayValue() + " " + bucket.getCount()); } }
The code above prints out the name of a facet (country) and then lists its buckets (Canada 1) below it.
Get the QueryResponse
Retrieve the QueryResponse message using sResponse.getQueryResponse(). This method should always be invoked when using streaming queries in order to ensure that resources are cleaned up after the query. It is not available until all of the documents and all of the facet buckets have been processed.
In this case we have retrieved the QueryResponse in order to report how many documents were matched by the query.
// All documents and facets must be processed before the // QueryResponse becomes available. // getQueryRespone() is required to clean up resources. QueryResponse queryResponse = sResponse.getQueryResponse(); System.out.println(""); System.out.println("Total documents matched: " + queryResponse.getDocuments().getNumberFound()); sResponse.close();
Catch Exceptions
We have to include some error-catching boilerplate at the end. This independent Java application might have to handle an AttivioException generated by the Java Client library. Note that this application is not a part of AIE, however, and therefore should not throw an AttivioException.
} catch (AttivioException e) { System.err.println("Category: " + e.getErrorCode().getCategory()); System.err.println("Code: " + e.getErrorCode().getCode()); e.printStackTrace(); }
Finally Close the Stream
It is essential that the application explicitly close the query stream. Otherwise resources will be allocated but never freed.
} finally { // Close the stream to clean up resources sResponse.close(); } } }
Compile the Class
To compile StreamQueryTest in Designer, select the project in the Project Explorer. Click the Project menu, and select Clean. Then select Project > Build Project.
Run the Program
To run SearchQueryTest from Designer, we'll have to create a Run Configuration for it.
- In Designer, navigate from the Run menu to Run Configurations. This opens the Run Configurations dialog box.
- Right-click Java Applications and select New.
- Create a new StreamQueryTest run application in project plusjava.
On the Arguments Tab, paste in the following program arguments. (The query workflow is "search". Locate all documents that mention "London".)
search London
On the Arguments Tab, paste in the following VM arguments. (Edit the file path to suit your situation.)
-Dattivio.log.printStackTraces=true -Dattivio.log.level=INFO -Dattivio.log.directory="C:\attivio-projects\plusjava\build\logs"
- Apply the changes. Close the dialog box.
Example Search Run
AIE must be running!
It should go without saying that the AIE server should be up and running before you attempt to run SearchQueryTest.
To run the application, use the Designer Run menu. Select the Run command.
The parameters are the search workflow and the keyword "London". If the target AIE project is a FactBook demo with the city feed loaded, our query should return multiple cities. This is one of them:
document ID: city-CITY-Canada-London-Ontario .score: 0.0 .id: city-CITY-Canada-London-Ontario .zone: default title: London teaser: Ontario language: English languages: English size: 303165 table: city date: Mon Jan 11 19:43:52 PST 2016 text: Ontario position: -81.2462,42.9869 latitude: 42.986895 longitude: -81.246214 sourcepath: C:\attivio51\conf\factbook\content\cities.csv filename: cities.csv country: Canada sentiment: neg sentiment.score: 0.6017876 security.read: anonymous:anonymous
Following the document output comes the facet output:
Facet: language English 3 Facet: country Canada 1 South Africa 1 United Kingdom 1 Facet: sentiment neg 1 pos 2 Total documents matched: 3
Here we see the facet buckets grouped by facet, with their counts. Finally, there is the document count from the QueryResponse object.