aerospike_helpers.batch package¶
aerospike_helpers.batch.recordsmodule¶
Classes for the use with client batch APIs batch_write()
, batch_operate()
, batch_apply()
, batch_remove()
.
records.py defines objects for use with aerospike client batch APIs. Currently batch_write, batch_operate, batch_remove, and batch_apply make use of objects in this file. Typically BatchReacords and underlying BatchRecord objects are used as input and output for the aformentioned client methods.
Note
APIs that utitlize these objects require server >= 6.0.0.
Example:
import aerospike
from aerospike import exception as ex
from aerospike_helpers.batch import records as br
import aerospike_helpers.expressions as exp
from aerospike_helpers.operations import operations as op
import sys
# Configure the client.
config = {"hosts": [("127.0.0.1", 3000)]}
# Create a client and connect it to the cluster.
try:
client = aerospike.client(config).connect()
except ex.ClientError as e:
print("Error: {0} [{1}]".format(e.msg, e.code))
sys.exit(1)
# setup records
namespace = "test"
set = "demo"
keys = [(namespace, set, i) for i in range(1000)]
records = [{"id": i, "balance": i * 10} for i in range(1000)]
for key, rec in zip(keys, records):
client.put(key, rec)
print("===== BATCH_OPERATE EXAMPLE =====")
# Batch add 10 to balance and read it if it's over
# 1000 NOTE: batch_operate ops must include a write op
# get_batch_ops or get_many can be used for all read ops.
expr = exp.GT(exp.IntBin("balance"), 1000).compile()
ops = [
op.increment("balance", 10),
op.read("balance")
]
policy_batch = {"expressions": expr}
res = client.batch_operate(keys, ops, policy_batch)
# res is an instance of BatchRecords
# the field, batch_records, contains a BatchRecord instance
# for each key used by the batch_operate call.
# the field, results, is 0 if all batch subtransactions completed succesfully
# or the only failures are FILTERED_OUT or RECORD_NOT_FOUND.
# Otherwise its value corresponds to an as_status and signifies that
# one or more of the batch subtransactions failed. Each BatchRecord instance
# also has a results field that signifies the status of that batch subtransaction.
if res.result == 0:
# BatchRecord 100 should have a result code of 27 meaning it was filtered out by an expression.
print("BatchRecord 100 result: {result}".format(result=res.batch_records[100].result))
# BatchRecord 100,record should be None.
print("BatchRecord 100 record: {record}".format(record=res.batch_records[100].record))
# BatchRecord 101 should have a result code of 0 meaning it succeeded.
print("BatchRecord 101 result: {result}".format(result=res.batch_records[101].result))
# BatchRecord 101, record should be populated.
print("BatchRecord 101 record: {record}".format(record=res.batch_records[101].record))
else:
# Some batch sub transaction failed.
print("res result: {result}".format(result=res.result))
print("===== BATCH_WRITE EXAMPLE =====")
# Apply different operations to different keys
# using batch_write.
batch_writes = br.BatchRecords(
[
br.Remove(
key=(namespace, set, 1),
policy={}
),
br.Write(
key=(namespace, set, 100),
ops=[
op.write("id", 100),
op.write("balance", 100),
op.read("id"),
op.read("balance"),
],
policy={"expressions": exp.GT(exp.IntBin("balance"), 2000).compile()}
),
br.Read(
key=(namespace, set, 333),
ops=[
op.read("id")
],
policy=None
),
]
)
# batch_write modifies its BatchRecords argument.
# Results for each requested key will be set in
# their coresponding BatchRecord result,
# record, and in_doubt fields.
client.batch_write(batch_writes)
print("batch_writes result: {result}".format(result=batch_writes.result))
# should have bins {'id': 333}.
print("batch_writes batch Write record: {result}".format(result=batch_writes.batch_records[2].record))
print("===== BATCH_APPLY EXAMPLE =====")
# Apply a user defined function (UDF) to a batch
# of records using batch_apply.
module = "test_record_udf"
path_to_module = "/path/to/test_record_udf.lua"
function = "bin_udf_operation_integer"
args = ["balance", 10, 5]
client.udf_put(path_to_module)
# This should add 15 to each balance bin.
res = client.batch_apply(keys, module, function, args)
# NOTE res.result should be -16 (one or more batch sub transactions failed)
# because the UDF failed on record 1 which was previously removed.
print("res result: {result}".format(result=res.result))
res_rec = res.batch_records[90].record
bins = res_rec[2]
# Should be 915.
print("res BatchRecord 90 bins: {result}".format(result=bins))
print("===== BATCH_REMOVE EXAMPLE =====")
# Delete the records using batch_remove.
res = client.batch_remove(keys)
# Should be 0 signifying success.
print("BatchRecords result: {result}".format(result=res.result))
- class aerospike_helpers.batch.records.Apply(key: tuple, module: str, function: str, args: List[Any], policy: Optional[Dict] = None)¶
Bases:
aerospike_helpers.batch.records.BatchRecord
BatchApply is used for executing Batch UDF (user defined function) apply operations with batch_write and retrieving results.
- args¶
List of arguments to pass to the UDF.
- Type
TypeUDFArgs
- in_doubt¶
Is it possible that the write transaction completed even though an error was generated. This may be the case when a client error occurs (like timeout) after the command was sent to the server.
- Type
- ops¶
A list of aerospike operation dictionaries to perform on the record at key.
- Type
TypeOps
- policy¶
An optional dictionary of batch apply policy flags.
- Type
TypeBatchPolicyApply, optional
- __init__(key: tuple, module: str, function: str, args: List[Any], policy: Optional[Dict] = None) None ¶
Example:
# Create a batch Apply to apply UDF "test_func" to bin "a" from the record. # Assume that "test_func" takes a bin name string as an argument. # Assume the appropriate UDF module has already been registerd. import aerospike_helpers.operations as op module = "my_lua" function = "test_func" bin_name = "a" args = [ bin_name ] namespace = "test" set = "demo" user_key = 1 key = (namespace, set, user_key) ba = Apply(key, module, function, args)
- class aerospike_helpers.batch.records.BatchRecord(key: tuple)¶
Bases:
object
BatchRecord provides the base fields for BatchRecord objects.
BatchRecord should usually be read from as a result and not created by the user. Its subclasses can be used as input to batch_write. Client methods
batch_apply()
,batch_operate()
,batch_remove()
with batch_records field as a list of these BatchRecord objects containing the batch request results.- record¶
The record corresponding to the requested key.
- Type
TypeRecord
- in_doubt¶
Is it possible that the write transaction completed even though an error was generated. This may be the case when a client error occurs (like timeout) after the command was sent to the server.
- Type
- __weakref__¶
list of weak references to the object (if defined)
- class aerospike_helpers.batch.records.BatchRecords(batch_records: List[aerospike_helpers.batch.records.BatchRecord] = [])¶
Bases:
object
BatchRecords is used as input and output for multiple batch APIs.
- batch_records¶
A list of BatchRecord subtype objects used to define batch operations and hold results. BatchRecord Types can be Remove, Write, Read, and Apply.
- Type
TypeBatchRecordList
- 0 if all batch subtransactions succeeded
- Type
or if the only failures were FILTERED_OUT or RECORD_NOT_FOUND
- non 0 if an error occured. The most common error being -16
- Type
One or more batch sub transactions failed
- __init__(batch_records: List[aerospike_helpers.batch.records.BatchRecord] = []) None ¶
Example:
# Create a BatchRecords to remove a record, write a bin, and read a bin. # Assume client is an instantiated and connected aerospike cleint. import aerospike_helpers.operations as op namespace = "test" set = "demo" bin_name = "id" keys = [ (namespace, set, 1), (namespace, set, 2), (namespace, set, 3) ] brs = BatchRecords( [ Remove( key=(namespace, set, 1), ), Write( key=(namespace, set, 100), ops=[ op.write(bin_name, 100), op.read(bin_name), ] ), BatchRead( key=(namespace, set, 333), ops=[ op.read(bin_name) ] ) ] ) # Note this call will mutate brs and set results in it. client.batch_write(brs)
- __weakref__¶
list of weak references to the object (if defined)
- class aerospike_helpers.batch.records.Read(key: tuple, ops: Optional[List[Dict]], read_all_bins: bool = False, policy: Optional[Dict] = None)¶
Bases:
aerospike_helpers.batch.records.BatchRecord
Read is used for executing Batch read operations with batch_write and retrieving results.
- in_doubt¶
Is it possible that the write transaction completed even though an error was generated. This may be the case when a client error occurs (like timeout) after the command was sent to the server.
- Type
- ops¶
list of aerospike operation dictionaries to perform on the record at key.
- Type
TypeOps
- policy¶
An optional dictionary of batch read policy flags.
- Type
TypeBatchPolicyRead, optional
- __init__(key: tuple, ops: Optional[List[Dict]], read_all_bins: bool = False, policy: Optional[Dict] = None) None ¶
Example:
# Create a batch Read to read bin "a" from the record. import aerospike_helpers.operations as op bin_name = "a" namespace = "test" set = "demo" user_key = 1 key = (namespace, set, user_key) ops = [ op.read(bin_name) ] br = Write(key, ops)
- class aerospike_helpers.batch.records.Remove(key: tuple, policy: Optional[Dict] = None)¶
Bases:
aerospike_helpers.batch.records.BatchRecord
Remove is used for executing Batch remove operations with batch_write and retrieving results.
- in_doubt¶
Is it possible that the write transaction completed even though an error was generated. This may be the case when a client error occurs (like timeout) after the command was sent to the server.
- Type
- ops¶
A list of aerospike operation dictionaries to perform on the record at key.
- Type
TypeOps
- policy¶
An optional dictionary of batch remove policy flags.
- Type
TypeBatchPolicyRemove, optional
- class aerospike_helpers.batch.records.Write(key: tuple, ops: List[Dict], policy: Optional[Dict] = None)¶
Bases:
aerospike_helpers.batch.records.BatchRecord
Write is used for executing Batch write operations with batch_write and retrieving batch write results.
- in_doubt¶
Is it possible that the write transaction completed even though an error was generated. This may be the case when a client error occurs (like timeout) after the command was sent to the server.
- Type
- ops¶
A list of aerospike operation dictionaries to perform on the record at key.
- Type
TypeOps
- policy¶
An optional dictionary of batch write policy flags.
- Type
TypeBatchPolicyWrite, optional
- __init__(key: tuple, ops: List[Dict], policy: Optional[Dict] = None) None ¶
Example:
# Create a batch Write to increment bin "a" by 10 and read the result from the record. import aerospike_helpers.operations as op bin_name = "a" namespace = "test" set = "demo" user_key = 1 key = (namespace, set, user_key) ops = [ op.increment(bin_name, 10), op.read(bin_name) ] bw = Write(key, ops)