Duplicated Movements

BOS is generating duplicated movements.

It is mandatory to find the way to skip this kind of behavior and implement a solution that prevents BOS to fail again in this kind of situations.

Problem Description

BOS has some flows that are not protected against multiple objects creation, this is causing the violation of several constraints that get us to incorrect values in balances.

Example:

Money movements are represented by two models:

  • Asset for internal movements (between BOS balances) Two people from operations can have the same view open at the same time and make two movements at once.

    • Nowadays, these movements are not protected and there is a risk of them generating duplications when BOS users are using the platform manually.

Several BOS users are working on the client's detail page:

  • In the payments list, first user press 'Fund from balance' on one payment.

  • In the same page, another user press 'Fund from balance' on the same payment.

  • ReconMatch for external ones (incoming/outgoing BOS balances).

    • Automatic matching can overwrite changes that operations make if the automatic process is running at the same time. Also, manual match authorization (BOS users) can take the system to produce duplicated movements

There are several processes trying to modify the same entry of a database table, in this particular case:

  • There are, at least, two locked processes waiting to make a write operation in table settlements__recon_match.

  • Once this table is unblock:

  • First process is able to write in the table. This operation includes the creation ok money movement (an entry in the table settlements__currency_account_entry).

  • Second process generates the corresponding entry in table settlements__currency_account_entry and the one created by first process stays orphan.

Background

There a couple of documents analyzing this situation * Real duplicates entries (from support team) * Candidates flows to be causing duplications

Solution

The selected solution should be generic in a way that will valid for future developments.

Because of the flows in BOS and how models are modified, external and internal movements can avoid duplications in different ways.

The proposal is:

External Movements (ReconMatch)

Duplications can be controlled at database level by creating an index that warranties only one:

Prototype

CREATE UNIQUE INDEX CONCURRENTLY unique_reconmatch_bank_account_entry_id_amended
ON reconciliation_reconmatch (bank_account_entry_id, amended)
WHERE amended = 0

Before this index is added we need to check if there’s any data that would violate the constraint before hand and fix it:

from django.db.models import Count
dupes = ReconMatch.objects.filter(amended=False).values('bank_account_entry_id').annotate(Count('id')).order_by().filter(id__count__gt=1)
ReconMatch.objects.filter(amended=False, bank_account_entry_id__in=[item['bank_account_entry_id'] for item in dupes])

Equivalent command for index creation in ORM (we need no keep code commented until we start using a new Django version):

# To be activated in Django 2.2
# constraints = [
#     models.UniqueConstraint(fields=['bank_account_entry_id', 'amended'],
#     name='unique_reconmatch_bank_account_entry_id_amended'),
#     condition=Q(amended=False),
# ]
#
# Or to be activated in Django 1.11 with django-partial-index library
# indexes = [
#    PartialIndex(fields=['bank_account_entry_id', 'amended'], unique=True, where=PQ(amended=False)),
# ]

And the migration will look like this:

@non_atomic_migration
def create_index(apps, schema_editor):
    index_sql = 'CREATE UNIQUE INDEX CONCURRENTLY "unique_reconmatch_bank_account_entry_id_amended" ON ' \
                '"reconciliation_reconmatch" ("bank_account_entry_id", "amended") WHERE amended = False;'
    cursor = connection.cursor()
    cursor.execute(index_sql)

@non_atomic_migration
def drop_index(apps, schema_editor):
    index_sql = 'DROP INDEX CONCURRENTLY "unique_reconmatch_bank_account_entry_id_amended";'
    cursor = connection.cursor()
    cursor.execute(index_sql)

Internal Movements (Assets)

Taking advantage of the fact that Asset creations are isolated in the class MoneyWorkflowService, we can implement a chain of movements to prevent the update of an object been modified.

Prototype

How does the class looks like?

class MovementChain(models.Model):
    '''Entities can be payments, trades, etc, ... '''
    source = GenericForeignKey('source_content_type', 'source_object_id')
    destination = GenericForeignKey('destination_content_type', 'destination_object_id')
    parent = models.ForeignKey('MovementChain', null=True)

    class Meta:
        unique_together = [('source_content_type', 'source_object_id', 'destination_content_type', 'destination_object_id', 'parent')]

Adding a movement per type of entity will reduce deadlocks in the database so deadlocking one table doesn't affect all movements. If we choose this solution, we could avoid using the GenericForeingKey and use a ForeignKey to the content type table and use only an IntegerField without referential integrity for the primary key of the entity. For example, there will be a PaymentMovementChain independent of everything else, even from the payments themselves.

How to control duplicated movements?

def create_movement_chain_entry(source, destination, parent_movement_id=None):
    return MovementChain.objects.create(source=source, destination=destination, parent_id=parent_movement_id)

How it's called in a view

try:
    movement = create_movement_chain_entry(source, destination, request.POST.get('parent_movement_id', None))
except SomethingWentWrong:
    # Depending on the nature of the view: log error message, notifications, send emails, ...

The parent id will be empty if we are dealing with the first movement, for the rest of movement involving the object, it will be the last movement id for this client that existed when If two people are trying to create a movement for the same parent, it will raise an exception for one of them so it will have to be notified and try again. This guarantees that a person can only create a new movement if their view had the latest data, preventing the system from creating.

  • For templates, a hidden field containing the last movement id will be added to the corresponding form.
  • Periodic tasks will be prevented from modifying an object by querying the last movement made over it. MoneyWorkflow service is a good candidate to cover this logic because elements (trade, payment, ...) are retrieved just before executing the movement.

Positive points

  • Easy to implement.
  • Easy to maintain.
  • No concurrency control involved.

Negative

  • Requires the identification and modification of every template and view where movements are executed.
  • Hard to deal with the exceptions generated by the system. I.e.:
    • Notifications for templates.
    • Logs for views, daemons, ...
    • Emails for critical parts of the code.
  • This solution also discards not duplicated movements.

Alternatives

There is an alternative covering the requirements, it is based on registering and protecting the movements and it takes three steps:

  1. Isolate critical functionality.

    First step would be to isolate the creation of assets and recon matches in a service. There have been previous developments where those creations were moved to the facade MoneyWorkflowFacade and its corresponding service, but there are still other parts of the code where those actions are done and they should be all centralized in the same service.

    Once the resource his unlock, the conditions of the movement (client has enough balance, trade totally funded, ...) have to be checked by the next process acquiring the lock and current MoneyWorkflowFacade does not do this. Because of this, it is necessary to create a layer wrapping MoneyWorkflow (for now, let's call it BusinessLayer) with these responsibilities: * Lock resources. * Check movement conditions. * Call MoneyWorkflow to move the money. * Release resources.

    This way, once the first movement is finished, conditions will be checked by next process and duplicated movements will be skipped and logged properly.

  2. At this point of, the movements need an unique identifier to avoid duplications and a system to deal with this to skip duplications. Two different perspectives:

    1. REDIS Cache.

      As mentioned in point 1, extra protection will be needed at code level. To achieve this goal, the BusinessLayer would use an external tool to control the objects been modified. A REDIS cache (let's call it movements_being_executed) could be the solution: * The key is an unique identifier of the movement been created. * i.e. ContentTypeOrigin + ObjectIdOrigin + ContentTypeTarget + ObjectIdTarget * The value is a dictionary containing: * pid: The id of the process executing the movement (to remove key from cache when this process is no longer active) * renewals_number: Control how many times the TTL has been renewed (30 seconds each time). * ttl_factor: Once the TTL has been renewed 16 times (8 minutes), it will be renewed dynamically in order to expire: * new_ttl = total_timeout / (2 * ttl_factor) until it is less than 59 seconds

    2. Model Movement

      Create Django model Movement to store movements: * This table will have an unique ID representing the movement. * When the system tries to create a movement with the same id an exception will be raises.

  3. Control Time To Live.

    TTL of the elements stored in cache movements_being_executed could be set as a constant and discard them once it is achieved.

    Another possibility is to renew the timeout dynamically. No implemented solution has been found so the proposal is: * A periodic task (executed every 5 seconds) to manage TTL of each element stored in cache movements_being_executed. * When a new key is stored in movements_being_executed, the key is also stored in a cache used by control TTL with: {key, ttl, timeout} * Every 5 seconds, the task updates the info: * Updates the TTL. * TTL < 5 seconds: * If the process is still running: * Renew TTL 30 seconds until 8 minutes (16 renewals), then renew it with T/2 strategy. The 30 seconds strategy is proposed to avoid long waiting periods when the psettlements/models/bank_account.py:230rocess is not alive. * otherwise, (movement is stored in the cache but process not running) means something went wrong and the element will expire gracefully.

    The proposed times have been chosen after taking a look to Kibana panels the latest database locks we had in BOS took between 1 and 11 minutes.

    lock system

Prototype

This is just an approach to the possible solution in python:

# Setting values: store initial timeout_factor with the movement key

import json, redis

r = redis.StrictRedis('server_name')
pid = os.getpid()
movement_info_dict = {'pid': pid, 'ttl_factor': 1, 'renewals_number': 0}
movement_info_json = json.dumps(movement_info_dict)
r.set('trade00001payment0002', movement_info_json)

# Getting values and updating TTL

import json, psutil, redis

INITIAL_TIMEOUT = 480
SECONDS_TO_LAST_RENEWAL = 59
MINIMUM_SECONDS_TO_UPDATE = 5
TTL_RENEWAL = 30
MAX_NUMBER_OF_TTL_RENEWALS = 15

for key in r.keys():
    movement_ttl = r.ttl(key)
    if movement_ttl < MINIMUM_SECONDS_TO_UPDATE:
        movement_info_json = r.get(key)
        movement_info_dict = json.loads(movement_info_json)
        pid = movement_info_dict.get('pid')
        if psutil.pid_exists(pid):
            number_of_renewals = movement_info_dict.get('renewals_number')
            if number_of_renewals < MAX_NUMBER_OF_RENEWALS:
                movement_info_dict.set('renewals_number', number_of_renewals + 1)
                movement_info_json = json.dumps(movement_info_dict)
                r.set(key, movement_info_json)
                r.expire(key, TTL_RENEWAL_SECONDS)
            else:
                ttl_factor = movement_info_dict.get('ttl_factor')
                new_ttl = INITIAL_TIMEOUT / (2 * ttl_factor)
                if new_ttl > SECONDS_TO_LAST_RENEWAL:
                    movement_info_dict.set('new_ttl', new_ttl + 1)
                    movement_info_json = json.dumps(movement_info_dict)
                    r.set(key, movement_info_json)
                    r.expire(key, new_ttl)

Positive things * An external micro service could take care of this. * Isolating creation of ReconMatch will give us a better control over its creation.

Negative things * Creating a system for TTL update is delicate stuff. * Hard to find unique ids for some movements. * Hard to identify a process is dead to update the cache

Another alternatives placed on the table:

  • Protect database at low level using triggers, constraints and other postgres utilities.

    • BOS database control is in the hands of the ORM Django. Mixing two different ways of controlling the database will drive the system into new problems in the future. It also requires an extra maintenance.
  • Modify models relationships

    • ReconMatch model: Change the type of the field currency_account_entry from ForeignKey to OnoToOne

      • Only fixes a particular case.
    • Create a double pointer ReconMatch <-> CurrencyAccountEntry

      • Hard to maintain.
      • Against database normalization.
  • Use select_for_update to control access to rows. python with transaction.atomic(): recon_matches = ReconMatch.objects.select_for_update().filter(id=reconmatch.id) if recon_matches: existing_entry = recon_matches[0].currency_account_entry if not existing_entry: reconmatch.currency_account_entry = new_entry else: reconmatch.currency_account_entry = existing_entry reconmatch.save()

    Previous investigation

  • Implement optimistic approach with post-check in Django models.

    Described here

    Rejected because this optimistic locking is good for updating records. To do optimistic locking when you are inserting new records, you have to say: "I want to create a new record, and the version of the data that I'm creating it for is N", where N is the number of records that already exist (or some other condition that is significant, perhaps the ID of the last created record). If the database sees that you're trying to create a new record based off an older version of the data, it stops you.

Caveats

  • Daemons and tasks. The behaviour of the automatic process acting over locked objects, i.e. automatic fund from balance

  • EBO. Probability of collision is low but there are some cases, i.e. clients with more than one contact managing the same client's trade at the same time, collisions with automatic processes in the background

  • API. There will be an impact over end points involving objects under modification.

  • Frontend. GUI will be affected by this solution and final users will be notified when they try to do some actions over objects under modification.

  • Multipayment. Pay special attention on testing how this solution affects multipayment funding.

  • Chain of Movements concerns:

    • Situations where a movement should be in a place of the chain but it’s not, do we need to rebuild the chain?
    • We will never ever modify the chain and amendments will always be made at the top of the chain or starting a new chain?

Operation

BOS users will be affected: * Operation users will see their movements are rejected. They will get the corresponding notification telling them the object is been modified and they need to refresh the view and get the correct data.

Security Impact

No sensitive data such as tokens, keys, or user data is involved in this project

No modifications in the API in a way that may impact security, such as a new way to access sensitive information or a new way to login?

No cryptography or hashing

No change involve using or parsing user-provided data.

Performance Impact

  • Performance will be affected slightly because the new table will be queried every time a new movement is created.
  • Performance issues as a consequence of an increasing number of locks in the new table.

Developer Impact

  • Every time we implement a new functionality involving a movement, we have to be aware of using the system controlling duplications.
  • Developer must be aware of the rollback in the cases covered by this new mechanism.

Data Consumer Impact

No changes in the current data structure are needed.

Deployment

New index over table reconciliation_reconmatch Migrations related to new model

Dependencies

The system that control the TTL of locked movements has to be implemented before start using the lock. (Section Solution: point 4 depends on point 3)

References

  • https://docs.djangoproject.com/en/1.8/ref/models/querysets/#select-for-update
  • https://redis.io/commands/ttl
  • https://dev.mysql.com/doc/refman/8.0/en/innodb-locking-reads.html
  • https://fxsolutions.atlassian.net/wiki/spaces/TEAM/pages/377127109/Concurrency+control
  • https://medium.com/@hakibenita/how-to-manage-concurrency-in-django-models-b240fed4ee2
  • https://www.postgresql.org/docs/9.1/sql-createindex.html
  • Estimations document (pending)