redis에 추가된 SUBSCRIBE, UNSUBSCRIBE 그리고 PUBLISH는 Publish/Subscribe 메시지 패러다임을 구현한 기능이다. sender(publisher)들은 특별한 receiver(subscriber)에게 값을 전달하는게 아니라 해당 채널에 메시지를 전달하면 그 메시지를 구독하고 있는 subscribe에게 메시지를 전송한다. subscribers는 하나 또는 그 이상의 채널에 구독을 요청하고 publisher가 누구인지 상관 없이 해당 채널에 들어온 모든 메시지를 읽게된다.

이 subscriber와 publisher의 decoupling은 확장성있는 성장을 가져올 수 있다.

 

Redis-Cli로 기능 사용하기


subscriber
redis-cli를 열고 SUBSCRIBE 채널1 채널2 ... 를 입력한다.

 

publisher
마찬가지로 redis-cli를 열고 PUBLISH 채널 메시지 를 입력해서 전송한다.

그럼 이를 구독하고 있던 subscriber 콘솔에 다음과 같이 출력된다.

 

Spring Boot 2.1.7에 적용하기


그럼 이 방식을 Spring boot에 적용하여 sub와 pub를 이용한 개발을 해보자.

우선 필요한 libaray는 다음과 같다.

spring-boot-starter-data-redis
spring-boot-starter-web
lettuce-core (기본적으로 탑재된 jedis보다 좋다고 하여 변경)
lombok
spring-boot-starter-test

 

라이브러리를 maven이나 gradle 통해 넣어주고 configuration을 통해서 지정해보다. 기본적으로 redisTemplate의 connection은 application.properties에 spring.redis.host, spring.redis.port에 지정해주면 그에 맞게 생성되기 때문에 별도로 설정해주지 않고 그대로 사용한다.

그리고 RedisSubscriber Listener를 구현해서 적용해주는데 RedisMessageListenerContainer를 설정해준다. 속성 값으로 MessageListenerAdapter를 부여해주는데 이 Adapter에는 MessageListener인터페이스를 구현하고 onMessage를 재정의하여 전달 받은 메시지에 대한 처리를 지정한다.

Configuration

    private RedisTemplate<String, String> redisTemplate;

    @Bean
    MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter(new RedisMessageSubscriber());
    }

    @Bean
    RedisMessageListenerContainer redisContainer() {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisTemplate.getConnectionFactory());
        container.addMessageListener(messageListener(), topic());
        return container;
    }

RedisMessageSubScriber

package com.study.redis.config;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

/**
 * spring-boot-study
 *
 * @author wedul
 * @since 2019-08-21
 **/
@Service
public class RedisMessageSubscriber implements MessageListener {

    public static List<String> messageList = new ArrayList<>();

    @Override
    public void onMessage(final Message message, final byte[] pattern) {
        messageList.add(message.toString());
        System.out.println("Message received: " + new String(message.getBody()));

    }
}

그럼 기동해보고 redis-cli를 통해서 PUBLISH를 날려보면 위에 onMessage에 정의한 대로 콘솔로그가 찍히는지 보자.

그리고 Publisher도 설정하고 Test 코드를 작성하여 redis-cli처럼 결과가 나오는지 확인해보자.

우선 Publisher에서 사용되는 RedisMessagePublisher를 정의해준다.

Configuration

    @Bean
    RedisMessagePublisher redisPublisher() {
        return new RedisMessagePublisher(redisTemplate, topic());
    }

    @Bean
    ChannelTopic topic() {
        return new ChannelTopic("wedul");
    }

Test

package com.study.redis;

import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
@NoArgsConstructor
public class RedisApplicationTests {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Test
    public void contextLoads() {
        redisTemplate.convertAndSend("wedul", "No I'm genius");
    }

}

결과도 잘 나왔다. 굿굿 

ElasticCache를 사용하고 있다면 별도의 카프카와 같은 메시지큐 없이도 레디스를 사용해도 되지 않나 싶기도 하다.

 

자세한 코드는 여기에 redis 모듈 참고

https://github.com/weduls/spring5

 

weduls/spring5

study. Contribute to weduls/spring5 development by creating an account on GitHub.

github.com

 

  1. Favicon of https://coding-start.tistory.com BlogIcon 여성게 2019.08.23 14:36 신고

    저는 메시지큐 쓰려고 무거운 카프카를 사용했었고 다른 용도로 레디스도 사용했는데, 복잡한 메시지큐 기능이 필요하지 않으면 레디스 펍/섭 기능도 가볍게 쓰기 좋겠내요 ㅎㅎ

기존에 공부삼아서 개발중이던 wedulpos에 spring batch를 추가해보려고 했다.

그래서 공통으로 mono 프로젝트로 되어있던 wedulpos를 multi module로 수정했다.

 

그랬더니 이상하게 servlet context에서 jsp를 로드하지 못했다.

그래서 계속해서 ServletException not include... jsp 또는 ServletException not jsp found 오류가 발생했다.

 

그래서 엄청난 구글링을 2틀동안했다. 집에서 그리고 약속장소에서 기다리면서 노트북으로 그리고 퇴근하고 오늘..

정말 가지가지한 방법을 다해봤었다. 기본적으로 embed-tomcat의 경우 jasper를 가지고 있지 못해서 별도의 모듈을 추가하고 servlet jspl 추가했고, compileOnly, provieded 별 난리를 다했다 ㅋㅋㅋ

하지만 tiles, url resolver 모두 bean이 등록되어있고 잘 동작하는데 jsp를 못찾는 해결하지 못했다.

최후에 방법으로 검색해본 키워드 intellij에서 정답을 찾았다. 

 

intellij에서 module 안에 웹 모듈을 실행시킬때는 working directory를 해당 모듈로 설정해줘야 한다. 그렇지 않으면 최상의 root로 working directory가 지정되기 때문이다.

https://stackoverflow.com/questions/44794588/intellij-run-configuration-spring-boot-vs-maven-issues

그래서 이 글을 보고 바로 지정해봤다.

 

결과는 성공 ㅋㅋㅋㅋㅋㅋㅋ

 

ㅋㅋㅋㅋ

너무 행복하다.  이 맛에 구글링하고 개발하는거 같다.

 

이제 내일부터는 spring batch를 공부해서 하나하나 정리하고 간단하게 batch를 만들어보자. 휴

git 주소 : https://github.com/weduls/wedulpos_boot

  1. 김병관 2019.08.06 09:00

    이 글을 보고 암이 나았습니다.감사합니다.하마터면죽을뻔했네요

간단하게 Kafka 설치

docker-compose.yml 생성 후 docker-compose up -d를 통해 설치

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_HOST_NAME: wedul.pos
      KAFKA_CREATE_TOPICS: "test:1:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
cs


설치된 프로세스 확인


생성된 토픽 확인

- test 토픽이 파티션 1와 replication 1로 생성되었는지 여부 확인

1
2
3
$ docker exec -it simple_kafka_1 bash
$ cd /opt/kafka/bin
$ kafka-topics.sh --describe --topic test --zookeeper simple_zookeeper_1
cs


Spring Boot 프로젝트 생성


카프카 설정

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.kafka.study.configuration;
 
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
 
import java.util.HashMap;
import java.util.Map;
 
/**
 * 카프카 설정
 *
 * @author wedul
 * @since 2019-01-24
 **/
@Configuration
@EnableKafka
@PropertySource("classpath:kafka.properties")
public class KafkaConfiguration {
 
  @Autowired
  private Environment env;
 
  private Map<String, Object> producerConfig() {
    Map<String, Object> config = new HashMap<>();
 
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("bootstrap.servers"));
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 
    return config;
  }
 
  @Bean
  public KafkaTemplate<StringString > kafkaTemplate() {
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig()));
  }
 
}
 
cs


카프카 컨트롤러

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.kafka.study.ctrl;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
 
/**
 * study
 *
 * @author wedul
 * @since 2019-01-24
 **/
@RestController
@Slf4j
public class KafkaCtrl {
 
  private final KafkaTemplate kafkaTemplate;
 
  public KafkaCtrl(KafkaTemplate kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
  }
 
  @PostMapping("/send")
  public ResponseEntity<String> sendMessage(String message) {
    if(!StringUtils.isEmpty(message)) kafkaTemplate.send("test""Message is " + message);
    log.info(message);
    return ResponseEntity.ok("");
  }
}
 
cs


Console-consumer 모니터링 모드

카프카에 메시지를 send 했을 때 모니터링하기 위한 모드 스크립트 실행

1
bash-4.4# kafka-console-consumer.sh --bootstrap-server wedul.pos:9092 --topic test
cs


실행해보면 콘솔에 메시지가 전송된것을 확인할 수 있다.


Kafka Licenser 설정 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.kafka.study.configuration;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
/**
 * study
 *
 * @author wedul
 * @since 2019-01-25
 **/
@Slf4j
@Component
public class ReceiveConfiguration {
 
  @KafkaListener(topics = "test", groupId = "console-consumer-1970")
  public void receive(String payload) {
    log.info("received payload='{}'", payload);
  }
 
}
 
cs


보내면 바로 consumer에서 메시지를 받을 수 있도록 리스너를 설정해보자.

그리고 테스트!

1
2
3
4
5
2019-01-25 00:09:43.033  INFO 1760 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2019-01-25 00:09:43.034  INFO 1760 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14705e51bd2ce5
2019-01-25 00:09:43.041  INFO 1760 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: 8gNzLx__SHq-4p0b_WsydA
2019-01-25 00:09:43.047  INFO 1760 --- [nio-8080-exec-1] com.kafka.study.ctrl.KafkaCtrl           : babo
2019-01-25 00:09:43.069  INFO 1760 --- [ntainer#0-0-C-1] c.k.s.c.ReceiveConfiguration             : received payload='Message is babo'
cs


git 저장소 : https://github.com/weduls/kafka_example

바로 직전 https://wedul.tistory.com/562?category=595982 에서 Custom validation을 만들어서 입력된 값에 validation을 체크하는 방법을 알아봤다.

그럼 이 validation체크를 통해서 front에 상황에 맞는 에러를 보내줄 수 있도록 조치를 취해보자.

우선 @valid 처리를 했었던 컨트롤러에서 에러 메시지를 수집해야한다. 


1. Controller

Spring에서 Validation 작업을 진행할 시 validation에 문제가 발생하면 에러 내용을 묶어서 BindingResult로 처리할 수 있도록 제공해준다. 이를 사용하기 위해서 parameter로 BindingResult값을 추가해준다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
 * 회원가입
 *
 * @param reqDto
 * @return
 * @throws Exception
 */
@RequestMapping("/join")
public ResponseEntity<?> join(@Valid UserDto reqDto, BindingResult bindingResult) throws Exception {
    // check constraint rules
    this.checkConstraintRule(bindingResult);
 
    return ResponseEntity.ok(userService.insertUser(reqDto));
}
cs


2. 에러 처리

BindingResult에 validation을 체크하고 발생한 에러들에 대한 내용을 하나씩 뽑아서 국제화 메시지로 변경해주고 \n으로 데이터를 묶어서 view에 전달할 수 있도록 데이터를 바꿔 준다. 공통적으로 사용하것이기 때문에 공통 Controller 클래스를 하나 만들어서 사용한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.wedul.common.controller;
 
import com.wedul.common.error.BadRequestException;
import com.wedul.common.util.MessageBundleUtil;
import lombok.AllArgsConstructor;
import org.apache.commons.lang.StringUtils;
import org.springframework.validation.BindingResult;
 
import java.util.stream.Collectors;
 
/**
 * wedulpos
 *
 * @author wedul
 * @since 2018-12-24
 **/
@AllArgsConstructor
public class BaseController {
 
  private final MessageBundleUtil messageBundleUtil;
 
  protected void checkConstraintRule(BindingResult bindingResult) throws BadRequestException {
    String msg = null;
    if (bindingResult.hasErrors()) {
       msg = bindingResult.getFieldErrors()
              .stream()
              .map(error -> messageBundleUtil.getMessage(error.getDefaultMessage()))
              .collect(Collectors.joining("\n"));
    }
 
    if(StringUtils.isNotBlank(msg)) {
      throw new BadRequestException(msg);
    }
 
    return;
  }
 
}
 
cs


3. 에러 핸들링

나는 에러를 에러 코드와 메시지로 전달해주는 방식을 좋아한다. 사실 다른 정보를 다 전달해줘봐야 프론트에서 처리하기도 어렵고 나머지는 로그로써 확인하는게 더 편하다. 그래서 전달하는 값을 정제하기 위해서 @ControllerAdvice를 통해 출력되는 에러를 재정의 한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package com.wedul.common.config;
 
import com.wedul.common.enums.EnumErrorType;
import com.wedul.common.error.BadRequestException;
import com.wedul.common.error.ForbiddenException;
import com.wedul.common.error.NotFoundException;
import com.wedul.common.error.InternalServerException;
import lombok.Builder;
import lombok.Data;
import org.apache.commons.lang.StringUtils;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
 
/**
 * 에러 유형을 나타내는 Config
 *
 * @author wedul
 * @Date 2017. 07. 09
 */
@ControllerAdvice
public class ExceptionConfig {
 
  @Data
  @Builder
  private static class ErrorResponse {
    private int errCode;
    private String msg;
  }
 
  @ExceptionHandler({Exception.class})
  @ResponseBody
  public ErrorResponse errorHandler(Exception ex) {
    if(ex instanceof BadRequestException) {
      return this.getError(400, ex);
    } else if(ex instanceof ForbiddenException) {
      return this.getError(403, ex);
    } else if(ex instanceof NotFoundException) {
      return this.getError(404, ex);
    } else if(ex instanceof InternalServerException) {
      return this.getError(500, ex);
    } else {
      return ErrorResponse.builder().errCode(500).msg(ex.getMessage()).build();
    }
  }
 
  /**
   * 기본 에러 내용 출력
   *
   * @param errorCode
   * @param ex
   * @return
   */
  private ErrorResponse getError(int errorCode, Exception ex) {
    String message = ex.getMessage();
    if(StringUtils.isBlank(message)) {
      message = EnumErrorType.getErrorMsg(errorCode);
    }
 
    return ErrorResponse.builder().errCode(errorCode).msg(message).build();
  }
 
  /**
   * Error code 만들기
   *
   * @return String
   * @date 2017. 7. 9.
   * @author wedul
   */
  private String makeErrorCode(Exception ex) {
    StackTraceElement[] ste = ex.getStackTrace();
    StringBuffer sb = new StringBuffer();
    StackTraceElement[] arrayOfStackTraceElement1;
    int j = (arrayOfStackTraceElement1 = ste).length;
    for (int i = 0; i < j; i++) {
      StackTraceElement el = arrayOfStackTraceElement1[i];
      String className = el.getClassName();
      if (className.startsWith("com.wedul.wedulpos")) {
        sb.append(className.substring(className.lastIndexOf("."+ 1).toUpperCase()).append("[");
        sb.append(el.getLineNumber()).append("]");
        break;
      }
    }
    if (StringUtils.isBlank(sb.toString())) {
      return ex.getStackTrace()[0].getClassName();
    }
    return sb.toString();
  }
}
 
cs


4. 테스트 진행

1) @NotBlank, @Email 확인


2) Custom Validation인 password check



소스코드 : https://github.com/weduls/wedulpos_boot

참고 : https://meetup.toast.com/posts/147

  1. 이정욱 2019.08.12 18:53

    정말 깔끔하고 좋은 코드 감사합니다.

엔티티 매핑에서 사용될 컬럼의 필드 유형을 설정하는 매핑 어노테이션을 정리해보자.

@Column
테이블에서 사용 되는 컬럼이라는 필드를 지정해줄때 사용하며 name, nullable(기본이 true) 등의 설정을 해줄 수 있다. 

1
2
@Column(name = "NAME", length = 10, nullable = true)
private String userName;
cs


@Enumerated
자바의 enum 타입을 매핑할 때 사용한다. 속성으로 EnumType.ORDINAL과 EnumType.STRING이 존재하는데 이름 그대로 ORDINAL은 순서를 STRING은 Enum의 이름을 저장한다

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Member {
 
  @Id
  @GeneratedValue
  @Column(name = "ID")
  private String id;
 
  @Column(name = "NAME"length = 10, nullable = true)
  private String userName;
 
  // 매핑 정보가 없는 필드
  private int age;
 
  @Enumerated(EnumType.STRING)
  Gender gender;
 
}
 
enum Gender {
  Men, Women;
 
  private Gender() {
 
  }
}
 
// 위와 같이 설정하면 데이터베이스에 Men으로 들어간다.
member.setGender(Gender.Men);
cs


@Temporal
java.util.Date와 java.util.Calendar 값을 매핑 할 때 사용한다.

1
2
3
4
5
6
7
8
9
10
11
// 2018-04-02 형태 (데이터베이스 DATE와 매핑)
@Temporal(TemporalType.DATE)
private Date birthDate;
 
// 12:11:11 (데이터베이스 TIME과 매핑)
@Temporal(TemporalType.TIME)
private Date birthTime;
 
// 2013-10-21 12:11:11 (데이터베이스 TIME과 매핑)
@Temporal(TemporalType.TIMESTAMP)
private Date birthTimeStamp;
cs


@LOB
데이터베이스 BLOB, CLOB 타입과 매핑 된다. CLOB(String, char[], java.sql.CLOB)은 문자, BLOB(byte[], java.sql.BLOB)은 나머지가 매핑된다.

@Transient
저장 조회에 사용되지도 않고 그냥 단순 값을 가지고 있고 싶을때 사용.

1
2
@Transient
private String tempStr;
cs


@Access
데이터베이스에 엔티티에 값이 저장될 때 필드(AccessType.FIELD)의 값을 직접 접근해서 사용할 것인가 아니면 메서드에 직접(AccessType.PROPERTY) 접근할 것 인가를 설정하는 것이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Access(AccessType.FIELD)
public class Member {
 
  @Id
  @GeneratedValue
  @Column(name = "ID")
  private String id;
 
  @Column(name = "NAME"length = 10, nullable = true)
  private String userName;
 
  // 매핑 정보가 없는 필드
  private int age;
 
  @Enumerated(EnumType.STRING)
  Gender gender;
 
  // 2018-04-02 형태 (데이터베이스 DATE와 매핑)
  @Temporal(TemporalType.DATE)
  private Date birthDate;
 
  // 12:11:11 (데이터베이스 TIME과 매핑)
  @Temporal(TemporalType.TIME)
  private Date birthTime;
 
  // 2013-10-21 12:11:11 (데이터베이스 TIME과 매핑)
  @Temporal(TemporalType.TIMESTAMP)
  private Date birthTimeStamp;
 
  @Transient
  private String tempStr;
 
}
cs


- @Access 필드를 생략하고 @Id 필드를 사용하면 AccessType.FIELD로 설정된 것과 같다.
- 나머지 필드는 @Id를 사용하여 AccessType.FIELD로 사용하고 특정 값만 AccessType.PROPERTY로 설정할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Access(AccessType.FIELD)
public class Member {
 
  @Id
  @GeneratedValue
  @Column(name = "ID")
  private String id;
 
  @Column(name = "NAME"length = 10, nullable = true)
  private String userName;
 
  // 매핑 정보가 없는 필드
  private int age;
 
  @Enumerated(EnumType.STRING)
  Gender gender;
 
  // 2018-04-02 형태 (데이터베이스 DATE와 매핑)
  @Temporal(TemporalType.DATE)
  private Date birthDate;
 
  // 12:11:11 (데이터베이스 TIME과 매핑)
  @Temporal(TemporalType.TIME)
  private Date birthTime;
 
  // 2013-10-21 12:11:11 (데이터베이스 TIME과 매핑)
  @Temporal(TemporalType.TIMESTAMP)
  private Date birthTimeStamp;
 
  @Transient
  private String tempStr;
 
  @Access(AccessType.PROPERTY)
  public String getFullName() {
    return "dbsafer" + this.userName;
  }
 
}
cs


+ Recent posts