Query Class — Query

Query

The query object created by calling aerospike.query() is used for executing queries over a secondary index of a specified set (which can be omitted or None). For queries, the None set contains those records which are not part of any named set.

The query can (optionally) be assigned one of the following

A query without a predicate will match all the records in the given set, similar to a Scan.

The query is invoked using foreach(), results(), or execute_background() The bins returned can be filtered by using select().

If a list of write operations is added to the query with add_ops(), they will be applied to each record processed by the query. See available write operations at See aerospike_helpers

Finally, a stream UDF may be applied with apply(). It will aggregate results out of the records streaming back from the query.

See also

Queries and Managing Queries.

Query Fields and Methods

class aerospike.Query

Fields

Fieldname max_records

int Approximate number of records to return to client. This number is divided by the number of nodes involved in the scan. The actual number of records returned may be less than max_records if node record counts are small and unbalanced across nodes. Requires server version >= 6.0.0

Default: 0 (No Limit).

Fieldname records_per_second

int Limit the scan to process records at records_per_second. Requires server version >= 6.0.0

Default: 0 (no limit).

Note

Version >= 5.0.0 Supports aerrospike expressions for results, foreach, and execute_background see aerospike_helpers.expressions package. Requires server version >= 5.2.0.

import aerospike
from aerospike_helpers import expressions as exp
from aerospike import exception as ex
import sys
import time

config = {"hosts": [("127.0.0.1", 3000)]}
client = aerospike.client(config).connect()

# register udf
try:
    client.udf_put(
        "/path/to/my_udf.lua"
    )
except ex.AerospikeError as e:
    print("Error: {0} [{1}]".format(e.msg, e.code))
    client.close()
    sys.exit(1)

# put records and apply udf
try:
    keys = [("test", "demo", 1), ("test", "demo", 2), ("test", "demo", 3)]
    records = [{"number": 1}, {"number": 2}, {"number": 3}]
    for i in range(3):
        client.put(keys[i], records[i])

    try:
        client.index_integer_create("test", "demo", "number", "test_demo_number_idx")
    except ex.IndexFoundError:
        pass

    query = client.query("test", "demo")
    query.apply("my_udf", "my_udf", ["number", 10])

    # only affect records with "number" bin greater than 1
    expr = exp.GT(exp.IntBin("number"), 1).compile()
    policy = {"expressions": expr}

    job_id = query.execute_background(policy)

    # wait for job to finish
    while True:
        response = client.job_info(job_id, aerospike.JOB_SCAN)
        print(response)
        if response["status"] != aerospike.JOB_STATUS_INPROGRESS:
            break
        time.sleep(0.25)

    records = client.get_many(keys)
    print(records)
except ex.AerospikeError as e:
    print("Error: {0} [{1}]".format(e.msg, e.code))
    sys.exit(1)
finally:
    client.close()
# EXPECTED OUTPUT:
# [
#   (('test', 'demo', 1, bytearray(b'\xb7\xf4\xb88\x89\xe2\xdag\xdeh>\x1d\xf6\x91\x9a\x1e\xac\xc4F\xc8')), {'gen': 2, 'ttl': 2591999}, {'number': 1}),
#   (('test', 'demo', 2, bytearray(b'\xaejQ_7\xdeJ\xda\xccD\x96\xe2\xda\x1f\xea\x84\x8c:\x92p')), {'gen': 12, 'ttl': 2591999}, {'number': 12}),
#   (('test', 'demo', 3, bytearray(b'\xb1\xa5`g\xf6\xd4\xa8\xa4D9\xd3\xafb\xbf\xf8ha\x01\x94\xcd')), {'gen': 13, 'ttl': 2591999}, {'number': 13})
# ]
# contents of my_udf.lua
function my_udf(rec, bin, offset)
    info("my transform: %s", tostring(record.digest(rec)))
    rec[bin] = rec[bin] + offset
    aerospike:update(rec)
end

Note

For a similar example using .results() see aerospike.Scan.results().

Methods

select(bin1[, bin2[, bin3..]])

Set a filter on the record bins resulting from results() or foreach(). If a selected bin does not exist in a record it will not appear in the bins portion of that record tuple.

where(predicate)

Set a where predicate for the query, without which the query will behave similar to aerospike.Scan. The predicate is produced by one of the aerospike.predicates methods equals() and between().

Parameters

predicate (tuple) – the tuple() produced by one of the aerospike.predicates methods.

Note

Currently, you can assign at most one predicate to the query.

results([,policy [, options]]) -> list of (key, meta, bins)

Buffer the records resulting from the query, and return them as a list of records.

Parameters
Returns

a list of Record Tuple.

import aerospike
from aerospike import predicates as p
import pprint

config = { 'hosts': [ ('127.0.0.1', 3000)]}
client = aerospike.client(config).connect()

pp = pprint.PrettyPrinter(indent=2)
query = client.query('test', 'demo')
query.select('name', 'age') # matched records return with the values of these bins

# assuming there is a secondary index on the 'age' bin of test.demo
query.where(p.equals('age', 40))

records = query.results( {'total_timeout':2000})

pp.pprint(records)
client.close()

Note

As of client 7.0.0 and with server >= 6.0 results and the query policy “partition_filter” see Partition Objects can be used to specify which partitions/records results will query. See the example below.

# This is an example of querying partitions 1000 - 1003.
import aerospike


query = client.query("test", "demo")

policy = {
    "partition_filter": {
        "begin": 1000,
        "count": 4
    },
}

# NOTE that these will only be non 0 if there are records in partitions 1000 - 1003 # results will be the records in partitions 1000 - 1003 results = query.results(policy=policy)

foreach(callback[, policy[, options]])

Invoke the callback function for each of the records streaming back from the query.

Parameters

Note

A Record Tuple is passed as the argument to the callback function. If the query is using the “partition_filter” query policy the callback will recieve two arguments The first is a int representing partition id, the second is the same Record Tuple as a normal callback.

import aerospike
from aerospike import predicates as p
import pprint

config = { 'hosts': [ ('127.0.0.1', 3000)]}
client = aerospike.client(config).connect()

pp = pprint.PrettyPrinter(indent=2)
query = client.query('test', 'demo')
query.select('name', 'age') # matched records return with the values of these bins
# assuming there is a secondary index on the 'age' bin of test.demo
query.where(p.between('age', 20, 30))
names = []
def matched_names(record):
    key, metadata, bins = record
    pp.pprint(bins)
    names.append(bins['name'])

query.foreach(matched_names, {'total_timeout':2000})
pp.pprint(names)
client.close()

Note

To stop the stream return False from the callback function.

import aerospike
from aerospike import predicates as p

config = { 'hosts': [ ('127.0.0.1',3000)]}
client = aerospike.client(config).connect()

def limit(lim, result):
    c = [0] # integers are immutable so a list (mutable) is used for the counter
    def key_add(record):
        key, metadata, bins = record
        if c[0] < lim:
            result.append(key)
            c[0] = c[0] + 1
        else:
            return False
    return key_add

query = client.query('test','user')
query.where(p.between('age', 20, 30))
keys = []
query.foreach(limit(100, keys))
print(len(keys)) # this will be 100 if the number of matching records > 100
client.close()

Note

As of client 7.0.0 and with server >= 6.0 foreach and the query policy “partition_filter” see Partition Objects can be used to specify which partitions/records foreach will query. See the example below.

# This is an example of querying partitions 1000 - 1003.
import aerospike


partitions = []

def callback(part_id, input_tuple):
    print(part_id)
    partitions.append(part_id)

query = client.query("test", "demo")

policy = {
    "partition_filter": {
        "begin": 1000,
        "count": 4
    },
}

query.foreach(callback, policy)


# NOTE that these will only be non 0 if there are records in partitions 1000 - 1003
# should be 4
print(len(partitions))

# should be [1000, 1001, 1002, 1003]
print(partitions)
apply(module, function[, arguments])

Aggregate the results() using a stream UDF. If no predicate is attached to the Query the stream UDF will aggregate over all the records in the specified set.

Parameters
  • module (str) – the name of the Lua module.

  • function (str) – the name of the Lua function within the module.

  • arguments (list) – optional arguments to pass to the function. NOTE: these arguments must be types supported by Aerospike See: supported data types. If you need to use an unsuported type, (e.g. set or tuple) you can use a serializer like pickle first.

Returns

one of the supported types, int, str, float (double), list, dict (map), bytearray (bytes), bool.

Note

Assume we registered the following Lua module with the cluster as stream_udf.lua using aerospike.udf_put().

local function having_ge_threshold(bin_having, ge_threshold)
    return function(rec)
        debug("group_count::thresh_filter: %s >  %s ?", tostring(rec[bin_having]), tostring(ge_threshold))
        if rec[bin_having] < ge_threshold then
            return false
        end
        return true
    end
end

local function count(group_by_bin)
  return function(group, rec)
    if rec[group_by_bin] then
      local bin_name = rec[group_by_bin]
      group[bin_name] = (group[bin_name] or 0) + 1
      debug("group_count::count: bin %s has value %s which has the count of %s", tostring(bin_name), tostring(group[bin_name]))
    end
    return group
  end
end

local function add_values(val1, val2)
  return val1 + val2
end

local function reduce_groups(a, b)
  return map.merge(a, b, add_values)
end

function group_count(stream, group_by_bin, bin_having, ge_threshold)
  if bin_having and ge_threshold then
    local myfilter = having_ge_threshold(bin_having, ge_threshold)
    return stream : filter(myfilter) : aggregate(map{}, count(group_by_bin)) : reduce(reduce_groups)
  else
    return stream : aggregate(map{}, count(group_by_bin)) : reduce(reduce_groups)
  end
end

Find the first name distribution of users in their twenties using a query aggregation:

import aerospike
from aerospike import predicates as p
import pprint

config = {'hosts': [('127.0.0.1', 3000)],
          'lua': {'system_path':'/usr/local/aerospike/lua/',
                  'user_path':'/usr/local/aerospike/usr-lua/'}}
client = aerospike.client(config).connect()

pp = pprint.PrettyPrinter(indent=2)
query = client.query('test', 'users')
query.where(p.between('age', 20, 29))
query.apply('stream_udf', 'group_count', [ 'first_name' ])
names = query.results()
# we expect a dict (map) whose keys are names, each with a count value
pp.pprint(names)
client.close()

With stream UDFs, the final reduce steps (which ties the results from the reducers of the cluster nodes) executes on the client-side. Explicitly setting the Lua user_path in the config helps the client find the local copy of the module containing the stream UDF. The system_path is constructed when the Python package is installed, and contains system modules such as aerospike.lua, as.lua, and stream_ops.lua. The default value for the Lua system_path is /usr/local/aerospike/lua.

add_ops(ops)

Add a list of write ops to the query. When used with Query.execute_background() the query will perform the write ops on any records found. If no predicate is attached to the Query it will apply ops to all the records in the specified set.

Parameters

opslist A list of write operations generated by the aerospike_helpers e.g. list_operations, map_operations, etc.

Note

Requires server version >= 4.7.0.

import aerospike
from aerospike_helpers.operations import list_operations
from aerospike_helpers.operations import operations
query = client.query('test', 'demo')

ops =  [
    operations.append(test_bin, 'val_to_append'),
    list_operations.list_remove_by_index(test_bin, list_index_to_remove, aerospike.LIST_RETURN_NONE)
]
query.add_ops(ops)

id = query.execute_background()
client.close()

For a more comprehensive example, see using a list of write ops with Query.execute_background() .

execute_background([policy])

Execute a record UDF or write operations on records found by the query in the background. This method returns before the query has completed. A UDF or a list of write operations must have been added to the query with Query.apply() or Query.add_ops() respectively.

Parameters

policy (dict) – optional Write Policies.

Returns

a job ID that can be used with aerospike.job_info() to track the status of the aerospike.JOB_QUERY , as it runs in the background.

# Using a record UDF
import aerospike
query = client.query('test', 'demo')
query.apply('myudfs', 'myfunction', ['a', 1])
query_id = query.execute_background()
# This id can be used to monitor the progress of the background query
# Using a list of write ops.
import aerospike
from aerospike import predicates
from aerospike import exception as ex
from aerospike_helpers.operations import list_operations
import sys
import time

# Configure the client.
config = {"hosts": [("127.0.0.1", 3000)]}

# Create a client and connect it to the cluster.
try:
    client = aerospike.client(config).connect()
except ex.ClientError as e:
    print("Error: {0} [{1}]".format(e.msg, e.code))
    sys.exit(1)

TEST_NS = "test"
TEST_SET = "demo"
nested_list = [{"name": "John", "id": 100}, {"name": "Bill", "id": 200}]
# Write the records.
try:
    keys = [(TEST_NS, TEST_SET, i) for i in range(5)]
    for i, key in enumerate(keys):
        client.put(key, {"account_number": i, "members": nested_list})
except ex.RecordError as e:
    print("Error: {0} [{1}]".format(e.msg, e.code))

# EXAMPLE 1: Append a new account member to all accounts.
try:
    new_member = {"name": "Cindy", "id": 300}

    ops = [list_operations.list_append("members", new_member)]

    query = client.query(TEST_NS, TEST_SET)
    query.add_ops(ops)
    id = query.execute_background()
    # allow for query to complete
    time.sleep(3)
    print("EXAMPLE 1")

    for i, key in enumerate(keys):
        _, _, bins = client.get(key)
        print(bins)
except ex.ClientError as e:
    print("Error: {0} [{1}]".format(e.msg, e.code))
    sys.exit(1)

# EXAMPLE 2: Remove a member from a specific account using predicates.
try:
    # Add index to the records for use with predex.
    client.index_integer_create(
        TEST_NS, TEST_SET, "account_number", "test_demo_account_number_idx"
    )

    ops = [
        list_operations.list_remove_by_index("members", 0, aerospike.LIST_RETURN_NONE)
    ]

    query = client.query(TEST_NS, TEST_SET)
    number_predicate = predicates.equals("account_number", 3)
    query.where(number_predicate)
    query.add_ops(ops)
    id = query.execute_background()
    # allow for query to complete
    time.sleep(3)
    print("EXAMPLE 2")

    for i, key in enumerate(keys):
        _, _, bins = client.get(key)
        print(bins)
except ex.ClientError as e:
    print("Error: {0} [{1}]".format(e.msg, e.code))
    sys.exit(1)

# Cleanup and close the connection to the Aerospike cluster.
for i, key in enumerate(keys):
    client.remove(key)
client.index_remove(TEST_NS, "test_demo_account_number_idx")
client.close()

"""
EXPECTED OUTPUT:
EXAMPLE 1
{'account_number': 0, 'members': [{'name': 'John', 'id': 100}, {'name': 'Bill', 'id': 200}, {'name': 'Cindy', 'id': 300}]}
{'account_number': 1, 'members': [{'name': 'John', 'id': 100}, {'name': 'Bill', 'id': 200}, {'name': 'Cindy', 'id': 300}]}
{'account_number': 2, 'members': [{'name': 'John', 'id': 100}, {'name': 'Bill', 'id': 200}, {'name': 'Cindy', 'id': 300}]}
{'account_number': 3, 'members': [{'name': 'John', 'id': 100}, {'name': 'Bill', 'id': 200}, {'name': 'Cindy', 'id': 300}]}
{'account_number': 4, 'members': [{'name': 'John', 'id': 100}, {'name': 'Bill', 'id': 200}, {'name': 'Cindy', 'id': 300}]}
EXAMPLE 2
{'account_number': 0, 'members': [{'name': 'John', 'id': 100}, {'name': 'Bill', 'id': 200}, {'name': 'Cindy', 'id': 300}]}
{'account_number': 1, 'members': [{'name': 'John', 'id': 100}, {'name': 'Bill', 'id': 200}, {'name': 'Cindy', 'id': 300}]}
{'account_number': 2, 'members': [{'name': 'John', 'id': 100}, {'name': 'Bill', 'id': 200}, {'name': 'Cindy', 'id': 300}]}
{'account_number': 3, 'members': [{'name': 'Bill', 'id': 200}, {'name': 'Cindy', 'id': 300}]}
{'account_number': 4, 'members': [{'name': 'John', 'id': 100}, {'name': 'Bill', 'id': 200}, {'name': 'Cindy', 'id': 300}]}
"""
paginate()

Makes a query instance a paginated query. Call this if you are using the max_records and you need to query data in pages.

Note

Calling .paginate() on a query instance causes it to save its partition state. This can be retrieved later using .get_partitions_status(). This can also been done by using the partition_filter policy.

# Query 3 pages of 1000 records each.

import aerospike

pages = 3
page_size = 1000

query = client.query('test', 'demo')
query.max_records = 1000

query.paginate()

# NOTE: The number of pages queried and records returned per page can differ
# if record counts are small or unbalanced across nodes.
for page in range(pages):
    records = query.results()

    print("got page: " + str(page))

    if query.is_done():
        print("all done")
        break

# This id can be used to paginate queries.
is_done()

If using query pagination, did the previous paginated or partition_filter query using this query instance return all records?

Returns

A bool signifying whether this paginated query instance has returned all records.

import aerospike

query = client.query('test', 'demo')
query.max_records = 1000

query.paginate()

records = query.results(policy=policy)

if query.is_done():
    print("all done")

# This id can be used to monitor the progress of a paginated query.
get_partitions_status()

Get this query instance’s partition status. That is which partitions have been queried and which have not. The returned value is a dict with partition id, int, as keys and tuple as values. If the query instance is not tracking its partitions, the returned dict will be empty.

Note

A query instance must have had .paginate() called on it, or been used with a partition filter, in order retrieve its partition status. If .paginate() was not called, or partition_filter was not used, the query instance will not save partition status.

Returns

a tuple of form (id: int, init: class`bool`, done: class`bool`, digest: bytearray). See Partition Objects for more information.

# This is an example of resuming a query using partition status.
import aerospike


for i in range(15):
    key = ("test", "demo", i)
    bins = {"id": i}
    client.put(key, bins)

records = []
resumed_records = []

def callback(input_tuple):
    record, _, _ = input_tuple

    if len(records) == 5:
        return False

    records.append(record)

query = client.query("test", "demo")
query.paginate()

query.foreach(callback)

# The first query should stop after 5 records.
assert len(records) == 5

partition_status = query.get_partitions_status()

def resume_callback(part_id, input_tuple):
    record, _, _ = input_tuple
    resumed_records.append(record)

query_resume = client.query("test", "demo")

policy = {
    "partition_filter": {
        "partition_status": partition_status
    },
}

query_resume.foreach(resume_callback, policy)

# should be 15
total_records = len(records) + len(resumed_records)
print(total_records)

# cleanup
for i in range(15):
    key = ("test", "demo", i)
    client.remove(key)

Query Policies

policy

A dict of optional query policies which are applicable to Query.results() and Query.foreach(). See Policy Options.

  • max_retries int
    Maximum number of retries before aborting the current transaction. The initial attempt is not counted as a retry.

    If max_retries is exceeded, the transaction will return error AEROSPIKE_ERR_TIMEOUT.

    Default: 0

    Warning

    : Database writes that are not idempotent (such as “add”) should not be retried because the write operation may be performed multiple times if the client timed out previous transaction attempts. It’s important to use a distinct write policy for non-idempotent writes which sets max_retries = 0;

  • sleep_between_retries int
    Milliseconds to sleep between retries. Enter 0 to skip sleep.

    Default: 0
  • socket_timeout int
    Socket idle timeout in milliseconds when processing a database command.

    If socket_timeout is not 0 and the socket has been idle for at least socket_timeout, both max_retries and total_timeout are checked. If max_retries and total_timeout are not exceeded, the transaction is retried.

    If both socket_timeout and total_timeout are non-zero and socket_timeout > total_timeout, then socket_timeout will be set to total_timeout. If socket_timeout is 0, there will be no socket idle limit.

    Default: 30000.
  • total_timeout int
    Total transaction timeout in milliseconds.

    The total_timeout is tracked on the client and sent to the server along with the transaction in the wire protocol. The client will most likely timeout first, but the server also has the capability to timeout the transaction.

    If total_timeout is not 0 total_timeout is reached before the transaction completes, the transaction will return error AEROSPIKE_ERR_TIMEOUT. If total_timeout is 0, there will be no total time limit.

    Default: 0
  • compress (bool)
    Compress client requests and server responses.

    Use zlib compression on write or batch read commands when the command buffer size is greater than 128 bytes. In addition, tell the server to compress it’s response on read commands. The server response compression threshold is also 128 bytes.

    This option will increase cpu and memory usage (for extra compressed buffers), but decrease the size of data sent over the network.

    Default: False
  • deserialize bool
    Should raw bytes representing a list or map be deserialized to a list or dictionary.
    Set to False for backup programs that just need access to raw bytes.

    Default: True
  • fail_on_cluster_change bool
    Deprecated in 6.0.0. No longer has any effect..
    Terminate query if cluster is in migration state.

    Default False
  • short_query bool
    Is query expected to return less than 100 records.
    If True, the server will optimize the query for a small record set.
    This field is ignored for aggregation queries, background queries
    and server versions less than 6.0.0.

    Mututally exclusive with records_per_second
    Default: False
  • expressions list
    Compiled aerospike expressions aerospike_helpers used for filtering records within a transaction.

    Default: None

    Note

    Requires Aerospike server version >= 5.2.

  • partition_filter dict
    A dictionary of partition information used by the client
    to perform partiton queries. Useful for resuming terminated queries and
    querying particular partitons/records.

    See Partition Objects for more information.

    Default: {} (All partitions will be queried).

    Note

    Requires Aerospike server version >= 6.0

Query Options

options

A dict of optional query options which are applicable to Query.foreach() and Query.results().

  • nobins bool
    Whether to return the bins portion of the Record Tuple.

    Default False.

New in version 3.0.0.