Replicating the MongoDB oplog to Elasticsearch using Apache Flink

2016-05-09 00:00:00 +0000

Replicating the MongoDB oplog to Elasticsearch using Apache Flink

I have been tinkering around with RxJava and reactive streams lately, and I decided to revisit one of my older projects for tailing the MongoDB oplog and used RabbitMQ to broadcast out changes to a sharded MongoDB Cluster. In this installment of tailing the oplog I decided not to use RabbitMQ, instead I decided to try out Apache Flink. Flink appears to play in the same domain as Spark Streaming. I am not endorsing one over the other in this blog, and it is always important to be constantly evaluating different technologies to ensure you make the right decisions. I am using Apache Flink to wire the streaming data source (MongoDB oplog tail) and an Elasticsearch sink.

<p>All source code for this project may be found
    <a href="https://github.com/JaiHirsch/flink-mingo-tail" target="new">here</a>.  Also, before we get started:  All code used here
    is not production quality.  It was not test driven and was written as a thought experiment on a rainy
    Friday afternoon as a way to learn more about
    <a href="http://www.reactive-streams.org" target="new">reactive streams</a>,
    <a hreaf="https://github.com/ReactiveX/RxJava" target="new">RxJava</a>,
    <a href ="https://flink.apache.org" target="new">Apache Flink</a>, and
    <a href="https://www.elastic.co" target="new"> Elasticsearh.</a>

</p>

I will not go deep into the dirty details of oplog tailing other than to say it is much easier with the reactive driver. Lets do, however, take a quick look at getting an event publisher using the reactive driver and some of syntax differences using the new generation of the MongoDB driver.

This snippet of code is inside a loop that is getting a publisher for every replica set inside of a sharded cluster.

FindPublisher<Document> oplogPublisher = client.getClient().getDatabase("local")
    .getCollection("oplog.rs").find().filter(getQueryFilter(tsCollection, client))
    .sort(new Document("$natural", 1)).cursorType(CursorType.TailableAwait);

It is also important to understand the filters that are generated though the getQueryFilter() call:

  • In this example I am using the same MongoDB cluster to track the last operation timestamps. So filtering out the collection these are being written to is VERY important, otherwise this creates a pretty bad ass infinite loop. On a side note, it is probably not a good idea to have your ops time collection on the same server you are tailing as it takes up a lot of space in the oplog.
  • The next filter filters out “no-ops” that show up in the oplog, this will keep down the clutter as we are only looking for CRUD ops.
  • Next we filter out fromMigrate, this removes operations from chunk migration and prevents false positives while tailing.
  • Finally we get the last operation timestamp, this prevents the entire oplog from replaying every time we connect.

private Bson getQueryFilter(MongoCollection<Document> tsCollection, MongoClientWrapper client) {
return and(ne("ns", "time_d.repl_time"), ne("op", "n"), exists("fromMigrate", false),getFilterLastTimeStamp(tsCollection, client));
}

The time stamp filter keeps us from replaying the entire oplog every time we connect, if you notice this is querying MongoDB by the host, this corresponds to each of the host's server info that comes from a the sharded cluster.

private Bson getFilterLastTimeStamp(MongoCollection<Document> tsCollection, MongoClientWrapper client) {
    Document lastTimeStamp = tsCollection.find(new Document("host", client.getHost())).limit(1).first();
    return getTimeQuery(lastTimeStamp == null ? null : (BsonTimestamp) lastTimeStamp.get("ts"));
}

private Bson getTimeQuery(BsonTimestamp lastTimeStamp) {
    return lastTimeStamp == null ? new Document() : gt("ts", lastTimeStamp);
}

The MongoDB reactive driver uses the reactive streams standard. Reactive streams is rumored to be implemented in the Java 9 release. This posed a bit of an issue while I was trying to bind it the RxJava, but there is a great translation library that wires the two approaches together, RxJavaReactiveStreams. The bindPublisherToObservable method is called in a loop and attaches a subscriber onto an oplog tail per mongod that has been passed in. This will create a thread per mongod. The code then uses a ConcurrentHashMap to increment the number of times a given operation is observed. Once the map count is equal the replica set depth (the number of replicas per shard) the given operation has fully replicated to all replica sets. At this point the operation is placed on an ArrayBlockingQueue opsQueue and removed from the map. The opsQueue is used by the Flink data source to collect the operation and place it on the Flink work flow.

private static final String OPLOG_TIMESTAMP = "ts";
private static final String OPLOG_ID = "h";

...

private void bindPublisherToObservable(Entry<String, FindPublisher<Document>> oplogPublisher,
    ExecutorService executor, MongoCollection<Document> tsCollection) {
    RxReactiveStreams.toObservable(oplogPublisher.getValue())
      .subscribeOn(Schedulers.from(executor)).subscribe(t -> {
        try {
            putOperationOnOpsQueue(oplogPublisher, tsCollection, t);
        } catch (InterruptedException e) {}
    });
}

private void putOperationOnOpsQueue(Entry<String, FindPublisher<Document>> publisher,
    MongoCollection<Document> tsCollection, Document t) throws InterruptedException {
    updateHostOperationTimeStamp(tsCollection, t.get(OPLOG_TIMESTAMP, BsonTimestamp.class), publisher.getKey());
    putOperationOnOpsQueueIfFullyReplicated(t);
}

private void putOperationOnOpsQueueIfFullyReplicated(Document t) throws InterruptedException {
    Long opKey = t.getLong(OPLOG_ID);
    documentCounter.putIfAbsent(opKey, new AtomicInteger(1));
    if (documentCounter.get(opKey).getAndIncrement() >= replicaDepth) {
        opsQueue.put(t);
        documentCounter.remove(opKey);
    }
}

The code to run the oplog tail to elastic search replication is fairly simple:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<Document> ds = see.addSource(new MongoDBOplogSource("host", port));
    ds.addSink(new PrintSinkFunction<Document>());
    ds.addSink(new ElasticsearchEmbeddedNodeSink("cluster.name").getElasticSink());
    see.execute("MongoDB Sharded Oplog Tail");
}

First we create the stream execution environment, next we add the MongoDB oplog source, and finally print and Elasticsearch sinks are added. The PrintSinkFunction function was used to show how easy it is to attach multiple sinks to the execution environment.

For this example I have not fully flushed out the Elasticsearch sink. It is actually indexing the oplog documents and not creating a synchronized copy of the MongoDB collections. I decided not to fully implement the Elasticsearh code as there are already a few decent projects out there that do this. Also, I was only able to the the 1.7x Apache Flink connector for Elasticsearch working, as I unable find the 2.x connector on Maven Central.

I had quite a bit of fun taking the reactive driver for MongoDB for a spin around the block. Stream and event driven programming is something we all need to be aware of and it is just as important in the server side and big data worlds as it is in for Android and other font end development.

Finding Duplicate Files in GridFS

2016-02-15 00:00:00 +0000

Finding Duplicate Files in GridFS

I have been working on several large scale GridFS projects lately and the topic of finding, removing, and preventing duplicate files keeps coming up. As such, here are some of my findings and code samples around this topic.


First of all, what is GridFS?

A convention for storing large files in a MongoDB database. All of the official MongoDB drivers support this convention, as does the mongofiles program.

...

GridFS stores files in two collections: GridFS places the collections in a common bucket by prefixing each with the bucket name. By default, GridFS uses two collections with a bucket named fs:
  • fs.files
  • fs.chunks

-- MongoDB Manual

Within the the files collection GridFS gives you the md5 hash of the file when it is uploaded. This does appear to a perfect candidate for identifying duplicate files and when you search google on this topic you will quickly find examples, gists and stackoverflow posts on how to use the md5 to identify duplicate files.

Example file header:

{
   "_id" : ObjectId("56bf88d782419304bf212014"),
   "filename" : "foo.txt",
   "aliases" : null,
   "chunkSize" : NumberLong(261120),
   "uploadDate" : ISODate("2016-02-13T19:49:43.575Z"),
   "length" : NumberLong(541),
   "contentType" : null,
   "md5" : "3813e09a727083f74b45fe7f5b253c07"
}

Here is an example using the aggregation pipeline that groups my md5, pushes the _ids into an array, and matches where there are two or more md5 matches:

    db.fs.files.aggregate([{$group:{_id:'$md5',count:{$sum:1},ids:{$push:'$_id'}}},{$match:{count:{$gte:2}}}])
    
It is also important to note that for this aggregation to work well there should be an index on md5 field. This could be a very costly index on a large implementation.

So if there are so many posts out there and if this is a solved problem then why am I writing about it? Because you should never trust an md5 hash, especially for binary files. Lets take a moment for a closer look at the md5.

The MD5 message-digest algorithm is a widely used cryptographic hash function producing a 128-bit (16-byte) hash value, typically expressed in text format as a 32 digit hexadecimal number. MD5 has been utilized in a wide variety of cryptographic applications, and is also commonly used to verify data integrity.

-- Wikipedia

There are many scholarly works written by much smarter people than I available on the probability of md5 collision, so I will not spend much time going over it. I have, however, encountered the consequences of hash collisions before and it is never fun.

Unfortunately, the files that I have dealt with before, that had collisions, are proprietary and I can not share them here, so I took to the googles and found some interesting utilities and articles.

Here are two that I liked in particular:

To perform my GridFS experiment I used a few jpg files that were created by Nat McHugh and may be found on his blog post: How I created two images with the same MD5 hash (He created the third one later)



You just can't go wrong with White, Brown, and Black. These images are obviously different, yet they all have the same md5 hash. What is more, they also show the same chunkSize and length information in the GridFS files document. Due to the fact that these three fields are identical we can not use the aggregation framework as noted thus far to identify that they are, in fact, not duplicates.

There are many ways other ways to identify if two files are duplicates. Lets take a look at one such approach that also takes advantage the file streaming aspect of GridFS.

The following example code is written in Java and should be considered demo quality only.

First you must get the input stream from the GridFS file

GridFSDBFile findFile1 = gridFS.find(docemnt);
InputStream is1 = findFile1.getInputStream();

Once we get both of the input streams we want to compare we can do so as follows:

public boolean isDuplicateStream(InputStream is1, InputStream is2) {
   boolean isSame = true;
   int val1, val2 = -1;
   do {
      try {
         val1 = is1.read();
         val2 = is2.read();
         if (val1 != val2) {
            isSame = false;
            break;
         }
      } catch (IOException e) {
         logger.warn(e.getMessage());
         isSame = false;
         break;
      }

   } while (val1 != -1 && val2 != -1);  // important to check both to make sure both are completely read
   return isSame;
}

I will be the first to admit, this code is rather ugly, but it works well enough for this. Using streams has the advantage that you do not have to materialize the file in order to check if they are duplicates. The downside is that it will take linear time to scan both files.

The GridFS documentation does mention ranged queries and the ability to skip to arbitrary locations in the file. This should allow us to write a threaded file validator, so look forward to part two of the post on optimization.

The full project I used to may be found here