MongoDB Transactions Demo

2018-08-08 00:00:00 +0000

MongoDB Transactions Demo

One of the major new features in the MongoDB 4.0 release is ACID transactions. For a quick refresher here is what Wikipedia has to say on ACID transactions at the time of this article:

In computer science, ACID ( Atomicity, Consistency, Isolation, Durability ) is a set of properties of database transactions intended to guarantee validity even in the event of errors, power failures, etc. In the context of databases, a sequence of database operations that satisfies the ACID properties, and thus can be perceived as a single logical operation on the data, is called a transaction. For example, a transfer of funds from one bank account to another, even involving multiple changes such as debiting one account and crediting another, is a single transaction.

In this post I am going to cover three topics. First, a brief review of transaction syntax in other databases. Next, a quick overview of MongoDB transactions. Finally, a demonstration of MongoDB transactions using the Java driver.

A brief review of transaction syntax

If you are unfamiliar with database transactions it may be helpful to review how some of the popular relational databases handle them before we get into the MongoDB transactions. Let's take a look at the structure around MySQL, Oracle, and PostgreSQL for transactions. I will leave the more advanced topics for the reader to review. Perhaps I will prepare a deep dive in a later post if there is interest.

Example from the MySQL docs:

    START TRANSACTION;
    SELECT @A:=SUM(salary) FROM table1 WHERE type=1;
    UPDATE table2 SET summary=@A WHERE type=1;
    COMMIT;
    

Example from the Oracle docs:

    ...

    COMMIT;
    --  This statement ends any existing transaction in the session.

    SET TRANSACTION NAME 'sal_update2';
    -- This statement begins a new transaction in the session and names it sal_update2.

    UPDATE employees
    SET salary = 7050
    WHERE last_name = 'Banda';
    -- This statement updates the salary for Banda to 7050.

    UPDATE employees
    SET salary = 10950
    WHERE last_name = 'Greene';
    -- This statement updates the salary for Greene to 10950.

    COMMIT;
    -- This statement commits all changes made in transaction sal_update2, ending the transaction. The commit
    guarantees that the changes are saved in the online redo log files.

    

Example from the PostgreSQL docs:

    BEGIN;
    UPDATE accounts SET balance = balance - 100.00
    WHERE name = 'Alice';
    SAVEPOINT my_savepoint;
    UPDATE accounts SET balance = balance + 100.00
    WHERE name = 'Bob';
    -- oops ... forget that and use Wally's account
    ROLLBACK TO my_savepoint;
    UPDATE accounts SET balance = balance + 100.00
    WHERE name = 'Wally';
    COMMIT;
    


A quick overview of MongoDB transactions

Now, let us turn our attention to MongoDB transactions. For more information on MongoDB transactions, please see the official documentation.
There are a few things from the documentation that I feel are very important to keep in mind.

For transactions:
  • You can specify read/write (CRUD) operations on existing collections. The collections can be in different databases.
  • You cannot read/write to collections in the config, admin, or local databases.
  • You cannot write to system.* collections.
  • You cannot return the supported operation’s query plan (i.e. explain).
  • For cursors created outside of transactions, you cannot call getMore inside a transaction.
  • For cursors created in a transaction, you cannot call getMore outside the transaction.
The following operations are not allowed in multi-document transactions:
  • Operations that affect the database catalog, such as creating or dropping a collection or an index. For example, a multi-document transaction cannot include an insert operation that would result in the creation of a new collection.
  • The listCollections and listIndexes commands and their helper methods are also excluded.
  • Non-CRUD and non-informational operations, such as createUser, getParameter, count, etc. and their helpers.

The following code examples are from a MongoDB 4.0 transactions demo I wrote during the 3.7 beta testing and may be found here. This code base should be considered as demo/example quality.

First, let’s take a look at the transaction structure as we have with the other databases. In this case, I am going to use the MongoDB java driver syntax as there is not a direct SQL translation.

    try (ClientSession clientSession = mongoClient.startSession()) {

        clientSession.startTransaction(TransactionOptions.builder()
            .writeConcern(WriteConcern.MAJORITY).build());

        inventoryCollection.updateOne(clientSession,
            Filters.eq("sku", "abc123"), Updates.inc("qty", amount));

        shipmentCollection.insertOne(clientSession, new Document("sku"
            , "abc123").append("qty", -amount)
            .append("tname", threadName));

        clientSession.commitTransaction();

    } catch (MongoException e) {
        throw new RuntimeException("Transaction failed: " + e);

    

Once you get past the Java syntax, the structure is pretty much the same as in the other databases. Let's take a few of the key elements out and display them in less verbose pseudo code. (And yes, the MySQL, Oracle, or Postgres Java code is just as ugly if you are using the odbc/jdbc drivers directly.)

    start transaction
        update an inventory record
        insert a shipment record
    commit transaction
    

The structure of MongoDB transactions follows the same structure as the other examples.

A demonstration of MongoDB transactions using the Java driver

We will begin by taking a look at the main class that runs the demo. One line to note is:
DemoMongoConnector dmc = new DemoMongoConnector()
This class wraps MongoClient for convenience. The code may be found here.

import com.mongodb.client.MongoDatabase;
import org.bson.Document;
import org.bson.assertions.Assertions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static com.mongodb.client.model.Filters.eq;

public class MultiThreadTransactionRunner {

    private static final ExecutorService changeStreamExecutorService = Executors
        .newFixedThreadPool(2);
    private static final ExecutorService trnsactionExecutorService = Executors
        .newFixedThreadPool(10);

public static void main(String[] args) throws IOException {
        try (DemoMongoConnector dmc = new DemoMongoConnector()) {
            setUpMongoForTransactionTest(dmc);
            changeStreamExecutorService.submit(new ChangeStreamWatcher(dmc
                .getDatabase()));
            launchThreadsAndRunTransactions(dmc);
            Document skuAbc123 = (Document) dmc.getInventory().find(eq("sku",
                "abc123")).first();

            System.out.println("++++++++++ " + skuAbc123);

            Assertions.isTrue("qty should have been 500",
                500 == skuAbc123.getInteger("qty"));
            trnsactionExecutorService.shutdown();
        }
        changeStreamExecutorService.shutdown();
    }

The first thing we are doing after instantiating the connector is attaching a change stream watcher:

    changeStreamExecutorService.submit(new ChangeStreamWatcher(dmc
    .getDatabase()));
    
Change streams were introduced in MongoDB 3.6 and allow one to have an open query against the changes in the database. They also make several of my blog posts on oplog tailing mostly old news. (sad face)

Change streams are outside the scope of this post, but we will use them to watch when the database accepts the writes during transactions. The change stream code may be found here.

Next, we set up the database environment for the transactional tests. The Collections to be used during a transaction must be created prior to the start of the transaction. For this testing scenario we will begin by dropping the existing "test" database and then recreating inventory and shipment collections. We will then insert an inventory document with a quantity value of 500.

    private static void setUpMongoForTransactionTest(DemoMongoConnector dmc) {
        MongoDatabase db = dmc.getMongoClient().getDatabase("test");
        db.drop();
        db.createCollection("inventory");
        db.createCollection("shipment");
        dmc.getInventory().insertOne(new Document("sku", "abc123").append("qty", 500));
    }

Now it’s time to begin the transactions! The launch method uses the subsequent submit method to do the actual work, but the forEach calling future.get() is important here. This is a blocking call on the main thread that will cause the main thread to be blocked until each of the transaction threads have completed.

    private static void launchThreadsAndRunTransactions(DemoMongoConnector dmc) {
        submitTransactionThreads(dmc).forEach(future -> {
            try {
                future.get();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

We’re almost to the good part, I swear! The submitTransactionThreads method is creating four transaction threads with the TransactionRetryModule that are going to increase the quantity of our item by one hundred, and four that will decrease the value by one hundred. Thus, a total of eight transaction threads will be created and placed into the changeStreamExecutorService. The second argument to the iterateTransactions will be used in a loop to fire the transaction multiple times. In this case, each thread will do its transaction five times. With the two combined arguments, each thread will increase or decrease the value by five hundred. This is done for testing only.

    private static List<Future/> submitTransactionThreads(DemoMongoConnector dmc) {
        List<Future/> futures = new ArrayList<>();
        for (int i = 0; i < 4; i++) {
            futures.add(changeStreamExecutorService.submit(new TransactionRetryModule()
                .iterateTransactions(100, 5)));
        }
        for (int i = 0; i < 4; i++) {
            futures.add(changeStreamExecutorService.submit(new TransactionRetryModule()
                .iterateTransactions(-100, 5)));
        }
        return futures;
    }

Retries are very important for the MongoDB transaction logic. The documentation gives examples for multiple languages.

From the MongoDB transaction documentation:

The individual write operations inside the transaction are not retryable, regardless of whether retryWrites is set to true.

If an operation encounters an error, the returned error may have an errorLabels array field. If the error is a transient error, the errorLabels array field contains "TransientTransactionError" as an element and the transaction as a whole can be retried.
I highly recommend reading up on the circuit breaker pattern if you implement retry patterns in production code. The Netflix Hystrix library is one of the more popular implementations that I have worked with. It is, however, rather heavy-weight for this demo. A good post from DZone on retries can be found here.

We are finally ready to review the TransactionRetryModule. The entry point into the class is the iterateTransactions method. This method creates a runnable lambda that is used in our multi-threaded model. Next, we create a new DemoMongoConnector. While one could use the connector from the main class, I wanted to imitate the transactions coming from multiple distinct applications. Next, we generate a random name for logging to show the order in which the transactions are being executed. Then, we loop and fire a new transaction for the number of iterations we passed into the runnable. Finally, the method calls

    handleTransactionClientSession(amount, dmc, threadName);
    
which will begin the transaction session.

import com.mongodb.TransactionOptions;
import com.mongodb.WriteConcern;
import com.mongodb.client.ClientSession;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import org.apache.commons.lang3.RandomStringUtils;
import org.bson.Document;
import org.mongo.utils.Retry;

public class TransactionRetryModule {

    private static final int MAX_RETRIES = 10;
    private static final long DELAY_BETWEEN_RETRIES_MILLIS = 30L;

    public Runnable iterateTransactions(final int amount, final int iterations) {
        return () -> {
            try (DemoMongoConnector dmc = new DemoMongoConnector()) {
                String threadName = RandomStringUtils.randomNumeric(4);
                for (int i = 0; i < iterations; i++) {
                    handleTransactionClientSession(amount, dmc, threadName);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        };

    }

The handleTransactionClientSession method starts the ClientSession and enters the transaction retry loop.

private void handleTransactionClientSession(int amount, DemoMongoConnector dmc,
        String threadName) {
    try (ClientSession clientSession = dmc.getMongoClient().startSession()) {
        transactionRetryLoop(amount, dmc, threadName, clientSession);
    } catch (Exception e) {

        throw new RuntimeException("Transaction failed: " + e);
    }
}

The transactionRetryLoop method begins by instantiating a Retry object that is used to control the loop. Next, we enter the loop and attempt the transaction. If successful, we mark the retry loop as complete. If an error is thrown from the transaction, the transaction will be aborted and retried. The sysouts are for demo only. Friends don't let friends sysout in production. Finally, if the retry loop exits unsuccessfully after maximum attempts, an error will be thrown.

private void transactionRetryLoop(int amount, DemoMongoConnector dmc, String threadName,
        ClientSession clientSession) {
    Retry retryLoop = new Retry().withAttempts(MAX_RETRIES)
        .withDelay(DELAY_BETWEEN_RETRIES_MILLIS);
    while (retryLoop.shouldContinue()) try {

        System.out.println(threadName + " : " + retryLoop.getTimesAttempted());

        doTransaction(amount, dmc, threadName, clientSession);
        retryLoop.markAsComplete();

        System.out.println(threadName + " complete : " + retryLoop.completedOk());

    } catch (Exception e) {
        retryLoop.takeException(e);
        if (e instanceof MongoException && ((MongoException)e).hasErrorLabel(
          MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
            System.out.println(threadName + " Aborting transaction: " + e.getMessage());
            clientSession.abortTransaction();
        } else throw e;
    }
    if (!retryLoop.completedOk()) {
        throw new RuntimeException("Transaction failed after " + MAX_RETRIES
            + " retries.", retryLoop.getLastException());
    }
}

We made it! The doTransaction method is where the magic finally happens. We begin the transaction, we perform an update on the inventory collection, and make an insert to the shipment collection. Finally, we commit the transaction. In the preceding code block we checked for a MongoException with a specific label MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL in the catch statement. This condition signifies that the transaction has failed due to a TransientTransactionError.

Please note the following code block that updates a document in the inventory collection:

    dmc.getInventory().updateOne(clientSession, Filters.eq("sku", "abc123"),
        Updates.inc("qty", amount));
    
This is the "shared" document for all of the transactions in this demo and is the key to raising the transaction errors we are checking for as each thread attempts to update the qty field for sku "abc123".

private void doTransaction(int amount, DemoMongoConnector dmc, String threadName,
        ClientSession clientSession) {

   clientSession.startTransaction(TransactionOptions.builder()
        .writeConcern(WriteConcern.MAJORITY).build());

    dmc.getInventory().updateOne(clientSession, Filters.eq("sku", "abc123"),
        Updates.inc("qty", amount));
    dmc.getShipment().insertOne(clientSession, new Document("sku", "abc123")
        .append("qty", -amount).append("tname", threadName));

    clientSession.commitTransaction();
}

The standard out for a run of the demo can be found here. It is rather verbose to paste into this post, but if examine it you will see the flow of aborted transactions and how the change streams show the successful ones.

Conclusion

Transactions are a big step forward for MongoDB. In the 4.0 release, they are limited to replica sets, but sharded clusters are on the road map. As with most new additions to a technology, I highly recommend doing extensive testing yourself prior to running it in production. Remember, even with transactions, it is your responsibility to ensure the integrity of your data. Any time your data model becomes more complex, so does the array of issues that can arise.

I hope this review has been helpful, and feel free to test out the code for yourself.

Mocking the MongoDB Java driver

2016-07-25 00:00:00 +0000

Mocking the MongoDB Java driver

While at MongoDB World 2016 I was involved in multiple conversations about unit testing and test driven development of data access objects that wrap the MongoDB Java driver. Not two days after returning to work from MongoDB World one of the teams I work with was complaining that the 3.x Java driver broke a lot of their testing patterns and that they were having difficulties mocking out the driver. I had a few meetings canceled so I decided to sit down and create a testing pattern that could be easily implemented and extended using the Mockito libraries.

A side note before we get started. If you are mocking at the driver layer you may want to take a step back and consider why you are doing it and much of the following code violates the principle of 'Do no mock types you don't own'. However, while I am not going to dive deep into the philosophy of unit testing, I think these testing patterns may be of use to some. My general rule of thumb is to only test logic, avoid the file system, and assume that the external systems work as advertised. Avoid the pitfalls of only testing mocks for the sake of testing and make sure you set up integration layers to verify that the external systems do, in fact, work as advertised. That being said, all the code for the following examples may be found here.

Lets begin with setting up our base mocks and injecting them into the class we will be working on.

@Mock
private MongoClient mockClient;
@Mock
private MongoCollection mockCollection;
@Mock
private MongoDatabase mockDB;

@InjectMocks
private DriverWrapper wrapper;

@Before
public void initMocks() {
   when(mockClient.getDatabase(anyString())).thenReturn(mockDB);
   when(mockDB.getCollection(anyString())).thenReturn(mockCollection);
   wrapper.init();
   MockitoAnnotations.initMocks(this);
}

I am using the Mockito's InjectMocks which will inject the mock MongoClient into the DriverWrapper class. DriverWrapper is the class under test in this case. Using the Junit @Before annotation reduces the amount boilerplate code needed in each test as this method will run before each test.

For the first test lets try something simple with a test for a method that finds documents by a last name field

@Test
public void findBob() {
   FindIterable iterable = mock(FindIterable.class);
   MongoCursor cursor = mock(MongoCursor.class);
   Document bob = new Document("_id",new ObjectId("579397d20c2dd41b9a8a09eb"))
      .append("firstName", "Bob")
      .append("lastName", "Bobberson");

   when(mockCollection.find(new Document("lastName", "Bobberson")))
      .thenReturn(iterable);
   when(iterable.iterator()).thenReturn(cursor);
   when(cursor.hasNext()).thenReturn(true).thenReturn(false);
   when(cursor.next()).thenReturn(bob);

   List<Document> found = wrapper.findByLastName("Bobberson");

   assertEquals(bob, found.get(0));
}

And here is the method under test:

public List<Document> findByLastName(String lastName) {
   List<Document> people = new ArrayList<>();
   FindIterable<Document> documents = collection.find(new Document("lastName", lastName));
   MongoCursor<Document> iterator = documents.iterator();
   while (iterator.hasNext()) {
      Document document = iterator.next();
      people.add(document);
   }
   return people;
}

So this works, but the code feels pretty bulky. Following the red/green/refactor principles lets clean up the implementation of the findByLastName method. Looking over the Java api there is a into method that will will replace the body of the code above:

public List<Document> findByLastNameUsingInto(String lastName) {
   return collection.find(new Document("lastName", lastName)).into(new ArrayList<>());
}

Well, that reduced the line count of the method by a bit... It did however break the test as we need to mock out different parts of the code and needs to be updated to reflect the new implementation:

@Test
public void findBobUsingInto() {
   FindIterable iterable = mock(FindIterable.class);
   MongoCursor cursor = mock(MongoCursor.class);
   Document bob = new Document("_id",new ObjectId("579397d20c2dd41b9a8a09eb"))
      .append("firstName", "Bob")
      .append("lastName", "Bobberson");

   when(mockCollection.find(new Document("lastName", "Bobberson"))).thenReturn(iterable);
   when(iterable.into(new ArrayList<>())).thenReturn(asList(bob));

   List<Document> found = wrapper.findByLastNameUsingInto("Bobberson");
   assertEquals(bob, found.get(0));
}

While the code has been reduced down to one line the test still has a lot of boilerplate that will need to be "duplicated" in the next test. I am a fan of the builder pattern for problems like this so lets take a look at a way to implement this pattern to create a more generic tool for setting up the tests.

public class MockCursorBuilder {

    private FindIterable iterable = mock(FindIterable.class);
    private MongoCursor cursor = mock(MongoCursor.class);
    private MongoCollection mockCollection;

    public MockCursorBuilder(MongoCollection mockCollection) {

        this.mockCollection = mockCollection;
        when(iterable.iterator()).thenReturn(cursor);
    }

    public MockCursorBuilder withQuery(Document query) {
        when(mockCollection.find(query)).thenReturn(iterable);
        return this;
    }

    public MockCursorBuilder usingInto(Document... documents) {
        when(iterable.into(new ArrayList<>())).thenReturn(asList(documents));
        return this;
    }
}

Using this approach we are passing the mock MongoCollection we created in the test as a constructor parameter. We then set collection as a field. We internalize the mock MongoCursor and FindIterable classes that we needed to set up per test. Finally we use the builder pattern to define the mocking of the find and into methods. The test may now be rewritten as:

@Test
public void findBobUsingIntoAndMockBuilder() {
    Document bob = new Document("_id",new ObjectId("579397d20c2dd41b9a8a09eb"))
        .append("firstName", "Bob").append("lastName", "Bobberson");
    new MockCursorBuilder(mockCollection)
        .withQuery(new Document("lastName", "Bobberson")).usingInto(bob);
    assertEquals(bob,wrapper.findByLastNameUsingInto("Bobberson").get(0));
}

Using this new builder it becomes much easier to create a test with multiple Documents returned by the query. The main difference in this test is the use of the Hamcrest library's IsIterableContainingInOrder class. This allows us to test the order of a returned list against an expected list in a direct assertion.

@Test
public void findMultipleUsers() {
    Document bob = new Document("_id",new ObjectId())
        .append("firstName", "Bob").append("lastName", "Bobberson");
    Document robert = new Document("_id",new ObjectId())
        .append("firstName", "Robert").append("lastName", "Bobberson");
    Document joe = new Document("_id",new ObjectId())
        .append("firstName", "JoeBob").append("lastName", "Bobberson");
    Document mark = new Document("_id",new ObjectId())
        .append("firstName", "MarkBob").append("lastName", "Bobberson");
    new MockCursorBuilder(mockCollection)
        .withQuery(new Document("lastName", "Bobberson"))
        .usingInto(bob, robert, joe, mark);
    Assert.assertThat(wrapper.findByLastNameUsingInto("Bobberson"),
        IsIterableContainingInOrder.contains(bob, robert, joe, mark));
}

This concludes my basic implementation of using Mockito to test implementations of the MongoDB Java driver. I hope you have found it useful. There are some other patterns and tests in the source code that I am working on and my publish more on this later.