Sunday 10 September 2023

Riak-like secondary index queries for S3

This is an idea for how to provide secondary index queries, similar to Riak 2i, on top of Amazon S3, using nothing but S3, boto3 and some Python.

This code hasn't been anywhere near a production environment, never benchmarked, only processed trivial amounts of data and tested only against localstack. It's not even commented. As such, it should not be used by anybody for any reason - ever.

If you do give it a try, let me know how it went.

s32i.sh

from concurrent.futures.thread import ThreadPoolExecutor
import re

from botocore.exceptions import ClientError


class S32iDatastore():

    __EXECUTOR = ThreadPoolExecutor(max_workers=os. cpu_count() - 1)

    INDEXES_FOLDER = 'indexes'
    LIST_OBJECTS = 'list_objects_v2'

    def __init__(self, s3_resource, bucket_name):

        self.s3_resource = s3_resource
        self.bucket_name = bucket_name

    def __run_in_thread(self, fn, *args):

        return self.__EXECUTOR.submit(fn, *args)

    def get(self, key):

        record = self.s3_resource.Object(self.bucket_name, key).get()
        indexes = record['Metadata']
        data = record['Body'].read()

        return data, indexes

    def head(self, key):

        record = self.s3_resource.meta.client.head_object(Bucket=self.bucket_name, Key=key)
        return record['Metadata']

    def exists(self, key):

        try:
            self.head(key)
            return True
        except ClientError:
            return False

    def put(self, key, data='', indexes={}):

        self.__run_in_thread(self.create_secondary_indexes, key, indexes)

        return self.s3_resource.Object(self.bucket_name, key).put(
            Body=data,
            Metadata=indexes)

    def delete(self, key):

        self.__run_in_thread(self.remove_secondary_indexes, key, self.head(key))

        return self.s3_resource.Object(self.bucket_name, key).delete()

    def create_secondary_indexes(self, key, indexes):

        for index, values in indexes.items():
            for value in values.split(','):
                self.put(f'{self.INDEXES_FOLDER}/{index}/{value}/{key}')

    def remove_secondary_indexes(self, key, indexes):

        for index, values in indexes.items():
            for value in values.split(','):
                self.s3_resource.Object(self.bucket_name, f'{self.INDEXES_FOLDER}/{index}/{value}/{key}').delete()

    def secondary_index_range_query(self,
                                    index,
                                    start, end=None,
                                    page_size=1000, max_results=10000,
                                    term_regex=None, return_terms=False):

        if end is None:
            end = start

        if term_regex:
            pattern = re.compile(f'^{self.INDEXES_FOLDER}/{index}/{term_regex}$')

        start_key = f'{self.INDEXES_FOLDER}/{index}/{start}'
        end_key = f'{self.INDEXES_FOLDER}/{index}/{end}'

        paginator = self.s3_resource.meta.client.get_paginator(self.LIST_OBJECTS)
        pages = paginator.paginate(
            Bucket=self.bucket_name,
            StartAfter=start_key,
            PaginationConfig={
                'MaxItems': max_results,
                'PageSize': page_size})

        for page in pages:
            for result in page['Contents']:

                result_key = result['Key']

                if result_key[0:len(end_key)] > end_key:
                    return

                if term_regex and not pattern.match(result_key):
                    continue

                parts = result_key.split('/')

                if return_terms:
                    yield (parts[-1], parts[-2])
                else:
                    yield parts[-1]

s32i_test.sh

import json
import unittest

import boto3

from s32i import S32iDatastore


class S32iDatastoreTest(unittest.TestCase):

    LOCALSTACK_ENDPOINT_URL = "http://localhost.localstack.cloud:4566"
    TEST_BUCKET = 's32idatastore-test-bucket'

    @classmethod
    def setUpClass(cls):

        cls.s3_resource = cls.create_s3_resource()
        cls.bucket = cls.create_bucket(cls.TEST_BUCKET)
        cls.datastore = S32iDatastore(cls.s3_resource, cls.TEST_BUCKET)
        cls.create_test_data()

    @classmethod
    def tearDownClass(cls):

        cls.delete_bucket()

    @classmethod
    def create_s3_resource(cls, endpoint_url=LOCALSTACK_ENDPOINT_URL):

        return boto3.resource(
        's3',
        endpoint_url=endpoint_url)

    @classmethod
    def create_bucket(cls, bucket_name):

        return cls.s3_resource.create_bucket(Bucket=bucket_name)

    @classmethod
    def delete_bucket(cls):

        cls.bucket.objects.all().delete()

    @classmethod
    def create_test_data(cls):

        cls.datastore.put(
            'KEY0001',
            json.dumps({'name': 'Alice', 'dob': '19700101', 'gender': '2'}),
            {'idx-gender-dob': '2|19700101'})

        cls.datastore.put(
            'KEY0002',
            json.dumps({'name': 'Bob', 'dob': '19800101', 'gender': '1'}),
            {'idx-gender-dob': '1|19800101'})

        cls.datastore.put(
            'KEY0003',
            json.dumps({'name': 'Carol', 'dob': '19900101', 'gender': '2'}),
            {'idx-gender-dob': '2|19900101'})

        cls.datastore.put(
            'KEY0004',
            json.dumps({'name': 'Dan', 'dob': '20000101', 'gender': '1'}),
            {'idx-gender-dob': '1|20000101'})

        cls.datastore.put(
            'KEY0005',
            json.dumps({'name': 'Eve', 'dob': '20100101', 'gender': '2'}),
            {'idx-gender-dob': '2|20100101'})

        cls.datastore.put(
            'KEY0006',
            json.dumps({'name': ['Faythe', 'Grace'], 'dob': '20200101', 'gender': '2'}),
            {'idx-gender-dob': '2|20200101', 'idx-name': 'Faythe,Grace'})

        cls.datastore.put('KEY0007', indexes={'idx-same': 'same'})
        cls.datastore.put('KEY0008', indexes={'idx-same': 'same'})
        cls.datastore.put('KEY0009', indexes={'idx-same': 'same'})

        cls.datastore.put(
            'KEY9999',
            json.dumps({'name': 'DELETE ME', 'dob': '99999999', 'gender': '9'}),
            {'idx-gender-dob': '9|99999999'})

    def test_get_record(self):

        data, indexes = self.datastore.get('KEY0001')

        self.assertDictEqual({'name': 'Alice', 'dob': '19700101', 'gender': '2'}, json.loads(data))
        self.assertDictEqual({'idx-gender-dob': '2|19700101'}, indexes)

    def test_head_record(self):

        indexes = self.datastore.head('KEY0002')

        self.assertDictEqual({'idx-gender-dob': '1|19800101'}, indexes)

    def test_2i_no_results(self):

        keys = self.datastore.secondary_index_range_query('idx-gender-dob', '3|30100101')

        self.assertListEqual([], list(keys))

    def test_2i_index_does_not_exist(self):

        keys = self.datastore.secondary_index_range_query('idx-does-not-exist', '3|30100101')

        self.assertListEqual([], list(keys))

    def test_2i_exact_value(self):

        keys = self.datastore.secondary_index_range_query('idx-gender-dob', '2|20100101')

        self.assertListEqual(['KEY0005'], list(keys))

    def test_2i_gender_2(self):

        keys = self.datastore.secondary_index_range_query('idx-gender-dob', '2|')

        self.assertListEqual(['KEY0001', 'KEY0003', 'KEY0005', 'KEY0006'], sorted(list(keys)))

    def test_2i_gender_2_max_results_2(self):

        keys = self.datastore.secondary_index_range_query('idx-gender-dob', '2|', max_results=2)

        self.assertListEqual(['KEY0001', 'KEY0003'], sorted(list(keys)))

    def test_2i_gender_1_dob_19(self):

        keys = self.datastore.secondary_index_range_query('idx-gender-dob', '1|19')

        self.assertListEqual(['KEY0002'], list(keys))

    def test_2i_gender_2_dob_19(self):

        keys = self.datastore.secondary_index_range_query('idx-gender-dob', '2|19')

        self.assertListEqual(['KEY0001', 'KEY0003'], sorted(list(keys)))

    def test_2i_gender_2_dob_1990_2000(self):

        keys = self.datastore.secondary_index_range_query('idx-gender-dob', '2|1990', '2|2000')

        self.assertListEqual(['KEY0003'], list(keys))

    def test_2i_term_regex(self):

        keys = self.datastore.secondary_index_range_query('idx-gender-dob', '1|', '2|', term_regex='[1|2]\|20[1|2]0.*')

        self.assertListEqual(['KEY0005', 'KEY0006'], list(keys))

    def test_2i_return_terms(self):

        key_terms = self.datastore.secondary_index_range_query(
            'idx-gender-dob', '1|', '2|',
            return_terms=True)

        self.assertListEqual([
            ('KEY0001', '2|19700101'),
            ('KEY0002', '1|19800101'),
            ('KEY0003', '2|19900101'),
            ('KEY0004', '1|20000101'),
            ('KEY0005', '2|20100101'),
            ('KEY0006', '2|20200101')],
            sorted(list(key_terms)))

    def test_2i_term_regex_return_terms(self):

        key_terms = self.datastore.secondary_index_range_query(
            'idx-gender-dob', '1|', '2|',
            term_regex='[1|2]\|20[1|2]0.*',
            return_terms=True)

        self.assertListEqual([('KEY0005', '2|20100101'), ('KEY0006', '2|20200101')], list(key_terms))

    def test_exists(self):

        self.assertTrue(self.datastore.exists('KEY0001'))
        self.assertFalse(self.datastore.exists('1000YEK'))

    def test_multiple_index_values(self):

        indexes = self.datastore.head('KEY0006')
        self.assertDictEqual({'idx-gender-dob': '2|20200101', 'idx-name': 'Faythe,Grace'}, indexes)

        keys = self.datastore.secondary_index_range_query('idx-name', 'Faythe')
        self.assertListEqual(['KEY0006'], list(keys))

        keys = self.datastore.secondary_index_range_query('idx-name', 'Grace')
        self.assertListEqual(['KEY0006'], list(keys))

    def test_multiple_keys_same_index(self):

        keys = self.datastore.secondary_index_range_query('idx-same', 'same')
        self.assertListEqual(['KEY0007', 'KEY0008', 'KEY0009'], sorted(list(keys)))

    def test_delete(self):

        self.assertTrue(self.datastore.exists('KEY9999'))

        keys = self.datastore.secondary_index_range_query('idx-gender-dob', '9|99999999')
        self.assertListEqual(['KEY9999'], list(keys))

        self.datastore.delete('KEY9999')

        self.assertFalse(self.datastore.exists('KEY9999'))

        keys = self.datastore.secondary_index_range_query('idx-gender-dob', '9|99999999')
        self.assertListEqual([], list(keys))