Page tree
Skip to end of metadata
Go to start of metadata

Overview

By default, AIE uses 'non-streaming" queries. These queries were designed for the typical search use case, where the user will review relatively few returned documents before refining the criteria and searching again. To reduce memory usage and to boost performance, these queries are limited to 5,000 returned documents. If the query matches more than 5,000 documents, AIE returns an error. ( This limit is configurable. )

However, some applications need to retrieve all matching documents, no matter how many there might be. For these applications, AIE offers "streaming" queries. A streaming query allows developers to request all results, facets, or document ID's back from a query. In addition, streaming can be used to send large query filters to AIE for reducing the number of returned results.

While the streaming API is very easy to use, it can require large amounts of client and back-end resources to fulfill a request. For example, running a query for "*:*" will stream every document out of the index.

Streaming and Response Workflows

The results returned by a streaming query pass directly from the index engine to the client application without passing through response workflows.   As a result, response document transformers cannot operate on streaming results.  (Streaming results can be customized through the use of Field Expressions, however.)

Streaming and Field Collapsing

Note that a streaming query cannot use Field Collapsing.

 

View incoming links.

Streaming Query API Overview

The Streaming Query API  is a part of the Java Client API.  The API has a single interface:


com.attivio.sdk.client.streamingDescription


StreamingQueryResponse

An interface that allows the developer to retrieve results from a streaming search query. 

 

Create a Streaming Application in Java

This section presents the simplest-possible example of a streaming search application that sends a query to AIE from the command line and displays the search results in the console window.

Create the Project

Sample Data

The query application demonstrated here is sufficiently generic to use with any AIE project.  For sake of demonstration, we started the FactBook demo and loaded the "city" feed.  At the end of the example we'll search for "London" and AIE will return multiple matches.

Create a New Class

We'll begin by using the plusjava project that we created on the  Using Java APIs page).

The example application, StreamQueryTest.java, is attached to this page.

To create a new Java class for this application:

  1. In Designer, right-click the project node in the Project Explorer view. In this example the project is plusjava.
  2. Select New > Class from the context menu. This exposes the New Java Class dialog.
  3. In the New Java Class dialog, set the Source Folder to plusjava/src.
  4. In the New Java Class dialog, enter the com.acme.client package name. (You can make up a new package name if you prefer.)
  5. Continuing in the Java Class dialog, enter a name for the new class. In this example we used StreamQueryTest.
  6. Click the Finish button to create the class. This will open the new class in an editing view.
  7. For the purposes of this exercise, open the StreamQueryTest.java file (attached to this page) and paste the content into the editor. Save the file.

The saved file should be visible in the <project-dir>\plusjava\src\com\acme\client\ subdirectory. Remember that com.acme.client is the new package name we supplied in step 4, above. If you used a different package name, the file will appear in the corresponding location.

Import Required Classes

The StreamQueryTest.java file begins with a package statement and a list of imported classes. (The library files that support these classes are already part of the Eclipse project generated by createproject.)

StreamQueryTest.java
/**
 * Copyright 2016 Attivio Inc., All rights reserved.
 */
// SearchQueryTest.java for the new SDK (AIE 5.0) 

package com.acme.client;

import java.io.IOException;
import java.util.ListIterator;

import com.attivio.sdk.search.QueryRequest;
import com.attivio.sdk.search.QueryResponse;
import com.attivio.sdk.search.SearchFieldValue;
import com.attivio.sdk.search.StreamingQueryRequest;
import com.attivio.sdk.search.StreamingQueryRequest.DocumentStreamingMode;
import com.attivio.sdk.AttivioException;
import com.attivio.sdk.search.QueryLanguages;
import com.attivio.sdk.search.SearchField;
import com.attivio.sdk.search.SearchDocument;
import com.attivio.sdk.search.facet.FacetBucket;
import com.attivio.sdk.search.facet.FacetRequest;
import com.attivio.sdk.search.facet.FacetResponse;
import com.attivio.sdk.client.SearchClient;
import com.attivio.sdk.client.streaming.StreamingFacetResponse;
import com.attivio.sdk.client.streaming.StreamingQueryResponse;

import com.attivio.sdk.service.ServiceFactory;
import com.attivio.sdk.service.Platform;

Class Declaration

The StreamQueryTest class will connect to an AIE server.

StreamQueryTest.java
public class StreamQueryTest {

    // Change these to point to your AIE server
    private static final String host = "localhost";
    private static final int port = 17000;

Note the host and port of your AIE zookeeper instance.

Setting up Main

To run this class as an independent application, we must supply it with a main method.

StreamQueryTest.java
    public static void main(String[] args) throws IOException {
        
        System.setProperty("AIE_ZOOKEEPER", "localhost:16980");
        Platform.instance.setProjectName("remote"); // must be the name of the remote project
        
        StreamingQueryResponse sResponse = null; //scoped outside of try

The main() method expects a string array of arguments, which will be fed in from the command line. It throws appropriate exceptions.

We also need an sResponse variable declared outside the scope of the following try/catch/finally construction, so we can reference it in both try and finally.

Validate Arguments

The first task is to examine the array of argument strings and do some validity checking. The criterion is simple: there must be at least two arguments. The first is the name of the query workflow ("search"). The second and subsequent arguments will be search keywords.

StreamQueryTest.java
        try {
            // Check required arguments (workflow, query string)
            if (args.length < 2) {
                System.out.println("Insufficient number of arguments ("
                        + args.length + "). "
                        + "StreamQueryTest requires at least two arguments:"
                        + " " + "(workflow, query string).");
            }
            // Loop over remaining arguments as query term(s)
            String queryString = args[1];
            for (int i = 2; i < args.length; i++) {
                queryString += " " + args[i];
            }

We know that the first argument (args[0]) is the workflow name. The code copies the second argument (args[1]) into a new string (queryString), and then iterates over any remaining arguments and appends them to queryString.

Create a Search Client

Use the ServiceFactory to instantiate a SearchClient object.

StreamQueryTest.java
            // Create a search client
            SearchClient searchClient = ServiceFactory.getService(SearchClient.class);

Once we have the searchClient object, we can set its clientWorkFlow value. This is the first of the incoming arguments (args[0]).

StreamQueryTest.java
            // Must set a workflow: default workflow is "search"
            searchClient.setClientWorkflow(args[0]);

Create a Query Request

Now we can create a QueryRequest  object and set some of its properties.


StreamQueryTest.java
            // Create query request
            QueryRequest request = new QueryRequest();

            // Set maximum number of rows to return.
            request.setRows(Long.MAX_VALUE);

            // Set query string and language
            request.setQuery(queryString, QueryLanguages.SIMPLE);

            // Return all fields.
            request.addField("*");

Our new QueryRequest object is called request

Since this is a streaming query, it might be important to set a maximum number of rows to return.  In this case, we set it to Long.MAX_VALUE, which means "no limit."

Next we call request.setQuery() with the string of keywords (queryString) and indicate that this will be a Simple Query Language request.

This example asks AIE to return all of the fields of the index record, using a wildcard in the request.addField() method.  Real applications tend to be more specific at this point.

Request Facets

If we want the search to return facet lists, we must request them at this point.

StreamSearchTest.java
            // Request facet buckets for specific fields.
            request.addFacet(new FacetRequest("language"));
            request.addFacet(new FacetRequest("country"));
            request.addFacet(new FacetRequest("location"));

Create a Streaming Query Request

The next step is to create a StreamingQueryRequest .  Note that the request is bundled into this step as a parameter of the StreamingQueryRequest. The other parameter is the DocumentStreamingMode, which is this case is FULL_DOCUMENTS..


StreamQueryTest.java
            // Create a streaming request
            StreamingQueryRequest sRequest = 
                    new StreamingQueryRequest(request,DocumentStreamingMode.FULL_DOCUMENTS);

Enable Streaming Facet Collection

It is necessary to explicitly enable facet reporting on the StreamingQueryRequest. 

StreamQueryTest.java
            // Enable streaming facet collection
            sRequest.setStreamFacets(true);

If you omit this step and then try to display facets, you'll get an IO error that says, "Facet Stream not available: state=DONE”. 

Execute the Query (StreamingQueryResponse)

Execute the query by calling the searchClient.search() method, using sRequest as the parameter.  This returns a StreamingQueryResponse  object  


StreamSearchTest.java
            // Create a streaming response object
            sResponse = searchClient.search(sRequest);

 

Display Returned Documents

The next step is to iterate over the matching documents. The sResponse.getDocuments() method provides a constantly-renewed buffer of streaming documents.  From the client side it looks as if we were processing a single SearchDocument.

StreamQueryTest.java
            // Iterate through documents
            for (SearchDocument document : sResponse.getDocuments()) {
                System.out.println("");
                // Print document ID
                System.out.println("document ID: " + document.getId());

Each block of document output begins with the document ID number, as shown above.

Next is the loop that processes the fields within the document:

StreamQueryTest.java
                // Iterate through fields
                for (SearchField field : document) {

                    // Print field name
                    System.out.print("  " + field.getName());

This is followed by the nested loop that prints out the values of the field. Most fields are single-valued, but some are multi-valued. The logic has to check and branch to handle the two situations.

StreamQueryTest.java
                    // Check field value count
                    if (field.size() == 1) {
                        // Print single value
                        System.out.println(": " + field.getFirstValue()); //Was getDisplayValue()
                    } else {
                        // Print multiple values
                        int numFieldValues = field.size(); //Was field.getValues().toArray().length
                        System.out.print(" {" + numFieldValues + " values}:");
                        ListIterator<SearchFieldValue> itr = field.iterator();
                        while(itr.hasNext()) {
                            Object element = itr.next();
                            System.out.print(element + " ");
                        }
                        System.out.println("");
                    }
                }
            }

Display Facets

Facet output cannot be accessed until all documents have been processed.  At that point we can iterate over a stream of facet buckets. Note that sResponse.getFacetBuckets() returns individual buckets (a facet value and a count). Each bucket knows which facet it belongs to, and all of a facet's buckets arrive in a block, so it is pretty easy to separate the buckets into facet lists.

The code below processes the facet buckets by formatting them as output to System.out.println().

            // Iterate through facets and buckets
            // Note that facets cannot be retrieved until all documents
            // have been processed. 
            for (StreamingFacetResponse facet : sResponse.getFacets()) {
                FacetResponse headerInfo = facet.getFacetResponse();
                System.out.println("Facet: "+headerInfo.getField());
                for (FacetBucket bucket : facet) {
                    System.out.println(" " + bucket.getDisplayValue() + " " + bucket.getCount());
                }
            }

The code above prints out the name of a facet (country) and then lists its buckets (Canada 1) below it.

Get the QueryResponse

Retrieve the QueryResponse  message using sResponse.getQueryResponse(). This method should always be invoked when using streaming queries in order to ensure that resources are cleaned up after the query. It is not available until all of the documents and all of the facet buckets have been processed. 


In this case we have retrieved the QueryResponse in order to report how many documents were matched by the query.

StreamQueryTest.java
                // All documents and facets must be processed before the
                // QueryResponse becomes available. 

                // getQueryRespone() is required to clean up resources.
                QueryResponse queryResponse = sResponse.getQueryResponse();
                System.out.println("");
                System.out.println("Total documents matched: " +
                        queryResponse.getDocuments().getNumberFound());
                sResponse.close();

Catch Exceptions

We have to include some error-catching boilerplate at the end. This independent Java application might have to handle an AttivioException generated by the Java Client library. Note that this application is not a part of AIE, however, and therefore should not throw an AttivioException.

StreamQueryTest.java
    } catch (AttivioException e) {
      System.err.println("Category: " + e.getErrorCode().getCategory());
      System.err.println("Code: " + e.getErrorCode().getCode());
      e.printStackTrace();
    }

Finally Close the Stream

It is essential that the application explicitly close the query stream. Otherwise resources will be allocated but never freed.

StreamQueryTest.java
         } finally {
            // Close the stream to clean up resources
            sResponse.close();
        }
    }
}

 

Compile the Class

To compile StreamQueryTest in Designer, select the project in the Project Explorer. Click the Project menu, and select Clean. Then select Project > Build Project.

Run the Program

To run SearchQueryTest from Designer, we'll have to create a Run Configuration for it.  

  1. In Designer, navigate from the Run menu to Run Configurations.  This opens the Run Configurations dialog box.
  2. Right-click Java Applications and select New
  3. Create a new StreamQueryTest run application in project plusjava.
  4. On the Arguments Tab, paste in the following program arguments. (The query workflow is "search".  Locate all documents that mention "London".)

    search London
  5. On the Arguments Tab, paste in the following VM arguments. (Edit the file path to suit your situation.)

    -Dattivio.log.printStackTraces=true -Dattivio.log.level=INFO 
    -Dattivio.log.directory="C:\attivio-projects\plusjava\build\logs"
  6. Apply the changes.   Close the dialog box.
 

Example Search Run

AIE must be running!

It should go without saying that the AIE server should be up and running before you attempt to run SearchQueryTest.

To run the application, use the Designer Run menu.  Select the Run command.

The parameters are the search workflow and the keyword "London".  If the target AIE project is a FactBook demo with the city feed loaded, our query should return multiple cities.  This is one of them:

Command Line
document ID: city-CITY-Canada-London-Ontario
  .score: 0.0
  .id: city-CITY-Canada-London-Ontario
  .zone: default
  title: London
  teaser: Ontario
  language: English
  languages: English
  size: 303165
  table: city
  date: Mon Jan 11 19:43:52 PST 2016
  text: Ontario
  position: -81.2462,42.9869
  latitude: 42.986895
  longitude: -81.246214
  sourcepath: C:\attivio51\conf\factbook\content\cities.csv
  filename: cities.csv
  country: Canada
  sentiment: neg
  sentiment.score: 0.6017876
  security.read: anonymous:anonymous

Following the document output comes the facet output:

Command Line
Facet: language
 English 3
Facet: country
 Canada 1
 South Africa 1
 United Kingdom 1
Facet: sentiment
 neg 1
 pos 2
Total documents matched: 3

Here we see the facet buckets grouped by facet, with their counts.  Finally, there is the document count from the QueryResponse object.