Scan Class — Scan

Scan

The Scan object is used to return all the records in a specified set (which can be ommitted or None). A Scan with a None set returns all the records in the namespace.

The scan can (optionally) be assigned one of the following

The scan is invoked using foreach(), results(), or execute_background(). The bins returned can be filtered using select().

See also

Scans and Managing Scans.

Scan Methods

class aerospike.Scan
select(bin1[, bin2[, bin3..]])

Set a filter on the record bins resulting from results() or foreach(). If a selected bin does not exist in a record it will not appear in the bins portion of that record tuple.

apply(module, function[, arguments])

Aggregate the results() using a stream UDF.

Parameters:
  • module (str) – the name of the Lua module.
  • function (str) – the name of the Lua function within the module.
  • arguments (list) – optional arguments to pass to the function. NOTE: these arguments must be types supported by Aerospike See: supported data types. If you need to use an unsuported type, (e.g. set or tuple) you can use a serializer like pickle first.
Returns:

one of the supported types, int, str, float (double), list, dict (map), bytearray (bytes).

add_ops(ops)

Add a list of write ops to the scan. When used with Scan.execute_background() the scan will perform the write ops on any records found. If no predicate is attached to the scan it will apply ops to all the records in the specified set. See aerospike_helpers for available ops.

Parameters:opslist A list of write operations generated by the aerospike_helpers e.g. list_operations, map_operations, etc.

Note

Requires server version >= 4.7.0.

import aerospike
from aerospike_helpers.operations import list_operations
from aerospike_helpers.operations import operations
scan = client.scan('test', 'demo')

ops =  [
    operations.append(test_bin, 'val_to_append'),
    list_operations.list_remove_by_index(test_bin, list_index_to_remove, aerospike.LIST_RETURN_NONE)
]
scan.add_ops(ops)

id = scan.execute_background()
client.close()

For a more comprehensive example, see using a list of write ops with Query.execute_background() .

results([policy[, nodename]]) -> list of (key, meta, bins)

Buffer the records resulting from the scan, and return them as a list of records.

Parameters:
  • policy (dict) – optional Scan Policies.
  • nodename (str) – optional Node ID of node used to limit the scan to a single node.
Returns:

a list of Record Tuple.

import aerospike
import pprint

pp = pprint.PrettyPrinter(indent=2)
config = { 'hosts': [ ('127.0.0.1',3000)]}
client = aerospike.client(config).connect()

client.put(('test','test','key1'), {'id':1,'a':1},
    policy={'key':aerospike.POLICY_KEY_SEND})
client.put(('test','test','key2'), {'id':2,'b':2},
    policy={'key':aerospike.POLICY_KEY_SEND})

scan = client.scan('test', 'test')
scan.select('id','a','zzz')
res = scan.results()
pp.pprint(res)
client.close()

Note

We expect to see:

[ ( ( 'test',
      'test',
      u'key2',
      bytearray(b'\xb2\x18\n\xd4\xce\xd8\xba:\x96s\xf5\x9ba\xf1j\xa7t\xeem\x01')),
    { 'gen': 52, 'ttl': 2592000},
    { 'id': 2}),
  ( ( 'test',
      'test',
      u'key1',
      bytearray(b'\x1cJ\xce\xa7\xd4Vj\xef+\xdf@W\xa5\xd8o\x8d:\xc9\xf4\xde')),
    { 'gen': 52, 'ttl': 2592000},
    { 'a': 1, 'id': 1})]

Note

Python client versions >= 3.10.0 Supports predicate expressions for results, foreach, and execute_background see predexp. Requires server version >= 4.7.0.

from __future__ import print_function
import aerospike
from aerospike import predexp
from aerospike import exception as ex
import sys
import time

config = { 'hosts': [('127.0.0.1', 3000)]}
client = aerospike.client(config).connect()

# register udf
try:
    client.udf_put('/path/to/my_udf.lua')
except ex.AerospikeError as e:
    print("Error: {0} [{1}]".format(e.msg, e.code))
    client.close()
    sys.exit(1)

# put records and run scan
try:
    keys = [('test', 'demo', 1), ('test', 'demo', 2), ('test', 'demo', 3)]
    records = [{'number': 1}, {'number': 2}, {'number': 3}]
    for i in range(3):
        client.put(keys[i], records[i])

    scan = client.scan('test', 'demo')

    preds = [ # check that the record has value < 2 or value == 3 in bin 'name'
        predexp.integer_bin('number'),
        predexp.integer_value(2),
        predexp.integer_less(),
        predexp.integer_bin('number'),
        predexp.integer_value(3),
        predexp.integer_equal(),
        predexp.predexp_or(2)
    ]

    policy = {
        'predexp': preds
    }

    records = scan.results(policy)
    print(records)
except ex.AerospikeError as e:
    print("Error: {0} [{1}]".format(e.msg, e.code))
    sys.exit(1)
finally:
    client.close()
# the scan only returns records that match the predexp
# EXPECTED OUTPUT:
# [
#   (('test', 'demo', 1, bytearray(b'\xb7\xf4\xb88\x89\xe2\xdag\xdeh>\x1d\xf6\x91\x9a\x1e\xac\xc4F\xc8')), {'gen': 2, 'ttl': 2591999}, {'number': 1}),
#   (('test', 'demo', 3, bytearray(b'\xb1\xa5`g\xf6\xd4\xa8\xa4D9\xd3\xafb\xbf\xf8ha\x01\x94\xcd')), {'gen': 13, 'ttl': 2591999}, {'number': 3})
# ]
# contents of my_udf.lua
function my_udf(rec, bin, offset)
    info("my transform: %s", tostring(record.digest(rec)))
    rec[bin] = rec[bin] + offset
    aerospike:update(rec)
end
foreach(callback[, policy[, options[, nodename]]])

Invoke the callback function for each of the records streaming back from the scan.

Parameters:
  • callback (callable) – the function to invoke for each record.
  • policy (dict) – optional Scan Policies.
  • options (dict) – the Scan Options that will apply to the scan.
  • nodename (str) – optional Node ID of node used to limit the scan to a single node.

Note

A Record Tuple is passed as the argument to the callback function.

import aerospike
import pprint

pp = pprint.PrettyPrinter(indent=2)
config = { 'hosts': [ ('127.0.0.1',3000)]}
client = aerospike.client(config).connect()

client.put(('test','test','key1'), {'id':1,'a':1},
    policy={'key':aerospike.POLICY_KEY_SEND})
client.put(('test','test','key2'), {'id':2,'b':2},
    policy={'key':aerospike.POLICY_KEY_SEND})

def show_key(record):
    key, meta, bins = record
    print(key)

scan = client.scan('test', 'test')
scan_opts = {
  'concurrent': True,
  'nobins': True,
  'priority': aerospike.SCAN_PRIORITY_MEDIUM
}
scan.foreach(show_key, options=scan_opts)
client.close()

Note

We expect to see:

('test', 'test', u'key2', bytearray(b'\xb2\x18\n\xd4\xce\xd8\xba:\x96s\xf5\x9ba\xf1j\xa7t\xeem\x01'))
('test', 'test', u'key1', bytearray(b'\x1cJ\xce\xa7\xd4Vj\xef+\xdf@W\xa5\xd8o\x8d:\xc9\xf4\xde'))

Note

To stop the stream return False from the callback function.

from __future__ import print_function
import aerospike

config = { 'hosts': [ ('127.0.0.1',3000)]}
client = aerospike.client(config).connect()

def limit(lim, result):
    c = [0] # integers are immutable so a list (mutable) is used for the counter
    def key_add(record):
        key, metadata, bins = record
        if c[0] < lim:
            result.append(key)
            c[0] = c[0] + 1
        else:
            return False
    return key_add

scan = client.scan('test','user')
keys = []
scan.foreach(limit(100, keys))
print(len(keys)) # this will be 100 if the number of matching records > 100
client.close()
execute_background([policy])

Execute a record UDF on records found by the scan in the background. This method returns before the scan has completed. A UDF can be added to the scan with Scan.apply().

Parameters:policy (dict) – optional Write Policies.
Returns:a job ID that can be used with aerospike.Client.job_info() to track the status of the aerospike.JOB_SCAN, as it runs in the background.

Note

Python client version 3.10.0 implemented scan execute_background.

from __future__ import print_function
import aerospike
from aerospike import exception as ex
import sys
import time

config = {"hosts": [("127.0.0.1", 3000)]}
client = aerospike.client(config).connect()

# register udf
try:
    client.udf_put("/path/to/my_udf.lua")
except ex.AerospikeError as e:
    print("Error: {0} [{1}]".format(e.msg, e.code))
    client.close()
    sys.exit(1)

# put records and apply udf
try:
    keys = [("test", "demo", 1), ("test", "demo", 2), ("test", "demo", 3)]
    records = [{"number": 1}, {"number": 2}, {"number": 3}]
    for i in range(3):
        client.put(keys[i], records[i])

    scan = client.scan("test", "demo")
    scan.apply("my_udf", "my_udf", ["number", 10])
    job_id = scan.execute_background()

    # wait for job to finish
    while True:
        response = client.job_info(job_id, aerospike.JOB_SCAN)
        if response["status"] != aerospike.JOB_STATUS_INPROGRESS:
            break
        time.sleep(0.25)

    records = client.get_many(keys)
    print(records)
except ex.AerospikeError as e:
    print("Error: {0} [{1}]".format(e.msg, e.code))
    sys.exit(1)
finally:
    client.close()
# EXPECTED OUTPUT:
# [
#   (('test', 'demo', 1, bytearray(b'\xb7\xf4\xb88\x89\xe2\xdag\xdeh>\x1d\xf6\x91\x9a\x1e\xac\xc4F\xc8')), {'gen': 2, 'ttl': 2591999}, {'number': 11}),
#   (('test', 'demo', 2, bytearray(b'\xaejQ_7\xdeJ\xda\xccD\x96\xe2\xda\x1f\xea\x84\x8c:\x92p')), {'gen': 12, 'ttl': 2591999}, {'number': 12}),
#   (('test', 'demo', 3, bytearray(b'\xb1\xa5`g\xf6\xd4\xa8\xa4D9\xd3\xafb\xbf\xf8ha\x01\x94\xcd')), {'gen': 13, 'ttl': 2591999}, {'number': 13})
# ]
# contents of my_udf.lua
function my_udf(rec, bin, offset)
    info("my transform: %s", tostring(record.digest(rec)))
    rec[bin] = rec[bin] + offset
    aerospike:update(rec)
end

Scan Policies

policy

A dict of optional scan policies which are applicable to Scan.results() and Scan.foreach(). See Policy Options.

  • max_retries int
    Maximum number of retries before aborting the current transaction. The initial attempt is not counted as a retry.

    If max_retries is exceeded, the transaction will return error AEROSPIKE_ERR_TIMEOUT.

    Default: 0

    Warning

    Database writes that are not idempotent (such as “add”) should not be retried because the write operation may be performed multiple times if the client timed out previous transaction attempts. It’s important to use a distinct write policy for non-idempotent writes which sets max_retries = 0;

  • sleep_between_retries int
    Milliseconds to sleep between retries. Enter 0 to skip sleep.

    Default: 0
  • socket_timeout int
    Socket idle timeout in milliseconds when processing a database command.

    If socket_timeout is not 0 and the socket has been idle for at least socket_timeout, both max_retries and total_timeout are checked. If max_retries and total_timeout are not exceeded, the transaction is retried.

    If both socket_timeout and total_timeout are non-zero and socket_timeout > total_timeout, then socket_timeout will be set to total_timeout. If socket_timeout is 0, there will be no socket idle limit.

    Default: 30000.
  • total_timeout int
    Total transaction timeout in milliseconds.

    The total_timeout is tracked on the client and sent to the server along with the transaction in the wire protocol. The client will most likely timeout first, but the server also has the capability to timeout the transaction.

    If total_timeout is not 0 and total_timeout is reached before the transaction completes, the transaction will return error AEROSPIKE_ERR_TIMEOUT. If total_timeout is 0, there will be no total time limit.

    Default: 0
  • fail_on_cluster_change bool
    Abort the scan if the cluster is not in a stable state.

    Default: False
  • durable_delete bool
    Perform durable delete (requires Enterprise server version >= 3.10)
    If the transaction results in a record deletion, leave a tombstone for the record.

    Default: False
  • records_per_second int
    Limit the scan to process records at records_per_second.
    Requires server version >= 4.7.0.

    Default: 0 (no limit).

Scan Options

options

A dict of optional scan options which are applicable to Scan.foreach().

  • priority
    Scan priority has been replaced by the records_per_second policy see Scan Policies.
  • nobins bool
    Whether to return the bins portion of the Record Tuple.

    Default False.
  • concurrent bool
    Whether to run the scan concurrently on all nodes of the cluster.

    Default False.
  • percent int
    Percentage of records to return from the scan.

    Default 100.

New in version 1.0.39.