Spring - ActiveMQ & JMS를 이용한 메시지 서비스 (펌)
★기본 용어
1. JMS : Java Message Service
2. MDB : Message Driven Bean
3. POJO : Plain Old Java Object
# 출처 : http://www.mimul.com/pebble/default/2011/03/15/1300195687681.html
업무나 프로젝트를 하다보면
비동기적인 처리를 요하는 잡들이 생길겁니다. 이런 일들은 FIFO를
통해 순서 보장되는 큐를 이용하면 좋은 효과를 봅니다.
또한, 클라우드 환경에서도 프로비저닝 등을 이용할 때 활용되곤 합니다.
1. ActiveMQ 매커니즘
ActiveMQ는 queue와 topic이라는
구조를 지원합니다
- queue(Sender와 Receiver가 1:1 관계)
- topic(Publisher와 Subscriber가 1:n 관계)
Sender나 Publisher는 message를 queue와 topic에 던지고 바로 응답을 받고 ActiveMQ가 이 message를 Receiver나 Subscriber에 전달한다. Receiver나 Subscriber가 죽어서 전달 대상이 없을 경우 유지를 하게 된다.
2. ActiveMQ 설치
- ApacheMQ 다운로드 : http://activemq.apache.org/download.html
- 제가 설치할 버전 5.4.2.
- 압축해제
- 서버 구동 : D:\Work\Data\apache-activemq-5.4.2\bin\activemq.bat
3. 관련 샘플
- MQExceptionListener(익셉션 리스너)
public class MQExceptionListener implements ExceptionListener
{
private Logger logger = LoggerFactory.
getLogger(MQExceptionListener.class);
private CachingConnectionFactory cachingConnectionFactory;
public void onException(JMSException jmse) {
if (logger.isDebugEnabled())
logger.debug("onException() is called!");
cachingConnectionFactory.onException(jmse);
}
public CachingConnectionFactory getCachingConnectionFactory() {
return cachingConnectionFactory;
}
public void setCachingConnectionFactory(
CachingConnectionFactory cachingConnectionFactory) {
this.cachingConnectionFactory = cachingConnectionFactory;
}
}
- MQController.java(메세지 전송 요청 REST API)
public class MQController {
private Logger logger = LoggerFactory.getLogger(MQController.class);
@Autowired
private MQService mqService;
@RequestMapping(method = RequestMethod.GET)
public void send(HttpServletRequest request, ModelMap model)
{
User user = null;
Random rnd = null;
try {
user = new User();
rnd = new Random();
user.setId("id_" + rnd.nextInt(10));
user.setUsername("홍길동_" + rnd.nextInt(10));
user.setCredential("pwd_" + rnd.nextInt(10));
user.setGrade("G");
user.setEmail("pepsi@paran.com");
user.setCelltel("010-1000-1000");
if (logger.isDebugEnabled())
logger.debug(user.toString() + " ... Sended!");
mqService.send(user);
} catch (Exception e) {
e.printStackTrace();
}
}
}
- User.java(회원 정보 객체)
public class User
{
private String id;
private String username;
private String credential;
private String grade;
private String email;
private String celltel;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getCredential() {
return credential;
}
public void setCredential(String credential) {
this.credential = credential;
}
public String getGrade() {
return grade;
}
public void setGrade(String grade) {
this.grade = grade;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public String getCelltel() {
return celltel;
}
public void setCelltel(String celltel) {
this.celltel = celltel;
}
@Override
public String toString() {
return "User:id=" + id + "&username=" + username +
"&credential=" + credential + "&grade=" + grade +
"&email=" + email + "&celltel=" + celltel;
}
}
- MQDelegator.java(큐 델리게이터)
public class MQDelegator
{
private Logger logger = LoggerFactory.getLogger(MQDelegator.class);
private MQService mqService;
public void setMqService(MQService mqService) {
this.mqService = mqService;
}
public void handleMessage(User user)
{
try {
if (logger.isDebugEnabled())
logger.debug("Queue 에 등록된 메세지 : " + user);
Thread.sleep(1000);
mqService.receive(user);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
- MQMessageConverter.java(메세지 변환 객체)
public class MQMessageConverter implements MessageConverter
{
private Logger logger =
LoggerFactory.getLogger(MQMessageConverter.class);
public Object fromMessage(Message message)
throws JMSException, MessageConversionException {
User user = null;
MapMessage mapMessage = null;
try {
if(!(message instanceof MapMessage)) {
logger.error("Null MapMessage");
throw new MessageConversionException("Null MapMessage");
}
mapMessage = (MapMessage) message;
user = new User();
user.setId(mapMessage.getString("id"));
user.setUsername(mapMessage.getString("username"));
user.setCredential(mapMessage.getString("credential"));
user.setGrade(mapMessage.getString("grade"));
user.setEmail(mapMessage.getString("email"));
user.setCelltel(mapMessage.getString("celltel"));
} catch (Exception e) {
e.printStackTrace();
}
return user;
}
public Message toMessage(Object object, Session session)
throws JMSException, MessageConversionException {
User user = null;
MapMessage message = null;
try {
if(!(object instanceof User)) {
logger.error(Null User");
throw new MessageConversionException("Null User");
}
user = (User) object;
message = session.createMapMessage();
message.setString("id", user.getId());
message.setString("username", user.getUsername());
message.setString("credential", user.getCredential());
message.setString("grade", user.getGrade());
message.setString("email", user.getEmail());
message.setString("celltel", user.getCelltel());
} catch (Exception e) {
e.printStackTrace();
}
return message;
}
}
- MQService.java
public interface MQService {
public void send(User user);
public void receive(User user);
}
- MQServiceImpl.java(메세지 송/수신 서비스)
public class MQServiceImpl implements MQService
{
private Logger logger = LoggerFactory.getLogger(MQServiceImpl.class);
@Autowired
private JmsTemplate jmsTemplate;
/ *
* 메시지 수신
*/
public void receive(User user) {
if (logger.isDebugEnabled())
logger.debug("Queue에 들어온 메시지 : " + user.toString());
}
/ *
* 메시지 송신
*/
public void send(User user) {
try {
if (logger.isDebugEnabled())
logger.debug("Queue 로 보내는 메세지 : " + user.toString());
//jmsTemplate.setDeliveryPersistent(true);
jmsTemplate.convertAndSend(user);
} catch (Exception e) {
e.printStackTrace();
logger.error("send()", e);
}
}
}
ㅁㄴㅇㅁㄴㅇㅁㄴㅇㅁㄴㅇㅁㄴㅇㅁㄴㅇ
- mqContext.xml(Spring 설정)
4. 실행 결과
5. JMX를 사용한 Apache Active MQ를 모니터링
- %JAVA_HOME%\bin\jconsole.exe 클릭
- Local Connections 목록에서 "run.jar start" 문자열이 있는 목록을 클릭
- org.apache.activemq>localhost>Queue>test_queue 를 통해 확인 가능
- http://localhost:8161/admin/browse.jsp?JMSDestination=test_queue 에서 해당 큐의 정보를 확인할 수 있음
6. 성능 튜닝 포인트
- activemq.xml
. 큐 메모리 조정 : <policyEntry queue=">" producerFlowControl="true" memoryLimit="10mb" optimizedDispatch="true">
. Consumers queue Prefetch : <policyEntry queue=">"queuePrefetch="1">
. Consumers topic Prefetch : <policyEntry topic=">"topicPrefetch="1000">
- activemq.bat
. 필요시 heap memory 용량 조정, 쓰레드 풀 사용 : ACTIVEMQ_OPTS="-Xmx512M -Dorg.apache.activemq.UseDedicatedTaskRunner=false"
- 기타
. socketBuffer 사이즈 증가 : tcp://localhost:61616?socketBufferSize=131072
. ioBuffer 사이즈 증가 : tcp://localhost:61616?ioBufferSize=16384
. Non-persistent 가 성능 좋음
7. 큐 활용가능한 비즈니스(비동기)
- 쇼핑몰 주문 : 결제, 배송의 프로세스를 백엔드기능
- 가입, 결제 등의 메일 발송 기능
- 배치 작업(월말 정산을 한다던지, 리포트 생성 등)
- 채팅 서비스
- Cloud의 프로비저닝, 이미지 썸네일, 동영상 인코딩 등
- 네트워크(클라이언트/서버) 프로그래밍의 브로드 캐스팅
[참조 사이트]
'FrameWorks > Spring' 카테고리의 다른 글
| 이클립스에서 Spring2.5+iBatis2.3 연동하기 (0) | 2012.08.15 |
|---|---|
| Spring - STS(SpringSource Tool Suite) Download (0) | 2012.03.13 |
| Spring - Spring >> ActiveMQ 접근 방법 (0) | 2011.09.01 |
| Spring - Spring Messaging Service :: 준비 (0) | 2011.08.31 |