Sunday, 1 April 2018

Riak - Building a Development Environment From Source

Building a Riak development environment, like anything involving Linux, is needlessly complicated for no good reason. This method to build from source worked for me from a clean install of Lubuntu 17.10.1:

First, update your package index and install the dependencies and utilities you will need:

$ sudo apt-get update
$ sudo apt-get install build-essential autoconf libncurses5-dev libpam0g-dev openssl libssl-dev fop xsltproc unixodbc-dev git curl

Next, navigate to user home, download kerl and use it to build and install the Basho version of Erlang (WHY?!?!). These steps took a while to complete on my machine, bear with it:

$ cd ~
$ curl -O https://raw.githubusercontent.com/kerl/kerl/master/kerl
$ chmod a+x kerl
$ ./kerl build git git://github.com/basho/otp.git OTP_R16B02_basho10 R16B02-basho10
$ ./kerl install R16B02-basho10 ~/erlang/R16B02-basho10
$ . ~/erlang/R16B02-basho10/activate

With Erlang installed, clone the Riak source repository from GitHub and build:

$ git clone https://github.com/basho/riak.git
$ cd riak
$ make rel

Finally, create 8 separate copies of Riak to use in a cluster:

$ make devrel

Start 3 (or more) Riak instances:

$ dev/dev1/bin/riak start
$ dev/dev2/bin/riak start
$ dev/dev3/bin/riak start

Then join instances 2 and 3 with instance 1 to form a cluster:

$ dev/dev2/bin/riak-admin cluster join dev1@127.0.0.1
$ dev/dev3/bin/riak-admin cluster join dev1@127.0.0.1

Check and commit the cluster plan:

$ dev/dev3/bin/riak-admin cluster plan
$ dev/dev3/bin/riak-admin cluster commit

Monitor the cluster status until all pending changes are complete:

$ dev/dev3/bin/riak-admin cluster status
---- Cluster Status ----
Ring ready: false

+--------------------+------+-------+-----+-------+
|        node        |status| avail |ring |pending|
+--------------------+------+-------+-----+-------+
| (C) dev1@127.0.0.1 |valid |  up   |100.0|  34.4 |
|     dev2@127.0.0.1 |valid |  up   |  0.0|  32.8 |
|     dev3@127.0.0.1 |valid |  up   |  0.0|  32.8 |
+--------------------+------+-------+-----+-------+

$ dev/dev3/bin/riak-admin cluster status
---- Cluster Status ----
Ring ready: true

+--------------------+------+-------+-----+-------+
|        node        |status| avail |ring |pending|
+--------------------+------+-------+-----+-------+
| (C) dev1@127.0.0.1 |valid |  up   | 34.4|  --   |
|     dev2@127.0.0.1 |valid |  up   | 32.8|  --   |
|     dev3@127.0.0.1 |valid |  up   | 32.8|  --   |
+--------------------+------+-------+-----+-------+

Check the cluster member status:

$ dev/dev3/bin/riak-admin member-status
================================= Membership ==================================
Status     Ring    Pending    Node
-------------------------------------------------------------------------------
valid      34.4%      --      'dev1@127.0.0.1'
valid      32.8%      --      'dev2@127.0.0.1'
valid      32.8%      --      'dev3@127.0.0.1'
-------------------------------------------------------------------------------
Valid:3 / Leaving:0 / Exiting:0 / Joining:0 / Down:0

Congratulations, you have a development Riak cluster. Test the cluster by writing some data to a node:

$ curl -XPUT http://127.0.0.1:10018/riak/test/helloworld -H "Content-type: application/json" --data-binary "Hello World!"

Use a browser to read the data from each node:
http://127.0.0.1:10018/riak/test/helloworld
http://127.0.0.1:10028/riak/test/helloworld
http://127.0.0.1:10038/riak/test/helloworld

Thursday, 8 February 2018

Tell 'em Steve-Dave!

SoundCloud's web interface is rubbish for downloading podcasts, but their API is pretty good, so here's a handy Python script for downloading all of your favourite Tell 'em Steve-Dave! episodes:

import requests
import os.path

API_URL = "http://api.soundcloud.com"
TRACKS_URL = API_URL + "/users/%(USER_ID)s/tracks" \
                       "?client_id=%(CLIENT_ID)s" \
                       "&offset=%(OFFSET)s" \
                       "&limit=%(LIMIT)s" \
                       "&format=json"
DOWNLOAD_URL = "https://api.soundcloud.com/tracks/%(TRACK_ID)s/download?client_id=%(CLIENT_ID)s"
CHUNK_SIZE = 16 * 1024
TESD_USER_ID = "79299245"
CLIENT_ID = "3b6b877942303cb49ff687b6facb0270"
LIMIT = 10
offset = 0

while True:

    url = TRACKS_URL % {
        "USER_ID": TESD_USER_ID,
        "CLIENT_ID": CLIENT_ID,
        "LIMIT": LIMIT,
        "OFFSET": offset
    }

    tracks = requests.get(url).json()

    if not tracks:
        break

    tracks = [(track["id"], track["title"]) for track in tracks]

    for (id, title) in tracks:

        url = DOWNLOAD_URL % {"TRACK_ID": id, "CLIENT_ID": CLIENT_ID}

        filename = "%s.mp3" % title
        print "downloading: %s" % filename

        if os.path.exists(filename):
            continue;

        request = requests.get(url, stream=True)

        with open(filename, 'wb') as fd:
            chunks = request.iter_content(chunk_size=CHUNK_SIZE)
            for chunk in chunks:
                fd.write(chunk)

    offset = offset + LIMIT

4 colors 4 life

Saturday, 27 January 2018

Overengineering Shit

I’ve had enough of Flickr, for all the standard reasons.

So I set out to build a scalable, secure, distributed, image sharing platform of my own, using open source components, tried and tested tech, with no bullshit.

It would be great, I thought, I could start small, just hosting my own photos; then I could open it up to friends and family, working out the bugs as I go, seamlessly scaling up the hardware as required. Then, who knows, I could open it up to the internet! It was going to be awesome!

I wanted nothing fancy for the implementation - only mature tech, battle tested stuff, which wasn’t going to become unsupported any time soon. And only the right tools for the job:
  1. Bulk uploading files over HTTP is bollocks, transferring files is a solved problem, the system should use FTP.
  2. No custom user database, no hand rolled permissions pseudo-framework, don’t re-invent the wheel, the authentication and authorisation should be handled by LDAP.
  3. Image storage should be implemented using a distributed, scalable filesystem, I want to just add more nodes when disk starts running low.
  4. Image processing, such as thumbnail generation, should be asynchronous with jobs taken from a scalable message queue, that way I can add more message processors when I need to.
  5. Simple REST webservices should be used by a client to fetch images and image metadata from the server.
  6. And finally the web UI should be simple, responsive and avoid JavaScript framework bloat.

My chosen implementations to satisfy the above included:

Also using imgscalr-lib for preview generation, Apache Avro and Apache commons-lang for serialization, Apache Tika for image format detection, SLF4J and Logback for logging and finally Ansible for deployment.

The components logically hang together something like this:

The code was looking good enough to get something up and running - it was time to investigate some hosting costs. I figured I would need a big-ish box to put Apache Cassandra and Apache Directory Server on, a small-ish box for the Apache Tomcat and Apache Webserver, another small-ish box for Apache FTP Server, and a medium sized box for Apache Kafka and the consumer processes.

Time to check out some recommended production hardware requirements for Cassandra:

a minimal production server requires at least 2 cores, and at least 8GB of RAM. Typical production servers have 8 or more cores and at least 32GB of RAM

And for Kafka:

A machine with 64 GB of RAM is a decent choice, but 32 GB machines are not uncommon. Less than 32 GB tends to be counterproductive (you end up needing many, many small machines).

Are you fucking kidding me? When did minimum requirements for a database and a queue become 32GB of fucking RAM each?!

At DigitalOcean's current prices, an 8GB droplet is $40 a month and a 32GB droplet is $160 a month, and the smaller droplets anything between $5 and $20 a month.

Memory vCPUs SSD Disk Transfer Price
1 GB 1 vCPU 25 GB 1 TB $5/mo
$0.007/hr
2 GB 1 vCPU 50 GB 2 TB $10/mo
$0.015/hr
4 GB 2 vCPUs 80 GB 4 TB $20/mo
$0.030/hr
8 GB 4 vCPUs 160 GB 5 TB $40/mo
$0.060/hr
16 GB 6 vCPUs 320 GB 6 TB $80/mo
$0.119/hr
32 GB 8 vCPUs 640 GB 7 TB $160/mo
$0.238/hr
... ... ... ... ...

I guess writing and running scalable systems requires a scalable bank balance - mine does not scale to over $200 a month just to upload some photos. Time to scrap this idea.

What I’ve ended up with is a $10 a month DigitalOcean droplet running nginx and SFTP and a half arsed Python script which uses ImageMagick to generate thumbnails and some static HTML - and you know what? It’s absolutely perfect for me.

Time to reflect on some almost-ten-year-old, but still relevant, wisdom from Ted Dziuba: I'm Going To Scale My Foot Up Your Ass

Python script to generate HTML and thumbnails:

import os
from subprocess import call

CONVERT_CMD = "convert"
PREVIEW_SIZE = "150x150"
PREVIEW_PREFIX = "preview_"
HTML_EXTENSION = ".html"
IMG_EXTENSION = ".jpg"
INDEX = "index.html"
ALBUM_IMG = "/album.png"

LIST_TEMPLATE = """
<!DOCTYPE html>
<html>
  <head>
    <title>taffnaid.photos</title>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <link rel="stylesheet" type="text/css" href="/taffnaidphotos.css">
  </head>
  <body>
    <div id="list" class="list">
      <div id="list-nav" class="nav">
        {0}
      </div>
      <div id="list-previews" class="previews">
        {1}
      </div>    
    </div>
  </body>
</html>
"""

LIST_NAV_TEMPLATE = """
<a href="{0}" class="parent">☷</a>
"""

PREVIEW_TEMPLATE = """
<div class="preview">
  <a href="{0}">
    <img src="{1}" alt=":-("/>
    <div class="name">{2}</div>
  </a>
</div>
"""

VIEW_TEMPLATE = """
<!DOCTYPE html>
<html>
  <head>
    <title>taffnaid.photos</title>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <link rel="stylesheet" type="text/css" href="/taffnaidphotos.css">
  </head>
  <body>
    <div id="view" class="view">
      <div id="view-nav" class="nav">
        {0}
      </div>
      <div id="view-image" class="image">
        <img src="{1}" alt=":-("/>
        <link rel="prefetch" href="{2}">
        <link rel="prefetch" href="{3}">
      </div>
    </div>
  </body>
</html>
"""

VIEW_NAV_TEMPLATE = """
<a href="{0}" class="previous">⟨</a>
<a href="{1}" class="parent">☷</a>
<a href="{2}" class="next">⟩</a>
"""

cwd = os.getcwd()

for root, dirs, files in os.walk(cwd):

    dirs = sorted(dirs, reverse=True)

    files = filter(lambda file: file.lower().endswith(IMG_EXTENSION), files)
    files = filter(lambda file: not file.startswith(PREVIEW_PREFIX), files)
    files = sorted(files)

    preview_html = ""

    for dir in dirs:
        preview_html = PREVIEW_TEMPLATE.format(
            os.path.join(dir, INDEX),
            ALBUM_IMG,
            dir) + preview_html

    for i, file in enumerate(files):
        previous = files[i - 1]
        parent = os.path.join(root.replace(cwd, ""), INDEX)
        next = files[(i + 1) % len(files)]

        nav_html = VIEW_NAV_TEMPLATE.format(
            previous + HTML_EXTENSION,
            parent,
            next + HTML_EXTENSION
        )

        preview_html = preview_html + PREVIEW_TEMPLATE.format(
            file + HTML_EXTENSION,
            PREVIEW_PREFIX + file,
            file)

        view_html = VIEW_TEMPLATE.format(
            nav_html,
            file,
            previous,
            next
        )

        image = os.path.abspath(os.path.join(root, file))
        preview = os.path.abspath(os.path.join(os.path.dirname(image), PREVIEW_PREFIX + os.path.basename(image)))

        if not os.path.exists(preview):

            cmd = [
                CONVERT_CMD,
                "-define", "jpeg:size=%s" % PREVIEW_SIZE,
                image,
                "-thumbnail", "%s^" % PREVIEW_SIZE,
                "-gravity", "center",
                "-extent", PREVIEW_SIZE,
                preview]

            call(cmd)

        view_file = image + HTML_EXTENSION

        with open(view_file, 'w') as view_file:
            view_file.write(view_html)

    parent = os.path.join(os.path.dirname(root.replace(cwd, "")), INDEX)
    nav_html = LIST_NAV_TEMPLATE.format(parent)

    list_html = LIST_TEMPLATE.format(
        nav_html,
        preview_html)
    index_file = os.path.join(root, INDEX)

    with open(index_file, 'w') as index_file:
        index_file.write(list_html)

nginx config to resize and cache images:

proxy_cache_path /var/www/html/cache levels=1:2 keys_zone=resized;

server {
        listen 80 default_server;
        listen [::]:80 default_server;

        root /var/www/html;

        index index.html index.html;

        server_name _;

        location / {
                try_files $uri $uri/ =404;
        }

        location ~ ^/.*(\.jpg|\.JPG)$ {
                proxy_cache resized;
                proxy_cache_valid 200 10d;
                proxy_pass http://127.0.0.1:9001;
        }
}

server {
        listen 9001;
        allow 127.0.0.1;
        deny all;

        root /var/www/html;

        location ~ ^/.*(\.jpg|\.JPG)$ {
                image_filter_buffer 10M;
                image_filter resize 1920 1080;
        }
}

Source Code

Wednesday, 18 October 2017

Use JAXB to generate classes from FHIR XSD schema

Running the FHIR XSD schemas through JAXB throws a bunch of exceptions, for example:

com.sun.istack.SAXParseException2; systemId: file:../xsd/fhir-xhtml.xsd; lineNumber: 283; columnNumber: 52; Property "Lang" is already defined. Use <jaxb:property> to resolve this conflict.

com.sun.istack.SAXParseException2; systemId: file:../xsd/fhir-xhtml.xsd; lineNumber: 1106; columnNumber: 58; Property "Lang" is already defined. Use <jaxb:property> to resolve this conflict.

org.xml.sax.SAXParseException; systemId: file:../xsd/fhir-single.xsd; lineNumber: 81; columnNumber: 31; A class/interface with the same name "org.adrianwalker.fhir.resources.Code" is already in use. Use a class customization to resolve this conflict.

org.xml.sax.SAXParseException; systemId: file:../xsd/fhir-single.xsd; lineNumber: 1173; columnNumber: 34; A class/interface with the same name "org.adrianwalker.fhir.resources.Address" is already in use. Use a class customization to resolve this conflict.

Without modifying the original FHIR XSD files, the JAXB conflicts can be resolved using JAXB bindings:

fhir-xhtml.xjb

<bindings xmlns="http://java.sun.com/xml/ns/jaxb"
          xmlns:xsi="http://www.w3.org/2000/10/XMLSchema-instance"
          xmlns:xs="http://www.w3.org/2001/XMLSchema"
          version="2.1">
  <bindings schemaLocation="../xsd/fhir-xhtml.xsd" version="1.0">

    <!--
    Fixes:-

    com.sun.istack.SAXParseException2; systemId: file:../xsd/fhir-xhtml.xsd;
    lineNumber: 283; columnNumber: 52; Property "Lang" is already defined. Use
    <jaxb:property> to resolve this conflict.
    -->
    <bindings node="//xs:attributeGroup[@name='i18n']">
      <bindings node=".//xs:attribute[@name='lang']">
        <property name="xml:lang"/>
      </bindings>
    </bindings>

    <!--
    Fixes:-

    com.sun.istack.SAXParseException2; systemId: file:../xsd/fhir-xhtml.xsd;
    lineNumber: 1106; columnNumber: 58; Property "Lang" is already defined. Use
    <jaxb:property> to resolve this conflict.
    -->
    <bindings node="//xs:element[@name='bdo']">
      <bindings node=".//xs:attribute[@name='lang']">
        <property name="xml:lang"/>
      </bindings>
    </bindings>
  </bindings>
</bindings>

fhir-single.xjb

<bindings xmlns="http://java.sun.com/xml/ns/jaxb"
          xmlns:xsi="http://www.w3.org/2000/10/XMLSchema-instance"
          xmlns:xs="http://www.w3.org/2001/XMLSchema"
          version="2.1">
  <bindings schemaLocation="../xsd/fhir-single.xsd" version="1.0">

    <!--
    Fixes:-

    org.xml.sax.SAXParseException; systemId: file:../xsd/fhir-single.xsd;
    lineNumber: 81; columnNumber: 31; A class/interface with the same name
    "org.adrianwalker.fhir.Code" is already in use. Use a class customization to
    resolve this conflict.
    -->
    <bindings node="//xs:complexType[@name='code']">
      <class name="CodeString" />
    </bindings>

    <!--
    Fixes:-

    org.xml.sax.SAXParseException; systemId: file:../xsd/fhir-single.xsd;
    lineNumber: 1173; columnNumber: 34; A class/interface with the same name
    "org.adrianwalker.fhir.Address" is already in use. Use a class customization
    to resolve this conflict.
    -->
    <bindings node="//xs:complexType[@name='Address']">
      <class name="PostalAddress" />
    </bindings>

  </bindings>
</bindings>

I've used the org.jvnet.jaxb2.maven2 jaxb2-maven-plugin Maven plugin, configured with the net.java.dev.jaxb2-commons jaxb-fluent-api plugin to generate the resource classes, with fluent API mutators for method chaining.

pom.xml

...
<build>
  <plugins>
    <plugin>
      <groupId>org.jvnet.jaxb2.maven2</groupId>
      <artifactId>maven-jaxb2-plugin</artifactId>
      <version>0.13.2</version>
      <configuration>
        <extension>true</extension>
        <args>
          <arg>-Xfluent-api</arg>
        </args>
        <schemaDirectory>src/main/xsd</schemaDirectory>
        <bindingDirectory>src/main/xjb</bindingDirectory>
        <generatePackage>org.adrianwalker.fhir.resources</generatePackage>
        <plugins>
          <plugin>
            <groupId>net.java.dev.jaxb2-commons</groupId>
            <artifactId>jaxb-fluent-api</artifactId>
            <version>2.1.8</version>
          </plugin>
        </plugins>
      </configuration>
      <executions>
        <execution>
          <goals>
            <goal>generate</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>
...

For example usage of generated classes and minimal unit testing see PatientExampleTest.java:

PatientExampleTest.java

package org.adrianwalker.fhir.resources;

import java.io.ByteArrayOutputStream;
import java.io.File;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
import javax.xml.transform.stream.StreamSource;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/*
 * Patient Example xml from: https://www.hl7.org/fhir/patient-example.xml.html
 */
public final class PatientExampleTest {

  private static Unmarshaller unmarshaller;
  private static Marshaller marshaller;

  @BeforeClass
  public static void setUp() throws JAXBException {

    JAXBContext context = JAXBContext.newInstance(Patient.class);
    unmarshaller = context.createUnmarshaller();
    marshaller = context.createMarshaller();
  }

  @Test
  public void testXmlToPatient() throws JAXBException {

    Patient patient = unmarshalPatient("src/test/resources/patient-example.xml");

    Assert.assertEquals("example", patient.getId().getValue());
    Assert.assertEquals("Chalmers", patient.getName().get(0).getFamily().getValue());
    Assert.assertEquals("Peter", patient.getName().get(0).getGiven().get(0).getValue());
    Assert.assertEquals("James", patient.getName().get(0).getGiven().get(1).getValue());
  }

  @Test
  public void testPatientToXml() throws JAXBException {

    Patient patient = new Patient()
            .withId(new Id().withValue("test"))
            .withName(new HumanName()
                    .withGiven(new String().withValue("Adrian"))
                    .withFamily(new String().withValue("Walker")));

    Assert.assertEquals(
            "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>"
            + "<Patient xmlns=\"http://hl7.org/fhir\" xmlns:ns2=\"http://www.w3.org/1999/xhtml\">"
            + "<id value=\"test\"/>"
            + "<name>"
            + "<family value=\"Walker\"/>"
            + "<given value=\"Adrian\"/>"
            + "</name>"
            + "</Patient>",
            marshalPatient(patient));
  }

  private Patient unmarshalPatient(final java.lang.String filename) throws JAXBException {

    JAXBElement<Patient> element = unmarshaller.unmarshal(
            new StreamSource(new File(filename)), Patient.class);

    return element.getValue();
  }

  private java.lang.String marshalPatient(final Patient patient) throws JAXBException {

    JAXBElement<Patient> element = new ObjectFactory().createPatient(patient);

    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    marshaller.marshal(element, baos);

    return baos.toString();
  }
}

Source Code

Build and Test

The project is a standard Maven project which can be built with:

mvn clean install

Sunday, 24 September 2017

FTP files into Apache Cassandra with Apache FtpServer

Apache FtpServer provides an API to allow you to implement your own file system to back file uploads and downloads. Using the native file system as a guide, this project builds on a previous blog post - Another Apache Cassandra File System, which implements a chunked file system with persistence provided by Apache Cassandra to read/write files directly to/from the database.

Make sure you clone and build the file system project and stand up the Cassandra database from this git repository, before using the code in this blog post - it's a required dependency.

To create an alternative file system, you need to implement three interfaces from the FtpServer ftplet-api: FileSystemFactory, FileSystemView and FtpFile.

CassandraFileSystemFactory.java

package org.adrianwalker.ftpserver.filesystem;

import org.adrianwalker.cassandra.filesystem.controller.FileSystemController;
import org.apache.ftpserver.ftplet.FileSystemFactory;
import org.apache.ftpserver.ftplet.FileSystemView;
import org.apache.ftpserver.ftplet.FtpException;
import org.apache.ftpserver.ftplet.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class CassandraFileSystemFactory implements FileSystemFactory {

  private static final Logger LOGGER = LoggerFactory.getLogger(CassandraFileSystemFactory.class);

  private final FileSystemController controller;

  public CassandraFileSystemFactory(final FileSystemController controller) {

    LOGGER.debug("controller = {}", controller);

    if (null == controller) {
      throw new IllegalArgumentException("controller is null");
    }

    this.controller = controller;
  }

  @Override
  public FileSystemView createFileSystemView(final User user) throws FtpException {

    LOGGER.debug("user = {}", user);

    if (null == user) {
      throw new IllegalArgumentException("user is null");
    }

    return new CassandraFileSystemView(user, controller);
  }
}

CassandraFileSystemView.java

package org.adrianwalker.ftpserver.filesystem;

import static java.io.File.separator;

import org.adrianwalker.cassandra.filesystem.controller.FileSystemController;
import org.adrianwalker.cassandra.filesystem.entity.File;
import org.apache.ftpserver.ftplet.FileSystemView;
import org.apache.ftpserver.ftplet.FtpException;
import org.apache.ftpserver.ftplet.FtpFile;
import org.apache.ftpserver.ftplet.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Path;
import java.nio.file.Paths;

public final class CassandraFileSystemView implements FileSystemView {

  private static final Logger LOGGER = LoggerFactory.getLogger(CassandraFileSystemView.class);

  private final User user;
  private final FileSystemController controller;

  private final String homeDirectory;
  private String workingDirectory;

  public CassandraFileSystemView(final User user, final FileSystemController controller) {

    LOGGER.debug("user = {}, controller = {}", user, controller);

    if (null == user) {
      throw new IllegalArgumentException("user is null");
    }

    if (null == controller) {
      throw new IllegalArgumentException("controller is null");
    }

    this.user = user;
    this.controller = controller;

    this.homeDirectory = user.getHomeDirectory();
    this.workingDirectory = homeDirectory;
  }

  @Override
  public FtpFile getHomeDirectory() throws FtpException {

    LOGGER.debug("homeDirectory = {}", homeDirectory);

    FtpFile file = getFile(homeDirectory);

    if (!file.doesExist()) {
      file = createDirectory(homeDirectory);
    }

    return file;
  }

  @Override
  public FtpFile getWorkingDirectory() throws FtpException {

    LOGGER.debug("workingDirectory = {}", workingDirectory);

    FtpFile file = getFile(workingDirectory);

    if (!file.doesExist()) {
      file = createDirectory(workingDirectory);
    }

    return file;
  }

  @Override
  public boolean changeWorkingDirectory(final String workingDirectory) throws FtpException {

    LOGGER.debug("workingDirectory = {}", workingDirectory);

    FtpFile file = getFile(workingDirectory);
    boolean exists = file.doesExist();

    if (exists) {
      this.workingDirectory = file.getAbsolutePath();
    }

    return exists;
  }

  @Override
  public FtpFile getFile(final String name) throws FtpException {

    LOGGER.debug("name = {}", name);

    if (null == name) {
      throw new IllegalArgumentException("name is null");
    }

    String path = normalize(name);
    File file = controller.getFile(path);

    return new CassandraFtpFile(user, path, file, controller);
  }

  @Override
  public boolean isRandomAccessible() throws FtpException {

    return false;
  }

  @Override
  public void dispose() {
  }

  private String normalize(final String name) {

    LOGGER.debug("name = {}", name);

    Path path;
    if (name.startsWith(separator)) {
      path = Paths.get(name);
    } else {
      path = Paths.get(workingDirectory, name);
    }

    String normalizedName = path
            .normalize()
            .toString();

    LOGGER.debug("normalizedName = {}", normalizedName);

    return normalizedName;
  }

  private FtpFile createDirectory(final String path) {

    LOGGER.debug("path = {}", path);

    File directory = new File();
    directory.setName(Paths.get(path).getFileName().toString());
    directory.setDirectory(true);
    directory.setOwner(user.getName());
    directory.setGroup(user.getName());
    directory.setModified(System.currentTimeMillis());

    controller.saveFile(path, directory);

    return new CassandraFtpFile(user, path, directory, controller);
  }
}

CassandraFtpFile.java

package org.adrianwalker.ftpserver.filesystem;

import static java.util.stream.Collectors.toList;

import org.adrianwalker.cassandra.filesystem.controller.FileSystemController;
import org.adrianwalker.cassandra.filesystem.entity.File;
import org.apache.ftpserver.ftplet.FtpFile;
import org.apache.ftpserver.ftplet.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Paths;
import java.util.List;

public final class CassandraFtpFile implements FtpFile {

  private static final Logger LOGGER = LoggerFactory.getLogger(CassandraFtpFile.class);

  private final User user;
  private final String path;
  private File file;
  private final FileSystemController controller;

  public CassandraFtpFile(
          final User user,
          final String path,
          final File file,
          final FileSystemController controller) {

    LOGGER.debug("user = {}, path = {}, file = {}, controller = {}", user, path, file, controller);

    if (null == user) {
      throw new IllegalArgumentException("user is null");
    }

    if (null == path) {
      throw new IllegalArgumentException("path is null");
    }

    if (null == controller) {
      throw new IllegalArgumentException("controller is null");
    }

    this.user = user;
    this.path = path;
    this.file = file;
    this.controller = controller;
  }

  @Override
  public String getAbsolutePath() {

    LOGGER.debug("path = {}", path);

    return path;
  }

  @Override
  public String getName() {

    String name = file.getName();
    LOGGER.debug("name = {}", name);

    return name;
  }

  @Override
  public boolean isHidden() {

    boolean hidden = file.isHidden();
    LOGGER.debug("hidden = {}", hidden);

    return hidden;
  }

  @Override
  public boolean isDirectory() {

    boolean directory = file.isDirectory();
    LOGGER.debug("directory = {}", directory);

    return directory;
  }

  @Override
  public boolean isFile() {

    boolean file = !isDirectory();
    LOGGER.debug("file = {}", file);

    return file;
  }

  @Override
  public boolean doesExist() {

    boolean exists = file != null;
    LOGGER.debug("exists = {}", exists);

    return exists;
  }

  @Override
  public boolean isReadable() {

    boolean readable = doesExist();
    LOGGER.debug("readable = {}", readable);

    return readable;
  }

  @Override
  public boolean isWritable() {

    boolean writable = path.startsWith(user.getHomeDirectory());
    LOGGER.debug("writable = {}", writable);

    return writable;
  }

  @Override
  public boolean isRemovable() {

    boolean removable = doesExist() && isWritable();
    LOGGER.debug("removable = {}", removable);

    return removable;
  }

  @Override
  public String getOwnerName() {

    String owner = file.getOwner();
    LOGGER.debug("owner = {}", owner);

    return owner;
  }

  @Override
  public String getGroupName() {

    String group = file.getGroup();
    LOGGER.debug("group = {}", group);

    return group;
  }

  @Override
  public int getLinkCount() {

    int linkCount = file.isDirectory() ? 2 : 1;
    LOGGER.debug("linkCount = {}", linkCount);

    return linkCount;
  }

  @Override
  public long getLastModified() {

    long lastModified = file.getModified();
    LOGGER.debug("lastModified = {}", lastModified);

    return lastModified;
  }

  @Override
  public boolean setLastModified(final long lastModified) {

    LOGGER.debug("lastModified = {}", lastModified);
    file.setModified(lastModified);

    return controller.saveFile(path, file);
  }

  @Override
  public long getSize() {

    long size = file.getSize();
    LOGGER.debug("size = {}", size);

    return size;
  }

  @Override
  public Object getPhysicalFile() {

    LOGGER.debug("file = {}", file);

    return file;
  }

  @Override
  public boolean mkdir() {

    LOGGER.debug("path = {}", path);

    File directory = new File();
    directory.setName(Paths.get(path).getFileName().toString());
    directory.setDirectory(true);
    directory.setOwner(user.getName());
    directory.setGroup(user.getName());

    return controller.saveFile(path, directory);
  }

  @Override
  public boolean delete() {

    LOGGER.debug("path = {}", path);

    return controller.deleteFile(path);
  }

  @Override
  public boolean move(final FtpFile ftpFile) {

    LOGGER.debug("ftpFile = {}", ftpFile);

    if (null == ftpFile) {
      throw new IllegalArgumentException("ftpFile is null");
    }

    return controller.moveFile(path, ftpFile.getAbsolutePath());
  }

  @Override
  public List<CassandraFtpFile> listFiles() {

    LOGGER.debug("path = {}", path);

    return controller.listFiles(path)
            .stream().map(file -> new CassandraFtpFile(
            user, Paths.get(path, file.getName()).toString(), file, controller))
            .collect(toList());
  }

  @Override
  public OutputStream createOutputStream(final long offset) throws IOException {

    LOGGER.debug("offset = {}", offset);

    if (offset != 0) {
      throw new IllegalArgumentException("zero offset unsupported");
    }

    if (null == file) {
      file = new File();
      file.setName(Paths.get(path).getFileName().toString());
      file.setDirectory(false);
      file.setOwner(user.getName());
      file.setGroup(user.getName());
      file.setModified(System.currentTimeMillis());

      controller.saveFile(path, file);
    }

    return new BufferedOutputStream(controller.createOutputStream(file));
  }

  @Override
  public InputStream createInputStream(final long offset) throws IOException {

    LOGGER.debug("offset = {}", offset);

    if (offset != 0) {
      throw new IllegalArgumentException("zero offset unsupported");
    }

    return new BufferedInputStream(controller.createInputStream(file));
  }
}

Example usage when used with an embedded FTP server:

private void exampleUsage() throws FtpException {

  ListenerFactory listenerFactory = new ListenerFactory();
  listenerFactory.setPort(8021);

  FtpServerFactory serverFactory = new FtpServerFactory();
  serverFactory.addListener("default", listenerFactory.createListener());

  Cluster cluster = new Cluster.Builder()
          .addContactPoints("127.0.0.1")
          .withPort(9042)
          .build();
  Session session = cluster.connect("filesystem");
  FileSystemController controller = new FileSystemController(session);

  serverFactory.setFileSystem(new CassandraFileSystemFactory(controller));

  PropertiesUserManagerFactory userManagerFactory = new PropertiesUserManagerFactory();
  userManagerFactory.setFile(new File("users.properties"));
  serverFactory.setUserManager(userManagerFactory.createUserManager());

  FtpServer server = serverFactory.createServer();
  server.start();
}

With a users properties file, where the test username is testuser, and the MD5 encoded password is password.

users.properties

ftpserver.user.testuser.homedirectory=/testuser
ftpserver.user.testuser.userpassword=5f4dcc3b5aa765d61d8327deb882cf99
ftpserver.user.testuser.maxloginnumber=3
ftpserver.user.testuser.writepermission=true

Source Code

Build and Test

The project is a standard Maven project which can be built with:

mvn clean install

Sunday, 17 September 2017

Java Turing Machine

Here is a Turing Machine implemented in Java as described by the Wikipedia article:
https://en.wikipedia.org/wiki/Turing_machine

With the copy subroutine test taken from:
https://en.wikipedia.org/wiki/Turing_machine_examples

Tape.java

package org.adrianwalker.turingmachine;

import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.range;
import static java.util.stream.IntStream.rangeClosed;

import java.util.List;
import java.util.TreeMap;

public final class Tape {

  private final TreeMap<Integer, String> cells;
  private final String blank;

  public Tape(final String blank) {

    this.cells = new TreeMap<>();
    this.blank = blank;
  }

  public List<String> getCells() {

    return rangeClosed(cells.firstKey(), cells.lastKey())
            .boxed()
            .map(i -> getCell(i))
            .collect(toList());
  }

  public void putCells(final List<String> symbols) {

    range(0, symbols.size())
            .boxed()
            .forEach(i -> putCell(i, symbols.get(i)));
  }

  public String getCell(final int position) {

    return cells.getOrDefault(position, blank);
  }

  public void putCell(final int position, final String symbol) {

    cells.put(position, symbol);
  }
}

Head.java

package org.adrianwalker.turingmachine;

public final class Head {

  private final Tape tape;
  private final String leftSymbol;
  private final String rightSymbol;
  private final String noOpSymbol;
  private int position = 0;

  public Head(
          final Tape tape,
          final String leftSymbol, final String rightSymbol, final String noOpSymbol) {

    this.tape = tape;
    this.leftSymbol = leftSymbol;
    this.rightSymbol = rightSymbol;
    this.noOpSymbol = noOpSymbol;
  }

  public void move(final String symbol) {

    if (noOpSymbol.equals(symbol)) {
      return;
    }

    if (leftSymbol.equals(symbol)) {
      position -= 1;
    } else if (rightSymbol.equals(symbol)) {
      position += 1;
    }
  }

  public String read() {

    return tape.getCell(position);
  }

  public void write(final String symbol) {

    if (noOpSymbol.equals(symbol)) {
      return;
    }

    tape.putCell(position, symbol);
  }
}

StateRegister.java

package org.adrianwalker.turingmachine;

public final class StateRegister {

  private final String haltState;
  private String state;

  public StateRegister(final String haltState, final String startState) {

    this.haltState = haltState;
    this.state = startState;
  }

  public boolean isHaltState() {

    return state.equals(haltState);
  }

  public String getState() {

    return state;
  }

  public void setState(final String state) {

    this.state = state;
  }
}

Table.java

package org.adrianwalker.turingmachine;

import java.util.HashMap;
import java.util.Map;

public final class Table {

  public static final class Entry {

    private final String state;
    private final String symbol;
    private final String writeSymbol;
    private final String moveTape;
    private final String nextState;

    public Entry(
            final String state, final String symbol,
            final String writeSymbol, final String moveTape, final String nextState) {

      this.state = state;
      this.symbol = symbol;
      this.writeSymbol = writeSymbol;
      this.moveTape = moveTape;
      this.nextState = nextState;
    }

    public String getState() {
      return state;
    }

    public String getSymbol() {
      return symbol;
    }

    public String getWriteSymbol() {
      return writeSymbol;
    }

    public String getMoveTape() {
      return moveTape;
    }

    public String getNextState() {
      return nextState;
    }
  }

  private static final String SEPARATOR = "_";

  private final Map<String, Entry> table;

  public Table() {

    table = new HashMap<>();
  }

  public void put(
          final String state, final String symbol,
          final String writeSymbol, final String moveTape, final String nextState) {

    table.put(
            state + SEPARATOR + symbol,
            new Entry(state, symbol, writeSymbol, moveTape, nextState));
  }

  public Entry get(final String state, final String symbol) {

    return table.get(state + SEPARATOR + symbol);
  }
}

TuringMachine.java

package org.adrianwalker.turingmachine;

import org.adrianwalker.turingmachine.Table.Entry;

public final class TuringMachine {

  private final Head head;
  private final StateRegister stateRegister;
  private final Table table;

  public TuringMachine(final Head head, final StateRegister stateRegister, final Table table) {

    this.head = head;
    this.stateRegister = stateRegister;
    this.table = table;
  }

  public long execute() {

    long steps = 0;

    while (!stateRegister.isHaltState()) {

      steps++;

      String state = stateRegister.getState();
      String symbol = head.read();

      Entry entry = table.get(state, symbol);
      head.write(entry.getWriteSymbol());
      head.move(entry.getMoveTape());
      stateRegister.setState(entry.getNextState());
    }

    return steps;
  }
}

TuringMachineTest.java

package org.adrianwalker.turingmachine;

import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import org.junit.Test;

public final class TuringMachineTest {

  private static final String BLANK = "0";
  private static final String MOVE_LEFT = "L";
  private static final String MOVE_RIGHT = "R";
  private static final String NO_OP = "N";
  private static final String HALT_STATE = "H";
  private static final String START_STATE = "A";

  @Test
  public void testBusyBeaver() {

    Tape tape = new Tape(BLANK);
    Head head = new Head(tape, MOVE_LEFT, MOVE_RIGHT, NO_OP);
    StateRegister stateRegister = new StateRegister(HALT_STATE, START_STATE);

    Table table = new Table();
    table.put("A", "0", "1", "R", "B");
    table.put("A", "1", "1", "L", "C");
    table.put("B", "0", "1", "L", "A");
    table.put("B", "1", "1", "R", "B");
    table.put("C", "0", "1", "L", "B");
    table.put("C", "1", "1", "N", "H");

    TuringMachine machine = new TuringMachine(head, stateRegister, table);
    long steps = machine.execute();

    assertEquals(13, steps);
    assertEquals(asList("1", "1", "1", "1", "1", "1"), tape.getCells());
  }

  @Test
  public void testCopySubroutine() {

    Tape tape = new Tape(BLANK);
    tape.putCells(asList("1", "1", "1"));

    Head head = new Head(tape, MOVE_LEFT, MOVE_RIGHT, NO_OP);
    StateRegister stateRegister = new StateRegister(HALT_STATE, START_STATE);

    Table table = new Table();
    table.put("A", "0", "N", "N", "H");
    table.put("A", "1", "0", "R", "B");
    table.put("B", "0", "0", "R", "C");
    table.put("B", "1", "1", "R", "B");
    table.put("C", "0", "1", "L", "D");
    table.put("C", "1", "1", "R", "C");
    table.put("D", "0", "0", "L", "E");
    table.put("D", "1", "1", "L", "D");
    table.put("E", "0", "1", "R", "A");
    table.put("E", "1", "1", "L", "E");

    TuringMachine machine = new TuringMachine(head, stateRegister, table);
    long steps = machine.execute();

    assertEquals(28, steps);
    assertEquals(asList("1", "1", "1", "0", "1", "1", "1"), tape.getCells());
  }
}

Source Code

Build and Test

The project is a standard Maven project which can be built with:

mvn clean install

Sunday, 10 September 2017

Another Apache Cassandra File System

I want to be able to store files in Apache Cassandra from a Java application, using something like CFS or DSEFS, but both of those appear to be proprietary, part of Datastax Enterprise and closed source.

SnackFS seems like a good open source alternative, written in Scala, but I have a specific use case in mind and want control over the implementation, so decided to roll my own.

The schema for the file system contains four tables, using a key space called ‘filesystem’, the CQL looks like this:

filesystem.cql

CREATE TABLE filesystem.file (
  id uuid,
  name text,
  size bigint,
  modified bigint,
  group text,
  owner text,
  hidden boolean,
  directory boolean,
  PRIMARY KEY (id)
);

CREATE TABLE filesystem.chunk (
  file_id uuid,
  chunk_number int,
  content blob,
  PRIMARY KEY (file_id, chunk_number)
);

CREATE TABLE filesystem.path (
  path text,
  file_id uuid,
  PRIMARY KEY (path)
);

CREATE TABLE filesystem.parent_path (
  path text,
  file_id uuid,
  PRIMARY KEY (path, file_id)
);

file table

The file table has a UUID primary/partitioning key, the ‘id’ column, so a random UUID can be used to enable even distribution around the ring and eliminate hot spots. The file table also contains information about the file: the file name, size in bytes, last modified time, group name, owner name, hidden flag and directory flag. The file table does not contain any information about the file’s absolute path or the contents of the file.

chunk table

The chunk table stores chunks of file content as a BLOB in the ‘content’ column. The Datastax site recommends using a relatively small BLOB size:

"The maximum theoretical size for a blob is 2 GB. The practical limit on blob size, however, is less than 1 MB."

The chunk table’s primary/partitioning key, the ‘file_id’ column, is a UUID and is intended to be equal to the corresponding file ID in the file table. This means a file’s details and content can be read using the same ID, and the file record and the corresponding chunk record(s) will reside on the same node in the ring. The chunk table also has a clustering column, ‘chunk_number’, which holds sequential file content chunk index numbers. This allows chunks to be read in the correct order and allows all the chunks for a given file ID to be deleted with one query.

path and parent_path tables

The path table is used as an inverted index to map an absolute file path to a file ID. It’s primary/partitioning key, the ‘path’ column, contains the file’s absolute path, and the ‘file_id’ column is the corresponding file UUID.

The parent_path table is used as an inverted index to map an absolute directory path to multiple file ID’s. It’s primary/partitioning key, the ‘path’ column, contains the file’s parent directory’s absolute path. The parent_path table also has a clustering column, the ‘file_id‘ column, this allows directory listing by querying the ‘path’ column primary key and returning all the file ID’s contained by the directory.

Moving files is accomplished by deleting a file’s ‘path’ and ‘parent_path’ entries and inserting new path information. No changes to ‘file’ and ‘chunk’ tables are required.

The entity classes mapped to the tables are:

File.java

package org.adrianwalker.cassandra.filesystem.entity;

import com.datastax.driver.core.utils.UUIDs;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;

import java.util.UUID;

@Table(keyspace = "filesystem", name = "file")
public final class File {

  private UUID id;
  private String name;
  private long size;
  private long modified;
  private String group;
  private String owner;
  private boolean hidden;
  private boolean directory;

  public File() {
  }

  @PartitionKey
  @Column(name = "id")
  public UUID getId() {

    if (null == id) {
      id = UUIDs.random();
    }

    return id;
  }

  public void setId(final UUID id) {

    this.id = id;
  }

  @Column(name = "name")
  public String getName() {

    return name;
  }

  public void setName(final String name) {

    this.name = name;
  }

  @Column(name = "size")
  public long getSize() {

    return size;
  }

  public void setSize(final long size) {

    this.size = size;
  }

  @Column(name = "modified")
  public long getModified() {

    return modified;
  }

  public void setModified(final long modified) {

    this.modified = modified;
  }

  @Column(name = "group")
  public String getGroup() {

    return group;
  }

  public void setGroup(final String group) {

    this.group = group;
  }

  @Column(name = "owner")
  public String getOwner() {

    return owner;
  }

  public void setOwner(final String owner) {

    this.owner = owner;
  }

  @Column(name = "hidden")
  public boolean isHidden() {

    return hidden;
  }

  public void setHidden(final boolean hidden) {

    this.hidden = hidden;
  }

  @Column(name = "directory")
  public boolean isDirectory() {

    return directory;
  }

  public void setDirectory(final boolean directory) {

    this.directory = directory;
  }
}

Chunk.java

package org.adrianwalker.cassandra.filesystem.entity;

import com.datastax.driver.mapping.annotations.ClusteringColumn;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;

import java.nio.ByteBuffer;
import java.util.UUID;

@Table(keyspace = "filesystem", name = "chunk")
public final class Chunk {

  private UUID fileId;
  private int chunkNumber;
  private ByteBuffer content;

  public Chunk() {
  }

  @PartitionKey
  @Column(name = "file_id")
  public UUID getFileId() {

    return fileId;
  }

  public void setFileId(final UUID fileId) {

    this.fileId = fileId;
  }

  @ClusteringColumn
  @Column(name = "chunk_number")
  public int getChunkNumber() {

    return chunkNumber;
  }

  public void setChunkNumber(final int chunkNumber) {

    this.chunkNumber = chunkNumber;
  }

  @Column(name = "content")
  public ByteBuffer getContent() {

    return content;
  }

  public void setContent(final ByteBuffer content) {

    this.content = content;
  }
}

Path.java

package org.adrianwalker.cassandra.filesystem.entity;

import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;

import java.util.UUID;

@Table(keyspace = "filesystem", name = "path")
public final class Path {

  private String path;
  private UUID fileId;

  public Path() {
  }

  public Path(final String path, final UUID fileId) {

    this.path = path;
    this.fileId = fileId;
  }

  @PartitionKey
  @Column(name = "path")
  public String getPath() {

    return path;
  }

  public void setPath(final String path) {

    this.path = path;
  }

  @Column(name = "file_id")
  public UUID getFileId() {

    return fileId;
  }

  public void setFileId(final UUID fileId) {

    this.fileId = fileId;
  }
}

ParentPath.java

package org.adrianwalker.cassandra.filesystem.entity;

import com.datastax.driver.mapping.annotations.ClusteringColumn;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;

import java.util.UUID;

@Table(keyspace = "filesystem", name = "parent_path")
public final class ParentPath {

  private String path;
  private UUID fileId;

  public ParentPath() {
  }

  public ParentPath(final String path, final UUID fileId) {

    this.path = path;
    this.fileId = fileId;
  }

  @PartitionKey
  @Column(name = "path")
  public String getPath() {

    return path;
  }

  public void setPath(final String path) {

    this.path = path;
  }

  @ClusteringColumn
  @Column(name = "file_id")
  public UUID getFileId() {

    return fileId;
  }

  public void setFileId(final UUID fileId) {

    this.fileId = fileId;
  }
}

The class for controlling file operations and creating chunked data input and output streams:

FileSystemController.java

package org.adrianwalker.cassandra.filesystem.controller;

import static java.util.Collections.EMPTY_LIST;
import static java.util.stream.Collectors.toList;

import com.datastax.driver.core.Session;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import com.datastax.driver.mapping.Result;
import com.datastax.driver.mapping.annotations.Accessor;
import com.datastax.driver.mapping.annotations.Param;
import com.datastax.driver.mapping.annotations.Query;
import org.adrianwalker.cassandra.filesystem.entity.Chunk;
import org.adrianwalker.cassandra.filesystem.entity.File;
import org.adrianwalker.cassandra.filesystem.entity.ParentPath;
import org.adrianwalker.cassandra.filesystem.entity.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.List;
import java.util.UUID;

public final class FileSystemController {

  @Accessor
  private interface ParentPathAccessor {

    @Query("SELECT * FROM parent_path WHERE path = :path")
    Result<ParentPath> selectParentPathByPath(@Param("path") String path);
  }

  @Accessor
  private interface FileAccessor {

    @Query("SELECT * FROM file WHERE id IN :ids")
    Result<File> selectFilesByIds(@Param("ids") List<UUID> ids);
  }

  @Accessor
  private interface ChunkAccessor {

    @Query("DELETE FROM chunk WHERE file_id = :file_id")
    void deleteChunksByFileId(@Param("file_id") UUID fileId);
  }

  private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemController.class);

  private final Mapper<ParentPath> parentPathMapper;
  private final Mapper<Path> pathMapper;
  private final Mapper<File> fileMapper;
  private final Mapper<Chunk> chunkMapper;

  private final ParentPathAccessor parentPathAccessor;
  private final FileAccessor fileAccessor;
  private final ChunkAccessor chunkAccessor;

  public FileSystemController(final Session session) {

    LOGGER.debug("session = {}", session);

    if (null == session) {
      throw new IllegalArgumentException("session is null");
    }

    MappingManager manager = new MappingManager(session);

    parentPathMapper = manager.mapper(ParentPath.class);
    pathMapper = manager.mapper(Path.class);
    fileMapper = manager.mapper(File.class);
    chunkMapper = manager.mapper(Chunk.class);

    parentPathAccessor = manager.createAccessor(ParentPathAccessor.class);
    fileAccessor = manager.createAccessor(FileAccessor.class);
    chunkAccessor = manager.createAccessor(ChunkAccessor.class);
  }

  public File getFile(final String path) {

    LOGGER.debug("path = {}", path);

    if (null == path) {
      throw new IllegalArgumentException("path is null");
    }

    Path filePath = pathMapper.get(path);
    if (null == filePath) {
      return null;
    }

    return fileMapper.get(filePath.getFileId());
  }

  public File saveFile(final String path, final File file) {

    LOGGER.debug("path = {}, file = {}", path, file);

    if (null == path) {
      throw new IllegalArgumentException("path is null");
    }

    if (null == file) {
      throw new IllegalArgumentException("file is null");
    }

    if (null == file.getId()) {
      file.setId(UUID.randomUUID());
    }

    pathMapper.save(new Path(path, file.getId()));
    parentPathMapper.save(new ParentPath(getParent(path), file.getId()));

    file.setModified(System.currentTimeMillis());
    fileMapper.save(file);

    return file;
  }

  public boolean deleteFile(final String path) {

    LOGGER.debug("path = {}", path);

    if (null == path) {
      throw new IllegalArgumentException("path is null");
    }

    File file = getFile(path);

    if (null == file) {
      return false;
    }

    pathMapper.delete(path);

    String parentPath = getParent(path);
    parentPathMapper.delete(parentPath, file.getId());

    chunkAccessor.deleteChunksByFileId(file.getId());

    fileMapper.delete(file.getId());

    return true;
  }

  public List<File> listFiles(final String parentPath) {

    LOGGER.debug("parentPath = {}", parentPath);

    if (null == parentPath) {
      throw new IllegalArgumentException("parentPath is null");
    }

    List<UUID> ids = getFileIds(parentPath);

    List<File> files;
    if (ids.isEmpty()) {
      files = EMPTY_LIST;
    } else {
      files = fileAccessor.selectFilesByIds(ids).all();
    }

    return files;
  }

  public void moveFile(final String fromPath, final String toPath) {

    LOGGER.debug("fromPath = {}, toPath = {}", fromPath, toPath);

    if (null == fromPath) {
      throw new IllegalArgumentException("fromPath is null");
    }

    if (null == toPath) {
      throw new IllegalArgumentException("toPath is null");
    }

    File file = getFile(fromPath);

    if (null == file) {
      return;
    }

    String toParentPath = getParent(toPath);
    pathMapper.save(new Path(toPath, file.getId()));
    parentPathMapper.save(new ParentPath(toParentPath, file.getId()));

    String fromParentPath = getParent(fromPath);

    if (!fromPath.equals(toPath)) {
      pathMapper.delete(fromPath);
    }

    if (!fromParentPath.equals(toParentPath)) {
      parentPathMapper.delete(fromParentPath, file.getId());
    }

    file.setName(getFileName(toPath));
    file.setModified(System.currentTimeMillis());
    fileMapper.save(file);
  }

  public OutputStream createOutputStream(final File file) {

    LOGGER.debug("file = {}", file);

    if (null == file) {
      throw new IllegalArgumentException("file is null");
    }

    chunkAccessor.deleteChunksByFileId(file.getId());

    return new OutputStream() {

      private static final int CAPACITY = 1 * 1024 * 1024;

      private Chunk chunk = null;
      private int chunkNumber = 0;
      private long bytesWritten = 0;

      @Override
      public void write(final int b) throws IOException {

        if (null == chunk) {
          chunk = new Chunk();
          chunk.setFileId(file.getId());
          chunk.setChunkNumber(chunkNumber);
          chunk.setContent(ByteBuffer.allocate(CAPACITY));
        }

        ByteBuffer content = chunk.getContent();
        content.put((byte) (b & 0xFF));

        if (content.position() == content.limit()) {
          save(content);
        }
      }

      @Override
      public void close() throws IOException {

        if (null != chunk) {
          ByteBuffer content = chunk.getContent();
          save(content);
        }

        file.setSize(bytesWritten);
        file.setModified(System.currentTimeMillis());
        fileMapper.save(file);
      }

      private void save(final ByteBuffer content) {

        content.flip();
        chunkMapper.save(chunk);

        chunk = null;
        chunkNumber++;
        bytesWritten += content.limit();
      }
    };
  }

  public InputStream createInputStream(final File file) {

    LOGGER.debug("file = {}", file);

    if (null == file) {
      throw new IllegalArgumentException("file is null");
    }

    return new InputStream() {

      private Chunk chunk = null;
      private int chunkNumber = 0;
      private long bytesRead = 0;

      @Override
      public int read() throws IOException {

        if (bytesRead == file.getSize()) {
          return -1;
        }

        if (null == chunk) {
          chunk = chunkMapper.get(file.getId(), chunkNumber);
        }

        ByteBuffer content = chunk.getContent();
        byte b = content.get();

        if (content.position() == content.limit()) {
          chunk = null;
          chunkNumber++;
          bytesRead += content.position();
        }

        return b & 0xFF;
      }
    };
  }

  private List<UUID> getFileIds(final String parentPath) {

    return parentPathAccessor.selectParentPathByPath(parentPath)
            .all()
            .stream()
            .map(pp -> pp.getFileId())
            .collect(toList());
  }

  private String getParent(final String path) {

    java.nio.file.Path parent = Paths.get(path).getParent();

    if (null == parent) {
      throw new IllegalArgumentException("invalid path");
    }

    return parent.toString();
  }

  private String getFileName(final String path) {

    java.nio.file.Path fileName = Paths.get(path).getFileName();

    if (null == fileName) {
      throw new IllegalArgumentException("invalid path");
    }

    return fileName.toString();
  }
}

A simple example to create a directory, create a file, write to the file, list directory contents and read from the file:

private void exampleUsage() {

 Cluster cluster = new Cluster.Builder()
          .addContactPoints("localhost")
          .build();
  Session session = cluster.connect("filesystem");

  FileSystemController controller = new FileSystemController(session);

  // create a directory
  File dir = new File();
  dir.setName("testdir");
  dir.setDirectory(true);
  dir.setOwner("test");
  dir.setGroup("test");
  dir.setHidden(false);
  controller.saveFile("/testdir", dir);

  // create a file
  File file = new File();
  file.setName("testfile.txt");
  file.setDirectory(false);
  file.setOwner("test");
  file.setGroup("test");
  file.setHidden(false);
  file = controller.saveFile("/testdir/testfile.txt", file);

  // write contents to file
  OutputStream os = new BufferedOutputStream(controller.createOutputStream(file));
  os.write("test content".getBytes());
  os.flush();
  os.close();

  // list files
  controller.listFiles("/testdir").forEach(f -> {
    System.out.println(f.getId() + "\t" + f.getName() + "\t" + f.getSize());
  });

  // read contents of file
  InputStream in = new BufferedInputStream(controller.createInputStream(file));
  BufferedReader reader = new BufferedReader(new InputStreamReader(in));
  System.out.println(reader.readLine());
  reader.close();
  in.close();

  session.close();
  cluster.close();
}

Source Code

Build and Test

The project is a standard Maven project which can be built with:

mvn clean install