dotsinspace
dotsinspace

Reputation: 691

Node-Smpp in Cluster mode

I have node-smpp module which could be found on npm node-smpp now weird thing is that when i run my node.js app using pm2 with -i max for anything greater then 0 causing issue where i can't stop node-smpp with intention of completely destroyed instance. but in cluster mode it simply doesn't work. Your findings can help me out to overcome this frustration. with one simple question does running node-smpp in cluster effect on how binds are managed.

I am attaching video link to this problem so that you can better understand what i am dealing with. and following is code of Smpp Manager

Code

/*
 * IMPORTS
 */
import Joi from 'joi' // Npm: Validation library.
import Enum from 'enum' // Npm: Enum library.
import Stringg from 'string' // Npm: String library.
import _ from 'underscore' // Npm: Utility library.
import { Buffer } from 'buffer' // Core: Buffer library.


/*
 * PACKAGES
 */
import Tag from 'tag'


/*
 * SIBLINGS
 */
import Smpp from './Session'
import { context, ExecuteGraph } from '../'


/*
 * GLOBALS
 */
const _SmppSessions = new Map()
const _messages = {}


/*
 * EXPORTS
 */
export default async function SmppManager($options) {
  // Constructor call check.
  if (this instanceof SmppManager) {
    try {
      // Local variable.
      let _JoiObject

      // Const assignment.
      const _functionName = 'Context -> SmppManager'
      const _this_ = this

      // Joi validate if username, password, ip, rxPort and txPort is defined.
      _JoiObject = (Joi.object().keys({
        'username': Joi.string().required(),
        'password': Joi.string().required(),
        'ip': Joi.string().required(),
        'rxPort': Joi.number().required(),
        'txPort': Joi.number().required()
      })).validate($options)

      /*
       * If validation caught exception
       * then report failure.
       *
       */
      if (_JoiObject instanceof Error) return _JoiObject

      // Properties assignment.
      this.$options = $options
      this.id = $options.id
      this.username = $options.username
      this.password = $options.password
      this.ip = $options.ip
      this.rxPort = $options.rxPort
      this.txPort = $options.txPort
      this.tpsDuration = 1000
      this.isPaused = false
      this.isSticky = $options.isSticky ?? false
      this.isDeliveredSmListening = false
      this.debug = global.WORKING_ENV.DEVELOPMENT.value === global.CONFIG_RC.env
      this.maxReconnectDelay = 10000
      this.displayName = $options.displayName ?? SmppManager.prototype.displayName
      this.tps = $options.tps ?? SmppManager.prototype.tps
      this.enquireLinkInterval = $options.enquireLinkInterval ?? SmppManager.prototype.enquireLinkInterval
      this.sessionTimeoutInterval = $options.sessionTimeoutInterval ?? SmppManager.prototype.sessionTimeoutInterval
      this.sessionAllowed = $options.sessionAllowed ?? SmppManager.prototype.sessionAllowed
      this.maxReconnectAttempts = $options.maxReconnectAttempts ?? SmppManager.prototype.maxReconnectAttempts
      this.initialReconnectDelay = $options.initialReconnectDelay ?? SmppManager.prototype.initialReconnectDelay
      this.reconnectBackoffFactor = $options.reconnectBackoffFactor ?? SmppManager.prototype.reconnectBackoffFactor

      // Report failure if context is empty.
      if (_.isEmpty(context) && context.isContext) return new Error('MISSING_CONTEXT')

      /*
       * Report failure if tps is greater than
       * tps of server.
       */
      if (this.tps > CONFIG_RC.tps) return new Error('TPS_LIMIT_BREACHED')

      /*
       * Validate all context with joi
       * to check if parameter type is correct
       * or not.
       */
      _JoiObject = await (Joi.object().keys({
        'id': Joi.string().required(),
        'username': Joi.string().optional(),
        'password': Joi.string().optional(),
        'ip': Joi.string().required(),
        'txPort': Joi.number().required(),
        'rxPort': Joi.number().required()
      })).validate(this)

      /*
       * If validation caught exception
       * then report failure.
       */
      if (_JoiObject instanceof Error) return _JoiObject

      // Style guide.
      context.Debug({ 'message': `Checking smpp manager with id: ${this.id} in HM.` }, _functionName)

      // Const assignment.
      const _SmppSessionsGet = _SmppSessions.get(Tag.Smpp.id(_this_.id))

      // If session is not empty then return it.
      if (!_.isEmpty(_SmppSessionsGet)) {
        // Style guide.
        context.Debug({ 'message': `Smpp manager with id: ${this.id} already initialized.` }, _functionName)

        // Return context.
        return _SmppSessionsGet
      }

      // Style guide.
      context.Debug({ 'message': `Initializing sessions.` }, _functionName)

      /*
       * Initialize tx port if it is different
       * from tx port and same with rx port.
       */
      if (this.txPort) {
        // Style guide.
        context.Debug({ 'message': 'Initializing tx session.' }, _functionName)

        // Initialize tx session.
        const _TxSession = await this.NetInitialization('txSession', _this_.rxPort === _this_.txPort ? 'TRX' : 'TX')

        // If smpp session caught exception then report failure.
        if (_TxSession instanceof Error) {
          // Style guide.
          context.Debug({ 'message': `Failed to initialize txSession.`, 'error': _TxSession }, _functionName)

          // Report failure.
          return _TxSession
        }

        // Style guide.
        context.Debug({ 'message': `Successfully initialized txSession.` }, _functionName)

        // Update mode of smpp.
        this.mode = _this_.rxPort === _this_.txPort ? 'UNIFIED' : 'SEPARATE'
      }
      if (_this_.rxPort) {
        // Style guide.
        context.Debug({ 'message': `rxPort is defined initializing rxSession.` }, _functionName)

        // Assign changes made for the session.
        if (_this_.rxPort === _this_.txPort) {
          // Style guide.
          context.Debug({ 'message': `rxPort is same as txPort skipping initialization.` }, _functionName)

          // Update this with client.
          this.rxSession = this.txSession

          // Update mode style.
          this.mode = 'UNIFIED'
        } else {
          // Style guide.
          context.Debug({ 'message': `rxPort is different from txPort.` }, _functionName)

          // Initialize rx session.
          const _RxSession = await this.NetInitialization('rxSession', 'RX')

          // If smpp session caught exception then report failure.
          if (_RxSession instanceof Error) {
            // Style guide.
            context.Debug({ 'message': `Failed to initialize rxSession.`, 'error': _RxSession }, _functionName)

            // Report failure.
            return _RxSession
          }

          // Style guide.
          context.Debug({ 'message': `Successfully initialized rxSession.` }, _functionName)

          // Update mode of smpp.
          this.mode = 'SEPARATE'
        }

        // Style guide.
        context.Debug({ 'message': `Successfully initialized smpp manager with id: ${this.id}.` }, _functionName)
      }

      // Style guide.
      context.Debug({ 'message': `Successfully initialized smpp manager with id: ${this.id}.` }, _functionName)
      context.Debug({ 'message': `Creating pool for smpp manager with max session allowed: ${_this_.sessionAllowed}` }, _functionName)

      // Style guide.
      context.Debug({ 'message': `Successfully created pool for smpp manager with max session allowed: ${_this_.sessionAllowed}` }, _functionName)
      context.Debug({ 'message': `Creating entry for smpp manager with id: ${this.id} in global sessions map.` }, _functionName)

      /*
       * Create entry for given smpp in
       * smpp clients with object pool.
       */
      _SmppSessions.set(Tag.Smpp.Session(_this_.ip), this)
      _SmppSessions.set(Tag.Smpp.id(_this_.id), this)

      // Style guide.
      context.Debug({ 'message': `Successfully created entry for smpp manager with id: ${this.id} in global sessions map.` }, _functionName)
      context.Debug({ 'message': `Loading configuration for smpp manager with id: ${this.id}.` }, _functionName)

      // Load configuration for given routePlan.
      await this.StatusUpdate(this.status.ACTIVE.value)
      await this.LoadConfiguration()

      // Object assignment.
      const _SubscribeChannel = async __id => {
        // Const assignment.
        const _EventSource = context.Pubsub.subscribe(Tag.Smpp.id(__id))

        // Add pubsub to this routePlan.
        for await (const value of _EventSource) {
          // Style guide.
          await context.Debug({ 'message': `Smpp got updates. Loading new configuration.` }, _functionName)

          // Load configuration.
          await this.LoadConfiguration(value)

          // Style guide.
          await context.Debug({ 'message': `Successfully loaded new configuration for smpp manager with id: ${this.id}.` }, _functionName)
          await context.Debug({ 'message': `Making sure that session are bounded.` }, _functionName)
        }
      }; _SubscribeChannel(this.id)

      // Style guide.
      context.Debug({ 'message': `Successfully loaded configuration for smpp manager with id: ${this.id}.` }, _functionName)

      // Object assign context updates.
      Object.assign(this, _this_)

      // Return context
      return this
    } catch (error) {
      // Report failure.
      return error
    }
  }
}
export { _SmppSessions as SmppManagers }


/*
 * PROTOTYPE
 */
SmppManager.prototype = {
  // Properties.
  'mode': 'UNIFIED',
  'readConfigurationTimeout': 900000,
  'configuration': {
    'displayName': 'SMPP',
    'enquireLinkInterval': 45000,
    'sessionTimeoutInterval': 300000,
    'sessionAllowed': 1,
    'maxReconnectAttempts': 5,
    'tps': 10
  },
  'queueConfiguration': CONFIG_RC.queueConfiguration,
  'status': new Enum({
    'ACTIVE': 'ACTIVE',
    'IN_ACTIVE': 'IN_ACTIVE'
  }),

  // Load configuration from database.
  async LoadConfiguration() {
    // Const assignment.
    const _functionName = 'Context -> SmppManager -> LoadConfiguration'

    // Style guide.
    context.Debug({ 'message': `Loading configuration for smpp ${this.id}.` }, _functionName)

    // Get configuration of current smpp.
    const _SmppFindUnique = await ExecuteGraph(`
      query SmppReadUniqueQuery {
        SmppReadUnique(smppId: "${this.id}") {
          id
          ip
          username
          password
          rxPort,
          txPort,
          isSticky,
          isPaused,
          mode,
          displayName
          enquireLinkInterval
          sessionTimeoutInterval
          sessionAllowed
          maxReconnectAttempts
          reconnectBackoffFactor,
          initialReconnectDelay,
          tps
          ContentTranslation {
            id
            strategy
            regex
            translationKeyword
          }
        }
      }
    `, context)

    // If getting smpp caught exception then report failure.
    if (0 < _SmppFindUnique?.errors?.length) {
      // Style guide.
      context.Debug({ 'message': `Failed to load configuration for smpp ${this.id}.` }, _functionName)

      // Report failure.
      return _SmppFindUnique
    }
    if (_.isEmpty(_SmppFindUnique?.data?.SmppReadUnique)) {
      // Style guide.
      context.Debug({ 'message': `Failed to load configuration for smpp id: ${this.id}.` }, _functionName)

      // Report failure.
      return new Error('SMPP_NOT_FOUND')
    }

    /*
     * If queue and session is paused then resume
     * if isPaused is false else keep it too paused.
     */
    if (this.isPaused !== _SmppFindUnique?.data?.SmppReadUnique?.isPaused) {
      // Style guide.
      context.Debug({ 'message': `Checking if session is: ${_SmppFindUnique?.data?.SmppReadUnique?.isPaused}` }, _functionName)

      // If session is paused then resume it.
      if (_SmppFindUnique?.data?.SmppReadUnique?.isPaused) {
        // Style guide.
        context.Debug({ 'message': `Pausing smpp manager with id: ${this.id}.` }, _functionName)

        // Pause smpp.
        await this.Pause()
      } else {
        // Style guide.
        context.Debug({ 'message': `Resuming smpp manager with id: ${this.id}.` }, _functionName)

        // Resume smpp.
        await this.Resume()

        // Style guide.
        context.Debug({ 'message': `Successfully resumed smpp manager with id: ${this.id}.` }, _functionName)
      }
    }

    // Update configuration.
    Object.assign(this, _SmppFindUnique?.data?.SmppReadUnique)

    // Style guide.
    context.Debug({ 'message': `Successfully loaded configuration for smpp id: ${this.id}.` }, _functionName)

    // Return context.
    return this
  },


  // Status Update of current SmppManager.
  StatusUpdate(__status) {
    // Const assignment.
    const _functionName = 'Context -> SmppManager -> StatusUpdate'

    // Style guide.
    context.Debug({ 'message': `Updating status for smpp manager with id: ${this.id}.` }, _functionName)

    // Mark route plan as active.
    return ExecuteGraph(`
      mutation SmppStatusUpdateMutation {
        SmppStatusUpdate(smppId: "${this.id}", isActive: ${__status === this.status.ACTIVE.value}) {
          id
          status
          message
        }
      }
    `, context)
  },


  /*
   * Handler for sending sms through
   * given smpp manager.
   */
  async NetInitialization(__sessionName, __mode = 'TRX') {
    // Local variable.
    let i

    // Const assignment.
    const _functionName = 'Context -> SmppManager -> NetInitialization'

    // Style guide.
    context.Debug({ 'message': 'HealthCheck is in progress..' }, _functionName)

    // Health check for given ip.
    const _HealthCheck = await context.Utility.HealthCheck(this.ip, this[__sessionName?.replace('Session', 'Port')])

    /*
     * If health check caught exception
     * then report failure.
     */
    if (_HealthCheck instanceof Error) {
      // Style guide.
      context.Debug({ 'message': `Failed to health check ip: ${this.ip}.`, 'error': _HealthCheck }, _functionName)

      // Report failure.
      return _HealthCheck
    }

    // Style guide.
    context.Debug({ 'message': `Successfully health checked ip: ${this.ip}.` }, _functionName)
    context.Debug({ 'message': `Initializing ${__sessionName}.` }, _functionName)

    // Variable assignment.
    this[__sessionName] = []

    // Loop over session allowed and create session.
    for (i = 0; i < this.sessionAllowed; i++) {
      // Style guide.
      context.Debug({ 'message': `Creating new smpp manager with id: ${this.id}.` }, _functionName)

      // Const assignment.
      const _connectedSession = Smpp.connect({
        'auto_enquire_link_period': this.enquireLinkInterval,
        'connectTimeout': this.sessionTimeoutInterval,
        'debug': this.debug,
        'username': this.username,
        'ip': this.ip,
        'id': this.id,
        'port': this[__sessionName?.replace('Session', 'Port')],
        'password': this.password,
        'sessionAllowed': this.sessionAllowed,
        'maxReconnectAttempts': this.maxReconnectAttempts,
        'initialReconnectDelay': this.initialReconnectDelay,
        'reconnectBackoffFactor': this.reconnectBackoffFactor,
        'tps': this.tps,
        'tpsDuration': this.tpsDuration,
        'mode': __mode,
        'onSubmitSm': __args => _connectedSession.submit_sm(__args, _messages?.[__args.destination_addr]?.onSuccess, r => { _messages?.[__args.destination_addr]?.onSend?.(r); _connectedSession.isActive = false }, _messages?.[__args.destination_addr]?.onFailure),
        'onDeliverSm': this.onDeliverSm,
        'sticky': this.isSticky
      }, () => {
        // Update smpp status.
        !_connectedSession.isActive && this.StatusUpdate(this.status.ACTIVE.value)
      })

      // Add event listener for session.
      _connectedSession.on('connect', () => { context.Pubsub.publish(Tag.Smpp.ErrorIpPort('GLOBAL_SMPP_NOTIFICATION'), { 'message': `IP: ${this.ip} got connected.`, 'status': 'READ_SUCCESSFUL' }) })
      _connectedSession.on('pdu', j => 0 === j.command_status ? void 0 : context.Debug({ 'message': JSON.stringify({ 'ip': this.ip, ...j }) && context.Pubsub.publish(Tag.Smpp.ErrorIpPort('GLOBAL_SMPP_NOTIFICATION'), { 'message': JSON.stringify({ 'ip': this.ip, ...j }), 'status': 'READ_SUCCESSFUL' }) }, _functionName))
      _connectedSession.on('debug', (e, j, k) => context.Pubsub.publish(Tag.Smpp.DebuggingIpPort(this.ip, this[__sessionName?.replace('Session', 'Port')]), { 'logs': { 'type': e, 'msg': j, 'payload': k } }))
      _connectedSession.on('error', e => { context.Debug({ 'message': `Smpp session with id: ${this.ip} got error.`, 'error': e }, _functionName); context.Pubsub.publish(Tag.Smpp.ErrorIpPort('GLOBAL_SMPP_NOTIFICATION'), { 'message': `IP: ${this.ip} got error on socket. Destroying current smpp bind will going to be created next time short message comes in.`, 'status': 'READ_SUCCESSFUL' }) })
      _connectedSession.socket.on('timeout', () => { context.Debug({ 'message': `Smpp session with id: ${this.ip} got timeout.` }, _functionName); context.Pubsub.publish(Tag.Smpp.ErrorIpPort('GLOBAL_SMPP_NOTIFICATION'), { 'message': `IP: ${this.ip} got closed on socket. Destroying current smpp bind will going to be created next time short message comes in.`, 'status': 'READ_SUCCESSFUL' }) })
      _connectedSession.socket.on('close', () => { context.Debug({ 'message': `Smpp session with id: ${this.ip} got closed.` }, _functionName); context.Pubsub.publish(Tag.Smpp.ErrorIpPort('GLOBAL_SMPP_NOTIFICATION'), { 'message': `IP: ${this.ip} got closed on socket. Destroying current smpp bind will going to be created next time short message comes in.`, 'status': 'READ_SUCCESSFUL' }) })
      _connectedSession.socket.on('error', e => { context.Debug({ 'message': `Smpp session with id: ${this.ip} got error.`, 'error': e }, _functionName); context.Pubsub.publish(Tag.Smpp.ErrorIpPort('GLOBAL_SMPP_NOTIFICATION'), { 'message': `IP: ${this.ip} got error on socket. Destroying current smpp and creating new one.`, 'status': 'READ_SUCCESSFUL' }) })

      // Update this with client.
      this[__sessionName].push(_connectedSession)
    }

    // Style guide.
    context.Debug({ 'message': `Successfully initialized ${__sessionName}.` }, _functionName)

    // Return context.
    return this[__sessionName]
  },

  // Method for getting smpp session from the pool.
  GetSession(__sessionName) {
    // Const assignment.
    const _functionName = 'Context -> SmppManager -> GetSession'

    // Style guide.
    context.Debug({ 'message': `Getting session for smpp manager with id: ${this.id}.` }, _functionName)

    // Loop over and check free session.
    for (const _SmppSession of this[__sessionName]) {
      // Check if session is free.
      if (!_SmppSession.isActive) {
        // Style guide.
        context.Debug({ 'message': `Found free session for smpp manager with id: ${this.id}.` }, _functionName)

        // Return context.
        return _SmppSession
      }
    }

    // Style guide.
    context.Debug({ 'message': `Failed to get session for smpp manager with id: ${this.id}.` }, _functionName)

    // Return context.
    return new Error('SESSION_LIMIT_REACHED')
  },

  // Method for handling deliver sm when they arrive.
  onDeliverSm(__pdu) {
    // Variable assignment.
    __pdu = JSON.parse(__pdu)

    // Const assignment.
    const _functionName = 'Context -> SmppManager -> onDeliverSm'

    // Style guide.
    context.Debug({ 'message': 'Deliver sm received.' }, _functionName)

    // Const assignment.
    const _receiptedMessageId = Stringg(__pdu.short_message.message).between('id:', ' sub:').s
    const _submitTime = Stringg(__pdu.short_message.message).between('submit date:', ' done date:').s
    const _doneTime = Stringg(__pdu.short_message.message).between('done date:', ' stat').s
    const _error = Stringg(__pdu.short_message.message).between('err:', '  text:').s
    const _messageStates = {
      '0': 'SCHEDULED',
      '1': 'ENROUTE',
      '2': 'DELIVERED',
      '3': 'EXPIRED',
      '4': 'DELETED',
      '5': 'UNDELIVERABLE'
    }

    // Search for the message.
    return ExecuteGraph(`
      query SmsReadUniqueQuery {
        SmsReadUnique(receiptId: "${_receiptedMessageId}") {
          id,
          remoteIpAddress,
          message,
          status
        }
      }`).then(_SmsFindUnique => {
      // If getting smpp caught exception then report failure.
      if (0 < _SmsFindUnique?.errors?.length) {
      // Style guide.
        context.Debug({ 'message': 'Something went wrong during short message search.', 'error': _SmsFindUnique?.errors }, _functionName)

        // Report failure.
        return _SmsFindUnique
      }
      if (_.isEmpty(_SmsFindUnique?.data?.SmsReadUnique)) {
      // Const assignment.
        const _errorr = new Error('SMS_NOT_FOUND')

        // Style guide.
        context.Debug({ 'message': 'Given short message not found', 'error': _errorr }, _functionName)

        // Report failure.
        return _errorr
      }

      // Style guide.
      context.Debug({ 'message': 'Found given short message.' }, _functionName)
      context.Debug({ 'message': 'Updating database with dlr report.' }, _functionName)

      // Object assignment.
      const _ConvertToDate = parameter => parameter ? new Date(`20${parameter.substring(0, 2)}`, parameter.substring(2, 4) - 1, parameter.substring(4, 6), parameter.substring(6, 8), parameter.substring(8, 10)) : new Date()

      // Update database with dlr report.
      return ExecuteGraph(`
      mutation SmsUpdateMutation {
        SmsUpdate(receiptId: "${_receiptedMessageId}", status: ${_messageStates[__pdu.message_state] ?? 'REJECTED'}, registeredDelivery: ${[0, 1].includes(parseInt(__pdu.registered_delivery, 10)) ? global.SMPP_REGISTERED_DELIVERY.REQUESTED.value : [2].includes(parseInt(__pdu.registered_delivery, 10)) ? global.SMPP_REGISTERED_DELIVERY.REQUESTED_ON_FAILURE.value : global.SMPP_REGISTERED_DELIVERY.REQUESTED_ON_SUCCESS.value}, error: "${_.isEmpty(_error) ? 'NONE' : _error}", submittedAt: "${new Date(_ConvertToDate(_submitTime))?.toISOString()}", doneAt: "${new Date(_ConvertToDate(_doneTime))?.toISOString()}", dlr: "${__pdu.short_message.message}") {
          id,
          message,
          status,
        }
      }
    `).then(_SmsUpdate => {
        // If updating smpp caught exception then report failure.
        if (0 < _SmsUpdate?.errors?.length) {
          // Style guide.
          context.Debug({ 'message': 'Failed to update database with dlr report.', 'error': _SmsUpdate?.errors }, _functionName)

          // Report failure.
          return _SmsUpdate
        }

        // Style guide.
        return context.Debug({ 'message': 'Successfully updated database with dlr report.' }, _functionName)
      })
    })
  },

  // SOME OF THE CODE IS REMOVED WHICH IS NOT RELATED TO THIS PROBLEM.

  /*
   * Handler for destroying this SMPP manager instance.
   * This will close and clean up all associated resources.
   */
  async Destroy() {
    // Const assignment.
    const _functionName = 'Context -> SmppManager -> Destroy'
    const _this_ = this

    // Style guide.
    context.Debug({ 'message': `Checking if smpp manager with id ${_this_.id} is already have session closed.` }, _functionName)

    /*
     * Loop over all xSession and close
     * destroy them.
     */
    for await (const jack of Object.entries(this)) {
      // Close the session.
      if (jack[0].includes('xSession')) {
        // Style guide.
        context.Debug({ 'message': `Closing smpp session with id ${_this_.id}.` }, _functionName)

        // Loop over each pool of xSession.
        for await (const jam of jack[1]) {
          // Style guide.
          context.Debug({ 'message': `Destroying smpp session with id ${_this_.id}.` }, _functionName)

          // Stop the session.
          await this.StatusUpdate(this.status.IN_ACTIVE.value)

          // After the session is closed, destroy it.
          await jam.onDestroy()

          // Style guide.
          context.Debug({ 'message': 'Successfully removed the smpp session.' }, _functionName)
        }
      }
    }

    // Remove the SMPP manager instance from the global sessions map.
    _SmppSessions.delete(Tag.Smpp.Session(_this_.ip))
    _SmppSessions.delete(Tag.Smpp.id(_this_.id))

    // Style guide.
    return context.Debug({ 'message': `Deleting smpp from sessions map stored in global.` }, _functionName)
  }
}

Video Link: https://vimeo.com/938197118?share=copy

I am expecting node.js to behave same way in any mode ( single instance or cluster mode ). but i can only kill smpp instance only when server is running in single instance as shown in video.

SOLUTION -> Running Smpp Server in cluster Mode. But how to build reliable system which can handle binds (which are stored in new Map()) and server should use one source from where it can fetch given bind and send message over it and same for Destroy

Upvotes: 0

Views: 77

Answers (0)

Related Questions