Spring - ActiveMQ & JMS를 이용한 메시지 서비스 (펌)
★기본 용어
1. JMS : Java Message Service
2. MDB : Message Driven Bean
3. POJO : Plain Old Java Object
# 출처 : http://www.mimul.com/pebble/default/2011/03/15/1300195687681.html
업무나 프로젝트를 하다보면
비동기적인 처리를 요하는 잡들이 생길겁니다. 이런 일들은 FIFO를
통해 순서 보장되는 큐를 이용하면 좋은 효과를 봅니다.
또한, 클라우드 환경에서도 프로비저닝 등을 이용할 때 활용되곤 합니다.
1. ActiveMQ 매커니즘
ActiveMQ는 queue와 topic이라는
구조를 지원합니다
- queue(Sender와 Receiver가 1:1 관계)
- topic(Publisher와 Subscriber가 1:n 관계)
Sender나 Publisher는 message를 queue와 topic에 던지고 바로 응답을 받고 ActiveMQ가 이 message를 Receiver나 Subscriber에 전달한다. Receiver나 Subscriber가 죽어서 전달 대상이 없을 경우 유지를 하게 된다.
2. ActiveMQ 설치
- ApacheMQ 다운로드 : http://activemq.apache.org/download.html
- 제가 설치할 버전 5.4.2.
- 압축해제
- 서버 구동 : D:\Work\Data\apache-activemq-5.4.2\bin\activemq.bat
3. 관련 샘플
- MQExceptionListener(익셉션 리스너)
public class MQExceptionListener implements ExceptionListener { private Logger logger = LoggerFactory. getLogger(MQExceptionListener.class); private CachingConnectionFactory cachingConnectionFactory; public void onException(JMSException jmse) { if (logger.isDebugEnabled()) logger.debug("onException() is called!"); cachingConnectionFactory.onException(jmse); } public CachingConnectionFactory getCachingConnectionFactory() { return cachingConnectionFactory; } public void setCachingConnectionFactory( CachingConnectionFactory cachingConnectionFactory) { this.cachingConnectionFactory = cachingConnectionFactory; } }
- MQController.java(메세지 전송 요청 REST API)
public class MQController { private Logger logger = LoggerFactory.getLogger(MQController.class); @Autowired private MQService mqService; @RequestMapping(method = RequestMethod.GET) public void send(HttpServletRequest request, ModelMap model) { User user = null; Random rnd = null; try { user = new User(); rnd = new Random(); user.setId("id_" + rnd.nextInt(10)); user.setUsername("홍길동_" + rnd.nextInt(10)); user.setCredential("pwd_" + rnd.nextInt(10)); user.setGrade("G"); user.setEmail("pepsi@paran.com"); user.setCelltel("010-1000-1000"); if (logger.isDebugEnabled()) logger.debug(user.toString() + " ... Sended!"); mqService.send(user); } catch (Exception e) { e.printStackTrace(); } } }
- User.java(회원 정보 객체)
public class User { private String id; private String username; private String credential; private String grade; private String email; private String celltel; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getCredential() { return credential; } public void setCredential(String credential) { this.credential = credential; } public String getGrade() { return grade; } public void setGrade(String grade) { this.grade = grade; } public String getEmail() { return email; } public void setEmail(String email) { this.email = email; } public String getCelltel() { return celltel; } public void setCelltel(String celltel) { this.celltel = celltel; } @Override public String toString() { return "User:id=" + id + "&username=" + username + "&credential=" + credential + "&grade=" + grade + "&email=" + email + "&celltel=" + celltel; } }
- MQDelegator.java(큐 델리게이터)
public class MQDelegator { private Logger logger = LoggerFactory.getLogger(MQDelegator.class); private MQService mqService; public void setMqService(MQService mqService) { this.mqService = mqService; } public void handleMessage(User user) { try { if (logger.isDebugEnabled()) logger.debug("Queue 에 등록된 메세지 : " + user); Thread.sleep(1000); mqService.receive(user); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }
- MQMessageConverter.java(메세지 변환 객체)
public class MQMessageConverter implements MessageConverter { private Logger logger = LoggerFactory.getLogger(MQMessageConverter.class); public Object fromMessage(Message message) throws JMSException, MessageConversionException { User user = null; MapMessage mapMessage = null; try { if(!(message instanceof MapMessage)) { logger.error("Null MapMessage"); throw new MessageConversionException("Null MapMessage"); } mapMessage = (MapMessage) message; user = new User(); user.setId(mapMessage.getString("id")); user.setUsername(mapMessage.getString("username")); user.setCredential(mapMessage.getString("credential")); user.setGrade(mapMessage.getString("grade")); user.setEmail(mapMessage.getString("email")); user.setCelltel(mapMessage.getString("celltel")); } catch (Exception e) { e.printStackTrace(); } return user; } public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException { User user = null; MapMessage message = null; try { if(!(object instanceof User)) { logger.error(Null User"); throw new MessageConversionException("Null User"); } user = (User) object; message = session.createMapMessage(); message.setString("id", user.getId()); message.setString("username", user.getUsername()); message.setString("credential", user.getCredential()); message.setString("grade", user.getGrade()); message.setString("email", user.getEmail()); message.setString("celltel", user.getCelltel()); } catch (Exception e) { e.printStackTrace(); } return message; } }
- MQService.java
public interface MQService { public void send(User user); public void receive(User user); } - MQServiceImpl.java(메세지 송/수신 서비스) public class MQServiceImpl implements MQService { private Logger logger = LoggerFactory.getLogger(MQServiceImpl.class); @Autowired private JmsTemplate jmsTemplate; / * * 메시지 수신 */ public void receive(User user) { if (logger.isDebugEnabled()) logger.debug("Queue에 들어온 메시지 : " + user.toString()); } / * * 메시지 송신 */ public void send(User user) { try { if (logger.isDebugEnabled()) logger.debug("Queue 로 보내는 메세지 : " + user.toString()); //jmsTemplate.setDeliveryPersistent(true); jmsTemplate.convertAndSend(user); } catch (Exception e) { e.printStackTrace(); logger.error("send()", e); } } }
ㅁㄴㅇㅁㄴㅇㅁㄴㅇㅁㄴㅇㅁㄴㅇㅁㄴㅇ
- mqContext.xml(Spring 설정)
4. 실행 결과
5. JMX를 사용한 Apache Active MQ를 모니터링
- %JAVA_HOME%\bin\jconsole.exe 클릭
- Local Connections 목록에서 "run.jar start" 문자열이 있는 목록을 클릭
- org.apache.activemq>localhost>Queue>test_queue 를 통해 확인 가능
- http://localhost:8161/admin/browse.jsp?JMSDestination=test_queue 에서 해당 큐의 정보를 확인할 수 있음
6. 성능 튜닝 포인트
- activemq.xml
. 큐 메모리 조정 : <policyEntry queue=">" producerFlowControl="true" memoryLimit="10mb" optimizedDispatch="true">
. Consumers queue Prefetch : <policyEntry queue=">"queuePrefetch="1">
. Consumers topic Prefetch : <policyEntry topic=">"topicPrefetch="1000">
- activemq.bat
. 필요시 heap memory 용량 조정, 쓰레드 풀 사용 : ACTIVEMQ_OPTS="-Xmx512M -Dorg.apache.activemq.UseDedicatedTaskRunner=false"
- 기타
. socketBuffer 사이즈 증가 : tcp://localhost:61616?socketBufferSize=131072
. ioBuffer 사이즈 증가 : tcp://localhost:61616?ioBufferSize=16384
. Non-persistent 가 성능 좋음
7. 큐 활용가능한 비즈니스(비동기)
- 쇼핑몰 주문 : 결제, 배송의 프로세스를 백엔드기능
- 가입, 결제 등의 메일 발송 기능
- 배치 작업(월말 정산을 한다던지, 리포트 생성 등)
- 채팅 서비스
- Cloud의 프로비저닝, 이미지 썸네일, 동영상 인코딩 등
- 네트워크(클라이언트/서버) 프로그래밍의 브로드 캐스팅
[참조 사이트]
'FrameWorks > Spring' 카테고리의 다른 글
이클립스에서 Spring2.5+iBatis2.3 연동하기 (0) | 2012.08.15 |
---|---|
Spring - STS(SpringSource Tool Suite) Download (0) | 2012.03.13 |
Spring - Spring >> ActiveMQ 접근 방법 (0) | 2011.09.01 |
Spring - Spring Messaging Service :: 준비 (0) | 2011.08.31 |