Sunday 10 September 2017

Another Apache Cassandra File System

I want to be able to store files in Apache Cassandra from a Java application, using something like CFS or DSEFS, but both of those appear to be proprietary, part of Datastax Enterprise and closed source.

SnackFS seems like a good open source alternative, written in Scala, but I have a specific use case in mind and want control over the implementation, so decided to roll my own.

The schema for the file system contains four tables, using a key space called ‘filesystem’, the CQL looks like this:

filesystem.cql

CREATE TABLE filesystem.file (
  id uuid,
  name text,
  size bigint,
  modified bigint,
  group text,
  owner text,
  hidden boolean,
  directory boolean,
  PRIMARY KEY (id)
);

CREATE TABLE filesystem.chunk (
  file_id uuid,
  chunk_number int,
  content blob,
  PRIMARY KEY (file_id, chunk_number)
);

CREATE TABLE filesystem.path (
  path text,
  file_id uuid,
  PRIMARY KEY (path)
);

CREATE TABLE filesystem.parent_path (
  path text,
  file_id uuid,
  PRIMARY KEY (path, file_id)
);

file table

The file table has a UUID primary/partitioning key, the ‘id’ column, so a random UUID can be used to enable even distribution around the ring and eliminate hot spots. The file table also contains information about the file: the file name, size in bytes, last modified time, group name, owner name, hidden flag and directory flag. The file table does not contain any information about the file’s absolute path or the contents of the file.

chunk table

The chunk table stores chunks of file content as a BLOB in the ‘content’ column. The Datastax site recommends using a relatively small BLOB size:

"The maximum theoretical size for a blob is 2 GB. The practical limit on blob size, however, is less than 1 MB."

The chunk table’s primary/partitioning key, the ‘file_id’ column, is a UUID and is intended to be equal to the corresponding file ID in the file table. This means a file’s details and content can be read using the same ID, and the file record and the corresponding chunk record(s) will reside on the same node in the ring. The chunk table also has a clustering column, ‘chunk_number’, which holds sequential file content chunk index numbers. This allows chunks to be read in the correct order and allows all the chunks for a given file ID to be deleted with one query.

path and parent_path tables

The path table is used as an inverted index to map an absolute file path to a file ID. It’s primary/partitioning key, the ‘path’ column, contains the file’s absolute path, and the ‘file_id’ column is the corresponding file UUID.

The parent_path table is used as an inverted index to map an absolute directory path to multiple file ID’s. It’s primary/partitioning key, the ‘path’ column, contains the file’s parent directory’s absolute path. The parent_path table also has a clustering column, the ‘file_id‘ column, this allows directory listing by querying the ‘path’ column primary key and returning all the file ID’s contained by the directory.

Moving files is accomplished by deleting a file’s ‘path’ and ‘parent_path’ entries and inserting new path information. No changes to ‘file’ and ‘chunk’ tables are required.

The entity classes mapped to the tables are:

File.java

package org.adrianwalker.cassandra.filesystem.entity;

import com.datastax.driver.core.utils.UUIDs;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;

import java.util.UUID;

@Table(keyspace = "filesystem", name = "file")
public final class File {

  private UUID id;
  private String name;
  private long size;
  private long modified;
  private String group;
  private String owner;
  private boolean hidden;
  private boolean directory;

  public File() {
  }

  @PartitionKey
  @Column(name = "id")
  public UUID getId() {

    if (null == id) {
      id = UUIDs.random();
    }

    return id;
  }

  public void setId(final UUID id) {

    this.id = id;
  }

  @Column(name = "name")
  public String getName() {

    return name;
  }

  public void setName(final String name) {

    this.name = name;
  }

  @Column(name = "size")
  public long getSize() {

    return size;
  }

  public void setSize(final long size) {

    this.size = size;
  }

  @Column(name = "modified")
  public long getModified() {

    return modified;
  }

  public void setModified(final long modified) {

    this.modified = modified;
  }

  @Column(name = "group")
  public String getGroup() {

    return group;
  }

  public void setGroup(final String group) {

    this.group = group;
  }

  @Column(name = "owner")
  public String getOwner() {

    return owner;
  }

  public void setOwner(final String owner) {

    this.owner = owner;
  }

  @Column(name = "hidden")
  public boolean isHidden() {

    return hidden;
  }

  public void setHidden(final boolean hidden) {

    this.hidden = hidden;
  }

  @Column(name = "directory")
  public boolean isDirectory() {

    return directory;
  }

  public void setDirectory(final boolean directory) {

    this.directory = directory;
  }
}

Chunk.java

package org.adrianwalker.cassandra.filesystem.entity;

import com.datastax.driver.mapping.annotations.ClusteringColumn;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;

import java.nio.ByteBuffer;
import java.util.UUID;

@Table(keyspace = "filesystem", name = "chunk")
public final class Chunk {

  private UUID fileId;
  private int chunkNumber;
  private ByteBuffer content;

  public Chunk() {
  }

  @PartitionKey
  @Column(name = "file_id")
  public UUID getFileId() {

    return fileId;
  }

  public void setFileId(final UUID fileId) {

    this.fileId = fileId;
  }

  @ClusteringColumn
  @Column(name = "chunk_number")
  public int getChunkNumber() {

    return chunkNumber;
  }

  public void setChunkNumber(final int chunkNumber) {

    this.chunkNumber = chunkNumber;
  }

  @Column(name = "content")
  public ByteBuffer getContent() {

    return content;
  }

  public void setContent(final ByteBuffer content) {

    this.content = content;
  }
}

Path.java

package org.adrianwalker.cassandra.filesystem.entity;

import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;

import java.util.UUID;

@Table(keyspace = "filesystem", name = "path")
public final class Path {

  private String path;
  private UUID fileId;

  public Path() {
  }

  public Path(final String path, final UUID fileId) {

    this.path = path;
    this.fileId = fileId;
  }

  @PartitionKey
  @Column(name = "path")
  public String getPath() {

    return path;
  }

  public void setPath(final String path) {

    this.path = path;
  }

  @Column(name = "file_id")
  public UUID getFileId() {

    return fileId;
  }

  public void setFileId(final UUID fileId) {

    this.fileId = fileId;
  }
}

ParentPath.java

package org.adrianwalker.cassandra.filesystem.entity;

import com.datastax.driver.mapping.annotations.ClusteringColumn;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;

import java.util.UUID;

@Table(keyspace = "filesystem", name = "parent_path")
public final class ParentPath {

  private String path;
  private UUID fileId;

  public ParentPath() {
  }

  public ParentPath(final String path, final UUID fileId) {

    this.path = path;
    this.fileId = fileId;
  }

  @PartitionKey
  @Column(name = "path")
  public String getPath() {

    return path;
  }

  public void setPath(final String path) {

    this.path = path;
  }

  @ClusteringColumn
  @Column(name = "file_id")
  public UUID getFileId() {

    return fileId;
  }

  public void setFileId(final UUID fileId) {

    this.fileId = fileId;
  }
}

The class for controlling file operations and creating chunked data input and output streams:

FileSystemController.java

package org.adrianwalker.cassandra.filesystem.controller;

import static java.util.Collections.EMPTY_LIST;
import static java.util.stream.Collectors.toList;

import com.datastax.driver.core.Session;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import com.datastax.driver.mapping.Result;
import com.datastax.driver.mapping.annotations.Accessor;
import com.datastax.driver.mapping.annotations.Param;
import com.datastax.driver.mapping.annotations.Query;
import org.adrianwalker.cassandra.filesystem.entity.Chunk;
import org.adrianwalker.cassandra.filesystem.entity.File;
import org.adrianwalker.cassandra.filesystem.entity.ParentPath;
import org.adrianwalker.cassandra.filesystem.entity.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.List;
import java.util.UUID;

public final class FileSystemController {

  @Accessor
  private interface ParentPathAccessor {

    @Query("SELECT * FROM parent_path WHERE path = :path")
    Result<ParentPath> selectParentPathByPath(@Param("path") String path);
  }

  @Accessor
  private interface FileAccessor {

    @Query("SELECT * FROM file WHERE id IN :ids")
    Result<File> selectFilesByIds(@Param("ids") List<UUID> ids);
  }

  @Accessor
  private interface ChunkAccessor {

    @Query("DELETE FROM chunk WHERE file_id = :file_id")
    void deleteChunksByFileId(@Param("file_id") UUID fileId);
  }

  private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemController.class);

  private final Mapper<ParentPath> parentPathMapper;
  private final Mapper<Path> pathMapper;
  private final Mapper<File> fileMapper;
  private final Mapper<Chunk> chunkMapper;

  private final ParentPathAccessor parentPathAccessor;
  private final FileAccessor fileAccessor;
  private final ChunkAccessor chunkAccessor;

  public FileSystemController(final Session session) {

    LOGGER.debug("session = {}", session);

    if (null == session) {
      throw new IllegalArgumentException("session is null");
    }

    MappingManager manager = new MappingManager(session);

    parentPathMapper = manager.mapper(ParentPath.class);
    pathMapper = manager.mapper(Path.class);
    fileMapper = manager.mapper(File.class);
    chunkMapper = manager.mapper(Chunk.class);

    parentPathAccessor = manager.createAccessor(ParentPathAccessor.class);
    fileAccessor = manager.createAccessor(FileAccessor.class);
    chunkAccessor = manager.createAccessor(ChunkAccessor.class);
  }

  public File getFile(final String path) {

    LOGGER.debug("path = {}", path);

    if (null == path) {
      throw new IllegalArgumentException("path is null");
    }

    Path filePath = pathMapper.get(path);
    if (null == filePath) {
      return null;
    }

    return fileMapper.get(filePath.getFileId());
  }

  public File saveFile(final String path, final File file) {

    LOGGER.debug("path = {}, file = {}", path, file);

    if (null == path) {
      throw new IllegalArgumentException("path is null");
    }

    if (null == file) {
      throw new IllegalArgumentException("file is null");
    }

    if (null == file.getId()) {
      file.setId(UUID.randomUUID());
    }

    pathMapper.save(new Path(path, file.getId()));
    parentPathMapper.save(new ParentPath(getParent(path), file.getId()));

    file.setModified(System.currentTimeMillis());
    fileMapper.save(file);

    return file;
  }

  public boolean deleteFile(final String path) {

    LOGGER.debug("path = {}", path);

    if (null == path) {
      throw new IllegalArgumentException("path is null");
    }

    File file = getFile(path);

    if (null == file) {
      return false;
    }

    pathMapper.delete(path);

    String parentPath = getParent(path);
    parentPathMapper.delete(parentPath, file.getId());

    chunkAccessor.deleteChunksByFileId(file.getId());

    fileMapper.delete(file.getId());

    return true;
  }

  public List<File> listFiles(final String parentPath) {

    LOGGER.debug("parentPath = {}", parentPath);

    if (null == parentPath) {
      throw new IllegalArgumentException("parentPath is null");
    }

    List<UUID> ids = getFileIds(parentPath);

    List<File> files;
    if (ids.isEmpty()) {
      files = EMPTY_LIST;
    } else {
      files = fileAccessor.selectFilesByIds(ids).all();
    }

    return files;
  }

  public void moveFile(final String fromPath, final String toPath) {

    LOGGER.debug("fromPath = {}, toPath = {}", fromPath, toPath);

    if (null == fromPath) {
      throw new IllegalArgumentException("fromPath is null");
    }

    if (null == toPath) {
      throw new IllegalArgumentException("toPath is null");
    }

    File file = getFile(fromPath);

    if (null == file) {
      return;
    }

    String toParentPath = getParent(toPath);
    pathMapper.save(new Path(toPath, file.getId()));
    parentPathMapper.save(new ParentPath(toParentPath, file.getId()));

    String fromParentPath = getParent(fromPath);

    if (!fromPath.equals(toPath)) {
      pathMapper.delete(fromPath);
    }

    if (!fromParentPath.equals(toParentPath)) {
      parentPathMapper.delete(fromParentPath, file.getId());
    }

    file.setName(getFileName(toPath));
    file.setModified(System.currentTimeMillis());
    fileMapper.save(file);
  }

  public OutputStream createOutputStream(final File file) {

    LOGGER.debug("file = {}", file);

    if (null == file) {
      throw new IllegalArgumentException("file is null");
    }

    chunkAccessor.deleteChunksByFileId(file.getId());

    return new OutputStream() {

      private static final int CAPACITY = 1 * 1024 * 1024;

      private Chunk chunk = null;
      private int chunkNumber = 0;
      private long bytesWritten = 0;

      @Override
      public void write(final int b) throws IOException {

        if (null == chunk) {
          chunk = new Chunk();
          chunk.setFileId(file.getId());
          chunk.setChunkNumber(chunkNumber);
          chunk.setContent(ByteBuffer.allocate(CAPACITY));
        }

        ByteBuffer content = chunk.getContent();
        content.put((byte) (b & 0xFF));

        if (content.position() == content.limit()) {
          save(content);
        }
      }

      @Override
      public void close() throws IOException {

        if (null != chunk) {
          ByteBuffer content = chunk.getContent();
          save(content);
        }

        file.setSize(bytesWritten);
        file.setModified(System.currentTimeMillis());
        fileMapper.save(file);
      }

      private void save(final ByteBuffer content) {

        content.flip();
        chunkMapper.save(chunk);

        chunk = null;
        chunkNumber++;
        bytesWritten += content.limit();
      }
    };
  }

  public InputStream createInputStream(final File file) {

    LOGGER.debug("file = {}", file);

    if (null == file) {
      throw new IllegalArgumentException("file is null");
    }

    return new InputStream() {

      private Chunk chunk = null;
      private int chunkNumber = 0;
      private long bytesRead = 0;

      @Override
      public int read() throws IOException {

        if (bytesRead == file.getSize()) {
          return -1;
        }

        if (null == chunk) {
          chunk = chunkMapper.get(file.getId(), chunkNumber);
        }

        ByteBuffer content = chunk.getContent();
        byte b = content.get();

        if (content.position() == content.limit()) {
          chunk = null;
          chunkNumber++;
          bytesRead += content.position();
        }

        return b & 0xFF;
      }
    };
  }

  private List<UUID> getFileIds(final String parentPath) {

    return parentPathAccessor.selectParentPathByPath(parentPath)
            .all()
            .stream()
            .map(pp -> pp.getFileId())
            .collect(toList());
  }

  private String getParent(final String path) {

    java.nio.file.Path parent = Paths.get(path).getParent();

    if (null == parent) {
      throw new IllegalArgumentException("invalid path");
    }

    return parent.toString();
  }

  private String getFileName(final String path) {

    java.nio.file.Path fileName = Paths.get(path).getFileName();

    if (null == fileName) {
      throw new IllegalArgumentException("invalid path");
    }

    return fileName.toString();
  }
}

A simple example to create a directory, create a file, write to the file, list directory contents and read from the file:

private void exampleUsage() {

 Cluster cluster = new Cluster.Builder()
          .addContactPoints("localhost")
          .build();
  Session session = cluster.connect("filesystem");

  FileSystemController controller = new FileSystemController(session);

  // create a directory
  File dir = new File();
  dir.setName("testdir");
  dir.setDirectory(true);
  dir.setOwner("test");
  dir.setGroup("test");
  dir.setHidden(false);
  controller.saveFile("/testdir", dir);

  // create a file
  File file = new File();
  file.setName("testfile.txt");
  file.setDirectory(false);
  file.setOwner("test");
  file.setGroup("test");
  file.setHidden(false);
  file = controller.saveFile("/testdir/testfile.txt", file);

  // write contents to file
  OutputStream os = new BufferedOutputStream(controller.createOutputStream(file));
  os.write("test content".getBytes());
  os.flush();
  os.close();

  // list files
  controller.listFiles("/testdir").forEach(f -> {
    System.out.println(f.getId() + "\t" + f.getName() + "\t" + f.getSize());
  });

  // read contents of file
  InputStream in = new BufferedInputStream(controller.createInputStream(file));
  BufferedReader reader = new BufferedReader(new InputStreamReader(in));
  System.out.println(reader.readLine());
  reader.close();
  in.close();

  session.close();
  cluster.close();
}

Source Code

Build and Test

The project is a standard Maven project which can be built with:

mvn clean install