Replicating the MongoDB oplog to Elasticsearch using Apache Flink
I have been tinkering around with RxJava and reactive streams lately, and I decided to revisit one of my older
projects for tailing the MongoDB oplog and used RabbitMQ to broadcast out changes to a sharded MongoDB Cluster. In this
installment of tailing the oplog I decided not to use RabbitMQ, instead I decided to try out Apache Flink. Flink
appears to play in the same domain as Spark Streaming. I am not endorsing one over the other in this blog, and
it is always important to be constantly evaluating different technologies to ensure you make the right decisions.
I am using Apache Flink to wire the streaming data source (MongoDB oplog tail) and an Elasticsearch sink.
<p>All source code for this project may be found
<a href="https://github.com/JaiHirsch/flink-mingo-tail" target="new">here</a>. Also, before we get started: All code used here
is not production quality. It was not test driven and was written as a thought experiment on a rainy
Friday afternoon as a way to learn more about
<a href="http://www.reactive-streams.org" target="new">reactive streams</a>,
<a hreaf="https://github.com/ReactiveX/RxJava" target="new">RxJava</a>,
<a href ="https://flink.apache.org" target="new">Apache Flink</a>, and
<a href="https://www.elastic.co" target="new"> Elasticsearh.</a>
</p>
I will not go deep into the dirty details of oplog tailing other than to say it is much easier with the reactive
driver. Lets do, however, take a quick look at getting an event publisher using the reactive driver and some of
syntax differences using the new generation of the MongoDB driver.
This snippet of code is inside a loop that is getting a publisher for every replica set inside of a sharded
cluster.
It is also important to understand the filters that are generated though the getQueryFilter() call:
In this example I am using the same MongoDB cluster to track the last operation timestamps. So filtering out
the collection these are being written to is VERY important, otherwise this creates a pretty bad ass infinite
loop. On a side note, it is probably not a good idea to have your ops time collection on the same server you
are tailing as it takes up a lot of space in the oplog.
The next filter filters out “no-ops” that show up in the oplog, this will keep down the clutter as we are
only looking for CRUD ops.
Next we filter out fromMigrate, this removes operations from chunk migration and prevents false positives
while tailing.
Finally we get the last operation timestamp, this prevents the entire oplog from replaying every time we connect.
The time stamp filter keeps us from replaying the entire oplog every time we connect, if you notice this is
querying MongoDB by the host, this corresponds to each of the host's server info that comes from a the sharded
cluster.
The MongoDB reactive driver uses the reactive streams standard. Reactive streams is rumored to be implemented in the Java 9
release. This posed a bit of an issue while I was trying to bind it the RxJava, but there is a great translation
library that wires the two approaches together,
RxJavaReactiveStreams.
The bindPublisherToObservable method is called in a loop and attaches a subscriber onto an oplog tail per mongod
that has been passed in. This will create a thread per mongod. The code then uses a ConcurrentHashMap to
increment the number of times a given operation is observed. Once the map count is equal the replica set depth
(the number of replicas per shard) the given operation has fully replicated to all replica sets. At this point
the operation is placed on an ArrayBlockingQueue opsQueue and removed from the map. The opsQueue is used by the
Flink data source to collect the operation and place it on the Flink work flow.
The code to run the oplog tail to elastic search replication is fairly simple:
First we create the stream execution environment, next we add the MongoDB oplog source, and finally print and
Elasticsearch sinks are added. The PrintSinkFunction function was used to show how easy it is to attach multiple
sinks to the execution environment.
For this example I have not fully flushed out the Elasticsearch sink. It is actually indexing the oplog documents
and not creating a synchronized copy of the MongoDB collections. I decided not to fully implement the Elasticsearh
code as there are already a few decent projects out there that do this. Also, I was only able to the the 1.7x
Apache Flink connector for Elasticsearch working, as I unable find the 2.x connector on Maven Central.
I had quite a bit of fun taking the reactive driver for MongoDB for a spin around the block. Stream and event
driven programming is something we all need to be aware of and it is just as important in the server side and
big data worlds as it is in for Android and other font end development.
I have been working on several large scale GridFS projects lately and the topic of finding, removing, and preventing
duplicate files keeps coming up. As such, here are some of my findings and code samples around this topic.
First of all, what is GridFS?
A convention for storing large files in a MongoDB database. All of the official MongoDB drivers support this
convention, as does the mongofiles program.
GridFS places the collections in a common bucket by prefixing each with the bucket name. By default, GridFS uses two
collections with a bucket named fs:
Within the the files collection GridFS gives you the md5 hash of the file when it is uploaded.
This does appear to
a perfect candidate for identifying duplicate files and when you search google on this topic you will quickly find
examples, gists and stackoverflow posts on how to use the md5 to identify duplicate files.
Example file header:
Here is an example using the aggregation pipeline that groups my md5, pushes the _ids into an array, and matches
where there are two or more md5 matches:
It is also important to note that for this aggregation to work well there should be an index on md5 field. This
could
be a very costly index on a large implementation.
So if there are so many posts out there and if this is a solved problem then why am I writing about it? Because
you should never trust an md5 hash, especially for binary files. Lets take a moment for a closer look at the
md5.
The MD5 message-digest algorithm is a widely used cryptographic hash function producing a 128-bit (16-byte) hash
value, typically expressed in text format as a 32 digit hexadecimal number. MD5 has been utilized in a wide variety
of cryptographic applications, and is also commonly used to verify data integrity.
There are many scholarly works written by much smarter people than I available on the probability of md5 collision,
so I will not spend much time going over it. I have, however, encountered the consequences of hash
collisions before and it is never fun.
Unfortunately, the files that I have dealt with before, that had collisions, are proprietary and I can not share them
here, so I took to the googles and found some interesting utilities and articles.
To perform my GridFS experiment I used a few jpg files that were created by Nat McHugh and may be found on his
blog post:
How I created two images
with the same MD5 hash (He created the third one later)
You just can't go wrong with White, Brown, and Black. These images are obviously different, yet they all have the
same md5 hash. What is more, they also show the same chunkSize and length information in the GridFS files document.
Due to the fact that these three fields are identical we can not use the aggregation framework as noted thus far
to identify that they are, in fact, not duplicates.
There are many ways other ways to identify if two files are duplicates. Lets take a look at one such approach that
also takes advantage the file streaming aspect of GridFS.
The following example code is written in Java and should be considered demo quality only.
First you must get the input stream from the GridFS file
Once we get both of the input streams we want to compare we can do so as follows:
I will be the first to admit, this code is rather ugly, but it works well enough for this. Using streams has the
advantage that you do not have to materialize the file in order to check if they are duplicates. The downside is that
it will take linear time to scan both files.
The GridFS documentation does mention ranged queries and the ability to skip to arbitrary locations in the file.
This should allow us to write a threaded file validator, so look forward to part two of the post on optimization.