This is an idea for how to provide secondary index queries, similar to Riak 2i, on top of Amazon S3, using nothing but S3, boto3 and some Python.
This code hasn't been anywhere near a production environment, never benchmarked, only processed trivial amounts of data and tested only against localstack. It's not even commented. As such, it should not be used by anybody for any reason - ever.
If you do give it a try, let me know how it went.
s32i.sh
from concurrent.futures.thread import ThreadPoolExecutor
import re
from botocore.exceptions import ClientError
class S32iDatastore():
__EXECUTOR = ThreadPoolExecutor(max_workers=os. cpu_count() - 1)
INDEXES_FOLDER = 'indexes'
LIST_OBJECTS = 'list_objects_v2'
def __init__(self, s3_resource, bucket_name):
self.s3_resource = s3_resource
self.bucket_name = bucket_name
def __run_in_thread(self, fn, *args):
return self.__EXECUTOR.submit(fn, *args)
def get(self, key):
record = self.s3_resource.Object(self.bucket_name, key).get()
indexes = record['Metadata']
data = record['Body'].read()
return data, indexes
def head(self, key):
record = self.s3_resource.meta.client.head_object(Bucket=self.bucket_name, Key=key)
return record['Metadata']
def exists(self, key):
try:
self.head(key)
return True
except ClientError:
return False
def put(self, key, data='', indexes={}):
self.__run_in_thread(self.create_secondary_indexes, key, indexes)
return self.s3_resource.Object(self.bucket_name, key).put(
Body=data,
Metadata=indexes)
def delete(self, key):
self.__run_in_thread(self.remove_secondary_indexes, key, self.head(key))
return self.s3_resource.Object(self.bucket_name, key).delete()
def create_secondary_indexes(self, key, indexes):
for index, values in indexes.items():
for value in values.split(','):
self.put(f'{self.INDEXES_FOLDER}/{index}/{value}/{key}')
def remove_secondary_indexes(self, key, indexes):
for index, values in indexes.items():
for value in values.split(','):
self.s3_resource.Object(self.bucket_name, f'{self.INDEXES_FOLDER}/{index}/{value}/{key}').delete()
def secondary_index_range_query(self,
index,
start, end=None,
page_size=1000, max_results=10000,
term_regex=None, return_terms=False):
if end is None:
end = start
if term_regex:
pattern = re.compile(f'^{self.INDEXES_FOLDER}/{index}/{term_regex}$')
start_key = f'{self.INDEXES_FOLDER}/{index}/{start}'
end_key = f'{self.INDEXES_FOLDER}/{index}/{end}'
paginator = self.s3_resource.meta.client.get_paginator(self.LIST_OBJECTS)
pages = paginator.paginate(
Bucket=self.bucket_name,
StartAfter=start_key,
PaginationConfig={
'MaxItems': max_results,
'PageSize': page_size})
for page in pages:
for result in page['Contents']:
result_key = result['Key']
if result_key[0:len(end_key)] > end_key:
return
if term_regex and not pattern.match(result_key):
continue
parts = result_key.split('/')
if return_terms:
yield (parts[-1], parts[-2])
else:
yield parts[-1]
s32i_test.sh
import json
import unittest
import boto3
from s32i import S32iDatastore
class S32iDatastoreTest(unittest.TestCase):
LOCALSTACK_ENDPOINT_URL = "http://localhost.localstack.cloud:4566"
TEST_BUCKET = 's32idatastore-test-bucket'
@classmethod
def setUpClass(cls):
cls.s3_resource = cls.create_s3_resource()
cls.bucket = cls.create_bucket(cls.TEST_BUCKET)
cls.datastore = S32iDatastore(cls.s3_resource, cls.TEST_BUCKET)
cls.create_test_data()
@classmethod
def tearDownClass(cls):
cls.delete_bucket()
@classmethod
def create_s3_resource(cls, endpoint_url=LOCALSTACK_ENDPOINT_URL):
return boto3.resource(
's3',
endpoint_url=endpoint_url)
@classmethod
def create_bucket(cls, bucket_name):
return cls.s3_resource.create_bucket(Bucket=bucket_name)
@classmethod
def delete_bucket(cls):
cls.bucket.objects.all().delete()
@classmethod
def create_test_data(cls):
cls.datastore.put(
'KEY0001',
json.dumps({'name': 'Alice', 'dob': '19700101', 'gender': '2'}),
{'idx-gender-dob': '2|19700101'})
cls.datastore.put(
'KEY0002',
json.dumps({'name': 'Bob', 'dob': '19800101', 'gender': '1'}),
{'idx-gender-dob': '1|19800101'})
cls.datastore.put(
'KEY0003',
json.dumps({'name': 'Carol', 'dob': '19900101', 'gender': '2'}),
{'idx-gender-dob': '2|19900101'})
cls.datastore.put(
'KEY0004',
json.dumps({'name': 'Dan', 'dob': '20000101', 'gender': '1'}),
{'idx-gender-dob': '1|20000101'})
cls.datastore.put(
'KEY0005',
json.dumps({'name': 'Eve', 'dob': '20100101', 'gender': '2'}),
{'idx-gender-dob': '2|20100101'})
cls.datastore.put(
'KEY0006',
json.dumps({'name': ['Faythe', 'Grace'], 'dob': '20200101', 'gender': '2'}),
{'idx-gender-dob': '2|20200101', 'idx-name': 'Faythe,Grace'})
cls.datastore.put('KEY0007', indexes={'idx-same': 'same'})
cls.datastore.put('KEY0008', indexes={'idx-same': 'same'})
cls.datastore.put('KEY0009', indexes={'idx-same': 'same'})
cls.datastore.put(
'KEY9999',
json.dumps({'name': 'DELETE ME', 'dob': '99999999', 'gender': '9'}),
{'idx-gender-dob': '9|99999999'})
def test_get_record(self):
data, indexes = self.datastore.get('KEY0001')
self.assertDictEqual({'name': 'Alice', 'dob': '19700101', 'gender': '2'}, json.loads(data))
self.assertDictEqual({'idx-gender-dob': '2|19700101'}, indexes)
def test_head_record(self):
indexes = self.datastore.head('KEY0002')
self.assertDictEqual({'idx-gender-dob': '1|19800101'}, indexes)
def test_2i_no_results(self):
keys = self.datastore.secondary_index_range_query('idx-gender-dob', '3|30100101')
self.assertListEqual([], list(keys))
def test_2i_index_does_not_exist(self):
keys = self.datastore.secondary_index_range_query('idx-does-not-exist', '3|30100101')
self.assertListEqual([], list(keys))
def test_2i_exact_value(self):
keys = self.datastore.secondary_index_range_query('idx-gender-dob', '2|20100101')
self.assertListEqual(['KEY0005'], list(keys))
def test_2i_gender_2(self):
keys = self.datastore.secondary_index_range_query('idx-gender-dob', '2|')
self.assertListEqual(['KEY0001', 'KEY0003', 'KEY0005', 'KEY0006'], sorted(list(keys)))
def test_2i_gender_2_max_results_2(self):
keys = self.datastore.secondary_index_range_query('idx-gender-dob', '2|', max_results=2)
self.assertListEqual(['KEY0001', 'KEY0003'], sorted(list(keys)))
def test_2i_gender_1_dob_19(self):
keys = self.datastore.secondary_index_range_query('idx-gender-dob', '1|19')
self.assertListEqual(['KEY0002'], list(keys))
def test_2i_gender_2_dob_19(self):
keys = self.datastore.secondary_index_range_query('idx-gender-dob', '2|19')
self.assertListEqual(['KEY0001', 'KEY0003'], sorted(list(keys)))
def test_2i_gender_2_dob_1990_2000(self):
keys = self.datastore.secondary_index_range_query('idx-gender-dob', '2|1990', '2|2000')
self.assertListEqual(['KEY0003'], list(keys))
def test_2i_term_regex(self):
keys = self.datastore.secondary_index_range_query('idx-gender-dob', '1|', '2|', term_regex='[1|2]\|20[1|2]0.*')
self.assertListEqual(['KEY0005', 'KEY0006'], list(keys))
def test_2i_return_terms(self):
key_terms = self.datastore.secondary_index_range_query(
'idx-gender-dob', '1|', '2|',
return_terms=True)
self.assertListEqual([
('KEY0001', '2|19700101'),
('KEY0002', '1|19800101'),
('KEY0003', '2|19900101'),
('KEY0004', '1|20000101'),
('KEY0005', '2|20100101'),
('KEY0006', '2|20200101')],
sorted(list(key_terms)))
def test_2i_term_regex_return_terms(self):
key_terms = self.datastore.secondary_index_range_query(
'idx-gender-dob', '1|', '2|',
term_regex='[1|2]\|20[1|2]0.*',
return_terms=True)
self.assertListEqual([('KEY0005', '2|20100101'), ('KEY0006', '2|20200101')], list(key_terms))
def test_exists(self):
self.assertTrue(self.datastore.exists('KEY0001'))
self.assertFalse(self.datastore.exists('1000YEK'))
def test_multiple_index_values(self):
indexes = self.datastore.head('KEY0006')
self.assertDictEqual({'idx-gender-dob': '2|20200101', 'idx-name': 'Faythe,Grace'}, indexes)
keys = self.datastore.secondary_index_range_query('idx-name', 'Faythe')
self.assertListEqual(['KEY0006'], list(keys))
keys = self.datastore.secondary_index_range_query('idx-name', 'Grace')
self.assertListEqual(['KEY0006'], list(keys))
def test_multiple_keys_same_index(self):
keys = self.datastore.secondary_index_range_query('idx-same', 'same')
self.assertListEqual(['KEY0007', 'KEY0008', 'KEY0009'], sorted(list(keys)))
def test_delete(self):
self.assertTrue(self.datastore.exists('KEY9999'))
keys = self.datastore.secondary_index_range_query('idx-gender-dob', '9|99999999')
self.assertListEqual(['KEY9999'], list(keys))
self.datastore.delete('KEY9999')
self.assertFalse(self.datastore.exists('KEY9999'))
keys = self.datastore.secondary_index_range_query('idx-gender-dob', '9|99999999')
self.assertListEqual([], list(keys))