본문 바로가기

FrameWorks/Spring

Spring - ActiveMQ & JMS를 이용한 메시지 서비스 (펌)



Spring - ActiveMQ & JMS를 이용한 메시지 서비스 (펌)


 ★기본 용어

1. JMS : Java Message Service

2. MDB : Message Driven Bean

3. POJO : Plain Old Java Object

 

# 출처 : http://www.mimul.com/pebble/default/2011/03/15/1300195687681.html

 

 

업무나 프로젝트를 하다보면 비동기적인 처리를 요하는 잡들이 생길겁니다. 이런 일들은 FIFO를 통해 순서 보장되는 큐를 이용하면 좋은 효과를 봅니다.
또한, 클라우드 환경에서도 프로비저닝 등을 이용할 때 활용되곤 합니다.

1. ActiveMQ
매커니즘
ActiveMQ
queue topic이라는 구조를 지원합니다
- queue(Sender
Receiver 1:1 관계)




- topic(Publisher와 Subscriber 1:n 관계)



Sender
Publisher message queue topic에 던지고 바로 응답을 받고 ActiveMQ가 이 message Receiver Subscriber에 전달한다. Receiver Subscriber가 죽어서 전달 대상이 없을 경우 유지를 하게 된다.

2. ActiveMQ
설치
- ApacheMQ
다운로드 : http://activemq.apache.org/download.html
-
제가 설치할 버전 5.4.2.
-
압축해제
-
서버 구동 : D:\Work\Data\apache-activemq-5.4.2\bin\activemq.bat

3.
관련 샘플
- MQExceptionListener(
익셉션 리스너)



public class MQExceptionListener implements ExceptionListener 
{
	private Logger logger = LoggerFactory.
	   getLogger(MQExceptionListener.class);
	private CachingConnectionFactory cachingConnectionFactory;

	public void onException(JMSException jmse) {
		if (logger.isDebugEnabled())
			logger.debug("onException() is called!");
		cachingConnectionFactory.onException(jmse);
	}

	public CachingConnectionFactory getCachingConnectionFactory() {
		return cachingConnectionFactory;
	}

	public void setCachingConnectionFactory(
			CachingConnectionFactory cachingConnectionFactory) {
		this.cachingConnectionFactory = cachingConnectionFactory;
	}
}



- MQController.java(메세지 전송 요청 REST API)




public class MQController {
	private Logger logger = LoggerFactory.getLogger(MQController.class);
	@Autowired
	private MQService mqService;
	
	@RequestMapping(method = RequestMethod.GET)
	public void send(HttpServletRequest request, ModelMap model) 
	{
		User user = null;
		Random rnd = null;
		
		try {
			user = new User();
			rnd = new Random();
			user.setId("id_" + rnd.nextInt(10));
			user.setUsername("홍길동_" + rnd.nextInt(10));
			user.setCredential("pwd_" + rnd.nextInt(10));
			user.setGrade("G");
			user.setEmail("pepsi@paran.com");
			user.setCelltel("010-1000-1000");
			if (logger.isDebugEnabled())
				logger.debug(user.toString() + " ... Sended!");
			
			mqService.send(user);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}




- User.java(회원 정보 객체)




public class User 
{
	private String id;
	private String username;
	private String credential;
	private String grade;
	private String email;
	private String celltel;
	
	public String getId() {
		return id;
	}
	
	public void setId(String id) {
		this.id = id;
	}

	public String getUsername() {
		return username;
	}

	public void setUsername(String username) {
		this.username = username;
	}

	public String getCredential() {
		return credential;
	}

	public void setCredential(String credential) {
		this.credential = credential;
	}

	public String getGrade() {
		return grade;
	}

	public void setGrade(String grade) {
		this.grade = grade;
	}

	public String getEmail() {
		return email;
	}

	public void setEmail(String email) {
		this.email = email;
	}

	public String getCelltel() {
		return celltel;
	}

	public void setCelltel(String celltel) {
		this.celltel = celltel;
	}

	@Override
	public String toString() {
		return "User:id=" + id + "&username=" + username + 
		  "&credential=" + credential + "&grade=" + grade +
		  "&email=" + email + "&celltel=" + celltel;
	}
}




- MQDelegator.java(큐 델리게이터)



public class MQDelegator 
{
	private Logger logger = LoggerFactory.getLogger(MQDelegator.class);
	private MQService mqService;
	
	public void setMqService(MQService mqService) {
		this.mqService = mqService;
	}

	public void handleMessage(User user) 
	{
		try {
		   if (logger.isDebugEnabled())
		      logger.debug("Queue 에 등록된 메세지 : " + user);
			
			Thread.sleep(1000);
			
			mqService.receive(user);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		
	}
}





- MQMessageConverter.java(메세지 변환 객체)



public class MQMessageConverter implements MessageConverter 
{
	private Logger logger = 
	  LoggerFactory.getLogger(MQMessageConverter.class);

    public Object fromMessage(Message message) 
      throws JMSException, MessageConversionException {
    	User user = null;
    	MapMessage mapMessage = null;
    	
    	try {
	        if(!(message instanceof MapMessage)) {
	        	logger.error("Null MapMessage");
	            throw new MessageConversionException("Null MapMessage");
	        }
	       
	        mapMessage = (MapMessage) message;
	        user = new User();
	        user.setId(mapMessage.getString("id"));
	        user.setUsername(mapMessage.getString("username"));
	        user.setCredential(mapMessage.getString("credential"));
	        user.setGrade(mapMessage.getString("grade"));
	        user.setEmail(mapMessage.getString("email"));
	        user.setCelltel(mapMessage.getString("celltel"));
    	} catch (Exception e) {
    		e.printStackTrace();
    	}
        return user;
    }

    public Message toMessage(Object object, Session session)
    	 throws JMSException, MessageConversionException {
    	
    	User user = null;
    	MapMessage message = null;
    	
    	try {
	        if(!(object instanceof User)) {
	        	logger.error(Null User");
	            throw new MessageConversionException("Null User");
	        }
	       
	        user = (User) object;
	        message = session.createMapMessage();
	        
	        message.setString("id", user.getId());
	        message.setString("username", user.getUsername());
	        message.setString("credential", user.getCredential());
	        message.setString("grade", user.getGrade());
	        message.setString("email", user.getEmail());
	        message.setString("celltel", user.getCelltel());
    	} catch (Exception e) {
    		e.printStackTrace();
    	} 	
        return message;
    }
}





- MQService.java



public interface MQService {
	
	public void send(User user);
	public void receive(User user);
	
}

- MQServiceImpl.java(메세지 송/수신 서비스)
public class MQServiceImpl implements MQService 
{
	private Logger logger = LoggerFactory.getLogger(MQServiceImpl.class);
	@Autowired
	private JmsTemplate jmsTemplate;
	
	/ * 
	 * 메시지 수신
	 */
	public void receive(User user) {
		if (logger.isDebugEnabled())
	  	  logger.debug("Queue에 들어온 메시지 : " + user.toString());
	}

	/ * 
	 * 메시지 송신
	 */
	public void send(User user) {
		try {
		    if (logger.isDebugEnabled())
	              logger.debug("Queue 로 보내는 메세지 : " + user.toString());
		   //jmsTemplate.setDeliveryPersistent(true);
		   jmsTemplate.convertAndSend(user);
		} catch (Exception e) {
			e.printStackTrace();
			logger.error("send()", e);
		}
	}
}


ㅁㄴㅇㅁㄴㅇㅁㄴㅇㅁㄴㅇㅁㄴㅇㅁㄴㅇ


- mqContext.xml(Spring 설정)


   











	



		



			



		



	



	 







	



	



	



	







	







  



      



  



  



  



  



  	



  



  



  



  



  







  



      



      



      



      



       



  



  



  	



  	



  	



  



     



  







	



	



	



	



	



	



	










 

 


4.
실행 결과



5. JMX
를 사용한 Apache Active MQ를 모니터링
- %JAVA_HOME%\bin\jconsole.exe
클릭
- Local Connections
목록에서 "run.jar start" 문자열이 있는 목록을 클릭
- org.apache.activemq>localhost>Queue>test_queue
를 통해 확인 가능
- http://localhost:8161/admin/browse.jsp?JMSDestination=test_queue
에서 해당 큐의 정보를 확인할 수 있음




6.
성능 튜닝 포인트
- activemq.xml
.
큐 메모리 조정 : <policyEntry queue=">" producerFlowControl="true" memoryLimit="10mb" optimizedDispatch="true">
. Consumers queue Prefetch : <policyEntry queue=">"queuePrefetch="1">
. Consumers topic Prefetch : <policyEntry topic=">"topicPrefetch="1000">

- activemq.bat
.
필요시 heap memory 용량 조정, 쓰레드 풀 사용 : ACTIVEMQ_OPTS="-Xmx512M -Dorg.apache.activemq.UseDedicatedTaskRunner=false"

-
기타
. socketBuffer
사이즈 증가 : tcp://localhost:61616?socketBufferSize=131072
. ioBuffer
사이즈 증가 : tcp://localhost:61616?ioBufferSize=16384
. Non-persistent
가 성능 좋음

7.
큐 활용가능한 비즈니스(비동기)
-
쇼핑몰 주문 : 결제, 배송의 프로세스를 백엔드기능
-
가입, 결제 등의 메일 발송 기능
-
배치 작업(월말 정산을 한다던지, 리포트 생성 등)
-
채팅 서비스
- Cloud
의 프로비저닝, 이미지 썸네일, 동영상 인코딩 등
-
네트워크(클라이언트/서버) 프로그래밍의 브로드 캐스팅

[
참조 사이트]