비동기 메시지 처리 시스템(4) - RabbitMQ toturial(2) - Spring Integration + RabbitMQ

@see
https://github.com/airlee00/rabbitmq-tutorial/tree/master/RabbitMQTutorial

 1. context xml
 
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
 xmlns:int="http://www.springframework.org/schema/integration"
 xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
 xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
 xmlns:task="http://www.springframework.org/schema/task"
 xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp 
  http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
  http://www.springframework.org/schema/integration 
  http://www.springframework.org/schema/integration/spring-integration.xsd
  http://www.springframework.org/schema/rabbit 
  http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
  http://www.springframework.org/schema/task 
  http://www.springframework.org/schema/task/spring-task.xsd
  http://www.springframework.org/schema/beans 
  http://www.springframework.org/schema/beans/spring-beans.xsd">

 <!-- ###1.Send -->
 <int:channel id="toRabbit">
  <int:queue capacity="400" />
 </int:channel>

 <int-amqp:outbound-channel-adapter
  channel="toRabbit" amqp-template="amqpTemplate" exchange-name="si.test.exchange"
  routing-key="si.test.binding" />



 <!-- ###2.Receive -->
 <int-amqp:inbound-channel-adapter
  channel="fromRabbit" queue-names="si.test.queue" connection-factory="connectionFactory" />

 <int:channel id="fromRabbit">
  <int:queue capacity="1000" />
  <int:interceptors>
   <int:wire-tap channel="loggingChannel" />
  </int:interceptors>
 </int:channel>

 <int:logging-channel-adapter id="loggingChannel"
  log-full-message="true" level="DEBUG" />

 <int:poller default="true" fixed-rate="500"
  max-messages-per-poll="400" task-executor="taskExecutor" />
 <task:executor id="taskExecutor" pool-size="100"
  queue-capacity="100" rejection-policy="ABORT" />

 <int:service-activator id="rabbitServiceActivator"
  input-channel="fromRabbit" ref="receiveActivator" method="receive">
  <int:poller fixed-rate="500" max-messages-per-poll="500"
   task-executor="taskExecutor" />
 </int:service-activator>

 <bean id="receiveActivator" class="com.toms.mq.rabbitmq.tutorial.part2.AsyncRecv" />
 
 
 <!-- ###3.RabbitMQ config -->

 <rabbit:connection-factory id="connectionFactory"
  addresses="localhost" />

 <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />

 <rabbit:admin connection-factory="connectionFactory" />

 <rabbit:queue name="si.test.queue" />

 <rabbit:direct-exchange name="si.test.exchange">
  <rabbit:bindings>
   <rabbit:binding queue="si.test.queue" key="si.test.binding" />
  </rabbit:bindings>
 </rabbit:direct-exchange>

</beans>

2. AsyncRecv.java

 
 package com.toms.mq.rabbitmq.tutorial.part2;

import org.apache.log4j.Logger;

public class AsyncRecv {
 
   public Logger log = Logger.getLogger(AsyncRecv.class);
   private long t = System.currentTimeMillis();
   public void receive(String msg) {
    log.debug(">>>>>>>>>" + msg+ "====" + (System.currentTimeMillis() - t));
    //System.out.println(">>>>>>>>>"+msg);
   }
}

3. AsyncSend.java

 
package com.toms.mq.rabbitmq.tutorial.part2;

import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.support.MessageBuilder;

public class AsyncSender {

 public static void main(String[] args) throws InterruptedException {
  final AbstractApplicationContext context = new ClassPathXmlApplicationContext(
    "classpath:context/rabbitmq/rabbitmq-async-text-context.xml");
  MessageChannel channel = (MessageChannel) context.getBean("toRabbit");
  for (int i = 0; i < 1000; i++) {
   Message<String> msg = MessageBuilder.withPayload(
     "RabbitMQ async message test --" + i).build();
   channel.send(msg);
  }

  Thread.sleep(3000);
  context.close();
 }
}

댓글

이 블로그의 인기 게시물

Charset 변환 ( EUC-KR, UTF-8, MS949, CP933 )

GZipUtils- gzip을 통한 압축시 charset처리

ESAPI ( XSS, Sql Injection )