700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > Kafka实战 - 02 Kafka生产者发送消息至topic实现数据上报

Kafka实战 - 02 Kafka生产者发送消息至topic实现数据上报

时间:2021-02-14 05:09:41

相关推荐

Kafka实战 - 02 Kafka生产者发送消息至topic实现数据上报

文章目录

1. 项目背景2. 依赖和配置3. 生产者配置 KafkaConfiguration4. 同步数据Topic枚举 SyncDataTopicEnum5. 请求体 DataSyncQo6. 同步数据控制层 AppSyncDataController7. 同步数据业务层 XdrDataSyncServiceImpl

1. 项目背景

资产可能会遭受各种网络攻击,安全事件和安全告警就是已经被攻击的资产产生的日志,一条攻击链路可能会经过多个资产,由此产生的日志为安全事件,而具体某一个被攻击的资产产生的日志为安全告警。一个安全事件关联多个安全告警,安全事件存在数据库mongodb中,安全告警存在数据库ElasticSearch中。

SIR和XDR是两个不同的产品,SIR平台是安全事件协同响应平台,能够根据安全告警和安全事件日志对已经遭受攻击的资产进行处置闭环,处置完成后需要修改安全事件和安全告警日志的处置状态。

但是XDR产品无法对安全事件和安全告警处置闭环,因此需要将XDR平台的安全告警和安全事件数据接入到SIR平台处置闭环,待处置完成后将数据的处置状态同步给XDR平台,保证两个平台的数据的处置状态一致;

所以下面要做的就是XDR平台的数据上报:

将XDR平台的安全告警和安全事件数据发送到指定的topic中,SIR平台通过Python脚本从topic中取出数据将安全事件存在数据库mongodb中,安全告警存在数据库ElasticSearch中。

2. 依赖和配置

相关文章:Kafka实战 - 01 自定义 SpringBoot Starter 实现 Kafka 的自动配置

在 ngsoc-open 服务中引入自定义的 ngsoc-common-kafka 的依赖:

<dependency><groupId>com.hh</groupId><artifactId>ngsoc-common-kafka</artifactId><version>3.0.1</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency>

kafka的相关配置:项目的配置中心使用的confd,#{{}}是相关配置文件语法,不影响

ngsoc:kafka:clusters:- name: ngsoc#{{$data := json (getv "/ngsoc/kafka/common/cluster/conn_info")}}bootstrap-servers: #{{range $data.route}}- '127.0.0.1:9092' #{{end}}topics:- name: NGSOC_APP_ALARMpartition: 1replication: 1- name: NGSOC_APP_INCIDENTpartition: 1replication: 1

3. 生产者配置 KafkaConfiguration

@Configurationpublic class KafkaConfiguration {@Bean("jsonKafkaTemplate")public KafkaTemplate<String, Object> jsonKafkaTemplate(ProducerFactory<String, Object> pf) {Map<String, Object> config = Map.of(// 3次重试ProducerConfig.RETRIES_CONFIG, "3",// 5ms批量发送ProducerConfig.LINGER_MS_CONFIG, "500",// JSON序列化ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);return new KafkaTemplate<>(pf, config );}}

4. 同步数据Topic枚举 SyncDataTopicEnum

public enum SyncDataTopicEnum {/*** 安全告警*/ALARM(0, "NGSOC_APP_ALARM"),/*** 安全事件*/INCIDENT(1, "NGSOC_APP_INCIDENT");private final int type;private final String name;SyncDataTopicEnum(int type, String name) {this.type = type;this.name = name;}public static String getTopicNameByType(int type) {for (SyncDataTopicEnum constants : SyncDataTopicEnum.values()) {if (type == constants.type) {return constants.name;}}return null;}public static boolean contains(int type) {for (SyncDataTopicEnum value : SyncDataTopicEnum.values()) {if (type == value.type) {return true;}}return false;}}

5. 请求体 DataSyncQo

@Datapublic class DataSyncQo implements ValidateAble {@ApiModelProperty("数据类型,0-安全事件,1-安全告警")@NotNull(message = "data.type.must.be.not.null")private Integer type;@ApiModelProperty("具体数据")@NotEmpty(message = "data.must.not.empty")private List<Object> data;private final int DATA_MAX_SIZE = 200;@Overridepublic void validate() throws ValidateException {if (!SyncDataTopicEnum.contains(type)) {throw new ValidateException("type.of.data.is.not.valid");}if (data.size() > DATA_MAX_SIZE) {throw new ValidateException("data.size.limit");}}}

6. 同步数据控制层 AppSyncDataController

@RestController@RequestMapping("/api/v1/app")public class AppSyncDataController {@Setter(onMethod_ = @Autowired)private IXdrDataSyncService incidentUploadService;@OpenApi@PostMapping("/syncData")@CheckValidateAble@OperateLog(target = "app.data", action = "xdr.data.sync")public ApiResponse<Object> upload(@RequestHeader("Authorization") String key, @Validated @RequestBody DataSyncQo qo) {return incidentUploadService.upload(key, qo);}}

7. 同步数据业务层 XdrDataSyncServiceImpl

@Slf4j@Servicepublic class XdrDataSyncServiceImpl implements IXdrDataSyncService {@Autowired@Qualifier("jsonKafkaTemplate")private KafkaTemplate<String, Object> kafkaTemplate;@Overridepublic ApiResponse<Object> upload(String key, DataSyncQo qo) {// 根据同步数据类型获取topic名称String topicName = Objects.requireNonNull(SyncDataTopicEnum.getTopicNameByType(qo.getType()));// 将同步数据发送到topic中pushToTopic(topicName, qo);return ApiResponse.ok();}private void pushToTopic(String topicName, DataSyncQo qo) {log.info("[XDR数据上报]XDR数据[{}]正在进行上报,数据长度为:{}", topicName, qo.getData().size());for (Object data : qo.getData()) {ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topicName, data);future.addCallback(new ListenableFutureCallback<>() {@Overridepublic void onFailure(@NotNull Throwable t) {log.error("[XDR数据上报]设备上传的数据打入Kafka异常:", t);}@Overridepublic void onSuccess(SendResult<String, Object> stringObjectSendResult) {// 正确不做记录}});}log.info("[XDR数据上报]XDR数据[{}]上报行为执行完毕", topicName);}}

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。