aerospike.Query — Query Class

Overview

Constructing A Query

The query object is used for executing queries over a secondary index of a specified set. It can be created by calling aerospike.Client.query().

A query without a secondary index filter will apply to all records in the namespace, similar to a Scan.

Otherwise, the query can optionally be assigned one of the secondary index filters in aerospike.predicates to filter out records using their bin values. These secondary index filters are applied to the query using where(). In this case, if the set is initialized to None, then the query will only apply to records without a set.

Note

The secondary index filters in aerospike.predicates are not the same as the deprecated predicate expressions. For more details, read this guide.

Writing Using Query

If a list of write operations is added to the query with add_ops(), they will be applied to each record processed by the query. See available write operations at aerospike_helpers.operations.

Query Aggregations

A stream UDF may be applied with apply(). It will aggregate results out of the records streaming back from the query.

Getting Results From Query

The returned bins can be filtered by using select().

Finally, the query is invoked using one of these methods:

See also

Queries and Managing Queries.

Fields

class aerospike.Query
max_records (int)

Approximate number of records to return to client.

This number is divided by the number of nodes involved in the scan. The actual number of records returned may be less than max_records if node record counts are small and unbalanced across nodes.

Default: 0 (no limit)

Note

Requires server version >= 6.0.0

records_per_second (int)

Limit the scan to process records at records_per_second. Requires server version >= 6.0.0

Default: 0 (no limit)

ttl (int)

The time-to-live (expiration) of the record in seconds. If set to aerospike.TTL_CLIENT_DEFAULT, use the client’s default write policy ttl.

See TTL Constants for more possible special values.

Note

Note that the TTL value will be employed ONLY on background query writes.

Requires server version >= 6.0.0

Default: 0 (record will adopt the default TTL value from the namespace)

Methods

Assume this boilerplate code is run before all examples below:

import aerospike
import sys
from aerospike import exception as ex

config = {'hosts': [('127.0.0.1', 3000)]}
client = aerospike.client(config).connect()

# Create a client and connect it to the cluster
try:
    client = aerospike.client(config).connect()
    client.truncate('test', "demo", 0)
except ex.ClientError as e:
    print("Error: {0} [{1}]".format(e.msg, e.code))
    sys.exit(1)

# Remove old indices
try:
    client.index_remove("test", "scoreIndex")
    client.index_remove("test", "eloIndex")
except ex.AerospikeError as e:
    # Ignore if no indices found
    pass

# Insert 4 records
keyTuples = [("test", "demo", f"player{i}") for i in range(4)]
bins = [
    {"score": 100, "elo": 1400},
    {"score": 20, "elo": 1500},
    {"score": 10, "elo": 1100},
    {"score": 200, "elo": 900}
]
for keyTuple, bin in zip(keyTuples, bins):
    client.put(keyTuple, bin)

query = client.query('test', 'demo')

# Queries require a secondary index for each bin name
client.index_integer_create("test", "demo", "score", "scoreIndex")
client.index_integer_create("test", "demo", "elo", "eloIndex")
class aerospike.Query
select(bin1[, bin2[, bin3..]])

Set a filter on the record bins resulting from results() or foreach().

If a selected bin does not exist in a record it will not appear in the bins portion of that record tuple.

where(predicate[, ctx])

Set a where predicate for the query.

You can only assign at most one predicate to the query. If this function isn’t called, the query will behave similar to aerospike.Scan.

Parameters:
results([,policy [, options]]) -> list of (key, meta, bins)

Buffer the records resulting from the query, and return them as a list of records.

Parameters:
Returns:

a list of Record Tuple.

from aerospike import predicates

query.select('score')
query.where(predicates.equals('score', 100))

records = query.results()
# Matches one record
print(records)
# [(('test', 'demo', None, bytearray(b'...')), {'ttl': 2592000, 'gen': 1}, {'score': 100})]

Note

As of client 7.0.0 and with server >= 6.0 results and the query policy “partition_filter” see Partition Objects can be used to specify which partitions/records results will query. See the example below.

# This is an example of querying partitions 1000 - 1003.
import aerospike


query = client.query("test", "demo")

policy = {
    "partition_filter": {
        "begin": 1000,
        "count": 4
    },
}

# NOTE that these will only be non 0 if there are records in partitions 1000 - 1003 # results will be the records in partitions 1000 - 1003 results = query.results(policy=policy)

foreach(callback[, policy[, options]])

Invoke the callback function for each of the records streaming back from the query.

A Record Tuple is passed as the argument to the callback function. If the query is using the “partition_filter” query policy the callback will receive two arguments The first is a int representing partition id, the second is the same Record Tuple as a normal callback.

Parameters:
  • callback (callable) – the function to invoke for each record.

  • policy (dict) – optional Policies.

  • options (dict) – optional Options.

# Callback function
# Calculates new elo for a player
def updateElo(record):
    keyTuple, _, bins = record
    # Add score to elo
    bins["elo"] = bins["elo"] + bins["score"]
    client.put(keyTuple, bins)

query.foreach(updateElo)

# Player elos should be updated
records = client.get_many(keyTuples)
for _, _, bins in records:
    print(bins)
# {'score': 100, 'elo': 1500}
# {'score': 20, 'elo': 1520}
# {'score': 10, 'elo': 1110}
# {'score': 200, 'elo': 1100}

Note

To stop the stream return False from the callback function.

# Adds record keys from a stream to a list
# But limits the number of keys to "lim"
def limit(lim: int, result: list):
    # Integers are immutable
    # so a list (mutable) is used for the counter
    c = [0]
    def key_add(record):
        key, metadata, bins = record
        if c[0] < lim:
            result.append(key)
            c[0] = c[0] + 1
        else:
            return False
    return key_add

from aerospike import predicates as p

keys = []
query.foreach(limit(2, keys))
print(len(keys)) # 2

Note

As of client 7.0.0 and with server >= 6.0 foreach and the query policy “partition_filter” see Partition Objects can be used to specify which partitions/records foreach will query. See the example below.

# This is an example of querying partitions 1000 - 1003.
import aerospike


partitions = []

def callback(part_id, input_tuple):
    print(part_id)
    partitions.append(part_id)

query = client.query("test", "demo")

policy = {
    "partition_filter": {
        "begin": 1000,
        "count": 4
    },
}

query.foreach(callback, policy)


# NOTE that these will only be non 0 if there are records in partitions 1000 - 1003
# should be 4
print(len(partitions))

# should be [1000, 1001, 1002, 1003]
print(partitions)
apply(module, function[, arguments])

Aggregate the results() using a stream UDF. If no predicate is attached to the Query the stream UDF will aggregate over all the records in the specified set.

Parameters:
  • module (str) – the name of the Lua module.

  • function (str) – the name of the Lua function within the module.

  • arguments (list) – optional arguments to pass to the function. NOTE: these arguments must be types supported by Aerospike See: supported data types. If you need to use an unsupported type, (e.g. set or tuple) you must use your own serializer.

Example: find the first name distribution of users who are 21 or older using a query aggregation:

-- Filter function
-- Filters records with a bin value >= a threshold
local function is_greater_than_or_equal(binname, threshold)
    return function(rec)
        if rec[binname] < threshold then
            return false
        end
        return true
    end
end

-- Creates an aggregate function that counts the number of times a specific bin value is found
local function count(bin_name)
    return function(counts_map, rec)
        -- Does record have that specific bin?
        if rec[bin_name] then
            -- Account for that bin value
            local bin_value = rec[bin_name]
            counts_map[bin_value] = (counts_map[bin_value] or 0) + 1
        end
        -- No changes to bin value counts
        return counts_map
    end
end

-- Helper function for reduce
local function add_values(val1, val2)
    return val1 + val2
end

-- Combines count maps into one
-- Need this function when the database runs multiple aggregations in parallel
local function reduce_groups(a, b)
    return map.merge(a, b, add_values)
end

-- First filter records with a bin binname that has value >= threshold (if those arguments are passed in)
-- Then count the number of times a value in bin "binname_to_group" is found
function group_count(stream, binname_to_group, binname, threshold)
    if binname and threshold then
        local filter = is_greater_than_or_equal(binname, threshold)
        return stream : filter(filter) : aggregate(map{}, count(binname_to_group)) : reduce(reduce_groups)
    else
        -- Don't filter records in this case
        return stream : aggregate(map{}, count(binname_to_group)) : reduce(reduce_groups)
    end
end

Assume the example code above is in a file called “example.lua”, and is the same folder as the following script.

import aerospike

config = {'hosts': [('127.0.0.1', 3000)],
            'lua': {'system_path':'/usr/local/aerospike/lua/',
                    'user_path':'./'}}
client = aerospike.client(config).connect()
client.udf_put("example.lua")

# Remove index if it already exists
from aerospike import exception as ex
try:
    client.index_remove("test", "ageIndex")
except ex.IndexNotFound:
    pass

bins = [
    {"name": "Jeff", "age": 20},
    {"name": "Derek", "age": 24},
    {"name": "Derek", "age": 21},
    {"name": "Derek", "age": 29},
    {"name": "Jeff", "age": 29},
]
keys = [("test", "users", f"user{i}") for i in range(len(bins))]
for key, recordBins in zip(keys, bins):
    client.put(key, recordBins)

client.index_integer_create("test", "users", "age", "ageIndex")

query = client.query('test', 'users')
query.apply('example', 'group_count', ['name', 'age', 21])
names = query.results()

# we expect a dict (map) whose keys are names, each with a count value
print(names)
# One of the Jeffs is excluded because he is under 21
# [{'Derek': 3, 'Jeff': 1}]

# Cleanup
client.index_remove("test", "ageIndex")
client.batch_remove(keys)
client.close()

With stream UDFs, the final reduce steps (which ties the results from the reducers of the cluster nodes) executes on the client-side. Explicitly setting the Lua user_path in the config helps the client find the local copy of the module containing the stream UDF. The system_path is constructed when the Python package is installed, and contains system modules such as aerospike.lua, as.lua, and stream_ops.lua. The default value for the Lua system_path is /usr/local/aerospike/lua.

add_ops(ops)

Add a list of write ops to the query. When used with Query.execute_background() the query will perform the write ops on any records found. If no predicate is attached to the Query it will apply ops to all the records in the specified set.

Parameters:

opslist A list of write operations generated by the aerospike_helpers e.g. list_operations, map_operations, etc.

Note

Requires server version >= 4.7.0.

execute_background([policy])

Execute a record UDF or write operations on records found by the query in the background. This method returns before the query has completed. A UDF or a list of write operations must have been added to the query with Query.apply() or Query.add_ops() respectively.

Parameters:

policy (dict) – optional Write Policies.

Returns:

a job ID that can be used with job_info() to track the status of the aerospike.JOB_QUERY , as it runs in the background.

# EXAMPLE 1: Increase everyone's score by 100

from aerospike_helpers.operations import operations
ops = [
    operations.increment("score", 100)
]
query.add_ops(ops)
id = query.execute_background()

# Allow time for query to complete
import time
time.sleep(3)

for key in keyTuples:
    _, _, bins = client.get(key)
    print(bins)
# {"score": 200, "elo": 1400}
# {"score": 120, "elo": 1500}
# {"score": 110, "elo": 1100}
# {"score": 300, "elo": 900}

# EXAMPLE 2: Increase score by 100 again for those with elos > 1000
# Use write policy to select players by elo
import aerospike_helpers.expressions as expr
eloGreaterOrEqualTo1000 = expr.GE(expr.IntBin("elo"), 1000).compile()
writePolicy = {
    "expressions": eloGreaterOrEqualTo1000
}
id = query.execute_background(policy=writePolicy)

time.sleep(3)

for i, key in enumerate(keyTuples):
    _, _, bins = client.get(key)
    print(bins)
# {"score": 300, "elo": 1400} <--
# {"score": 220, "elo": 1500} <--
# {"score": 210, "elo": 1100} <--
# {"score": 300, "elo": 900}

# Cleanup and close the connection to the Aerospike cluster.
for key in keyTuples:
    client.remove(key)
client.close()
paginate()

Makes a query instance a paginated query. Call this if you are using the max_records and you need to query data in pages.

Note

Calling .paginate() on a query instance causes it to save its partition state. This can be retrieved later using .get_partitions_status(). This can also been done by using the partition_filter policy.

# After inserting 4 records...
# Query 3 pages of 2 records each.

pages = 3
page_size = 2

query.max_records = 2
query.paginate()

# NOTE: The number of pages queried and records returned per page can differ
# if record counts are small or unbalanced across nodes.
for page in range(pages):
    records = query.results()
    print("got page: " + str(page))

    # Print records in each page
    for record in records:
        print(record)

    if query.is_done():
        print("all done")
        break
# got page: 0
# (('test', 'demo', None, bytearray(b'HD\xd1\xfa$L\xa0\xf5\xa2~\xd6\x1dv\x91\x9f\xd6\xfa\xad\x18\x00')), {'ttl': 2591996, 'gen': 1}, {'score': 20, 'elo': 1500})
# (('test', 'demo', None, bytearray(b'f\xa4\t"\xa9uc\xf5\xce\x97\xf0\x16\x9eI\xab\x89Q\xb8\xef\x0b')), {'ttl': 2591996, 'gen': 1}, {'score': 10, 'elo': 1100})
# got page: 1
# (('test', 'demo', None, bytearray(b'\xb6\x9f\xf5\x7f\xfarb.IeaVc\x17n\xf4\x9b\xad\xa7T')), {'ttl': 2591996, 'gen': 1}, {'score': 200, 'elo': 900})
# (('test', 'demo', None, bytearray(b'j>@\xfe\xe0\x94\xd5?\n\xd7\xc3\xf2\xd7\x045\xbc*\x07 \x1a')), {'ttl': 2591996, 'gen': 1}, {'score': 100, 'elo': 1400})
# got page: 2
# all done
is_done()

If using query pagination, did the previous paginated or partition_filter query using this query instance return all records?

Returns:

A bool signifying whether this paginated query instance has returned all records.

get_partitions_status()

Get this query instance’s partition status. That is which partitions have been queried and which have not. If the query instance is not tracking its partitions, the returned dict will be empty.

Note

A query instance must have had .paginate() called on it, or been used with a partition filter, in order retrieve its partition status. If .paginate() was not called, or partition_filter was not used, the query instance will not save partition status.

Returns:

See Partition Objects for a description of the partition status return value.

# Only read 2 records

recordCount = 0
def callback(record):
    global recordCount
    if recordCount == 2:
        return False
    recordCount += 1

    print(record)

# Query is set to read ALL records
query = client.query("test", "demo")
query.paginate()
query.foreach(callback)
# (('test', 'demo', None, bytearray(b'...')), {'ttl': 2591996, 'gen': 1}, {'score': 10, 'elo': 1100})
# (('test', 'demo', None, bytearray(b'...')), {'ttl': 2591996, 'gen': 1}, {'score': 20, 'elo': 1500})


# Use this to resume query where we left off
partition_status = query.get_partitions_status()

# Callback must include partition_id parameter
# if partition_filter is included in policy
def resume_callback(partition_id, record):
    print(partition_id, "->", record)

policy = {
    "partition_filter": {
        "partition_status": partition_status
    },
}

query.foreach(resume_callback, policy)
# 1096 -> (('test', 'demo', None, bytearray(b'...')), {'ttl': 2591996, 'gen': 1}, {'score': 100, 'elo': 1400})
# 3690 -> (('test', 'demo', None, bytearray(b'...')), {'ttl': 2591996, 'gen': 1}, {'score': 200, 'elo': 900})

Policies

policy

A dict of optional query policies which are applicable to Query.results() and Query.foreach(). See Policies.

  • max_retries int
    Maximum number of retries before aborting the current transaction. The initial attempt is not counted as a retry.

    If max_retries is exceeded, the transaction will the last suberror that was received.

    Default: 0

    Warning

    : Database writes that are not idempotent (such as “add”) should not be retried because the write operation may be performed multiple times if the client timed out previous transaction attempts. It’s important to use a distinct write policy for non-idempotent writes which sets max_retries = 0;

  • sleep_between_retries int
    Milliseconds to sleep between retries. Enter 0 to skip sleep.

    Default: 0
  • socket_timeout int
    Socket idle timeout in milliseconds when processing a database command.

    If socket_timeout is not 0 and the socket has been idle for at least socket_timeout, both max_retries and total_timeout are checked. If max_retries and total_timeout are not exceeded, the transaction is retried.

    If both socket_timeout and total_timeout are non-zero and socket_timeout > total_timeout, then socket_timeout will be set to total_timeout. If socket_timeout is 0, there will be no socket idle limit.

    Default: 30000.
  • total_timeout int
    Total transaction timeout in milliseconds.

    The total_timeout is tracked on the client and sent to the server along with the transaction in the wire protocol. The client will most likely timeout first, but the server also has the capability to timeout the transaction.

    If total_timeout is not 0 and total_timeout is reached before the transaction completes, the transaction will return error AEROSPIKE_ERR_TIMEOUT. If total_timeout is 0, there will be no total time limit.

    Default: 0
  • compress (bool)
    Compress client requests and server responses.

    Use zlib compression on write or batch read commands when the command buffer size is greater than 128 bytes. In addition, tell the server to compress it’s response on read commands. The server response compression threshold is also 128 bytes.

    This option will increase cpu and memory usage (for extra compressed buffers), but decrease the size of data sent over the network.

    Default: False
  • deserialize bool
    Should raw bytes representing a list or map be deserialized to a list or dictionary.
    Set to False for backup programs that just need access to raw bytes.

    Default: True
  • expected_duration
    Expected query duration. The server treats the query in different ways depending on the expected duration.
    This field is ignored for aggregation queries, background queries and server versions < 6.0.

    See Query Duration for possible values.

  • short_query bool

    Deprecated: Use "expected_duration" instead.

    For backwards compatibility: If "short_query" is true, the query is treated as a short query and "expected_duration" is ignored. If "short_query" is false, "expected_duration" is used and defaults to aerospike.QUERY_DURATION_LONG.

    Is query expected to return less than 100 records.

    If True, the server will optimize the query for a small record set.

    This field is ignored for aggregation queries, background queries and server versions less than 6.0.0.

    Mutually exclusive with records_per_second

    Default: False

  • expressions list
    Compiled aerospike expressions aerospike_helpers used for filtering records within a transaction.

    Default: None

    Note

    Requires Aerospike server version >= 5.2.

  • partition_filter dict
    A dictionary of partition information used by the client
    to perform partiton queries. Useful for resuming terminated queries and
    querying particular partitons/records.

    See Partition Objects for more information.

    Default: {} (All partitions will be queried).

    Note

    Requires Aerospike server version >= 6.0

  • replica

    Default: aerospike.POLICY_REPLICA_SEQUENCE

Options

options

A dict of optional query options which are applicable to Query.foreach() and Query.results().

  • nobins bool
    Whether to return the bins portion of the Record Tuple.

    Default False.

New in version 3.0.0.