HEX
Server: Apache
System: Linux vpshost0650.publiccloud.com.br 4.4.79-grsec-1.lc.x86_64 #1 SMP Wed Aug 2 14:18:21 -03 2017 x86_64
User: bandeirantesbomb3 (10068)
PHP: 8.0.7
Disabled: apache_child_terminate,dl,escapeshellarg,escapeshellcmd,exec,link,mail,openlog,passthru,pcntl_alarm,pcntl_exec,pcntl_fork,pcntl_get_last_error,pcntl_getpriority,pcntl_setpriority,pcntl_signal,pcntl_signal_dispatch,pcntl_sigprocmask,pcntl_sigtimedwait,pcntl_sigwaitinfo,pcntl_strerror,pcntl_wait,pcntl_waitpid,pcntl_wexitstatus,pcntl_wifexited,pcntl_wifsignaled,pcntl_wifstopped,pcntl_wstopsig,pcntl_wtermsig,php_check_syntax,php_strip_whitespace,popen,proc_close,proc_open,shell_exec,symlink,system
Upload Files
File: //opt/puppetlabs/puppet/lib/ruby/vendor_ruby/mcollective/client.rb
module MCollective
  # Helpers for writing clients that can talk to agents, do discovery and so forth
  class Client
    attr_accessor :options, :stats, :discoverer, :connection_timeout

    def initialize(options)
      @config = Config.instance
      @options = nil

      if options.is_a?(String)
        # String is the path to a config file
        @config.loadconfig(options) unless @config.configured
      elsif options.is_a?(Hash)
        @config.loadconfig(options[:config]) unless @config.configured
        @options = options
        @connection_timeout = options[:connection_timeout]
      else
        raise "Invalid parameter passed to Client constructor. Valid types are Hash or String"
      end

      @connection_timeout ||= @config.connection_timeout

      @connection = PluginManager["connector_plugin"]
      @security = PluginManager["security_plugin"]

      @security.initiated_by = :client
      @subscriptions = {}

      @discoverer = Discovery.new(self)

      # Time box the connection if a timeout has been specified
      # connection_timeout defaults to nil which means it will try forever if
      # not specified
      begin
        Timeout::timeout(@connection_timeout, ClientTimeoutError) do
          @connection.connect
        end
      rescue ClientTimeoutError => e
        Log.error("Timeout occured while trying to connect to middleware")
        raise e
      end
    end

    @@request_sequence = 0
    def self.request_sequence
      @@request_sequence
    end

    # Returns the configured main collective if no
    # specific collective is specified as options
    def collective
      if @options[:collective].nil?
        @config.main_collective
      else
        @options[:collective]
      end
    end

    # Disconnects cleanly from the middleware
    def disconnect
      Log.debug("Disconnecting from the middleware")
      @connection.disconnect
    end

    # Sends a request and returns the generated request id, doesn't wait for
    # responses and doesn't execute any passed in code blocks for responses
    def sendreq(msg, agent, filter = {})
      request = createreq(msg, agent, filter)
      publish(request)
      request.requestid
    end

    def createreq(msg, agent, filter ={})
      if msg.is_a?(Message)
        request = msg
        agent = request.agent
      else
        ttl = @options[:ttl] || @config.ttl
        request = Message.new(msg, nil, {:agent => agent, :type => :request, :collective => collective, :filter => filter, :ttl => ttl})
        request.reply_to = @options[:reply_to] if @options[:reply_to]
      end

      @@request_sequence += 1

      request.encode!
      subscribe(agent, :reply) unless request.reply_to
      request
    end

    def subscribe(agent, type)
      unless @subscriptions.include?(agent)
        subscription = Util.make_subscriptions(agent, type, collective)
        Log.debug("Subscribing to #{type} target for agent #{agent}")

        Util.subscribe(subscription)
        @subscriptions[agent] = 1
      end
    end

    def unsubscribe(agent, type)
      if @subscriptions.include?(agent)
        subscription = Util.make_subscriptions(agent, type, collective)
        Log.debug("Unsubscribing #{type} target for #{agent}")

        Util.unsubscribe(subscription)
        @subscriptions.delete(agent)
      end
    end
    # Blocking call that waits for ever for a message to arrive.
    #
    # If you give it a requestid this means you've previously send a request
    # with that ID and now you just want replies that matches that id, in that
    # case the current connection will just ignore all messages not directed at it
    # and keep waiting for more till it finds a matching message.
    def receive(requestid = nil)
      reply = nil

      begin
        reply = @connection.receive
        reply.type = :reply
        reply.expected_msgid = requestid

        reply.decode!

        unless reply.requestid == requestid
          raise(MsgDoesNotMatchRequestID, "Message reqid #{reply.requestid} does not match our reqid #{requestid}")
        end

        Log.debug("Received reply to #{reply.requestid} from #{reply.payload[:senderid]}")
      rescue SecurityValidationFailed => e
        Log.warn("Ignoring a message that did not pass security validations")
        retry
      rescue MsgDoesNotMatchRequestID => e
        Log.debug("Ignoring a message for some other client : #{e.message}")
        retry
      end

      reply
    end

    # Performs a discovery of nodes matching the filter passed
    # returns an array of nodes
    #
    # An integer limit can be supplied this will have the effect
    # of the discovery being cancelled soon as it reached the
    # requested limit of hosts
    def discover(filter, timeout, limit=0)
      @discoverer.discover(filter.merge({'collective' => collective}), timeout, limit)
    end

    # Send a request, performs the passed block for each response
    #
    # times = req("status", "mcollectived", options, client) {|resp|
    #   pp resp
    # }
    #
    # It returns a hash of times and timeouts for discovery and total run is taken from the options
    # hash which in turn is generally built using MCollective::Optionparser
    def req(body, agent=nil, options=false, waitfor=[], &block)
      if body.is_a?(Message)
        agent = body.agent
        waitfor = body.discovered_hosts || []
        @options = body.options
      end

      @options = options if options
      threaded = @options[:threaded]
      timeout = @discoverer.discovery_timeout(@options[:timeout], @options[:filter])
      request = createreq(body, agent, @options[:filter])
      publish_timeout = @options[:publish_timeout] || @config.publish_timeout
      stat = {:starttime => Time.now.to_f, :discoverytime => 0, :blocktime => 0, :totaltime => 0}
      STDOUT.sync = true
      hosts_responded = 0

      begin
        if threaded
          hosts_responded = threaded_req(request, publish_timeout, timeout, waitfor, &block)
        else
          hosts_responded = unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
        end
      rescue Interrupt => e
      ensure
        unsubscribe(agent, :reply)
      end

      return update_stat(stat, hosts_responded, request.requestid)
    end

    # Starts the client receiver and publisher unthreaded.
    # This is the default client behaviour.
    def unthreaded_req(request, publish_timeout, timeout, waitfor, &block)
      start_publisher(request, publish_timeout)
      start_receiver(request.requestid, waitfor, timeout, &block)
    end

    # Starts the client receiver and publisher in threads.
    # This is activated when the 'threader_client' configuration
    # option is set.
    def threaded_req(request, publish_timeout, timeout, waitfor, &block)
      Log.debug("Starting threaded client")
      publisher = Thread.new do
        start_publisher(request, publish_timeout)
      end

      # When the client is threaded we add the publishing timeout to
      # the agent timeout so that the receiver doesn't time out before
      # publishing has finished in cases where publish_timeout >= timeout.
      total_timeout = publish_timeout + timeout
      hosts_responded = 0

      receiver = Thread.new do
        hosts_responded = start_receiver(request.requestid, waitfor, total_timeout, &block)
      end

      receiver.join
      hosts_responded
    end

    # Starts the request publishing routine
    def start_publisher(request, publish_timeout)
      Log.debug("Starting publishing with publish timeout of #{publish_timeout}")
      begin
        Timeout.timeout(publish_timeout) do
          publish(request)
        end
      rescue Timeout::Error => e
        Log.warn("Could not publish all messages. Publishing timed out.")
      end
    end

    def publish(request)
      Log.info("Sending request #{request.requestid} for agent '#{request.agent}' with ttl #{request.ttl} in collective '#{request.collective}'")
      request.publish
    end

    # Starts the response receiver routine
    # Expected to return the amount of received responses.
    def start_receiver(requestid, waitfor, timeout, &block)
      Log.debug("Starting response receiver with timeout of #{timeout}")
      hosts_responded = 0

      if (waitfor.is_a?(Array))
        unfinished = Hash.new(0)
        waitfor.each {|w| unfinished[w] += 1}
      else
        unfinished = []
      end

      begin
        Timeout.timeout(timeout) do
          loop do
            resp = receive(requestid)

            if block.arity == 2
              yield resp.payload, resp
            else
              yield resp.payload
            end

            hosts_responded += 1

            if (waitfor.is_a?(Array))
              sender = resp.payload[:senderid]
              if unfinished[sender] <= 1
                unfinished.delete(sender)
              else
                unfinished[sender] -= 1
              end

              break if !waitfor.empty? && unfinished.empty?
            else
              break unless waitfor == 0 || hosts_responded < waitfor
            end
          end
        end
      rescue Timeout::Error => e
        if waitfor.is_a?(Array)
          if !unfinished.empty?
            Log.warn("Could not receive all responses. Did not receive responses from #{unfinished.keys.join(', ')}")
          end
        elsif (waitfor > hosts_responded)
          Log.warn("Could not receive all responses. Expected : #{waitfor}. Received : #{hosts_responded}")
        end
      end

      hosts_responded
    end

    def update_stat(stat, hosts_responded, requestid)
      stat[:totaltime] = Time.now.to_f - stat[:starttime]
      stat[:blocktime] = stat[:totaltime] - stat[:discoverytime]
      stat[:responses] = hosts_responded
      stat[:noresponsefrom] = []
      stat[:unexpectedresponsefrom] = []
      stat[:requestid] = requestid

      @stats = stat
    end

    def discovered_req(body, agent, options=false)
      raise "Client#discovered_req has been removed, please port your agent and client to the SimpleRPC framework"
    end

    # Prints out the stats returns from req and discovered_req in a nice way
    def display_stats(stats, options=false, caption="stomp call summary")
      options = @options unless options

      if options[:verbose]
        puts("\n---- #{caption} ----")

        if stats[:discovered]
          puts("           Nodes: #{stats[:discovered]} / #{stats[:responses]}")
        else
          puts("           Nodes: #{stats[:responses]}")
        end

        printf("      Start Time: %s\n", Time.at(stats[:starttime]))
        printf("  Discovery Time: %.2fms\n", stats[:discoverytime] * 1000)
        printf("      Agent Time: %.2fms\n", stats[:blocktime] * 1000)
        printf("      Total Time: %.2fms\n", stats[:totaltime] * 1000)

      else
        if stats[:discovered]
          printf("\nFinished processing %d / %d hosts in %.2f ms\n\n", stats[:responses], stats[:discovered], stats[:blocktime] * 1000)
        else
          printf("\nFinished processing %d hosts in %.2f ms\n\n", stats[:responses], stats[:blocktime] * 1000)
        end
      end

      if stats[:noresponsefrom].size > 0
        puts("\nNo response from:\n")

        stats[:noresponsefrom].each do |c|
          puts if c % 4 == 1
          printf("%30s", c)
        end

        puts
      end

      if stats[:unexpectedresponsefrom].size > 0
        puts("\nUnexpected response from:\n")

        stats[:unexpectedresponsefrom].each do |c|
          puts if c % 4 == 1
          printf("%30s", c)
        end

        puts
      end
    end
  end
end